MapReduce执行过程及运行原理详解

Hadoop 潘老师 4个月前 (08-06) 385 ℃ (0) 扫码查看

MapReduce执行过程及运行原理是非常复杂的,理解起来也极其困难,这也是许多同学在学习Hadoop过程中比较困惑的地方,今天潘老师来带大家从头到尾详细地梳理一下MapReduce执行的4个阶段,分别为Split阶段—> Map阶段 —> Shuffle阶段 —> Reduce阶段

在正式进入详解之前,我们先通过一个简单的WordCount单词统计案例的执行过程来大致熟悉下这几个阶段的执行过程及其含义。

如果你想查看WordCount单词统计案例代码可以参考:Hadoop经典入门案例实战-WordCount单词统计代码实现及原理分析

首先,有这样一个简单的需求,假设有一个文件,里面有三行共9个单词(如图),现在需要是统计文件中每个单词出现的次数,那么MapReduce执行过程如下图所示:
MapReduce执行过程及运行原理详解
从图中我们可以看出,它的执行流程是这样的:
图中主要分为Split、Map、Shuffle和Reduce 的4个阶段,每个阶段作用如下:
1)Split:由于输入文件太大,MapReduce会对其进行分割,大文件会被切分成多份,图中假设被切为3份,每一行代表一份。
2)Map:解析出每一行中的每个单词,并在后面记上数字1,表示此单词出现了一次。
3)Shuffle:将每一份中的相同的单词分组到一起,并默认按照字母顺序进行升序排序。
4)Reduce:将每一组中的单词出现的次数进行累加求和。
最后将求和好的数据输出HDFS文件系统中。

我相信,大部分同学看完上面的案例,应该多MapReduce大致执行过程有了初步的了解,不过这里面还有很多细节没有细化,接下来我们对每个阶段进行详细讲解。先上一张MapReduce执行过程图,后面许多讲解都会根据这张图来说明:
MapReduce执行过程及运行原理详解

1、输入分片的概念

所谓输入分片,并不是真的把原来的一个大文件,比如说10MB的文件,切分成10个1MB的小文件,这里的分片不是物理分片,而是逻辑分片,所谓逻辑分片就是根据文件的字节索引进行分割,比如0~1MB位置定义为第一个分片,1MB~2MB定义为为第二个分片,依次类推…..而原来的大文件还是原来的大文件,不会受到影响,因此,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组。
2、分片数量与Map Task数量的关系?

Map Task的个数等于split的个数。我们知道,mapreduce在处理大文件的时候,会根据一定的规则,把大文件划分成多个分片,这样能够提高map的并行度。 划分出来的就是InputSplit,每个map处理一个InputSplit,因此,有多少个InputSplit,就有多少个map task。
3、由谁来划分分片?

主要是InputFormat类来负责划分Split。InputFormat类有2个重要的作用:
1)将输入的数据切分为多个逻辑上的InputSplit,其中每一个InputSplit作为一个map的输入。
2)提供一个RecordReader,用于将InputSplit的内容转换为可以作为map输入的k,v键值对。
FileInputFormat是InputFormat的子类,是使用比较广泛的类,输入格式如果是hdfs上的文件,基本上用的都是FileInputFormat的子类,如TextInputFormat用来处理普通的文件,SequceFileInputFormat用来处理Sequce格式文件。 FileInputFormat类中的getSplits(JobContext job)方法是划分split的主要逻辑。
4、分片的大小由谁来决定?

每个输入分片的大小是固定的,默认情况下,输入片(InputSplit)的大小与数据块(Block)的大小是相同的。Hadoop 2.x默认的block大小是128MB,Hadoop 1.x默认的block大小是64MB,可以在hdfs-site.xml中设置dfs.block.size,注意单位是byte。
分片大小范围可以在mapred-site.xml中设置,最小分片大小:mapred.min.split.size ,最大分片大小:mapred.max.split.size,minSplitSize大小默认为1B,maxSplitSize大小默认为Long.MAX_VALUE = 9223372036854775807

那么分片到底是多大呢?可以通过如下公式计算:

minSize=max{minSplitSize,mapred.min.split.size} 
maxSize=mapred.max.split.size
splitSize=max{minSize,min{maxSize,blockSize}}

在我们没有设置分片的范围的时候,也就是默认情况下,分片大小是由block块大小决定的,和它的大小一样。比如把一个258MB的文件上传到HDFS上,假设block块大小是128MB,那么它就会被分成三个block块,与之对应产生三个split,所以最终会产生三个map task。我又发现了另一个问题,第三个block块里存的文件大小只有2MB,而它的block块大小是128MB,那它实际占用Linux file system的多大空间?答案是实际的文件大小即2M,而非一个块的大小。

