MapTask运行机制

  • 首先,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关系默认是一对一。

  • 将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n作为分隔符,读取一行数据,返回<key,value>。Key表示每行首字符偏移值,value表示这一行文本内容。

  • 读取split返回<key,value>,进入用户自己继承的Mapper类中,执行用户重写的map函数。RecordReader读取一行这里调用一次。

  • map逻辑完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitioner。

    MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。

  • 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。

    环形缓冲区其实是一个数组,数组中存放着key、value的序列化数据和key、value的元数据信息,包括partition、key的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概念。

    缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spillpercent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Maptask的输出结果还可以往剩下的20MB内存中写,互不影响。

  • 当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为!

    如果job设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。

    那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。

  • 合并溢写文件:每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。

  • MapTask的一些配置,可以访问官方文档查看。


注意:MapTask阶段所有的排序都是针对map输出kv的key进行排序!

MapTask的并行度

  • MapTask并行度思考

    MapTask的并行度决定Map阶段的任务处理并发度,从而影响到整个Job的处理速度。

    思考:MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?

  • MapTask并行度决定机制

    数据块:Block是HDFS物理上把数据分成一块一块。

    切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。**一个分片(split)对应一个MapTask任务。**分片大小默认等于block大小。splitSize=blockSize!blockSize=128M。

问题:a文件300M,b文件100M,两个文件都存入hdfs,并作为某个mr任务的输入数据,现在我们请问下当下这个mr任务的split,以及MapTask的并行度是多少?

切片的计算方式:按照文件逐个计算。

a文件:0-128M,128-256M,256-300M

b文件:0-100M

总共是4个split,MapTask并行度=4!

注意:在大数据分布式计算框架中,移动计算也不要移动数据,移动数据的成本很高,移动计算比较简单。

切片机制源码阅读




MapTask并行度是不是越多越好呢?

答案不是,如果一个文件仅仅比128M大一点点也被当成一个split来对待,而不是多个split。

MR框架在并行运算的同时也会消耗更多资源,并行度越高资源消耗也越高,假设129M文件分为两个分片,一个是128M,一个是1M;对于1M的切片的Maptask来说,太浪费资源。

129M的文件在Hdfs存储的时候会不会切成两块?

HDFS对文件存储时时按照128M切分数据块,无论多出来多少都会另存一块。HDFS上传大小为129M的文件,实际存储为两个数据块,一个128M,另一个1M。

ReduceTask工作机制


Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据
进行merge。待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行finalMerge操作,纯粹的sort阶段。完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。

  • Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求
    maptask获取属于自己的文件。

  • Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。

  • 合并排序。把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。

  • 对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。

ReduceTask并行度

ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:

1
2
// 默认值是1,手动设置为4
job.setNumReduceTasks(4);

注意事项:

  • ReduceTask=0,表示没有Reduce阶段,输出文件数和MapTask数量保持一致;

  • ReduceTask数量不设置默认就是一个,输出文件数量为1个;

  • 如果数据分布不均匀,可能在Reduce阶段产生数据倾斜,即某个reduceTask处理的数据量远远大于其他节点;

Shuffle机制

map阶段处理的数据如何传递给reduce阶段,是MapReduce框架中最关键的一个流程,这个流程就叫shuffle。

shuffle: 洗牌、发牌——(核心机制:数据分区,排序,分组,combine,合并等过程)。


MapReduce的分区与reduceTask的数量

在MapReduce中,通过我们指定分区,会将同一个分区的数据发送到同一个reduce当中进行处理(默认是key相同去往同个分区),例如我们为了数据的统计,我们可以把一批类似的数据发送到同一个reduce当中去,在同一个reduce当中统计相同类型的数据。

如何才能保证相同key的数据去往同个reduce呢?只需要保证相同key的数据分发到同个分区即可。结合以上原理分析我们知道MR程序shuffle机制默认就是这种规则!!

  • 分区源码

翻阅源码验证以上规则,MR程序默认使用的HashPartitioner,保证了相同的key去往同个分区!!

  • 自定义分区

实际生产中需求变化多端,默认分区规则往往不能满足需求,需要结合业务逻辑来灵活控制分区规则以及分区数量!!

如何制定自己需要的分区规则?

  1. 自定义类继承Partitioner,重写getPartition()方法

  2. 在Driver驱动中,指定使用自定义Partitioner

  3. 在Driver驱动中,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask数量。

自定义分区案例

需求:按照不同的appkey把记录输出到不同的分区中。

原始日志格式

1
2
010     001577c3   kar_890809             120.196.100.99    1116              954                   200
日志id 设备id appkey(合作硬件厂商) 网络ip 自有内容时长(秒) 第三方内容时长(秒) 网络状态码

输出结果

根据appkey把不同厂商的日志数据分别输出到不同的文件中

