排序是MapReduce框架中最重要的操作之一。

MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑.上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序

  • MapTask它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,溢写完毕后,它会对磁盘上所有文件进行归并排序。

  • ReduceTask 当所有数据拷贝完毕后,ReduceTask统-对内存和磁盘上的所有数据进行一次归并排序。

  1. 部分排序.

MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。

  1. 全排序

最终输出结果只有一个文件,且文件内部有序。实现方式是只设置- -个ReduceTask。但该方法在处理大型文件时效率极低,因为- -台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。

  1. 辅助排序: ( GroupingComparator分组)

在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部
字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。

  1. 二次排序

在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

WritableComparable

Bean对象如果作为Map输出的key时,需要实现WritableComparable接口并重写compareTo方法指定排序规则。

全排序

基于统计的播放时长案例的输出结果对总时长进行排序。实现全局排序只能设置一个ReduceTask!!

播放时长案例输出结果

1
2
3
4
5
6
7
00fdaf3   33180   33420   00fdaf3     66600
00wersa4 30689 35191 00wersa4 65880
0a0fe2 43085 44254 0a0fe2 87339
0ad0s7 31702 29183 0ad0s7 60885
0sfs01 31883 29101 0sfs01 60984
a00df6s 33239 36882 a00df6s 70121
adfd00fd5 30727 31491 adfd00fd5 62218

需求分析

如何设计map()方法输出的key,value

MR框架中shuffle阶段的排序是默认行为,不管你是否需要都会进行排序。

key:把所有字段封装成为一个bean对象,并且指定bean对象作为key输出,如果作为key输出,需要实现排序接口,指定自己的排序规则;

在博客MapReduce编程规范及示例编写中maven工程wordcount里编写案例。

  • 创建SpeakBean对象,Bean对象实现WritableComparable接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    package com.lagou.mr.sort;

    import org.apache.hadoop.io.WritableComparable;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.util.Objects;

    //因为这个类的实例对象要作为map输出的key,所以要实现writablecomparalbe接口
    public class SpeakBean implements WritableComparable<SpeakBean> {
    //定义属性
    private Long selfDrutation;//自有内容播放时长
    private Long thirdPartDuration;//第三方内容播放时长
    private String deviceId;//设备id
    private Long sumDuration;//总时长

    //准备构造方法
    public SpeakBean() {
    }

    public SpeakBean(Long selfDrutation, Long thirdPartDuration, String deviceId, Long sumDuration) {
    this.selfDrutation = selfDrutation;
    this.thirdPartDuration = thirdPartDuration;
    this.deviceId = deviceId;
    this.sumDuration = sumDuration;
    }

    public Long getSelfDrutation() {
    return selfDrutation;
    }

    public void setSelfDrutation(Long selfDrutation) {
    this.selfDrutation = selfDrutation;
    }

    public Long getThirdPartDuration() {
    return thirdPartDuration;
    }

    public void setThirdPartDuration(Long thirdPartDuration) {
    this.thirdPartDuration = thirdPartDuration;
    }

    public String getDeviceId() {
    return deviceId;
    }

    public void setDeviceId(String deviceId) {
    this.deviceId = deviceId;
    }

    public Long getSumDuration() {
    return sumDuration;
    }

    public void setSumDuration(Long sumDuration) {
    this.sumDuration = sumDuration;
    }

    //序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
    out.writeLong(selfDrutation);
    out.writeLong(thirdPartDuration);
    out.writeUTF(deviceId);
    out.writeLong(sumDuration);
    }

    //反序列化方法
    @Override
    public void readFields(DataInput in) throws IOException {
    this.selfDrutation = in.readLong();
    this.thirdPartDuration = in.readLong();
    this.deviceId = in.readUTF();
    this.sumDuration = in.readLong();
    }

    //指定排序规则,我们希望按照总时长进行排序
    @Override
    public int compareTo(SpeakBean o) { //返回值三种:0:相等 1:小于 -1:大于
    System.out.println("compareTo 方法执行了。。。");
    //指定按照bean对象的总时长字段的值进行比较
    if (this.sumDuration > o.sumDuration) {
    return -1;
    } else if (this.sumDuration < o.sumDuration) {
    return 1;
    } else {
    return 0; //加入第二个判断条件,二次排序
    }
    }

    @Override
    public boolean equals(Object o) {
    System.out.println("equals方法执行了。。。");
    return super.equals(o);
    }

    @Override
    public int hashCode() {
    return Objects.hash(getSelfDrutation(), getThirdPartDuration(), getDeviceId(), getSumDuration());
    }

    @Override
    public String toString() {
    return selfDrutation +
    "\t" + thirdPartDuration +
    "\t" + deviceId + '\t' +
    sumDuration
    ;
    }
    }
  • 编写Mapper类,名称为SortMapper。

    1. 读取结果文件,按照制表符进行切分
    2. 解析出相应字段封装为SpeakBean
    3. SpeakBean实现WritableComparable接口重写compareTo方法
    4. map()方法输出kv;key–>SpeakBean,value–>NullWritable.get()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    package com.lagou.mr.sort;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.awt.image.BandCombineOp;
    import java.io.IOException;

    public class SortMapper extends Mapper<LongWritable, Text, SpeakBean, NullWritable> {
    final SpeakBean bean = new SpeakBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    //1 读取一行文本,转为字符串,切分
    final String[] fields = value.toString().split("\t");
    //2 解析出各个字段封装成SpeakBean对象
    bean.setDeviceId(fields[0]);
    bean.setSelfDrutation(Long.parseLong(fields[1]));
    bean.setThirdPartDuration(Long.parseLong(fields[2]));
    bean.setSumDuration(Long.parseLong(fields[4]));
    //3 SpeakBean作为key输出
    context.write(bean, NullWritable.get());
    }
    }
  • 编写Reducer类,名称为SortReducer。

    1. 循环遍历输出
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    package com.lagou.mr.sort;

    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;

    import java.io.IOException;

    public class SortReducer extends Reducer<SpeakBean, NullWritable, SpeakBean, NullWritable> {
    //reduce方法的调用是相同key的value组成一个集合调用一次
    /*
    java中如何判断两个对象是否相等?
    根据equals方法,比较还是地址值
    */
    @Override
    protected void reduce(SpeakBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
    //讨论按照总流量排序这件事情,还需要在reduce端处理吗?因为之前已经利用mr的shuffle对数据进行了排序
    //为了避免前面compareTo方法导致总流量相等被当成对象相等,而合并了key,所以遍历values获取每个key(bean对象)
    for (NullWritable value : values) { //遍历value同时,key也会随着遍历。
    context.write(key, value);
    }
    }
    }
  • 编写Driver类,名称为SortDriver。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.lagou.mr.sort;