5、默认分片大小与Block分块大小是相同的原因?

hadoop在存储有输入数据(HDFS中的数据)的节点上运行map任务,可以获得高性能,这就是所谓的数据本地化。所以最佳分片的大小应该与HDFS上的块大小一样,因为如果分片跨越2个数据块,对于任何一个HDFS节点(Hadoop系统保证一个块存储在一个datanode上,基本不可能同时存储这2个数据块),分片中的另外一块数据就需要通过网络传输到map任务节点,与使用本地数据运行map任务相比,效率则更低!优点就是可以实现分块优化,减少网络传输数据,使用本地数据运行map任务。

每个Mapper任务是一个java进程,它会读取HDFS文件中自己对应的输入切片,将切片中记录按照一定的规则解析成很多的键值对,有个默认规则是把每一行文本内容解析成键值对,这里的“键”是每一行的起始位置(单位是字节),“值”是本行的文本内容,也就是我们图中定义为<k1,v1>。然后经过我们重写的map方法处理,针对解析出来的每一个键值对<k1,v1>,都分别调用一次map方法,如果有1000个键值对,就会调用1000次map方法,每一次调用map方法会输出零个或者多个键值对,这里输出的键值对,也就是我们图中定义的,比如我们单词统计案例中其中的一个<k1,v1>为<0,'Dear Bear River'>,经一次map处理后变为3个<k2,v2>,即<'Dear',1>、<'Bear',1>和<'River',1>。

Shuffle阶段也称为洗牌阶段,该阶段是将输出的<k2,v2>传给Shuffle(洗牌),Shuffle完成对数据的分区、排序和合并等操作。Shuffle过程包含在Map和Reduce两端,即Map shuffle和Reduce shuffle, Shuffle描述着数据从map task流向reduce task的这段过程,具体过程如图:
MapReduce执行过程及运行原理详解

1)每个输入分片会让一个 Map 任务来处理,默认情况下,以 HDFS 的一个块的大小(默认为 128MB)为一个分片。Map 函数开始产生输出时,并不是简单地把数据写到磁盘中,因为频繁的磁盘操作会导致性能严重下降。它的处理过程是把数据首先写到内存中的一个缓冲区, 并做一些预排序,以提升效率。

2)每个 Map 任务都有一个用来写入输出数据的循环内存缓冲区(默认大小为 128MB),当缓冲区中的数据量达到一个特定阈值(默认是 80%)时,系统将会启动一个后台线程,把缓冲 区中的内容写到磁盘中(即 Spill 阶段)。在写磁盘过程中,Map 输出继续被写到缓冲区中,但如果在此期间缓冲区被填满,那么 Map 任务就会阻塞直到写磁盘过程完成。

3)在写入磁盘之前,线程首先根据reduce任务的数目将数据划分为相同数目的分区(Partition),也就是一个reduce任务对应一个分区的数据。这样做是为了避免有些reduce任务分配到大量数据,而有些reduce任务却分到很少数据,甚至没有分到数据的尴尬局面。其实分区就是对数据进行hash的过程。然后对每个分区中的数据进行排序,如果此时设置了Combiner,将排序后的结果进行Combine操作,这样做的目的是让尽可能少的数据写入到磁盘。

4)当map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并。合并的过程中会不断地进行排序和Combine操作,目的有两个:①尽量减少每次写入磁盘的数据量。②尽量减少下一复制阶段网络传输的数据量。最后合并成了一个已分区且已排序的文件。为了减少网络传输的数据量,这里可以将数据压缩,只要将mapred.compress.map.out设置为true就可以了。

补充Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。
在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。合并完之后数据(以key和value的形式存在,及<k2,v2>)会基于Partition被发送到不同的Reduce上,Reduce会从不同的Map上取得属于自己的数据并写入磁盘,完成merge操作减少文件数量,并调用Reduce程序,最终Output完成输出

5)溢出写文件归并完毕后,Map 任务将删除所有的临时溢出写文件,并告知 TaskTracker 任务已完成,只要其中一个 Map 任务完成,Reduce 任务就会开始复制它的输出(Copy 阶段)。

6)Map 任务的输出文件放置在运行 Map 任务的 TaskTracker 的本地磁盘上,它是运行 Reduce 任务的 TaskTracker 所需要的输入数据。

