基本序列化类型往往不能满足所有需求,比如在Hadoop框架内部传递一个自定义bean对象,那么该对象就需要实现Writable序列化接口。
实现Writable序列化步骤
必须实现Writable接口。
反序列化时,需要反射调用空参构造函数,所以必须有空参构造。
1 2 3 public CustomBean() { super(); }
重写序列化方法。
1 2 3 4 @Override public void write(DataOutput out) throws IOException { .... }
重写反序列化方法。
1 2 3 4 @Override public void readFields(DataInput in) throws IOException { .... }
反序列化的字段顺序和序列化字段的顺序必须完全一致。
方便展示结果数据,需要重写bean对象的toString()方法,可以自定义分隔符。
如果自定义Bean对象需要放在Mapper输出KV中的K,则该对象还需实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序!
1 2 3 4 5 @Override public int compareTo(CustomBean o) { // 自定义排序规则 return this.num > o.getNum() ? -1 : 1; }
Writable接口案例
需求
统计每台智能音箱设备内容播放时长。
原始日志格式如下
1 2 3 4 5 6 7 8 9 10 11 001 001577c3 kar_890809 120.196.100.99 1116 954 200 002 001577c3 kar_890809 120.196.100.99 1116 954 200 003 001577c3 kar_890809 120.196.100.99 1116 954 200 004 001577c3 kar_890809 120.196.100.99 1116 954 200 005 001577c3 kar_890809 120.196.100.99 1116 954 200 006 001577c3 kar_890809 120.196.100.99 1116 954 200 007 001577c3 kar_890809 120.196.100.99 1116 954 200 008 001577c3 kar_890809 120.196.100.99 1116 954 200 009 001577c3 kar_890809 120.196.100.99 1116 954 200 010 001577c3 kar_890809 120.196.100.99 1116 954 200 日志id 设备id appkey(合作硬件厂商) 网络ip 自有内容时长(秒) 第三方内容时长(秒) 网络状态码
输出结果
1 2 001577c3 11160 9540 20700 设备id 自有内容时长(秒) 第三方内容时长(秒) 网络状态码
mr编程总结:
map()方法输出的kv以及具体类型如何确定?
mr中map()方法输出的kv中的key如果是相同key则会去往同个reduce调用reduce方法。
reduce()方法中输入参数kv,key:map()方法输出的某个key,而value:value是一个集合,这个集合中的数据都来自map的输出的kv,而且是k相同的所有kv对的value集合在一起。
编写MapReduce程序
在下一篇博客MapReduce编程规范及示例编写中创建的maven工程中编写,或按照相同的pom文件和log4j.properties创建新的maven工程。
创建SpeakBean对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 package com.lagou.mr.speak; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; //这个类型是map输出kv中value的类型,需要实现writable序列化接口 public class SpeakBean implements Writable { //定义属性 private Long selfDuration;//自有内容时长 private Long thirdPartDuration;//第三方内容时长 private String deviceId;//设备id private Long sumDuration;//总时长 public Long getSelfDuration() { return selfDuration; } public void setSelfDuration(Long selfDuration) { this.selfDuration = selfDuration; } public Long getThirdPartDuration() { return thirdPartDuration; } public void setThirdPartDuration(Long thirdPartDuration) { this.thirdPartDuration = thirdPartDuration; } public String getDeviceId() { return deviceId; } public void setDeviceId(String deviceId) { this.deviceId = deviceId; } public Long getSumDuration() { return sumDuration; } public void setSumDuration(Long sumDuration) { this.sumDuration = sumDuration; } //准备一个空参构造 public SpeakBean() { } //有参构造 public SpeakBean(Long selfDuration, Long thirdPartDuration, String deviceId) { this.selfDuration = selfDuration; this.thirdPartDuration = thirdPartDuration; this.deviceId = deviceId; this.sumDuration = this.selfDuration + this.thirdPartDuration; } //反序列化的字段顺序和序列化字段的顺序必须完全一致 //序列化方法:就是把内容输出到网络或者文本中 @Override public void write(DataOutput out) throws IOException { out.writeLong(selfDuration); out.writeLong(thirdPartDuration); out.writeUTF(deviceId); out.writeLong(sumDuration); } //反序列化方法 @Override public void readFields(DataInput in) throws IOException { this.selfDuration = in.readLong();//自由时长 this.thirdPartDuration = in.readLong();//第三方时长 this.deviceId = in.readUTF();//设备id this.sumDuration = in.readLong();//总时长 } //为了方便观察数据,重写toString()方法 @Override public String toString() { return selfDuration + "\t" + thirdPartDuration + "\t" + deviceId + "\t" + sumDuration; } }
编写Mapper类,名称为SpeakMapper。
读取一行文本数据,按照制表符切分。
抽取出自由内容时长,第三方内容时长,设备id。
输出:key–>设备id,value封装一个bean对象,bean对象携带自由时长,第三方内容时长,设备id。
自定义bean对象作为value输出,需要实现writeable序列化接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package com.lagou.mr.speak; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; //四个参数:分为两对kv //第一对kv:map输入参数的kv类型;k-->一行文本偏移量,v-->一行文本内容 //第二对kv:map输出参数kv类型;k-->map输出的key类型,v:map输出的value类型 public class SpeakMapper extends Mapper<LongWritable, Text, Text, SpeakBean> { /* 1 转换接收到的text数据为String 2 按照制表符进行切分;得到自有内容时长,第三方内容时长,设备id,封装为SpeakBean 3 直接输出:k-->设备id,value-->speakbean */ Text device_id = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1 转换接收到的text数据为String final String line = value.toString(); //2 按照制表符进行切分;得到自有内容时长,第三方内容时长,设备id,封装为SpeakBean final String[] fields = line.split("\t"); //自有内容时长 String selfDuration = fields[fields.length - 3]; //第三方内容时长 String thirdPartDuration = fields[fields.length - 2]; //设备id String deviceId = fields[1]; final SpeakBean bean = new SpeakBean(Long.parseLong(selfDuration), Long.parseLong(thirdPartDuration), deviceId); //3 直接输出:k-->设备id,value-->speakbean device_id.set(deviceId); context.write(device_id, bean); } }
编写Reducer类,名称为SpeakReducer。
在reduce方法中直接遍历迭代器,累加时长然后输出即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package com.lagou.mr.speak; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SpeakReducer extends Reducer<Text, SpeakBean, Text, SpeakBean> { @Override protected void reduce(Text key, Iterable<SpeakBean> values, Context context) throws IOException, InterruptedException { //定义时长累加的初始值 Long self_duration = 0L; Long third_part_duration = 0L; //reduce方法的key:map输出的某一个key //reduce方法的value:map输出的kv对中相同key的value组成的一个集合 //reduce 逻辑:遍历迭代器累加时长即可 for (SpeakBean bean : values) { final Long selfDuration = bean.getSelfDuration(); final Long thirdPartDuration = bean.getThirdPartDuration(); self_duration += selfDuration; third_part_duration += thirdPartDuration; } //输出,封装成一个bean对象输出 final SpeakBean bean = new SpeakBean(self_duration, third_part_duration, key.toString()); context.write(key, bean); } }
编写Driver类,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package com.lagou.mr.speak; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class SpeakDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { final Configuration conf = new Configuration(); final Job job = Job.getInstance(conf, "speakDriver"); //设置jar包本地路径 job.setJarByClass(SpeakDriver.class); //使用的mapper和reducer job.setMapperClass(SpeakMapper.class); job.setReducerClass(SpeakReducer.class); //map的输出kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(SpeakBean.class); //设置reduce输出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(SpeakBean.class); //读取的数据路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //提交任务 final boolean flag = job.waitForCompletion(true); System.exit(flag ? 0 : 1); } }
mr编程技巧总结