需求分析

面对业务需求,结合mr的特点,来设计map输出的kv,以及reduce输出的kv数据。一个ReduceTask对应一个输出文件,因为在shuffle机制中每个reduceTask拉取的都是某一个分区的数据,一个分区对应一个输出文件。结合appkey的前缀相同的特点,同时不能使用默认分区规则,而是使用自定义分区器,只要appkey前缀相同则数据进入同个分区。

在博客MapReduce编程规范及示例编写中maven工程wordcount里编写案例。

  • 创建PartitionBean对象

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    package com.lagou.mr.partition;

    import org.apache.hadoop.io.Writable;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    public class PartitionBean implements Writable {
    //准备一个空参构造
    public PartitionBean() {
    }

    public PartitionBean(String id, String deviceId, String appkey, String ip, Long selfDuration, Long thirdPartDuration, String status) {
    this.id = id;
    this.deviceId = deviceId;
    this.appkey = appkey;
    this.ip = ip;
    this.selfDuration = selfDuration;
    this.thirdPartDuration = thirdPartDuration;
    this.status = status;
    }

    //序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
    out.writeUTF(id);
    out.writeUTF(deviceId);
    out.writeUTF(appkey);
    out.writeUTF(ip);
    out.writeLong(selfDuration);
    out.writeLong(thirdPartDuration);
    out.writeUTF(status);

    }

    //反序列化方法 要求序列化与反序列化字段顺序要保持一致
    @Override
    public void readFields(DataInput in) throws IOException {
    this.id = in.readUTF();
    this.deviceId = in.readUTF();
    this.appkey = in.readUTF();
    this.ip = in.readUTF();
    this.selfDuration = in.readLong();
    this.thirdPartDuration = in.readLong();
    this.status = in.readUTF();

    }

    //定义属性
    private String id;//日志id
    private String deviceId;//设备id
    private String appkey;//appkey厂商id
    private String ip;//ip地址
    private Long selfDuration;//自有内容播放时长
    private Long thirdPartDuration;//第三方内容时长
    private String status;//状态码

    public String getId() {
    return id;
    }

    public void setId(String id) {
    this.id = id;
    }

    public String getDeviceId() {
    return deviceId;
    }

    public void setDeviceId(String deviceId) {
    this.deviceId = deviceId;
    }

    public String getAppkey() {
    return appkey;
    }

    public void setAppkey(String appkey) {
    this.appkey = appkey;
    }

    public String getIp() {
    return ip;
    }

    public void setIp(String ip) {
    this.ip = ip;
    }

    public Long getSelfDuration() {
    return selfDuration;
    }

    public void setSelfDuration(Long selfDuration) {
    this.selfDuration = selfDuration;
    }

    public Long getThirdPartDuration() {
    return thirdPartDuration;
    }

    public void setThirdPartDuration(Long thirdPartDuration) {
    this.thirdPartDuration = thirdPartDuration;
    }

    public String getStatus() {
    return status;
    }

    public void setStatus(String status) {
    this.status = status;
    }

    //方便文本中的数据易于观察,重写toString()方法
    @Override
    public String toString() {
    return id + '\t' +
    "\t" + deviceId + '\t' + appkey + '\t' +
    ip + '\t' +
    selfDuration +
    "\t" + thirdPartDuration +
    "\t" + status;
    }
    }
  • 编写Mapper类,名称为PartitionMapper。

    1. 读取一行文本,按照制表符切分
    2. 解析出appkey字段,其余数据封装为PartitionBean对象(实现序列化Writable接口)
    3. 设计map()输出的kv,key–>appkey(依靠该字段完成分区),PartitionBean对象作为Value输出
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    package com.lagou.mr.partition;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.IOException;

    /*
    1. 读取一行文本,按照制表符切分

    2. 解析出appkey字段,其余数据封装为PartitionBean对象(实现序列化Writable接口)

    3. 设计map()输出的kv,key-->appkey(依靠该字段完成分区),PartitionBean对象作为Value输出
    */
    public class PartitionMapper extends Mapper<LongWritable, Text, Text, PartitionBean> {
    final PartitionBean bean = new PartitionBean();
    final Text k = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    final String[] fields = value.toString().split("\t");
    String appkey = fields[2];

    bean.setId(fields[0]);
    bean.setDeviceId(fields[1]);
    bean.setAppkey(fields[2]);
    bean.setIp(fields[3]);
    bean.setSelfDuration(Long.parseLong(fields[4]));
    bean.setThirdPartDuration(Long.parseLong(fields[5]));
    bean.setStatus(fields[6]);

    k.set(appkey);
    context.write(k, bean); //shuffle开始时会根据k的hashcode值进行分区,但是结合我们自己的业务,默认hash分区方式不能满足需求
    }
    }
  • 编写Reducer类,名称为PartitionReducer。

    1. reduce()正常输出即可,无需进行聚合操作
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    package com.lagou.mr.partition;

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    import java.io.IOException;

    //reduce输入类型:Text,PartitionBean,输出:Text,PartitionBean
    public class PartitionReducer extends Reducer<Text, PartitionBean, Text, PartitionBean> {
    @Override
    protected void reduce(Text key, Iterable<PartitionBean> values, Context context) throws IOException, InterruptedException {
    //无需聚合运算,只需要进行输出即可
    for (PartitionBean bean : values) {
    context.write(key, bean);
    }
    }
    }
  • 编写Partitioner类,名称为CustomPartitioner。

    1. 自定义分区器,实现按照appkey字段的前缀来区分所属分区
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    package com.lagou.mr.partition;

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;

    //Partitioner分区器的泛型是map输出的kv类型
    public class CustomPartitioner extends Partitioner<Text, PartitionBean> {
    @Override
    public int getPartition(Text text, PartitionBean partitionBean, int numPartitions) {
    int partition = 0;

    if (text.toString().equals("kar")) {
    //只需要保证满足此if条件的数据获得同个分区编号集合
    partition = 0;
    } else if (text.toString().equals("pandora")) {
    partition = 1;
    } else {
    partition = 2;
    }
    return partition;
    }
    }
  • 编写Driver类,名称为PartitionDriver。

    1. 在原先设置job属性的同时增加设置使用自定义分区器
    2. 注意设置ReduceTask的数量(与分区数量保持一致)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    package com.lagou.mr.partition;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    public class PartitionDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

    //1 获取配置文件
    final Configuration conf = new Configuration();
    //2 获取job实例
    final Job job = Job.getInstance(conf);
    //3 设置任务相关参数
    job.setJarByClass(PartitionDriver.class);
    job.setMapperClass(PartitionMapper.class);
    job.setReducerClass(PartitionReducer.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(PartitionBean.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(PartitionBean.class);

    // 4 设置使用自定义分区器
    job.setPartitionerClass(CustomPartitioner.class);
    //5 指定reducetask的数量与分区数量保持一致,分区数量是3
    job.setNumReduceTasks(3); //reducetask不设置默认是1个
    //job.setNumReduceTasks(5);
    //job.setNumReduceTasks(2);
    // 6 指定输入和输出数据路径
    FileInputFormat.setInputPaths(job, new Path("e:/speak.data"));
    FileOutputFormat.setOutputPath(job, new Path("e:/parition/out"));
    // 7 提交任务
    final boolean flag = job.waitForCompletion(true);

    System.exit(flag ? 0 : 1);
    }
    }