溢写阶段详情:
步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。

补充:combiner是可选操作,combiner其实也是一种reduce操作,因此我们看见WordCount类里是用reduce进行加载的。Combiner是一个本地化的reduce操作,它是map运算的后续操作,主要是在map计算出中间文件前做一个简单的合并重复key值的操作,例如我们对文件里的单词频率做统计,map计算时候如果碰到一个hadoop的单词就会记录为1,但是这篇文章里hadoop可能会出现n多次,那么map输出文件冗余就会很多,因此在reduce计算前对相同的key做一个合并操作,那么文件会变小,这样就提高了宽带的传输效率,毕竟hadoop计算力宽带资源往往是计算的瓶颈也是最为宝贵的资源,但是combiner操作是有风险的,使用它的原则是combiner的输入不会影响到reduce计算的最终输入,例如:如果计算只是求总数,最大值,最小值可以使用combiner,但是做平均值计算使用combiner的话,最终的reduce计算结果就会出错。

步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。

补充:在Hadoop这样的集群环境中,大部分map task与reduce task的执行是在不同的节点上。当然很多情况下Reduce执行时需要跨节点去拉取其它节点上的map task结果。如果集群正在运行的job有很多,那么task的正常执行对集群内部的网络资源消耗会很严重。这种网络消耗是正常的,我们不能限制,能做的就是最大化地减少不必要的消耗。还有在节点内,相比于内存,磁盘IO对job完成时间的影响也是可观的。

因为频繁的磁盘I/O操作会严重的降低效率,因此Map处理后的“中间结果”不会立马写入磁盘,而是优先存储到map节点的“环形内存缓冲区”。每个 Map 任务都有一个用来写入输出数据的循环内存缓冲区(默认大小为 128MB),当缓冲区中的数据量达到一个特定阈值(默认是 80%)时,系统将会启动一个后台线程,把缓冲 区中的内容写到磁盘中(即 Spill 阶段)。当溢写线程启动,会锁定这128*0.8 MB的内存,执行溢写过程,此时Map task的输出结果还可以往剩下的128*0.2 MB内存中写,互不影响,但如果在此期间缓冲区仍旧被填满,那么 Map 任务就会阻塞直到写磁盘过程完成。

溢写之前包括Partition(分区)、Sort(排序)和可选的Combiner(本地化reduce)。Partition和Sort都有默认实现,其中Partition分区默认按照“key的hash%reduce数量”进行分区,分区之后的数据会进入不同的Reduce,而Sort是默认key的compareTo方法进行排序的,我们也可以根据业务需求自行编写。

MapReduce执行过程及运行原理详解

1)Reduce 进程启动一些数据复制线程,请求 Map 任务所在的 TaskTracker 以获取输出文件,并且每个map传来的数据都是有序的(Copy 阶段)。

2)如果reduce端接受的数据量相当小,则直接存储在内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用作此用途的堆空间的百分比),如果数据量超过了该缓冲区大小的一定比例(由mapred.job.shuffle.merge.percent决定),则对数据合并后溢写到磁盘中。

3)随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样做是为了给后面的合并节省时间。其实不管在map端还是reduce端,MapReduce都是反复地执行排序,合并操作,可以说:排序是hadoop的灵魂。

4)合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到reduce函数。

5)在输入到reduce函数之前,reducer接收到所有映射到这个reducer的map输出后,也是会对key排序,然后开始构造一个key对应的value迭代器。这时就要用到分组,存在默认分组,key相同则分为一组,也可以使用jobjob.setGroupingComparatorClass设置的自定义分组函数类分组,只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key(实际测试,通过后面的温度排序案例打印reduce端MyKey日志发现同一组却不同key,很奇怪,还没搞明白呢?)。

Reduce阶段就将分组好的<k2,{v2集合}>,进行聚合统计,和map函数一样也是程序员编写的,然后将结果输出到HDFS,每个reduce进程会对应一个输出文件,名称以part-开头。

说明:Reduce的默认数是1,也可自行设置。
一个程序可能只有一个Map任务却没有Reduce任务,也可能是多个MapReduce程序串接起来,比如把第一个MapReduce的输出作为第二个MapReduce的输入,第二个MapReduce的输出成为第三个MapReduce的输入,最终才能完成一个任务。


版权声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系潘老师进行处理。
喜欢 (1)
请潘老师喝杯Coffee吧!】
分享 (0)

您必须 微信登录 才能发表评论!

登录