MapReduce原理分析
MapTask运行机制
-
首先,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关系默认是一对一。
-
将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n作为分隔符,读取一行数据,返回<key,value>。Key表示每行首字符偏移值,value表示这一行文本内容。
-
读取split返回<key,value>,进入用户自己继承的Mapper类中,执行用户重写的map函数。RecordReader读取一行这里调用一次。
-
map逻辑完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitioner。
MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。
-
接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。
环形缓冲区其实是一个数组,数组中存放着key、value的序列化数据和key、value的元数据信息,包括partition、key的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概念。
缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spillpercent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Maptask的输出结果还可以往剩下的20MB内存中写,互不影响。
-
当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为!
如果job设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。
那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。
-
合并溢写文件:每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。
-
MapTask的一些配置,可以访问官方文档查看。
注意:MapTask阶段所有的排序都是针对map输出kv的key进行排序!
MapTask的并行度
-
MapTask并行度思考
MapTask的并行度决定Map阶段的任务处理并发度,从而影响到整个Job的处理速度。
思考:MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?
-
MapTask并行度决定机制
数据块:Block是HDFS物理上把数据分成一块一块。
切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。**一个分片(split)对应一个MapTask任务。**分片大小默认等于block大小。splitSize=blockSize!blockSize=128M。
问题:a文件300M,b文件100M,两个文件都存入hdfs,并作为某个mr任务的输入数据,现在我们请问下当下这个mr任务的split,以及MapTask的并行度是多少?
切片的计算方式:按照文件逐个计算。
a文件:0-128M,128-256M,256-300M
b文件:0-100M
总共是4个split,MapTask并行度=4!
注意:在大数据分布式计算框架中,移动计算也不要移动数据,移动数据的成本很高,移动计算比较简单。
切片机制源码阅读
MapTask并行度是不是越多越好呢?
答案不是,如果一个文件仅仅比128M大一点点也被当成一个split来对待,而不是多个split。
MR框架在并行运算的同时也会消耗更多资源,并行度越高资源消耗也越高,假设129M文件分为两个分片,一个是128M,一个是1M;对于1M的切片的Maptask来说,太浪费资源。
129M的文件在Hdfs存储的时候会不会切成两块?
HDFS对文件存储时时按照128M切分数据块,无论多出来多少都会另存一块。HDFS上传大小为129M的文件,实际存储为两个数据块,一个128M,另一个1M。
ReduceTask工作机制
Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据
进行merge。待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行finalMerge操作,纯粹的sort阶段。完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。
-
Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求
maptask获取属于自己的文件。 -
Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。
-
合并排序。把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。
-
对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。
ReduceTask并行度
ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:
1 | // 默认值是1,手动设置为4 |
注意事项:
-
ReduceTask=0,表示没有Reduce阶段,输出文件数和MapTask数量保持一致;
-
ReduceTask数量不设置默认就是一个,输出文件数量为1个;
-
如果数据分布不均匀,可能在Reduce阶段产生数据倾斜,即某个reduceTask处理的数据量远远大于其他节点;
Shuffle机制
map阶段处理的数据如何传递给reduce阶段,是MapReduce框架中最关键的一个流程,这个流程就叫shuffle。
shuffle: 洗牌、发牌——(核心机制:数据分区,排序,分组,combine,合并等过程)。
MapReduce的分区与reduceTask的数量
在MapReduce中,通过我们指定分区,会将同一个分区的数据发送到同一个reduce当中进行处理(默认是key相同去往同个分区),例如我们为了数据的统计,我们可以把一批类似的数据发送到同一个reduce当中去,在同一个reduce当中统计相同类型的数据。
如何才能保证相同key的数据去往同个reduce呢?只需要保证相同key的数据分发到同个分区即可。结合以上原理分析我们知道MR程序shuffle机制默认就是这种规则!!
- 分区源码
翻阅源码验证以上规则,MR程序默认使用的HashPartitioner,保证了相同的key去往同个分区!!
- 自定义分区
实际生产中需求变化多端,默认分区规则往往不能满足需求,需要结合业务逻辑来灵活控制分区规则以及分区数量!!
如何制定自己需要的分区规则?
-
自定义类继承Partitioner,重写getPartition()方法
-
在Driver驱动中,指定使用自定义Partitioner
-
在Driver驱动中,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask数量。
自定义分区案例
需求:按照不同的appkey把记录输出到不同的分区中。
原始日志格式
1 | 010 001577c3 kar_890809 120.196.100.99 1116 954 200 |
输出结果
根据appkey把不同厂商的日志数据分别输出到不同的文件中
需求分析
面对业务需求,结合mr的特点,来设计map输出的kv,以及reduce输出的kv数据。一个ReduceTask对应一个输出文件,因为在shuffle机制中每个reduceTask拉取的都是某一个分区的数据,一个分区对应一个输出文件。结合appkey的前缀相同的特点,同时不能使用默认分区规则,而是使用自定义分区器,只要appkey前缀相同则数据进入同个分区。
在博客MapReduce编程规范及示例编写中maven工程wordcount里编写案例。
-
创建PartitionBean对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125package com.lagou.mr.partition;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class PartitionBean implements Writable {
//准备一个空参构造
public PartitionBean() {
}
public PartitionBean(String id, String deviceId, String appkey, String ip, Long selfDuration, Long thirdPartDuration, String status) {
this.id = id;
this.deviceId = deviceId;
this.appkey = appkey;
this.ip = ip;
this.selfDuration = selfDuration;
this.thirdPartDuration = thirdPartDuration;
this.status = status;
}
//序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(id);
out.writeUTF(deviceId);
out.writeUTF(appkey);
out.writeUTF(ip);
out.writeLong(selfDuration);
out.writeLong(thirdPartDuration);
out.writeUTF(status);
}
//反序列化方法 要求序列化与反序列化字段顺序要保持一致
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readUTF();
this.deviceId = in.readUTF();
this.appkey = in.readUTF();
this.ip = in.readUTF();
this.selfDuration = in.readLong();
this.thirdPartDuration = in.readLong();
this.status = in.readUTF();
}
//定义属性
private String id;//日志id
private String deviceId;//设备id
private String appkey;//appkey厂商id
private String ip;//ip地址
private Long selfDuration;//自有内容播放时长
private Long thirdPartDuration;//第三方内容时长
private String status;//状态码
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getAppkey() {
return appkey;
}
public void setAppkey(String appkey) {
this.appkey = appkey;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public Long getSelfDuration() {
return selfDuration;
}
public void setSelfDuration(Long selfDuration) {
this.selfDuration = selfDuration;
}
public Long getThirdPartDuration() {
return thirdPartDuration;
}
public void setThirdPartDuration(Long thirdPartDuration) {
this.thirdPartDuration = thirdPartDuration;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
//方便文本中的数据易于观察,重写toString()方法
@Override
public String toString() {
return id + '\t' +
"\t" + deviceId + '\t' + appkey + '\t' +
ip + '\t' +
selfDuration +
"\t" + thirdPartDuration +
"\t" + status;
}
} -
编写Mapper类,名称为PartitionMapper。
- 读取一行文本,按照制表符切分
- 解析出appkey字段,其余数据封装为PartitionBean对象(实现序列化Writable接口)
- 设计map()输出的kv,key–>appkey(依靠该字段完成分区),PartitionBean对象作为Value输出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36package com.lagou.mr.partition;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
1. 读取一行文本,按照制表符切分
2. 解析出appkey字段,其余数据封装为PartitionBean对象(实现序列化Writable接口)
3. 设计map()输出的kv,key-->appkey(依靠该字段完成分区),PartitionBean对象作为Value输出
*/
public class PartitionMapper extends Mapper<LongWritable, Text, Text, PartitionBean> {
final PartitionBean bean = new PartitionBean();
final Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
final String[] fields = value.toString().split("\t");
String appkey = fields[2];
bean.setId(fields[0]);
bean.setDeviceId(fields[1]);
bean.setAppkey(fields[2]);
bean.setIp(fields[3]);
bean.setSelfDuration(Long.parseLong(fields[4]));
bean.setThirdPartDuration(Long.parseLong(fields[5]));
bean.setStatus(fields[6]);
k.set(appkey);
context.write(k, bean); //shuffle开始时会根据k的hashcode值进行分区,但是结合我们自己的业务,默认hash分区方式不能满足需求
}
} -
编写Reducer类,名称为PartitionReducer。
- reduce()正常输出即可,无需进行聚合操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17package com.lagou.mr.partition;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
//reduce输入类型:Text,PartitionBean,输出:Text,PartitionBean
public class PartitionReducer extends Reducer<Text, PartitionBean, Text, PartitionBean> {
@Override
protected void reduce(Text key, Iterable<PartitionBean> values, Context context) throws IOException, InterruptedException {
//无需聚合运算,只需要进行输出即可
for (PartitionBean bean : values) {
context.write(key, bean);
}
}
} -
编写Partitioner类,名称为CustomPartitioner。
- 自定义分区器,实现按照appkey字段的前缀来区分所属分区
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22package com.lagou.mr.partition;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
//Partitioner分区器的泛型是map输出的kv类型
public class CustomPartitioner extends Partitioner<Text, PartitionBean> {
@Override
public int getPartition(Text text, PartitionBean partitionBean, int numPartitions) {
int partition = 0;
if (text.toString().equals("kar")) {
//只需要保证满足此if条件的数据获得同个分区编号集合
partition = 0;
} else if (text.toString().equals("pandora")) {
partition = 1;
} else {
partition = 2;
}
return partition;
}
} -
编写Driver类,名称为PartitionDriver。
- 在原先设置job属性的同时增加设置使用自定义分区器
- 注意设置ReduceTask的数量(与分区数量保持一致)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43package com.lagou.mr.partition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class PartitionDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1 获取配置文件
final Configuration conf = new Configuration();
//2 获取job实例
final Job job = Job.getInstance(conf);
//3 设置任务相关参数
job.setJarByClass(PartitionDriver.class);
job.setMapperClass(PartitionMapper.class);
job.setReducerClass(PartitionReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(PartitionBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PartitionBean.class);
// 4 设置使用自定义分区器
job.setPartitionerClass(CustomPartitioner.class);
//5 指定reducetask的数量与分区数量保持一致,分区数量是3
job.setNumReduceTasks(3); //reducetask不设置默认是1个
//job.setNumReduceTasks(5);
//job.setNumReduceTasks(2);
// 6 指定输入和输出数据路径
FileInputFormat.setInputPaths(job, new Path("e:/speak.data"));
FileOutputFormat.setOutputPath(job, new Path("e:/parition/out"));
// 7 提交任务
final boolean flag = job.waitForCompletion(true);
System.exit(flag ? 0 : 1);
}
}
总结
- 自定义分区器时最好保证分区数量与reduceTask数量保持一致;
- 如果分区数量不止1个,但是reduceTask数量1个,此时只会输出一个文件。
- 如果reduceTask数量大于分区数量,但是输出多个空文件
- 如果reduceTask数量小于分区数量,有可能会报错。
MapReduce中的Combiner
combiner运行机制:
- Combiner是MR程序中Mapper和Reducer之外的一种组件
- Combiner组件的父类就是Reducer
- Combiner和reducer的区别在于运行的位置
- Combiner是在每一个maptask所在的节点运行;
- Combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量。
- Combiner能够应用的前提是不能影响最终的业务逻辑,此外,Combiner的输出kv应该跟reducer的输入kv类型要对应起来。
举例说明:
假设一个计算平均值的MR任务.
Map阶段,2个MapTask,MapTask1输出数据:10,5,15 如果使用Combiner:(10+5+15)/3=10;MapTask2输出数据:2,6 如果使用Combiner:(2+6)/2=4。
Reduce阶段汇总(10+4)/2=7,而正确结果应该是(10+5+15+2+6)/5=7.6。
-
自定义Combiner实现步骤
自定义一个Combiner继承Reducer,重写Reduce方法。
在驱动(Driver)设置使用Combiner(默认是不适用Combiner组件)。
在博客MapReduce编程规范及示例编写中maven工程wordcount,添加WordCountCombiner类。
1 | package com.lagou.mr.wc; |
在驱动WordCountDriver(Driver)设置使用Combiner。
1 | job.setCombinerClass(WordCountCombiner.class); |
验证结果
如果直接使用WordCountReducer作为Combiner使用是否可以?
直接使用Reducer作为Combiner组件来使用是可以的!!