今天抽时间又重新整体复习了下mapreduce在各个阶段的执行过程,感觉每一次学习都会有更大的收获,今天就把我学习到的东西全都和大家一起分享出来,方便自己梳理知识和记忆的同时,也希望能给其他小伙伴带来收获!每天进步一点点!

mapreduce分为几个阶段(各个阶段做了什么)(1)

一.什么是MapReduce

MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。

MapReduce的核心思想是“分而治之,先分后合”,即将一个大的、复杂的工作或任务,拆分成多个小的任务,并行处理,最终进行合并==。适用于大量复杂的、时效性不高的任务处理场景(大规模离线数据处理场景)。其中Map阶段就是分,Reduce阶段是合。

map:以一条记录为单位做映射~!主要做映射,变换,过滤,1进N出(进来1条记录出去N条记录)

reduce:以一组(相同Key数据为一组)为单位做计算~!主要做分解,缩小,归纳,一组进N出(进来一组记录出去N条记录)

map与reduce之间的衔接:键值对(Key,Value)

Key,Value):由map映射实现,键值对的键划分数据分组

mapreduce分为几个阶段(各个阶段做了什么)(2)

mapreduce简单的过程

二.MapReduce运行过程

1.MapReduce大致运行过程

mapreduce分为几个阶段(各个阶段做了什么)(3)

MapReduce的大致运行过程

MapReduce分为Map阶段和Reduce阶段,从上图中可以看出所有Map任务完成后才可以进行Reduce阶段,Reduce的输入来自于Map的输出,两者之间是有依赖的,不是并行关系。

3个split(分片)是可以并行运行的,代表3个map任务,一个切片对应一个map计算,默认情况下,以HDFS的一个块(block)的大小(默认128M)为一个分片。

那么思考一下,为什么不直接用一个block直接对应一个map计算?而是要在加一个split呢?

这就是在软件工程学当中经常提到的加一层解耦,block块的大小是固定的,split的大小默认情况下等于block块的大小,当然我们也是可以根据自己的需求来设置split的大小,可以大于块大小,也可以小于块大小,这样就可以满足未来不同项目组的不同需求了,用起来相当灵活了。

可以看出,map的个数(并行度)是由split的数量决定的,每个split对应一个map计算,默认情况下,一个split就是一个块,split是可以复用block偏移量,大小,location信息的,split可以根据block的location相关信息来找到对应副本的机器,然后map程序可以移动到相应的机器上进行计算,很好的实现了计算向数据移动的语义。

接下来介绍下分组和分区的概念:

分组:将相同key的value进行合并,key相等的话,将分到同一个组里面

分区:决定当前的key交给哪个reduce进行处理,默认情况下,根据key的hash值,对reduce个数取余

Reduce的阶段也可以设置多个reduce任务来计算,一个分区由一个reduce任务处理,每个reduce任务可以计算同一分组或多组中的数据。也就是说一个分区包含一个组或多个组中的数据。相同组的数据是不可切割开的,如果切开了,计算结果是不准确的。如果一个reduce任务中有多个组,那么需要开启多个线程来处理不同组中的数据,

reduce的个数(并行度)由人来决定,可以根据不同需求自己手动设置。

2.MapReduce一个map,reduce阶段的详细过程

mapreduce分为几个阶段(各个阶段做了什么)(4)

一个MapReduce的详细过程

如上图所示,MapReduce整体分为Map端任务和Reduce端任务,整个MapReduce程序又分为如下5个阶段。

input

首先读取HDFS上的文件,分出若干个分片(split),split会将文件内容格式化为记录,以记录为单位调用map方法,每一条记录都会调用一次map方法。

map

经过map方法后,map的输出映射为key,value,key,value会参与分区计算,拿着key算出分区号(根据key的hash值,对reduce个数取余),最终map输出key,value,分区(partition)。

shuffle:即为洗牌(包括Map端shuffle和Reduce端shuffle)***重要阶段****

maptask的输出是一个文件(中间数据集),存在本地的文件系统中,也就是说中间数据集并不是存放到HDFS中的。假设这个文件中有50000条记录,那么如何将这50000条记录放到本地文件系统中呢?是要每产出一条就写一条吗?内存进程和磁盘多次进行切换,这样速度会很慢,此时就会出现一个环形内存缓冲区(默认大小为100M)

1.map端的shuffle

(1)溢写(spill

每个map处理之后的结果将会进入环形缓冲区。内存缓冲区是有大小限制的,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为spill,中文可理解为溢写。溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。比例默认是0.8,也就是当缓冲区的数据值已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。map task的输出结果还可以往剩下的20MB内存中写,互不影响。

(2)排序

当溢写线程启动后,需要对这80MB空间内的分区,key做排序(sort)。需要做一个2次排序,分区有序,且分区内key有序,保证未来相同一组key会相邻的排在一起。

(3)往磁盘溢写数据

当环形缓冲区达到阈值80%,开始将分区排序后的数据溢写磁盘变成文件,每次溢写会在磁盘上生成一个溢写文件,如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个溢写文件存在。最终会产生多个小文件。

(4)merge合并

将spill生成的小文件进行合并,将多个溢写文件合并到一个文件,上面的每个小文件都是内部有序,外部无序的,所以在合并时做一个归并排序来达到合并后的文件是有序的。

map task结束,通过appmaster,appmaster通过reduce过来拉取数据。

2.reduce的shuffle:

(1)copy过程

简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过http方式请求map task所在的TaskTracker获取map task的输出文件。因为map task早已结束,这些文件就归TaskTracker管理在本地磁盘中

(2)merge合并

将每个map task的结果中属于自己分区的数据进行合并,同样做归并排序,使得相同分区的数据的key是有序的copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle使用

reduce

将合并后的数据f直接输入到reduce函数,进行相关计算,产出最终结果。

output

将最终结果存放到HDFS上。

分析了MapReduce的运行过程,可以发现MapReduce都是反复地执行排序,合并操作,现在终于明白了有些人为什么会说:排序是hadoop的灵魂

,