基本序列化类型往往不能满足所有需求,比如在Hadoop框架内部传递一个自定义bean对象,那么该对象就需要实现Writable序列化接口。

实现Writable序列化步骤

  1. 必须实现Writable接口。

  2. 反序列化时,需要反射调用空参构造函数,所以必须有空参构造。

1
2
3
public CustomBean() {
super();
}
  1. 重写序列化方法。
1
2
3
4
@Override
public void write(DataOutput out) throws IOException {
....
}
  1. 重写反序列化方法。
1
2
3
4
@Override
public void readFields(DataInput in) throws IOException {
....
}
  1. 反序列化的字段顺序和序列化字段的顺序必须完全一致。

  2. 方便展示结果数据,需要重写bean对象的toString()方法,可以自定义分隔符。

  3. 如果自定义Bean对象需要放在Mapper输出KV中的K,则该对象还需实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序!

1
2
3
4
5
@Override
public int compareTo(CustomBean o) {
// 自定义排序规则
return this.num > o.getNum() ? -1 : 1;
}

Writable接口案例

需求

统计每台智能音箱设备内容播放时长。

原始日志格式如下

1
2
3
4
5
6
7
8
9
10
11
001     001577c3   kar_890809             120.196.100.99    1116              954                   200
002 001577c3 kar_890809 120.196.100.99 1116 954 200
003 001577c3 kar_890809 120.196.100.99 1116 954 200
004 001577c3 kar_890809 120.196.100.99 1116 954 200
005 001577c3 kar_890809 120.196.100.99 1116 954 200
006 001577c3 kar_890809 120.196.100.99 1116 954 200
007 001577c3 kar_890809 120.196.100.99 1116 954 200
008 001577c3 kar_890809 120.196.100.99 1116 954 200
009 001577c3 kar_890809 120.196.100.99 1116 954 200
010 001577c3 kar_890809 120.196.100.99 1116 954 200
日志id 设备id appkey(合作硬件厂商) 网络ip 自有内容时长(秒) 第三方内容时长(秒) 网络状态码

输出结果

1
2
001577c3    11160                9540                   20700
设备id 自有内容时长(秒) 第三方内容时长(秒) 网络状态码

mr编程总结:

map()方法输出的kv以及具体类型如何确定?

mr中map()方法输出的kv中的key如果是相同key则会去往同个reduce调用reduce方法。

reduce()方法中输入参数kv,key:map()方法输出的某个key,而value:value是一个集合,这个集合中的数据都来自map的输出的kv,而且是k相同的所有kv对的value集合在一起。

编写MapReduce程序

在下一篇博客MapReduce编程规范及示例编写中创建的maven工程中编写,或按照相同的pom文件和log4j.properties创建新的maven工程。

  1. 创建SpeakBean对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package com.lagou.mr.speak;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

//这个类型是map输出kv中value的类型,需要实现writable序列化接口
public class SpeakBean implements Writable {
//定义属性
private Long selfDuration;//自有内容时长
private Long thirdPartDuration;//第三方内容时长
private String deviceId;//设备id
private Long sumDuration;//总时长

public Long getSelfDuration() {
return selfDuration;
}

public void setSelfDuration(Long selfDuration) {
this.selfDuration = selfDuration;
}

public Long getThirdPartDuration() {
return thirdPartDuration;
}

public void setThirdPartDuration(Long thirdPartDuration) {
this.thirdPartDuration = thirdPartDuration;
}

public String getDeviceId() {
return deviceId;
}

public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}

public Long getSumDuration() {
return sumDuration;
}

public void setSumDuration(Long sumDuration) {
this.sumDuration = sumDuration;
}

//准备一个空参构造
public SpeakBean() {
}

//有参构造
public SpeakBean(Long selfDuration, Long thirdPartDuration, String deviceId) {
this.selfDuration = selfDuration;
this.thirdPartDuration = thirdPartDuration;
this.deviceId = deviceId;
this.sumDuration = this.selfDuration + this.thirdPartDuration;
}

