大 数 据 技 术
(教师版)
课 程 实 验 文 档
杭 州 量 算 科 技 有 限 公 司
目 录
一、
SPARK实验1 .................................................................................................................... 1
二、
SPARK单机模式安装 .......................................................................................................... 1 RDD介绍与操作 ................................................................................................................ 3 综合案例 ........................................................................................................................... 13 附加题 ............................................................................................................................... 14 SPARK实验2 .................................................................................................................. 16
三、
NETCAT安装 ..................................................................................................................... 16 SPARK STREAMING ............................................................................................................. 17 附加题 ............................................................................................................................... 20 MAPREDUCE实验 ........................................................................................................ 21
安装HADOOP环境 ........................................................................................................... 21 编写程序 .............................................................................................. 错误!未定义书签。 运行程序 ........................................................................................................................... 27 附加题 ............................................................................................................................... 28
附件(编写MAPREDUCE程序文件代码): ........................................................................... 29 四、
HIVE实验 ........................................................................................................................ 35
HIVE安装配置 .................................................................................................................. 35 HIVE入门 .......................................................................................................................... 36 HIVE的数据库 .................................................................................................................. 38 HIVE实现单词统计 .......................................................................................................... 46 附加题 ............................................................................................................................... 47
I
一、 Spark实验1
实验目的:
1、 了解Spark的生态圈和基本功能。
2、 掌握Spark单机模式的安装过程和参数配置。 3、 掌握RDD的多种操作方法。
Spark单机模式安装
1.1 下载资料
将大数据实验所需要的安装包和资料下载到/home/data目录下。 ① 第一步,新建一个目录。 mkdir /home/data
② 第二步,下载资料,进入data目录。 cd /home/data 下载资料: wget -O bigdata.tar
http://10.131.70.3/owncloud/index.php/s/SpfsWiPNj4P4NpO/download ③ 第三步,解压。 tar -xvf bigdata.tar 1.2 安装JDK
① 第一步,新建一个文件夹。 mkdir /usr/java
② 第二步,将JDK安装包拷贝到/usr/java目录下。
cp /home/data/BigData/package/jdk-8u65-linux-x64.tar.gz /usr/java ③ 第三步,解压。 cd /usr/java
tar xzf jdk-8u65-linux-x64.tar.gz
1
④ 第四步,设置环境变量。 vi /etc/profile
在/etc/profile文件最底下添加以下信息: export JAVA_HOME=/usr/java/jdk1.8.0_65 export JRE_HOME=/usr/java/jdk1.8.0_65/jre
export CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH export
PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$JAVA_HOME:$PATH
⑤ 第五步,加载环境变量。 source /etc/profile
⑥ 第六步,查看jdk版本。当出现jdk版本信息时,表示JDK已经安装成
功。 java -version 1.3 解压spark
将spark安装包拷贝到/usr/spark目录下,并解压。 mkdir /usr/spark cd /usr/spark
cp /home/data/BigData/package/spark-2.0.1-bin-hadoop2.7.tgz /usr/spark tar -xvf spark-2.0.1-bin-hadoop2.7.tgz 1.4 配置文件
进入Spark安装目录下的conf目录,复制conf spark-env.sh.template文件为 spark-env.sh。
cd /usr/spark/spark-2.0.1-bin-hadoop2.7/conf/ cp spark-env.sh.template spark-env.sh vi spark-env.sh
在其中修改,增加如下内容: SPARK_LOCAL_IP=127.0.0.1
2
1.5 Spark单机模式启动
在bin目录下执行: sh spark-shell --master=local
cd /usr/spark/spark-2.0.1-bin-hadoop2.7/bin ➢ Centos7.4输入启动: sh spark-shell --master=local ➢ Ubuntu16.04输入启动: ./spark-shell --master=local 出现以下界面,即为启动成功。
RDD介绍与操作
2.1 介绍
RDD就是带有分区的集合类型,弹性分布式数据集(RDD),特点是可以并行操作,并且是容错的。
1) RDD的创建(两种方法):
① 执行Transform操作(变换操作),使用的方式有两种,makeRDD和
parallelize。
➢ 第一种:parallelize方法。
scala> val r1=sc.parallelize(List(1,2,3,4),2) //2是分区数量。 ➢ 第二种:makeRDD方法。
scala> val r2=sc.makeRDD(List(1,2,3,4),2)
② 读取外部存储系统的数据集,如HDFS,HBase,或任何与Hadoop有关
3
的数据源。此方法同样有两种方式: ➢ 第一种:从文件中读取。
scala> val data1=sc.textFile(\"file:///home/word.txt\ ➢ 第二种:从分布式文件系统读。
scala>val data2=sc.textFile(\"hdfs://hadoop:9000/word.txt\ 2) 几个RDD的通用方法:
① 查看RDD的全部数量:scala>rdd.collect。
② 收集rdd中的数据组成Array返回,此方法将会把分布式存储的rdd中
的数据集中到一台机器中组建Array。
➢ 注意:在生产环境下一定要慎用这个方法,容易内存溢出。 ③ 查看RDD的分区数量:scala>rdd.partitions.size。 ④ 查看RDD每个分区的元素:scala>rdd.glom.collect。 ➢ 提示:此方法会将每个分区的元素以Array形式返回。 2.2 操作
针对RDD的操作分两种,一种是Transformation(变换),一种是Actions(执行)。
Transformation(变换)操作属于懒操作(算子),不会真正触发RDD的处理计算。变换方法的共同点:
➢ 不会马上触发计算;
➢ 每当调用一次变换方法,都会产生一个新的RDD。 Actions(执行)操作才会真正触发。 Transformations变换。
下面操作几个常用的函数。同学们可以选择操作其中的几个来熟悉RDD的用法。
① map(func)。
参数是函数,函数应用于RDD每一个元素,返回值是新的RDD。 案例展示:map 将函数应用到rdd的每个元素中。 scala> var r1=sc.makeRDD(List(1,2,3,4),2)
4
r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at scala> r1.map(x=>x*10) res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at scala> res0.collect res2: Array[Int] = Array(10, 20, 30, 40) ② flatMap(func)。 扁平化map,对RDD每个元素转换,然后再扁平化处理。 案例展示:flatMap 扁平map处理。 scala> var r2 = sc.makeRDD(List(\"hello world\ r2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at makeRDD at scala> r2.flatMap(_.split{\" \ res3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at scala> res3.collect res5: Array[String] = Array(hello, world, hello, count, hello, spark) map和flatMap有何不同? ➢ map: 对RDD每个元素转换。 ➢ flatMap: 对RDD每个元素转换, 然后再扁平化(即去除集合)。 ③ filter(func)。 参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD。 案例展示:filter 用来从rdd中过滤掉不符合条件的数据。 5 scala> val r3 = sc.makeRDD(List(1,3,5,7,9)) r3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at makeRDD at scala> r3.filter(_<5); res6: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at filter at scala> res6.collect res8: Array[Int] = Array(1, 3) ④ union(otherDataset)。 并集,也可以用++实现。 案例展示:union 取并集。 scala> val r4=sc.makeRDD(List(1,2,3),2) r4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at makeRDD at ^ scala> val r5=sc.makeRDD(List(3,4,5),2) r5: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at makeRDD at scala> r4.union(r5) res9: org.apache.spark.rdd.RDD[Int] = UnionRDD[8] at union at scala> res9.collect res10: Array[Int] = Array(1, 2, 3, 3, 4, 5) ⑤ intersection(otherDataset)。 交集。 6 案例展示:取交集。 scala> val r6 = sc.makeRDD(List(1,3,5,7)); r6: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at makeRDD at scala> val r7 = sc.makeRDD(List(5,7,9,11)); r7: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at makeRDD at scala> r6.intersection(r7) res11: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[16] at intersection at scala> res11.collect res13: Array[Int] = Array(7, 5) ⑥ subtract(otherDataset)。 差集。 案例展示:取交集。 scala> val r8 = sc.makeRDD(List(1,3,5,7,9)); r8: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at makeRDD at scala> val r9 = sc.makeRDD(List(5,7,9,11,13)); r9: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at makeRDD at scala> r8.subtract(r9) res14: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[22] at subtract at 7 scala> res14.collect res15: Array[Int] = Array(1, 3) ⑦ distinct([numTasks]))。 没有参数,将RDD里的元素进行去重操作。 案例展示:去重。 scala> val r10 = sc.makeRDD(List(1,3,5,7,9,3,7,10,23,7)); r10: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at makeRDD at scala> r10.distinct res16: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[26] at distinct at scala> res16.collect res17: Array[Int] = Array(23, 1, 3, 7, 9, 10, 5) ⑧ groupByKey([numTasks])。 按key分组。 案例展示:按key分组。 scala>val r11= sc.makeRDD(List((\"hello\ r11: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[27] at makeRDD at scala> r11.groupByKey res18: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[28] at 8 groupByKey at scala> res18.collect res19: Array[(String, Iterable[Int])] = Array((hello,CompactBuffer(1, 1)), (word,CompactBuffer(1)), (spark,CompactBuffer(5)) ⑨ reduceByKey(func, [numTasks])。 按key相加。 案例展示:按key相加。 scala> val r12= sc.makeRDD(List((\"hello\ r12: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at makeRDD at scala> r12.reduceByKey(_+_) res23: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[31] at reduceByKey at scala> res23.collect res24: Array[(String, Int)] = Array((word,1), (hello,2), (spark,5)) ⑩ sortByKey([ascending], [numTasks])。 按key排序。 案例演示:按人名排序。 scala> val r13= sc.makeRDD(List((3,\"tom\"),(4,\"rose\"),(5,\"dawei\"),(3,\"cat\")),2 ) r13: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[32] at makeRDD at 9 scala> r13.map{case(x,y)=>(y,x)}.sortByKey() res25: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[36] at sortByKey at scala> res25.collect res26: Array[(String, Int)] = Array((cat,3), (dawei,5), (rose,4), (tom,3)) ⑪ join(otherDataset, [numTasks])。 join要求是list,里面是二元元组,相同的join,拼接,没有的话舍去。 案例演示:拼接。 scala> var r14 = sc.makeRDD(List((\"cat\ r14: org.apache.spark.rdd.RDD[(String, Any)] = ParallelCollectionRDD[37] at makeRDD at scala> var r15 = sc.makeRDD(List((\"cat\ r15: org.apache.spark.rdd.RDD[(String, Any)] = ParallelCollectionRDD[38] at makeRDD at scala> r14.join(r15) res27: org.apache.spark.rdd.RDD[(String, (Any, Any))] = MapPartitionsRDD[42] at join at scala> res27.collect res29: Array[(String, (Any, Any))] = Array((dog,(2,4)), (cat,(1,3))) Actions变化。 下面操作几个常用的函数,同学们可以选择操作其中的几个来熟悉RDD的用法。 ① reduce(func)。 并行整合所有RDD数据,例如求和操作。 10 ② collect()。 返回RDD所有元素,将rdd分布式存储在集群中不同分区的数据获取到一起,组成一个数组返回。 ➢ 注意:这个方法将会把所有数据收集到一个机器内,容易造成内存的溢出。 在生产环境下千万慎用。 ③ count()。 统计RDD元素个数。 案例演示:统计RDD元素个数。 scala> var r16 = sc.makeRDD(List(1,2,3,4,5),2) r16: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at scala> r16.count res0: Long = 5 ④ first()。 取RDD的第一个元素。 ⑤ take(n)。 获取前几个数据。 案例演示:取前两个元素。 scala> var r17 = sc.makeRDD(List(52,31,22,43,14,35)) r17: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at scala> r17.take(2) res2: Array[Int] = Array(52, 31) ⑥ takeOrdered(n, [ordering])。 先将rdd中的数据进行升序排序然后取前n个。 案例演示:将rdd数据进行升序,然后取前3个。 11 scala> var r18 = sc.makeRDD(List(52,31,22,43,14,35)) r18: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at scala> r18.takeOrdered(3) res3: Array[Int] = Array(14, 22, 31) ⑦ top(n)。 先将rdd中的数据进行降序排序然后取前n个。 案例演示:将rdd数据进行降序,然后取前3个。 scala> var r19=sc.makeRDD(List(52,31,22,43,14,35)) r19: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at makeRDD at scala> r19.top(3) res4: Array[Int] = Array(52, 43, 35) ⑧ saveAsTextFile(path)。 按照文本方式保存分区数据。 案例演示:将rdd数据以文本方式保存。 scala> val r20 = sc.makeRDD(List(1,2,3,4,5),2) r20: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at makeRDD at scala> r20.saveAsTextFile(\"file:///home/result\") [root@hadoop ~]# cd /home/result/ [root@hadoop result]# ls part-00000 part-00001 _SUCCESS ⑨ countByKey()。 按照数据集中的Key进行分组,计算各个K的对应的个数。 12 (K,V)返回 (K,Int) ⑩ foreach(func)。 在数据集上的每个元素上运行func方法。 综合案例 完成Top K案例。 ➢ 案例说明:Top K算法有两步,一是统计词频,二是找出词频最高的前K个 词。 ➢ 案例前提:将topk.txt放入到服务器的/home目录下。 cp /home/data/BigData/data/topk.txt /home。 其中单词按空格分隔,内容如图所示: ① 第一步,创建RDD。读取外部存储系统的数据集,这里我们从文件中读 取。 scala> var data = sc.textFile(\"file:///home/topk.txt\ data: org.apache.spark.rdd.RDD[String] = file:///home/topk.txt MapPartitionsRDD[11] at textFile at ② 第二步,统计每个单词出现的字数。 ➢ 首先利用flatMap扁平化,将文件中的单词按空格分隔。 效果:[hello,world,bye,world,....redis]。 ➢ 然后利用map,将后面函数应用于RDD每一个元素,返回值是新的RDD。 这里将x变成(x,1),即将每个单词出现一次,写为(单词,1)。 效果:[(hello,1),(world,1),(bye,1),(world,1),...(redis,1)] ➢ 最后利用reduceByKey按key相加。 效果:[(hello,4),(world,3),(bye,2),..(redis,2)] 13 scala>var count= data.flatMap{ x=>x.split(\" \") # 注意:双引号之间有空格 }.map{x=>(x,1)}.reduceByKey(_+_) count: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at reduceByKey at ③ 第三步,找出词频最高的前K个词。 ➢ 首先利用map,将后面函数应用于RDD每一个元素,返回值是新的RDD。 这里将x,y翻转,即单词和词频翻转。 效果:[(4,hello),(3,world),(2,bye),...(2,redis)] ➢ 然后利用top,将数据进行降序排序取前3个。 效果:[(4,hello), (4,hadoop), (3,world)] scala> var result = count.map{case(x,y)=>(y,x)}.top(3) res8: Array[(Int, String)] = Array((4,hello), (4,hadoop), (3,world)) 可以看到词频最高的三个单词是hello,hadoop,world,分别出现4次,4次,3次。 附加题 ➢ 题目: 1、Spark有几种部署模式,每种模式特点是什么? 2、RDD中reduceBykey与groupByKey哪个性能好,为什么? ✓ 答案: 1、 1) 本地模式。 Spark不一定非要跑在hadoop集群,可以在本地,起多个线程的方式来指定。方便调试,本地模式分三类: ① local:只启动一个executor。 ② local[k]: 启动k个executor。 ③ local:启动跟cpu数目相同的 executor。 2) Standalone模式。 14 分布式部署集群,自带完整的服务,资源管理和任务监控是Spark自己监控,这个模式也是其他模式的基础。 3) Spark On Yarn模式。 分布式部署集群,资源和任务监控交给yarn管理。粗粒度资源分配方式,包含cluster和client运行模式,cluster 适合生产,driver运行在集群子节点,具有容错功能;client 适合调试,dirver运行在客户端。 4) Spark On Mesos模式。 运行在 mesos 资源管理器框架之上,由 mesos 负责资源管理,Spark负责任务调度和计算。 2、 1) reduceByKey:reduceByKey会在结果发送至reducer之前会对每个mapper在本地进行merge,有点类似于在MapReduce中的combiner。这样做的好处在于,在map端进行一次reduce之后,数据量会大幅度减小,从而减小传输,保证reduce端能够更快的进行结果计算。 2) groupByKey:groupByKey会对每一个RDD中的value值进行聚合形成一个序列(Iterator),此操作发生在reduce端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成OutOfMemoryError。 通过以上对比可以发现在进行大量数据的reduce操作时候建议使用reduceByKey。不仅可以提高速度,还是可以防止使用groupByKey造成的内存溢出问题。 15 二、 Spark实验2 实验目的: 1、 了解Netcat和Sprak Streaming的基本功能。 2、 掌握Netcat创建过程及参数配置。 3、 掌握Sprak Streaming的操作及程序编写。 Spark Streaming是一种构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力,以吞吐量高和容错能力强著称。 本实验需要Netcat可以作为server以TCP或UDP方式侦听指定端口,来实时给Spark Streaming传输数据,而Spark Streaming接受到数据后实时处理数据。所以需要先安装Netcat。 Netcat安装 1.1 安装必要环境 请确保服务器已经安装gcc和make,如果已经安装,请跳过该步骤。 ➢ Ubuntu16.04安装gcc: apt install gcc ➢ Ubuntu16.04安装make: apt install make 1.2 解压 将netcat安装包拷贝到/usr/netcat目录下,并解压。 mkdir /usr/netcat cd /usr/netcat cp /home/data/BigData/package/netcat-0.7.1.tar.gz /usr/netcat tar -zxvf netcat-0.7.1.tar.gz 1.3 编译安装 切换至安装目录下,查看编译配置文件。 cd /usr/netcat/netcat-0.7.1 16 ./configure 编译安装: make && make install 1.4 配置 vi /etc/profile 在配置文件中加入: export NETCAT_HOME=/usr/netcat/netcat-0.7.1 export PATH=$PATH:$NETCAT_HOME/bin 生效配置: source /etc/profile 1.5 查看是否生效 nc -help 之后出现各种命令意思表明配置已生效。 1.6 启动netcat 启动netcat,侦听并接受连接端口9999。 nc -lv -p 9999 出现如下图所示,即为成功。 -l用于指定nc将处于侦听模式。指定该参数,则意味着nc被当作server,侦听并接受连接,而非向其它地址发起连接。 -v输出交互或出错信息,新手调试时尤为有用。 -p Spark Streaming Spark Streaming用于实时处理程序。这里netcat发送过来句子,以空格分开,每发送一次Sprak Streaming需要计算此句子中每个单词的数量。 2.1 编写程序 ① 启动spark单机模式。 17 在bin目录下执行:sh spark-shell --master=local[2],注意这里必须写[2],大于1个。 cd /usr/spark/spark-2.0.1-bin-hadoop2.7/bin ➢ Ubuntu16.04启动: ./spark-shell --master=local[2] ➢ Centos7.4启动: sh spark-shell --master=local[2] ② 导入Spark Streaming依赖包。 scala> import org.apache.spark.streaming._ import org.apache.spark.streaming._ ③ 创建Streaming上下文对象。 scala> val ssc = new StreamingContext(sc,Seconds(5)) ssc:org.apache.spark.streaming.StreamingContext=org.apache.spark.streaming.StreamingContext@1d944fc0 运行周期为5秒,表示流式计算每间隔5秒执行一次。这个时间的设置需要综合考虑程序的延时需求和集群的工作负载,应该大于每次的运行时间。 StreamingContext创建好之后,还需要下面这几步来实现一个完整的Spark流式计算: ➢ 创建一个输入DStream,用于接收数据。 ➢ 使用作用于DStream上的Transformation和Output操作来定义流式计算 (Spark程序是使用Transformation和Action操作)。 ➢ 启动计算,使用StreamingContext.start()。 等待计算结束(人为或错误),使用StreamingContext.awaitTermination();也可以手工结束计算,使用StreamingContext.stop()。 ④ 读取外部的数据集。 scala> val lines = ssc.socketTextStream(\"127.0.0.1\ lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@35ab4260 18 这里创建一个将连接到本地9999端口的数据流。 ⑤ 将读取到的数据按空格分隔进行扁平化处理。 scala> val words = lines.flatMap(_.split(\" \")) words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@1bea7b0 ⑥ 将上一步获取到的所有单词转换为(单词,1),表示每出现一次单词就记 录1。 scala> val pairs = words.map(word=>(word,1)) pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@42ef5216 ⑦ 将上一步结果的值按键相加,统计每个单词出现的次数。 scala> val wordCounts = pairs.reduceByKey(_+_) wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@1b30b6f8 ⑧ 打印结果。 scala> wordCounts.print() ⑨ 启动计算。 scala> ssc.start() 可以发现一开始没有数据传输过来的时候,Spark Streaming流式计算每5秒运行一次。 后来在nc输入一些单词。 19 Spark Streaming接受到数据就进行计算,并打印结果。 附加题 ➢ 题目: Spark生态圈有哪些组件,每个组件都有什么功能,适合什么应用场景? ✓ 答案: ① Spark core。 是其它组件的基础,spark的内核。主要包含:有向循环图、RDD、Lingage、Cache、broadcast等。 ② SparkStreaming。 是一个对实时数据流进行高通量、容错处理的流式处理系统将流式计算分解成一系列短小的批处理作业。 ③ Spark sql。 能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行外部查询 ④ MLBase。 是Spark生态圈的一部分专注于机器学习,让机器学习的门槛更低。MLBase分为四部分:MLlib、MLI、ML Optimizer和MLRuntime。 ⑤ GraphX。 是Spark中用于图和图并行计算。 20 三、 MapReduce实验 实验目的: 1、 了解MapReduce的基本功能。 2、 了解Hadoop的基本功能。 3、 掌握Hadoop的安装过程、参数配置及其操作方法。 4、 掌握MapReduce的程序编写及运行方法。 安装Hadoop环境 Hadoop的安装分为单机方式、伪分布式方式和完全分布式方式。 单机模式是Hadoop的默认模式。当首次解压Hadoop的源码包时,Hadoop无法了解硬件安装环境,便保守地选择了最小配置。在这种默认模式下所有3个XML文件均为空。当配置文件为空时,Hadoop会完全运行在本地。因为不需要与其他节点交互,单机模式就不使用HDFS,也不加载任何Hadoop的守护进程。该模式主要用于开发调试MapReduce程序的应用逻辑。 伪分布模式 Hadoop守护进程运行在本地机器上,模拟一个小规模的的集群。可以使用HDFS和MapReduce。 全分布模式 Hadoop守护进程运行在一个集群上。启动所有的守护进程,具有hadoop完整的功能,可以使用hdfs、mapreduce和yarn,并且这些守护进程运行在集群中,可以真正的利用集群提供高性能,在生产环境下使用。 我们主要是进行Hadoop的伪分布式安装。 1.1 配置Hosts ① 第一步,vi /etc/hosts。 先将里面的内容全部删除,然后添加以下两行。 127.0.0.1 hadoop 192.168.11.190 hadoop ➢ 注意:192.168.11.190应该为你主机对应的外网IP,hadoop为主机名可以自己 定义,但是下面有关于Hadoop的配置有关主机的地方都需要改为你自定义 21 的主机名。 ② 第二步,vi /etc/hostname。 再编辑/etc/hostname文件,将其中的内容改为指定的主机名。改为hadoop,你定义的主机名。 ③ 第三步,最后进行重启,让其生效。 1.2 配置主机名 ➢ 提示:centos7.4需要完成此步骤,ubuntu16.04跳过此步骤。 ➢ 注意:安装hadoop的集群主机名不能有下划线,不然会找不到主机,无法启 动。 ① 第一步,vi /etc/sysconfig/network。 配置主机名,例如: NETWORKING=yes HOSTNAME=hadoop ② 第二步,source /etc/sysconfig/network,使其生效。 1.3 关闭防火墙 ① Ubuntu16.04关闭防火墙: ➢ 查看防火墙状态。 ufw status ➢ 停止防火墙。 ufw disable ② Centos7.4关闭防火墙: ➢ 查看防火墙状态。 firewall-cmd --state ➢ 停止防火墙。 systemctl stop firewalld.service ➢ 禁止防火墙开机启动。 systemctl disable firewalld.service 22 1.4 配置免密码互通 Hadoop底层是基于RPC来实现的,所以就意味着Hadoop中的很多操作都是通过网络进行信号的传输,也就意味着如果想要访问其他的节点,都需要输入密码。生成自己的公钥和私钥,生成的公私钥将自动存放在/root/.ssh目录下。 ① 第一步,ssh-keygen ,一直按回车即可。 把生成的公钥copy到远程机器上。 ② 第二步,ssh-copy-id root@hadoop,先输入“yes”,然后输入密码liangsuan。 此时在远程主机的/root/.ssh/authorized_keys文件中保存了公钥,在known_hosts中保存了已知主机信息,当再次访问的时候就不需要输入密码了。 ③ 第三步,ssh 弹性IP,我这里是ssh 192.168.11.190。 通过此命令远程连接,检验是否可以不需密码连接。 1.5 安装JDK 参考spark实验1。 1.6 安装Hadoop ① 第一步,新建一个文件夹。 mkdir /usr/hadoop cd /usr/hadoop ② 第二步,将hadoop安装包拷贝到/usr/hadoop目录下。 cp /home/data/BigData/package/hadoop-2.7.1.tar.gz /usr/hadoop ③ 第三步,解压。 tar xzf hadoop-2.7.1.tar.gz 1.7 配置hadoop 先进入配置目录: cd /usr/hadoop/hadoop-2.7.1/etc/hadoop ① 第一步,修改 hadoop-env.sh。 vi hadoop-env.sh 主要是修改java_home的路径,在hadoop-env.sh的第25行,把export JAVA_HOME=${JAVA_HOME}修改成具体的路径,这里改成/usr/java/jdk1.8.0_65. 23 重新加载使修改生效: source hadoop-env.sh ② 第二步,修改 core-site.xml。 vi core-site.xml 增加namenode配置、文件存储位置配置。 ③ 第三步,修改 hdfs-site.xml。 vi hdfs-site.xml 配置包括自身在内的备份副本数量。 24 ④ 第四步,修改mapred-site.xml。 ➢ 在/etc/hadoop的目录下,只有一个mapred-site.xml.template文件,复制 一个: cp mapred-site.xml.template mapred-site.xml ➢ 通过vim打开: vi mapred-site.xml ➢ 配置mapreduce运行在yarn上: ⑤ 第五步,修改yarn-site.xml。 vi yarn-site.xml 配置: 25 ⑥ 第六步,修改 slaves.hadoop本身是一个主从结构。 vi slaves修改为hadoop。 ⑦ 第七步,配置hadoop的环境变量。 vi /etc/profile 添加以下两行: export HADOOP_HOME=/usr/hadoop/hadoop-2.7.1 export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin 重新加载profile使配置生效source /etc/profile。环境变量配置完成,测试环境变量是否生效echo $HADOOP_HOME。 ⑧ 第八步,重启linux。 如果上面的配置没有立即生效,就需要进行重启,不是必须的,但是建议重启。 Centos重启完之后配置的主机名失效,重新执行3.3,而Ubuntu则跳过即可。 vi /etc/sysconfig/network 配置主机名,例如: NETWORKING=yes HOSTNAME=hadoop source /etc/sysconfig/network,使其生效。 ⑨ 第九步,格式化namenode。进入 hadoop/bin 输入命令格式化: cd /usr/hadoop/hadoop-2.7.1/bin hadoop namenode -format 在格式化成功的时候,会有这样的输出: Storage directory /tmp/hadoop-root/dfs/name has been successfully formatted 如果出错,那就删除tmp目录,重新执行该命令。 1.8 hadoop的启动和停止 ➢ 启动hadoop。 26 在/usr/hadoop/hadoop-2.7.1/sbin目录下 start-all.sh。 ➢ 关闭hadoop。 在/usr/hadoop/hadoop-2.7.1/sbin目录下 stop-all.sh。 启动完成之后输入“jps”,检查伪分布式是否启动完成,共有5个进程。 ① HDFS的守护进程。 ➢ 主节点:Namenode、SecondaryNamenode。 ➢ 从节点:Datanode。 ② YARN的守护进程。 ➢ 主节点:ResourceManager。 ➢ 从节点:NodeManager。 1.9 通过浏览器访问hadoop管理页面 http://[server_ip]:50070。 运行程序 运行步骤 ① 第一步,新建Hadoop的HDFS输入目录“input”。 ➢ 进入到hadoop安装目录的bin目录下: cd /usr/hadoop/hadoop-2.7.1/bin ➢ 输入“hdfs dfs -mkdir /input”。 ② 第二步,将home目录下的aa.txt上传到HDFS的input目录下: hdfs dfs -put /home/aa.txt /input ③ 第三步,运行MapReduce程序: hadoop jar /home/wc.jar wc.WCDriver /input /output 27 在hadoop中运行jar任务需要使用的命令: ① hadoop jar [jar文件位置] [jar 主类] [HDFS输入位置] [HDFS输出位置], 其中: ➢ Jar:表示要运行的是一个基于Java的任务。 ➢ jar文件位置: 提供所要运行任务的jar文件位置,如果在当前操作目录 下,可直接使用文件名。 ➢ jar主类: 提供入口函数所在的类,格式为[包名.]类名。 ➢ HDFS输入位置: 指定输入文件在HDFS中的位置。 ➢ HDFS输出位置: 执行输出文件在HDFS中的存储位置,该位置必须不 存在,否则任务不会运行,该机制就是为了防止文件被覆盖出现意外丢失。 ② hadoop: hadoop脚本命令。 接下来,我们可以看到会先进行map,然后进行reduce。 最后我们输入hdfs dfs -cat /output/* 可以看到处理结果。 附加题 ➢ 题目: Mapreduce的并行计算和Spark有什么区别? ✓ 答案: ① hadoop的一个作业称为job,job里面分为map task和reduce task,每个task都是在自己的进程中运行的,当task结束时,进程也会结束。 28 ② spark用户提交的任务成为application,一个application对应一个sparkcontext,app中存在多个job,每触发一次action操作就会产生一个job。这些job可以并行或串行执行,每个job中有多个stage,stage是shuffle过程中DAGSchaduler通过RDD之间的依赖关系划分job而来的,每个stage里面有多个task,组成taskset有TaskSchaduler分发到各个executor中执行,executor的生命周期是和app一样的,即使没有job运行也是存在的,所以task可以快速启动读取内存进行计算。 ③ hadoop的job只有map和reduce操作,表达能力比较欠缺而且在mr过程中会重复的读写hdfs,造成大量的io操作,多个job需要自己管理关系。spark的迭代计算都是在内存中进行的,API中提供了大量的RDD操作如join,groupby等,而且通过DAG图可以实现良好的容错。 附件(编写MapReduce程序文件代码): 1. WCMapper代码: package wc; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /* * KEYIN:输入kv数据对中key的数据类型 * VALUEIN:输入kv数据对中value的数据类型 * KEYOUT:输出kv数据对中key的数据类型 * VALUEOUT:输出kv数据对中value的数据类型 */ public class WCMapper extends Mapper 29 /* * map方法是提供给map task进程来调用的,map task进程是每读取一行文本来调用一次我们自定义的map方法 * map task在调用map方法时,传递的参数: * 一行的起始偏移量LongWritable作为key * 一行的文本内容Text作为value */ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //拿到一行文本内容,转换成String 类型 String line = value.toString(); //将这行文本切分成单词 String[] words=line.split(\" \"); //输出<单词,1> for(String word:words){ context.write(new Text(word), new IntWritable(1)); } } } 2. WCReducer代码: package wc; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; 30 import org.apache.hadoop.mapreduce.Reducer; /* * KEYIN:对应mapper阶段输出的key类型 * VALUEIN:对应mapper阶段输出的value类型 * KEYOUT:reduce处理完之后输出的结果kv对中key的类型 * VALUEOUT:reduce处理完之后输出的结果kv对中value的类型 */ public class WCReducer extends Reducer * reduce方法提供给reduce task进程来调用 * reduce task会将shuffle阶段分发过来的大量kv数据对进行聚合,聚合的机制是相同key的kv对聚合为一组 * 然后reduce task对每一组聚合kv调用一次我们自定义的reduce方法 * 比如: * 调用时传递的参数: * key:一组kv中的key * values:一组kv中所有value的迭代器 */ @Override protected void reduce(Text key, Iterable //定义一个计数器 int count = 0; //通过value这个迭代器,遍历这一组kv中所有的value,进行累 31 加 for(IntWritable value:values){ count+=value.get(); } //输出这个单词的统计结果 context.write(key, new IntWritable(count)); } } 3. WCDriver代码: package wc; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WCDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job wordCountJob = Job.getInstance(conf); //重要:指定本job所在的jar包 wordCountJob.setJarByClass(WCDriver.class); 32 //设置wordCountJob所用的mapper逻辑类为哪个类 wordCountJob.setMapperClass(WCMapper.class); //设置wordCountJob所用的reducer逻辑类为哪个类 wordCountJob.setReducerClass(WCReducer.class); //设置map阶段输出的kv数据类型 wordCountJob.setMapOutputKeyClass(Text.class); wordCountJob.setMapOutputValueClass(IntWritable.class); //设置最终输出的kv数据类型 wordCountJob.setOutputKeyClass(Text.class); wordCountJob.setOutputValueClass(IntWritable.class); //设置要处理的文本数据所存放的路径 FileInputFormat.setInputPaths(wordCountJob, \"hdfs://hadoop:9000/input/aa.txt\"); FileOutputFormat.setOutputPath(wordCountJob,new Path(\"hdfs://hadoop:9000/output/\")); if (!wordCountJob.waitForCompletion(true)) return; } } /usr/hadoop/hadoop-2.7.1/etc/*, 33 /usr/hadoop/hadoop-2.7.1/etc/hadoop/*, /usr/hadoop/hadoop-2.7.1/lib/*, /usr/hadoop/hadoop-2.7.1/share/hadoop/common/*, /usr/hadoop/hadoop-2.7.1/share/hadoop/common/lib/*, /usr/hadoop/hadoop-2.7.1/share/hadoop/mapreduce/*, /usr/hadoop/hadoop-2.7.1/share/hadoop/mapreduce/lib/*, /usr/hadoop/hadoop-2.7.1/share/hadoop/hdfs/*, /usr/hadoop/hadoop-2.7.1/share/hadoop/hdfs/lib/*, /usr/hadoop/hadoop-2.7.1/share/hadoop/yarn/*, /usr/hadoop/hadoop-2.7.1/share/hadoop/yarn/lib/* 34 四、 Hive实验 实验目的: 1、 了解Hive的基本功能。 2、 掌握Hive的安装过程及参数配置。 3、 掌握Hive的入门操作及数据库的安装方法。 4、 掌握Hive实现单词统计的操作方法。 Hive安装配置 1.1 前提准备 安装JDK,并配置JAVA_HOME。 安装Hadoop,并配置HADOOP_HOME。 1.2 解压 将hive安装包复制到/usr/hive目录中,进行解压。 ① 首先创建目录/usr/hive并进入该目录: mkdir /usr/hive cd /usr/hive ② 然后将hive安装包上传到该目录下,最后进行解压: cp /home/data/BigData/package/apache-hive-1.2.0-bin.tar.gz /usr/hive tar -zxvf apache-hive-1.2.0-bin.tar.gz 1.3 启动 启动好Hadoop,进入hive/bin目录,直接运行hive命令,即可进入hive提示符。 ➢ 首先要启动Hadoop: cd /usr/hadoop/hadoop-2.7.1/sbin start-all.sh ➢ 输入“jps”,观察hadoop的五个进程是否全部起来。 35 ➢ 然后启动Hive: cd /usr/hive/apache-hive-1.2.0-bin/bin ./hive Hive入门 首先在浏览器上输“IP:50070”访问 hadoop的管理页面。然后点击 “Utilities”,再点击“Browse the file system”,进入HDFS目录。 2.1 数据库 在hive中输入“create database” ,数据库名: hive> create database parkdb; OK Time taken: 1.575 seconds HDFS的目录/user/hive/warehouse下就多了一个park.db。 ➢ 结论1: hive内置的default库,对应的就是hdfs中/user/hive/warehouse 目录。 ➢ 结论2: Hive中的数据库就是底层HDFS中的一个[库名.db]文件夹。 2.2 表 在hive中输入: hive> use parkdb; 36 OK Time taken: 0.025 seconds hive> create table stu(id int,name string); OK Time taken: 0.941 seconds HDFS的目录/user/hive/warehouse/park.db下就多了一个stu。 ➢ 结论3:Hive中的的表就是底层HDFS中库名文件夹下以表名为名字的 子文件夹。 2.3 数据 将teacher.txt复制到/home目录下,其内容如下(中间以制表符分隔): cp /home/data/BigData/data/teacher.txt /home 在hive中输入: ① 创建teacher表,行之间以制表符\分隔。 hive> create table teacher(id int,name string) row format delimited fields terminated by \"\\"; OK Time taken: 0.195 seconds ② 插入数据到表中: hive> load data local inpath '/home/teacher.txt' into table teacher; Loading data to table parkdb.teacher Table parkdb.teacher stats: [numFiles=1, totalSize=47] OK 37 Time taken: 2.411 seconds ③ 查询数据: hive> select * from teacher; OK 1.孙悟空 2.唐僧 3.猪八戒 4.沙悟净 Time taken: 0.805 seconds, Fetched: 4 row(s)。 HDFS的目录/user/hive/warehouse/park.db/teacher下就多了一个teacher.txt。 ➢ 结论4:Hive表中的数据其实就是在HDFS中Hive表对应的文件夹下 的文件。 2.4 底层操作 在hive中输入: hive> select count(*) from teacher; 你会发现,进行了Hadoop的MapReduce操作。 ➢ 结论5:Hive中的hql会转换为底层的MR来执行。 Hive的数据库 3.1 元数据库 默认情况下Hive将这些元数据存储在hive内置的derby的数据库中。 ➢ derby存在的问题1: derby数据库是一种文件型的数据库,在进入时会检查当前目录下是否有 38 metastore_db文件夹用来存储数据库数据,如果有就直接使用,如果没有就创建,这样一旦换一个目录,元数据就找不到了。 ➢ erby存在的问题2: derby数据库是一个单用户的数据库,无法支持多用户同时操作,而hive如果使用derby作为元数据库,则也只能支持单用户操作。 所以Hive安装完成之后,通常都需要替换元数据库,目前Hive只支持derby和mysql两种元数据库,需要将元数据库替换为MySql。 3.2 安装mysql数据库 这里分为centos和ubuntu两个操作系统的mysql安装。 Centos7.4安装步骤: ① 上传解压。 将mysql安装包上传到/usr/mysql下,然后进行解压。 mkdir /usr/mysql cd /usr/mysql cp /home/data/BigData/package/Percona-...bundle.tar /usr/mysql tar -xvf Percona-Server-5.6.24-72.2......bundle.tar ② 执行rpm安装。 这里我们需要rpm安装4个文件,安装顺序不能错乱。 ➢ rpm -ivh Percona-Server-56-debuginfo-5.6.24-rel72.2.el6.x86_64.rpm ➢ rpm -ivh Percona-Server-shared-56-5.6.24-rel72.2.el6.x86_64.rpm ➢ rpm -ivh Percona-Server-client-56-5.6.24-rel72.2.el6.x86_64.rpm ➢ rpm -ivh Percona-Server-server-56-5.6.24-rel72.2.el6.x86_64.rpm 上面三步还不会报错,执行第四步的时候报错,报错如下: 是因为CentOS 7的默认数据库已经不再是MySQL了,而是MariaDB. MySQL安装时的mysql lib库与mariadb的库、包冲突了。 解决如下: 39 [root@hadoop mysql]# more /etc/redhat-release CentOS Linux release 7.4.1708 (Core) [root@hadoop mysql]# rpm -qa|grep mariadb mariadb-libs-5.5.56-2.el7.x86_64 [root@hadoop mysql]# rpm -e mariadb-libs-5.5.56-2.el7.x86_64 再一次执行第四步,发现安装成功。 rpm -ivh Percona-Server-server-56-5.6.24-rel72.2.el6.x86_64.rpm ③ 启动mysql服务。 ➢ service mysql start 启动服务。 ➢ service mysql stop 停止服务。 ➢ service mysql restart 重启服务。 ➢ service mysql status 查看服务状态。 ④ 修改密码。 我们配置访问密码,用户名root,密码root。 mysqladmin -u root password \"root\" 使用配置的用户名root,密码root进行登录。 mysql -uroot -proot 验证mysql命令是否有效。 mysql> show databases; ⑤ 关闭防火墙。 ➢ 查看防火墙状态: firewall-cmd --state ➢ 停止防火墙: systemctl stop firewalld.service ➢ 禁止防火墙开机启动: systemctl disable firewalld.service ⑥ 开启权限访问 默认安装的mysql没有外部访问权限,所以需要开启访问权限。 40 grant [权限] on [数据库名].[表名] to [用户名]@[web服务器的ip地址] identified by [密码]; mysql>grant all on *.* to 'root'@'%' identified by 'root'; mysql> flush privileges; ➢ 注意:必须重启mysql服务: service mysql restart Ubuntu16.04安装步骤: ① 安装mysql命令。 ➢ 首先安装mysql-server: apt-get install mysql-server 如果出现以下问题,解决方案如下,否则跳过该步骤。 E: dpkg was interrupted, you must manually run 'dpkg --configure -a' to correct the problem. 解决方案: dpkg --configure -a apt-get update 再次执行以下命令安装: apt-get install mysql-server ➢ 输入密码和确认密码:root。 apt install mysql-client apt install libmysqlclient-dev ② 安装成功后可以通过下面的命令测试是否安装成功。 netstat -tap | grep mysql 安装成功后的图示: ➢ 现在已经启动mysql,进入mysql。 mysql -uroot -proot 如果出现以下错误,请按以下方案解决,否则跳过。 41 mysql: [Warning] Using a password on the command line interface can be insecure. ERROR 1045 (28000): Access denied for user 'root'@'localhost' (using password: YES) 解决方案: ➢ 找到mysql的安装目录,我的是在/etc/mysql/mysql.conf.d ,该文件夹下 有个mysqld.cnf文件,使用管理员权限编辑:sudo vim mysqld.cnf,然后在文件最后一行加入:skip-grant-tables ,然后保存退出(大概就是跳过验证的意思吧)。 root@hadoop:/etc/mysql/mysql.conf.d# vim mysqld.cnf 在文件最后一行加入:skip-grant-tables ➢ 要重启mysql服务。 service mysql restart ➢ 接下来用空密码进入mysql管理命令行,切换到mysql库,直接命令: mysql 回车即可进入mysql命令模式,然后就是修改下root密码(其他用户也一样,只是权限不一样),修改密码语句如下: root@hadoop:/etc/mysql/mysql.conf.d# mysql mysql> update mysql.user set authentication_string=password('root') where user='root' and Host ='localhost'; Query OK, 1 row affected, 1 warning 然后刷选配置,使之立即生效: mysql> flush privileges; 接着退出mysql命令行模式。 mysql> quit; Bye ➢ 下一步便是把刚才我们改的mysqld.cnf文件,把刚才加入的那一行注释 或者删除:skip-grant-tables,保存退出。 ➢ 然后就可以用mysql -u root -p登录了。 ③ 远程连接mysql。 42 ➢ 默认安装的mysql没有外部访问权限,所以需要开启访问权限。 grant [权限] on [数据库名].[表名] to ['用户名']@['web服务器的ip地址'] identified by ['密码']; root@hadoop:~# mysql -uroot -proot mysql> grant all on *.* to 'root'@'%' identified by 'root'; Query OK, 0 rows affected, 1 warning (0.01 sec) mysql> flush privileges; Query OK, 0 rows affected (0.00 sec) ➢ root@hadoop:~ #vim /etc/mysql/mysql.conf.d/mysqld.cnf。 将bind-address=127.0.0.1注释掉。 ➢ 重启mysql。 service mysql restart。 3.3 修改Hive的元数据库为mysql ① 复制hive/conf/hive-default.xml.template为hive-site.xml。 cd /usr/hive/apache-hive-1.2.0-bin/conf/ cp hive-default.xml.template hive-site.xml ② 配置文件。 打开此文件,删除其中所有默认配置,配置如下配置信息: 43 然后观察需要删除的行数: num1,num2d。 发现要删除的行数为18行到3908行,输入:“18,3908 d”进行删除。 44 可以发现 ③ 将mysql的驱动复制到hive的lib目录下,否则启动会报出异常。 将驱动mysql-connector-java-5.1.38-bin.jar复制到/usr/hive/apache-hive-1.2.0-bin/lib目录下: cp/home/data/BigData/package/mysql-connector-java-5.1.38-bin.jar /usr/hive/apache-hive-1.2.0-bin/lib ④ 开启mysql的当前机器的root用户访问权限。 mysql -uroot -proot mysql>GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'root' WITH GRANT OPTION; mysql>GRANT ALL PRIVILEGES ON *.* TO 'root'@'hadoop' IDENTIFIED BY 'root' WITH GRANT OPTION; mysql>FLUSH PRIVILEGES; ⑤ 启动。 ➢ 启动Hadoop。 cd /usr/hadoop/hadoop-2.7.1/sbin start-all.sh ➢ 启动Mysql。 service mysql start ➢ 启动Hive。 cd /usr/hive/apache-hive-1.2.0-bin/bin ./hive 45 Hive实现单词统计 ① 本地构造数据hive.txt,内容如下: bigdata mapreduce is very good hive is very good hive is very good ➢ 将其复制到/home目录下: cp /home/data/BigData/data/hive.txt /home。 ② hive创建表,再将本地数据导入: ➢ 创建words表,字段名word,类型string: hive> create table words (word string)。 ➢ 将本地数据导入到hive数据库中: hive>load data local inpath '/home/hive.txt' into table words; ③ 利用MapReduce思想,先把每行数据转化为一个数组的形式。 hive> select split(word,' ') from words; OK [\"bigdata\"] [\"mapreduce\ [\"hive\ [\"hive\ Time taken: 0.898 seconds, Fetched: 4 row(s) ④ 然后将数组的每一个单词拆分出来,每行就只有一个单词。 hive> select explode(split(word,' ')) from words; ⑤ 组合语句写一个单词统计并排序,同时将统计出来的结果放在 wordcount表中。 hive > create table wordcount as > select word ,count(1) as c > from ( 46 > select explode(split(word,' ')) as word from words > ) t group by word > order by c desc; 发现hql底层正在进行MapReduce操作。 ⑥ 观察结果。 hive> select * from wordcount; OK very is 3 good hive 3 2 3 mapreduce 1 bigdata 1 Time taken: 0.197 seconds, Fetched: 6 row(s) 附加题 ➢ 题目: Mapreduce和Hive有什么区别? ✓ 答案: ① 运算资源消耗。 无论从时间,数据量,计算量上来看,一般情况下MapReduce都是优于或者等于hive的。MR的灵活性毋庸置疑。在转换到hive的过程中,会有一些为了实现某些场景的需求而不得不用多步hive来实现的时候。 ② 开发成本/维护成木。 亳无疑问,hive的开发成本是远低于MR的。如果能熟练的运用udf和transform会更加提高hive开发的效率。另外对于数据的操作也非常的直观,对于全世界程序员都喜闻乐见的语法的,继承也让它更加的容易上手。hive独47 有的分区管理,方便进行数据的管理。代码的管理也很方便,就是直接的文本。逻辑的修改和生效很方便。但是当出现异常错误的时候,hive的调试会比较麻烦。特别是在大的生产集群上面的时候。 ③ 底层相关性。 在使用hive以后,读取文件的时候,再也不用关心文件的格式,文件的分隔符,只要指定一次,hive就会保存好。相比来说方便了很多。 当侧重关心与业务相关的内容的时候,用hive会比较有优势。而在一些性能要求高,算法研究的时候,MR会更加适合。 48 因篇幅问题不能全部显示,请点此查看更多更全内容