🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
[TOC] # 结构 一个完整的mapreduce程序在分布式运行时有三类实例进程: 1. MRAppMaster:负责整个程序的过程调度及状态协调 2. mapTask:负责map阶段的整个数据处理流程 3. ReduceTask:负责reduce阶段的整个数据处理流程 # MR程序运行流程 流程示意图 ![](https://box.kancloud.cn/37d285204772d869353f544b75392842_1297x812.png) 流程解析 1. 一个mr程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程 2. maptask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为: * 利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对 * 将输入KV对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存 * 将缓存中的KV对按照K分区排序后不断溢写到磁盘文件 3. MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区) 4. Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储 # MapTask并行度决定机制 maptask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度 那么,mapTask并行实例是否越多越好呢?其并行度又是如何决定呢? ## mapTask并行度的决定机制 一个job的map阶段并行度由客户端在提交job时决定 而客户端对map阶段并行度的规划的基本逻辑为: 将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个split),然后每一个split分配一个mapTask并行实例处理 这段逻辑及形成的切片规划描述文件,由FileInputFormat实现类的getSplits()方法完成,其过程如下图: ![](https://box.kancloud.cn/ec6de6d312f5aa5027e1db35cbc767fd_1319x837.png) ## FileInputFormat切片机制 1. 切片定义在InputFormat类中的getSplit()方法 2. FileInputFormat中默认的切片机制: * 简单地按照文件的内容长度进行切片 * 切片大小,默认等于block大小 * 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片 比如待处理数据有两个文件: ~~~ file1.txt 320M file2.txt 10M ~~~ 经过FileInputFormat的切片机制运算后,形成的切片信息如下: ~~~ file1.txt.split1-- 0~128 file1.txt.split2-- 128~256 file1.txt.split3-- 256~320 file2.txt.split1-- 0~10M ~~~ 3. FileInputFormat中切片的大小的参数配置 通过分析源码,在FileInputFormat中,计算切片大小的逻辑: `Math.max(minSize, Math.min(maxSize, blockSize));` 切片主要由这几个值来运算决定 ~~~ minsize:默认值:1 配置参数: mapreduce.input.fileinputformat.split.minsize maxsize:默认值:Long.MAXValue 配置参数:mapreduce.input.fileinputformat.split.maxsize blocksize ~~~ 因此,默认情况下,切片大小=blocksize maxsize(切片最大值): 参数如果调得比blocksize小,则会让切片变小,而且就等于配置的这个参数的值 minsize (切片最小值): 参数调的比blockSize大,则可以让切片变得比blocksize还大 **但是,不论怎么调参数,都不能让多个小文件“划入”一个split** 选择并发数的影响因素: 运算节点的硬件配置 运算任务的类型:CPU密集型还是IO密集型 运算任务的数据量 ## map并行度的经验之谈 如果硬件配置为`2*12core + 64G`,恰当的map并行度是大约每个节点20-100个map,最好每个map的执行时间至少一分钟。 * 如果job的每个map或者 reduce task的运行时间都只有30-40秒钟,那么就减少该job的map或者reduce数,每一个task(map|reduce)的setup和加入到调度器中进行调度,这个中间的过程可能都要花费几秒钟,所以如果每个task都非常快就跑完了,就会在task的开始和结束的时候浪费太多的时间。 配置task的JVM重用可以改善该问题: (`mapred.job.reuse.jvm.num.tasks`,默认是1,表示一个JVM上最多可以顺序执行的task 数目(属于同一个Job)是1。也就是说一个task启一个JVM) * 如果input的文件非常的大,比如1TB,可以考虑将hdfs上的每个block size设大,比如设成256MB或者512MB # ReduceTask并行度的决定 reducetask的并行度同样影响整个job的执行并发度和执行效率,但与maptask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置: ~~~ //默认值是1,手动设置为4 job.setNumReduceTasks(4); ~~~ 如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜 注意: reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个reducetask 尽量不要运行太多的reduce task。对大多数job来说,最好reduce的个数最多和集群中的reduce持平,或者比集群的 reduce slots小。这个对于小集群而言,尤其重要。 # mapreduce实践 ## mapreduce编程规范 编程规范 1. 用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端) 2. Mapper的输入数据是KV对的形式(KV的类型可自定义) 3. Mapper的输出数据是KV对的形式(KV的类型可自定义) 4. Mapper中的业务逻辑写在map()方法中 5. map()方法(maptask进程)对每一个<K,V>调用一次 6. Reducer的输入数据类型对应Mapper的输出数据类型,也是KV 7. Reducer的业务逻辑写在reduce()方法中 8. Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法 9. 用户自定义的Mapper和Reducer都要继承各自的父类 10. 整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象 ## mapreduce程序运行模式 ### 本地运行模式 1. mapreduce程序是被提交给LocalJobRunner在本地以单进程的形式运行 2. 而处理的数据及输出结果可以在本地文件系统,也可以在hdfs上 3. 怎样实现本地运行?写一个程序,不要带集群的配置文件(本质是你的mr程序的conf中是否有mapreduce.framework.name=local以及yarn.resourcemanager.hostname参数) 4. 本地模式非常便于进行业务逻辑的debug,只要在eclipse中打断点即可 如果在windows下想运行本地模式来测试程序逻辑,需要在windows中配置环境变量: ~~~ %HADOOP_HOME% = d:/hadoop-2.6.1 %PATH% = %HADOOP_HOME%\bin ~~~ 并且要将`d:/hadoop-2.6.1`的lib和bin目录替换成windows平台编译的版本 ### 集群运行模式 1. 将mapreduce程序提交给yarn集群resourcemanager,分发到很多的节点上并发执行 2. 处理的数据和输出结果应该位于hdfs文件系统 3. 提交集群的实现步骤: A. 将程序打成JAR包,然后在集群的任意一个节点上用hadoop命令启动 ~~~ $ hadoop jar wordcount.jar cn.bigdata.mrsimple.WordCountDriver inputpath outputpath ~~~ B. 直接在linux的eclipse中运行main方法 (项目中要带参数:`mapreduce.framework.name=yarn`以及yarn的两个基本配置) C. 如果要在windows的eclipse中提交job给集群,则要修改YarnRunner类 mapreduce程序在集群中运行时的大体流程: ![](https://box.kancloud.cn/a27d33a9fb9feaf5dbfc86dcae046e88_1312x571.png) 附:在windows平台上访问hadoop时改变自身身份标识的方法之二: 在代码中设置配置或者 ![](https://box.kancloud.cn/881e618811fda2202f4ca43211c46dd0_1087x712.png) ### MAPREDUCE中的Combiner Combiner的使用要非常谨慎 因为combiner在mapreduce过程中可能调用也可能不调用,可能调一次也可能调多次 所以:combiner使用的原则是:有或没有都不能影响业务逻辑 1. combiner是MR程序中Mapper和Reducer之外的一种组件 2. combiner组件的父类就是Reducer 3. combiner和reducer的区别在于运行的位置: Combiner是在每一个maptask所在的节点运行 Reducer是接收全局所有Mapper的输出结果; 4. combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量 具体实现步骤: 1. 自定义一个combiner继承Reducer,重写reduce方法 2. 在job中设置: job.setCombinerClass(CustomCombiner.class) 5. combiner能够应用的前提是不能影响最终的业务逻辑,而且,combiner的输出kv应该跟reducer的输入kv类型要对应起来 # MAPREDUCE原理篇 ## mapreduce的shuffle机制 概述: * mapreduce中,map阶段处理的数据如何传递给reduce阶段,是mapreduce框架中最关键的一个流程,这个流程就叫shuffle; * shuffle: 洗牌、发牌——(核心机制:数据分区,排序,缓存); * 具体来说:就是将maptask输出的处理结果数据,分发给reducetask,并在分发的过程中,对数据按key进行了分区和排序; ## 主要流程: Shuffle缓存流程: ![](https://box.kancloud.cn/c0df47e8d217ebe66c7bd4b89b7a30e9_1302x599.png) shuffle是MR处理流程中的一个过程,它的每一个处理步骤是分散在各个map task和reduce task节点上完成的,整体来看,分为3个操作: 1. 分区partition 2. Sort根据key排序 3. Combiner进行局部value的合并 ## 详细流程 1. maptask收集我们的map()方法输出的kv对,放到内存缓冲区中 2. 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件 3. 多个溢出文件会被合并成大的溢出文件 4. 在溢出过程中,及合并的过程中,都要调用partitoner进行分组和针对key进行排序 5. reducetask根据自己的分区号,去各个maptask机器上取相应的结果分区数据 6. reducetask会取到同一个分区的来自不同maptask的结果文件,reducetask会将这些文件再进行合并(归并排序) 7. 合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法) Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快 缓冲区的大小可以通过参数调整, 参数:`io.sort.mb` 默认100M map的环形缓冲区有一个阈值,默认0.8,可以通过`io.sort.spill.percent`设置,通常设置为0.6 ## 详细流程示意图 ![](https://box.kancloud.cn/7079e2b94d3657baec01b87ce45309b3_1804x953.png) ![](https://box.kancloud.cn/0b495fdf2dba131c0045866bd67eb10a_1799x939.png) # MAPREDUCE中的序列化 ## 概述 Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系。。。。),不便于在网络中高效传输; 所以,hadoop自己开发了一套序列化机制(Writable),精简,高效 ## Jdk序列化和MR序列化之间的比较 简单代码验证两种序列化机制的差别: ~~~ public class TestSeri { public static void main(String[] args) throws Exception { //定义两个ByteArrayOutputStream,用来接收不同序列化机制的序列化结果 ByteArrayOutputStream ba = new ByteArrayOutputStream(); ByteArrayOutputStream ba2 = new ByteArrayOutputStream(); //定义两个DataOutputStream,用于将普通对象进行jdk标准序列化 DataOutputStream dout = new DataOutputStream(ba); DataOutputStream dout2 = new DataOutputStream(ba2); ObjectOutputStream obout = new ObjectOutputStream(dout2); //定义两个bean,作为序列化的源对象 ItemBeanSer itemBeanSer = new ItemBeanSer(1000L, 89.9f); ItemBean itemBean = new ItemBean(1000L, 89.9f); //用于比较String类型和Text类型的序列化差别 Text atext = new Text("a"); // atext.write(dout); itemBean.write(dout); byte[] byteArray = ba.toByteArray(); //比较序列化结果 System.out.println(byteArray.length); for (byte b : byteArray) { System.out.print(b); System.out.print(":"); } System.out.println("-----------------------"); String astr = "a"; // dout2.writeUTF(astr); obout.writeObject(itemBeanSer); byte[] byteArray2 = ba2.toByteArray(); System.out.println(byteArray2.length); for (byte b : byteArray2) { System.out.print(b); System.out.print(":"); } } } ~~~ ## 自定义对象实现MR中的序列化接口 如果需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序,此时,自定义的bean实现的接口应该是: `public class FlowBean implements WritableComparable<FlowBean> ` 需要自己实现的方法是: ~~~ /** * 反序列化的方法,反序列化时,从流中读取到的各个字段的顺序应该与序列化时写出去的顺序保持一致 */ @Override public void readFields(DataInput in) throws IOException { upflow = in.readLong(); dflow = in.readLong(); sumflow = in.readLong(); } /** * 序列化的方法 */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upflow); out.writeLong(dflow); //可以考虑不序列化总流量,因为总流量是可以通过上行流量和下行流量计算出来的 out.writeLong(sumflow); } @Override public int compareTo(FlowBean o) { //实现按照sumflow的大小倒序排序 return sumflow>o.getSumflow()?-1:1; } ~~~ # MapReduce与YARN ## YARN概述 Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而mapreduce等运算程序则相当于运行于操作系统之上的应用程序 ## YARN的重要概念 1. yarn并不清楚用户提交的程序的运行机制 2. yarn只提供运算资源的调度(用户程序向yarn申请资源,yarn就负责分配资源) 3. yarn中的主管角色叫ResourceManager 4. yarn中具体提供运算资源的角色叫NodeManager 5. 这样一来,yarn其实就与运行的用户程序完全解耦,就意味着yarn上可以运行各种类型的分布式运算程序(mapreduce只是其中的一种),比如mapreduce、storm程序,spark程序,tez …… 6. 所以,spark、storm等运算框架都可以整合在yarn上运行,只要他们各自的框架中有符合yarn规范的资源请求机制即可 7. Yarn就成为一个通用的资源调度平台,从此,企业中以前存在的各种运算集群都可以整合在一个物理集群上,提高资源利用率,方便数据共享 ## Yarn中运行运算程序的示例 mapreduce程序的调度过程,如下图 ![](https://box.kancloud.cn/20d257f3ebf92cd809542a606a7810dd_1445x639.png) # reduce阶段 1. reduceTask分区数据中,有一个mapTask处理完成之后,就开始从mapTask本地磁盘拉取输出数据.拉取任务是多线程的,默认是5,可根据`mpred.reduce.parallel.copys`设置 2. reduceTask复制的数据,首先保存在内存(输入缓冲区)中,由`mpred.job.shuffe.input.buffer.percent`属性控制,指定可用做输入缓冲区的堆内存的百分比 3. 在输入缓冲区溢出之前,会对数据进行合并,由groupingComparator规定合并数据规则.如果有指定的combiner组件,会在此时调用,以减少溢出时写入磁盘的数据量 4. 如果溢出文件过多,会根据合并因子逐步合并少数几个文件.(并不会合并为1个,reduce的输入数据最后一次合并,可来自内存)