import com.lagou.mr.wc.WordCountDriver;
import com.lagou.mr.wc.WordCountMapper;
import com.lagou.mr.wc.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SortDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
/*
1. 获取配置文件对象,获取job对象实例
2. 指定程序jar的本地路径
3. 指定Mapper/Reducer类
4. 指定Mapper输出的kv数据类型
5. 指定最终输出的kv数据类型
6. 指定job处理的原始数据路径
7. 指定job输出结果路径
8. 提交作业
*/
//1. 获取配置文件对象,获取job对象实例
final Configuration conf = new Configuration();

final Job job = Job.getInstance(conf, "SortDriver");
//2. 指定程序jar的本地路径
job.setJarByClass(SortDriver.class);
//3. 指定Mapper/Reducer类
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
//4. 指定Mapper输出的kv数据类型
job.setMapOutputKeyClass(SpeakBean.class);
job.setMapOutputValueClass(NullWritable.class);
//5. 指定最终输出的kv数据类型
job.setOutputKeyClass(SpeakBean.class);
job.setOutputValueClass(NullWritable.class);

//指定reduceTask的数量,默认是1个
job.setNumReduceTasks(1);
//6. 指定job处理的原始数据路径
//import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
//import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
FileInputFormat.setInputPaths(job, new Path("E:\\speak\\out")); //指定读取数据的原始路径
//7. 指定job输出结果路径
FileOutputFormat.setOutputPath(job, new Path("e:\\speak\\sortout")); //指定结果数据输出路径

//8. 提交作业
final boolean flag = job.waitForCompletion(true);
//jvm退出:正常退出0,非0值则是错误退出
System.exit(flag ? 0 : 1);
}
}

总结

  1. 自定义对象作为Map的key输出时,需要实现WritableComparable接口,排序:重写compareTo()方法,序列以及反序列化方法
  2. 再次理解reduce()方法的参数;reduce()方法是map输出的kv中key相同的kv中的v组成一个集合调用一次reduce()方法,选择遍历values得到所有的key.
  3. 默认reduceTask数量是1个;
  4. 对于全局排序需要保证只有一个reduceTask!!

分区排序(默认的分区规则,区内有序)

GroupingComparator

GroupingComparator是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定义GroupingComparator实现不同的key作为同一个组,调用一次reduce逻辑。

  • 需求,需要求出每一个订单中成交金额最大的一笔交易。原始数据如下图

  • 实现步骤

在博客MapReduce编程规范及示例编写中maven工程wordcount里编写案例。

  1. 编写OrderBean类

OrderBean定义两个字段,一个字段是orderId,第二个字段是金额(注意金额一定要使用Double或者DoubleWritable类型,否则没法按照金额顺序排序)。排序规则指定为先按照订单Id排序,订单Id相等再按照金额降序排!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.lagou.mr.group;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {

private String orderId;//订单id
private Double price;//金额


public OrderBean(String orderId, Double price) {
this.orderId = orderId;
this.price = price;
}

public OrderBean() {
}

public String getOrderId() {
return orderId;
}

public void setOrderId(String orderId) {
this.orderId = orderId;
}

public Double getPrice() {
return price;
}

public void setPrice(Double price) {
this.price = price;
}

//指定排序规则,先按照订单id比较再按照金额比较,按照金额降序排
@Override
public int compareTo(OrderBean o) {
// int res = this.orderId.compareTo(o.getOrderId()); //0 1 -1
// if (res == 0) {
// //订单id相同,比较金额
// res = - this.price.compareTo(o.getPrice());
//
// }

int res = - this.price.compareTo(o.getPrice());
System.out.println(res);
return res;
}

//序列化
@Override
public void write(DataOutput out) throws IOException {

out.writeUTF(orderId);
out.writeDouble(price);
}

//反序列化
@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.price = in.readDouble();
}