总结

  1. 自定义分区器时最好保证分区数量与reduceTask数量保持一致;
  2. 如果分区数量不止1个,但是reduceTask数量1个,此时只会输出一个文件。
  3. 如果reduceTask数量大于分区数量,但是输出多个空文件
  4. 如果reduceTask数量小于分区数量,有可能会报错。

MapReduce中的Combiner

combiner运行机制:

  1. Combiner是MR程序中Mapper和Reducer之外的一种组件
  2. Combiner组件的父类就是Reducer
  3. Combiner和reducer的区别在于运行的位置
  4. Combiner是在每一个maptask所在的节点运行;
  5. Combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量。
  6. Combiner能够应用的前提是不能影响最终的业务逻辑,此外,Combiner的输出kv应该跟reducer的输入kv类型要对应起来。

举例说明:

假设一个计算平均值的MR任务.

Map阶段,2个MapTask,MapTask1输出数据:10,5,15 如果使用Combiner:(10+5+15)/3=10;MapTask2输出数据:2,6 如果使用Combiner:(2+6)/2=4。

Reduce阶段汇总(10+4)/2=7,而正确结果应该是(10+5+15+2+6)/5=7.6。

  • 自定义Combiner实现步骤

    自定义一个Combiner继承Reducer,重写Reduce方法。

    在驱动(Driver)设置使用Combiner(默认是不适用Combiner组件)。

在博客MapReduce编程规范及示例编写中maven工程wordcount,添加WordCountCombiner类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.lagou.mr.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

//combiner组件的输入和输出类型与map()方法保持一致
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
final IntWritable total = new IntWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int num = 0;
//进行局部汇总,逻辑是与reduce方法保持一致
for (IntWritable value : values) {
final int i = value.get();
num += i;
}
total.set(num);
//输出单词,累加结果
context.write(key, total);
}
}

在驱动WordCountDriver(Driver)设置使用Combiner。

1
job.setCombinerClass(WordCountCombiner.class);

验证结果

如果直接使用WordCountReducer作为Combiner使用是否可以?

直接使用Reducer作为Combiner组件来使用是可以的!!