//反序列化的字段顺序和序列化字段的顺序必须完全一致
//序列化方法:就是把内容输出到网络或者文本中
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(selfDuration);
out.writeLong(thirdPartDuration);
out.writeUTF(deviceId);
out.writeLong(sumDuration);
}

//反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
this.selfDuration = in.readLong();//自由时长
this.thirdPartDuration = in.readLong();//第三方时长
this.deviceId = in.readUTF();//设备id
this.sumDuration = in.readLong();//总时长
}

//为了方便观察数据,重写toString()方法
@Override
public String toString() {
return
selfDuration +
"\t" + thirdPartDuration +
"\t" + deviceId + "\t" + sumDuration;
}
}
  1. 编写Mapper类,名称为SpeakMapper。

    • 读取一行文本数据,按照制表符切分。
    • 抽取出自由内容时长,第三方内容时长,设备id。
    • 输出:key–>设备id,value封装一个bean对象,bean对象携带自由时长,第三方内容时长,设备id。
    • 自定义bean对象作为value输出,需要实现writeable序列化接口。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.lagou.mr.speak;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

//四个参数:分为两对kv
//第一对kv:map输入参数的kv类型;k-->一行文本偏移量,v-->一行文本内容
//第二对kv:map输出参数kv类型;k-->map输出的key类型,v:map输出的value类型
public class SpeakMapper extends Mapper<LongWritable, Text, Text, SpeakBean> {
/*
1 转换接收到的text数据为String
2 按照制表符进行切分;得到自有内容时长,第三方内容时长,设备id,封装为SpeakBean
3 直接输出:k-->设备id,value-->speakbean
*/
Text device_id = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1 转换接收到的text数据为String
final String line = value.toString();
//2 按照制表符进行切分;得到自有内容时长,第三方内容时长,设备id,封装为SpeakBean
final String[] fields = line.split("\t");
//自有内容时长
String selfDuration = fields[fields.length - 3];
//第三方内容时长
String thirdPartDuration = fields[fields.length - 2];
//设备id
String deviceId = fields[1];
final SpeakBean bean = new SpeakBean(Long.parseLong(selfDuration), Long.parseLong(thirdPartDuration), deviceId);
//3 直接输出:k-->设备id,value-->speakbean
device_id.set(deviceId);
context.write(device_id, bean);
}
}
  1. 编写Reducer类,名称为SpeakReducer。

    • 在reduce方法中直接遍历迭代器,累加时长然后输出即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.lagou.mr.speak;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SpeakReducer extends Reducer<Text, SpeakBean, Text, SpeakBean> {
@Override
protected void reduce(Text key, Iterable<SpeakBean> values, Context context) throws IOException, InterruptedException {
//定义时长累加的初始值
Long self_duration = 0L;
Long third_part_duration = 0L;

//reduce方法的key:map输出的某一个key
//reduce方法的value:map输出的kv对中相同key的value组成的一个集合
//reduce 逻辑:遍历迭代器累加时长即可
for (SpeakBean bean : values) {
final Long selfDuration = bean.getSelfDuration();
final Long thirdPartDuration = bean.getThirdPartDuration();
self_duration += selfDuration;
third_part_duration += thirdPartDuration;
}
//输出,封装成一个bean对象输出
final SpeakBean bean = new SpeakBean(self_duration, third_part_duration, key.toString());
context.write(key, bean);
}
}
  1. 编写Driver类,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.lagou.mr.speak;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SpeakDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
final Configuration conf = new Configuration();
final Job job = Job.getInstance(conf, "speakDriver");
//设置jar包本地路径
job.setJarByClass(SpeakDriver.class);
//使用的mapper和reducer
job.setMapperClass(SpeakMapper.class);
job.setReducerClass(SpeakReducer.class);
//map的输出kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(SpeakBean.class);
//设置reduce输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(SpeakBean.class);
//读取的数据路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交任务
final boolean flag = job.waitForCompletion(true);
System.exit(flag ? 0 : 1);
}
}

mr编程技巧总结

  • 结合业务设计Map输出的key和v,利用key相同则去往同一个reduce的特点!

  • map()方法中获取到只是一行文本数据尽量不做聚合运算。

  • reduce()方法的参数要清楚含义。