//重写toString()

@Override
public String toString() {
return orderId + '\t' +
price
;
}
}
  1. 自定义分区器,编写CustomPartitioner类。

保证ID相同的订单去往同个分区最终去往同一个Reduce中

1
2
3
4
5
6
7
8
9
10
11
12
13
package com.lagou.mr.group;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class CustomPartitioner extends Partitioner<OrderBean, NullWritable> {
@Override
public int getPartition(OrderBean orderBean, NullWritable nullWritable, int numPartitions) {
//希望订单id相同的数据进入同个分区

return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
  1. 自定义CustomGroupingComparator

保证id相同的订单进入一个分组中,进入分组的数据已经是按照金额降序排序。reduce()方法取出第一个即是金额最高的交易

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.lagou.mr.group;

import com.sun.corba.se.impl.orb.ParserTable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class CustomGroupingComparator extends WritableComparator {

public CustomGroupingComparator() {
super(OrderBean.class, true); //注册自定义的GroupingComparator接受OrderBean对象
}

//重写其中的compare方法,通过这个方法来让mr接受orderid相等则两个对象相等的规则,key相等

@Override
public int compare(WritableComparable a, WritableComparable b) { //a 和b是orderbean的对象
//比较两个对象的orderid
final OrderBean o1 = (OrderBean) a;
final OrderBean o2 = (OrderBean) b;
final int i = o1.getOrderId().compareTo(o2.getOrderId());
return i; // 0 1 -1
}
}
  1. 编写Mapper类,GroupMapper

读取一行文本数据,切分出每个字段;
订单id和金额封装为一个Bean对象,Bean对象的排序规则指定为先按照订单Id排序,订单Id 相等再按照金额降序排;
map()方法输出kv;key–>bean对象,value–>NullWritable.get();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.lagou.mr.group;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class GroupMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
OrderBean bean = new OrderBean();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
final String[] fields = value.toString().split("\t");
//订单id与jine封装为一个orderBean
bean.setOrderId(fields[0]);
bean.setPrice(Double.parseDouble(fields[2]));
context.write(bean, NullWritable.get());
}
}
  1. 编写Reducer类,GroupReducer

每个reduce()方法写出一组key的第一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.lagou.mr.group;

import org.apache.avro.Schema;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class GroupReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {

//key:reduce方法的key注意是一组相同key的kv的第一个key作为传入reduce方法的key,因为我们已经指定了排序的规则
//按照金额降序排列,则第一个key就是金额最大的交易数据
//value:一组相同key的kv对中v的集合
//对于如何判断key是否相同,自定义对象是需要我们指定一个规则,这个规则通过Groupingcomaprator来指定
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
//直接输出key就是金额最大的交易
context.write(key, NullWritable.get());
}
}
  1. 编写Driver类,GroupDriver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.lagou.mr.group;

import com.lagou.mr.wc.WordCountDriver;
import com.lagou.mr.wc.WordCountMapper;
import com.lagou.mr.wc.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class GroupDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
/*
1. 获取配置文件对象,获取job对象实例
2. 指定程序jar的本地路径
3. 指定Mapper/Reducer类
4. 指定Mapper输出的kv数据类型
5. 指定最终输出的kv数据类型
6. 指定job处理的原始数据路径
7. 指定job输出结果路径
8. 提交作业
*/
// 1. 获取配置文件对象,获取job对象实例
final Configuration conf = new Configuration();

final Job job = Job.getInstance(conf, "GroupDriver");
// 2. 指定程序jar的本地路径
job.setJarByClass(GroupDriver.class);
// 3. 指定Mapper/Reducer类
job.setMapperClass(GroupMapper.class);
job.setReducerClass(GroupReducer.class);
// 4. 指定Mapper输出的kv数据类型
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
// 5. 指定最终输出的kv数据类型
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);

//指定分区器
job.setPartitionerClass(CustomPartitioner.class);
//指定使用groupingcomparator
job.setGroupingComparatorClass(CustomGroupingComparator.class);
FileInputFormat.setInputPaths(job, new Path("E:\\teach\\hadoop框架\\资料\\data\\GroupingComparator")); //指定读取数据的原始路径
// 7. 指定job输出结果路径
FileOutputFormat.setOutputPath(job, new Path("E:\\group\\out3")); //指定结果数据输出路径

//指定reducetask的数量,不要使用默认的一个,分区效果不明显
job.setNumReduceTasks(2);
// 8. 提交作业
final boolean flag = job.waitForCompletion(true);
//jvm退出:正常退出0,非0值则是错误退出
System.exit(flag ? 0 : 1);

}
}