需求

现在有一些订单的评论数据,数据内容如下,其中数据第九个字段表示好评,中评,差评。0:好评,1:中评,2:差评。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
300 东西很不错,物流也很快 \N 1 106 131******33 0 2019-02-06 19:10:13
301 还行,洗完有点干,不知道怎么回事 \N 1 106 136******44 0 2019-03-22 14:16:41
302 还可以吧,保质期短,感觉貌似更天然些 \N 1 106 134******34 0 2019-04-10 13:40:06
303 还可以吧,保质期短,感觉貌似更天然些 \N 1 105 134******33 0 2019-01-15 14:40:21
304 还没用,,不知道效果怎么样 \N 1 105 137******66 0 2019-02-28 18:55:43
305 刚收到,还没用,用后再追评!不过,听朋友说好用,才买的! \N 1 105 138******60 0 2019-03-13 19:10:09
306 一般,感觉用着不是很好,可能我头发太干了 \N 1 105 132******44 0 2019- 04-09 10:35:49
307 非常好用,之前买了10支,这次又买了10支,不错,会继续支持! \N 1 103 131******33 0 2019-01-15 13:10:46
308 喜欢茶树油的 \N 1 103 135******33 0 2019-02-08 14:35:09
309 好像比其他的强一些,继续使用中 \N 1 103 133******99 0 2019-03-14 19:55:36
310 感觉洗后头发很干净,头皮有一定改善。 \N 1 103 138******44 0 2019-04-09 22:55:59
311 从出生到现在一直都是惠氏 现在宝宝两周半了 \N 1 157 那***情 0 2017-12- 01 06:05:30
312 口感不错,孩子很喜欢。推荐。 \N 1 157 w***4 0 2017-12-12 08:35:06
313 价格优惠,日期新鲜,包装完好!发货速度快,非常喜欢!还有赠品! \N 1 157 j***0 0 2019-01-09 22:55:41

现在有大量类似上面的小文件,需要根据好评,中评,差评把数据分类并输出到不同的目录中,并且要求按照时间顺序降序排列。

分析

  1. 自定义InputFormat合并小文件

  2. 自定义分区根据评论等级把数据分区

  3. 自定义OutputFormat把数据输出到多个目录

开发

合并小文件

在博客MapReduce编程规范及示例编写中maven工程wordcount里编写案例,创建step1包。

  1. 编写MergeInputFormat类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.lagou.mr.comment.step1;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

//自定义inputformat读取多个小文件合并为一个SequenceFile文件
//SequenceFile文件中以kv形式存储文件,key--》文件路径+文件名称,value-->文件的整个内容
public class MergeInputFormat extends FileInputFormat<Text, BytesWritable> {
//重写是否可切分
@Override
protected boolean isSplitable(JobContext context, Path filename) {
//对于当前需求,不需要把文件切分,保证一个切片就是一个文件
return false;
}

//recordReader就是用来读取数据的对象
@Override
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
MergeRecordReader recordReader = new MergeRecordReader();
//调用recordReader的初始化方法
recordReader.initialize(split, context);
return recordReader;
}
}
  1. 编写MergeRecordReader类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.lagou.mr.comment.step1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

//负责读取数据,一次读取整个文件内容,封装成kv输出
public class MergeRecordReader extends RecordReader<Text, BytesWritable> {
private FileSplit split;
//hadoop配置文件对象
private Configuration conf;
//定义key,value的成员变量
private Text key = new Text();
private BytesWritable value = new BytesWritable();

//初始化方法,把切片以及上下文提升为全局
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.split = (FileSplit) split;
conf = context.getConfiguration();
}

private Boolean flag = true;
//用来读取数据的方法
@Override public boolean nextKeyValue() throws IOException, InterruptedException {
//对于当前split来说只需要读取一次即可,因为一次就把整个文件全部读取了。
if (flag) {
//准备一个数组存放读取到的数据,数据大小是多少?
byte[] content = new byte[(int) split.getLength()];
//获取切片的path信息
final Path path = split.getPath();
//获取到文件系统对象
final FileSystem fs = path.getFileSystem(conf);
//获取到输入流
final FSDataInputStream fis = fs.open(path);
//读取数据并把数据 放入byte[]
IOUtils.readFully(fis, content, 0, content.length);
// 封装key和value
key.set(path.toString());
value.set(content, 0, content.length);
//把再次读取的开关置为false
IOUtils.closeStream(fis);
flag = false;
return true;
}
return false;
}

//获取到key
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}

//获取到value
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}

//获取进度
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}

//关闭资源
@Override
public void close() throws IOException {

}
}
  1. 编写MergeMapper类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.lagou.mr.comment.step1;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

//text:代表的是一个文件的path+名称,BytesWritable:一个文件的内容
public class MergeMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {
@Override
protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
  1. 编写MergeReducer类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.lagou.mr.comment.step1;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class MergeReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
@Override
protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
//输出value值(文件内容),只获取其中第一个即可(只有一个)
context.write(key, values.iterator().next());
}
}
  1. 编写MergeDriver类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.lagou.mr.comment.step1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import java.io.IOException;

public class MergeDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1. 获取配置文件对象,获取job对象实例
final Configuration conf = new Configuration();
final Job job = Job.getInstance(conf, "MergeDriver");
// 2. 指定程序jar的本地路径
job.setJarByClass(MergeDriver.class);
// 3. 指定Mapper/Reducer类
job.setMapperClass(MergeMapper.class);
job.setReducerClass(MergeReducer.class);
// 4. 指定Mapper输出的kv数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
// 5. 指定最终输出的kv数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
//设置使用自定义InputFormat读取数据
job.setInputFormatClass(MergeInputFormat.class);
FileInputFormat.setInputPaths(job, new Path("E:\\bigdatafile\\input\\step1"));
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// 7. 指定job输出结果路径
FileOutputFormat.setOutputPath(job, new Path("E:\\bigdatafile\\output\\step1"));
// 8. 提交作业
final boolean flag = job.waitForCompletion(true);
//jvm退出:正常退出0,非0值则是错误退出
System.exit(flag ? 0 : 1);
}
}

分区排序多目录输出

在博客MapReduce编程规范及示例编写中maven工程wordcount里编写案例,创建step2包。

  1. 编写CommentMapper类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.lagou.mr.comment.step2;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

//第一对kv:使用SequenceFileinputformat读取,所以key:Text,Value:BytesWritable(原因是生 成sequencefile文件指定就是这种类型)
public class CommentMapper extends Mapper<Text, BytesWritable, CommentBean, NullWritable> {
//key就是文件名 //value:一个文件的完整内容
@Override
protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
//切分区每一行
String str = new String(value.getBytes());
String[] lines = str.split("\n");
for (String line : lines) {
CommentBean commentBean = parseStrToCommentBean(line);
if (null != commentBean) {
context.write(commentBean, NullWritable.get());
}
}
}

//切分字符串封装成commentbean对象
public CommentBean parseStrToCommentBean(String line) {
if (StringUtils.isNotBlank(line)) {
//每一行进行切分
String[] fields = line.split("\t");
if (fields.length >= 9) {
return new CommentBean(fields[0], fields[1], fields[2], Integer.parseInt(fields[3]), fields[4], fields[5], fields[6], Integer.parseInt(fields[7]), fields[8]);
}else {
return null;
}
}
return null;
}
}
  1. 编写CommentBean类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package com.lagou.mr.comment.step2;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class CommentBean implements WritableComparable<CommentBean> {
private String orderId;
private String comment;
private String commentExt;
private int goodsNum;
private String phoneNum;
private String userName;
private String address;
private int commentStatus;
private String commentTime;

@Override
public String toString() {
return orderId+"\t"+comment+"\t"+commentExt+"\t"+goodsNum+"\t"+phoneNum+"\t"+userName+" \t"+address+"\t"+commentStatus+"\t"+commentTime;
}

//无参构造
public CommentBean() {

}

public CommentBean(String orderId, String comment, String commentExt, int goodsNum, String phoneNum, String userName, String address, int commentStatus, String commentTime) {
this.orderId = orderId;
this.comment = comment;
this.commentExt = commentExt;
this.goodsNum = goodsNum;
this.phoneNum = phoneNum;
this.userName = userName;
this.address = address;
this.commentStatus = commentStatus;
this.commentTime = commentTime;
}

public String getOrderId() {
return orderId;
}

public void setOrderId(String orderId) {
this.orderId = orderId;
}

public String getComment() {
return comment;
}

public void setComment(String comment) {
this.comment = comment;
}

public String getCommentExt() {
return commentExt;
}

public void setCommentExt(String commentExt) {
this.commentExt = commentExt;
}

public int getGoodsNum() {
return goodsNum;
}

public void setGoodsNum(int goodsNum) {
this.goodsNum = goodsNum;
}

public String getPhoneNum() {
return phoneNum;
}

public void setPhoneNum(String phoneNum) {
this.phoneNum = phoneNum;
}

public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}

public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public int getCommentStatus() {
return commentStatus;
}
public void setCommentStatus(int commentStatus) {
this.commentStatus = commentStatus;
}
public String getCommentTime() {
return commentTime;
}
public void setCommentTime(String commentTime) {
this.commentTime = commentTime;
}
//定义排序规则,按照时间降序;0,1,-1
@Override
public int compareTo(CommentBean o) {
return o.getCommentTime().compareTo(this.commentTime);
}

//序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeUTF(comment);
out.writeUTF(commentExt);
out.writeInt(goodsNum);
out.writeUTF(phoneNum);
out.writeUTF(userName);
out.writeUTF(address);
out.writeInt(commentStatus);
out.writeUTF(commentTime);
}

//反序列化
@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.comment = in.readUTF();
this.commentExt = in.readUTF();
this.goodsNum = in.readInt();
this.phoneNum = in.readUTF();
this.userName = in.readUTF();
this.address = in.readUTF();
this.commentStatus = in.readInt();
this.commentTime = in.readUTF();
}
}
  1. 编写CommentPartitioner类
1
2
3
4
5
6
7
8
9
10
11
12
package com.lagou.mr.comment.step2;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class CommentPartitioner extends Partitioner<CommentBean, NullWritable> {
@Override
public int getPartition(CommentBean commentBean, NullWritable nullWritable, int numPartitions) {
// return (commentBean.getCommentStatus() & Integer.MAX_VALUE) % numPartitions;
return commentBean.getCommentStatus(); // 0,1,2 -->对应分区编号的
}
}
  1. 编写CommentOutputFormat类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.lagou.mr.comment.step2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

//最终输出的kv类型
public class CommentOutputFormat extends FileOutputFormat<CommentBean, NullWritable> {
//负责写出数据的对象
@Override
public RecordWriter<CommentBean, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
FileSystem fs = FileSystem.get(conf);
//获取到在Driver指定的输出路径;0是好评,1是中评,2是差评
String outputDir = conf.get("mapreduce.output.fileoutputformat.outputdir");
FSDataOutputStream goodOut=null;
FSDataOutputStream commonOut=null;
FSDataOutputStream badOut=null;
//当前reducetask处理的分区编号来创建文件获取输出流
int id = job.getTaskAttemptID().getTaskID().getId(); //当前reducetask 处理的分区编号
if(id==0){
//好评数据
goodOut =fs.create(new Path(outputDir + "\\good\\good.log"));
}else if(id ==1){
//中评数据
commonOut = fs.create(new Path(outputDir + "\\common\\common.log"));
}else{
badOut = fs.create(new Path(outputDir + "\\bad\\bad.log"));
}
return new CommentRecorderWrtier(goodOut,commonOut,badOut);
}
}
  1. 编写CommentRecorderWrtier类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.lagou.mr.comment.step2;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class CommentRecorderWrtier extends RecordWriter<CommentBean, NullWritable> {
//定义写出数据的流
private FSDataOutputStream goodOut;
private FSDataOutputStream commonOut;
private FSDataOutputStream badOut;

public CommentRecorderWrtier(FSDataOutputStream goodOut, FSDataOutputStream commonOut, FSDataOutputStream badOut) {
this.goodOut = goodOut;
this.commonOut = commonOut;
this.badOut = badOut;
}

//实现把数据根据不同的评论类型输出到不同的目录下
//写出数据的逻辑
@Override
public void write(CommentBean key, NullWritable value) throws IOException, InterruptedException {
int commentStatus = key.getCommentStatus();
String beanStr = key.toString();
if (commentStatus == 0) {
goodOut.write(beanStr.getBytes());
goodOut.write("\n".getBytes());
goodOut.flush();
} else if (commentStatus == 1) {
commonOut.write(beanStr.getBytes());
commonOut.write("\n".getBytes());
commonOut.flush();
} else {
badOut.write(beanStr.getBytes());
badOut.write("\n".getBytes());
badOut.flush();
}
}

//释放资源
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeStream(goodOut);
IOUtils.closeStream(commonOut);
IOUtils.closeStream(badOut);
}
}
  1. 编写CommentReducer类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.lagou.mr.comment.step2;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class CommentReducer extends Reducer<CommentBean, NullWritable, CommentBean, NullWritable> {
@Override
protected void reduce(CommentBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
//遍历values,输出的是key;key:是一个引用地址,底层获取value同时,key的值也发生了 变化
for (NullWritable value : values) {
context.write(key, value);
}
}
}
  1. 编写CommentDriver类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.lagou.mr.comment.step2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class CommentDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "CommentDriver");
job.setJarByClass(CommentDriver.class);
job.setMapperClass(CommentMapper.class);
job.setReducerClass(CommentReducer.class);
job.setMapOutputKeyClass(CommentBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(CommentBean.class);
job.setOutputValueClass(NullWritable.class);
job.setPartitionerClass(CommentPartitioner.class);
//指定inputformat类型
job.setInputFormatClass(SequenceFileInputFormat.class);
//指定输出outputformat类型
job.setOutputFormatClass(CommentOutputFormat.class);
//指定输入,输出路径
FileInputFormat.setInputPaths(job, new Path("E:\\bigdatafile\\output\\step1"));
FileOutputFormat.setOutputPath(job, new Path("E:\\bigdatafile\\output\\step2"));
//指定reducetask的数量
job.setNumReduceTasks(3);
boolean b = job.waitForCompletion(true);
if (b) {
System.exit(0);
}
}
}

程序调优

预合并

在博客MapReduce编程规范及示例编写中maven工程wordcount里编写案例,创建step3包。

  1. 编写CombineMapper类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.lagou.mr.comment.step3;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class CombineMapper extends Mapper<LongWritable,Text, NullWritable,Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(NullWritable.get(), value);
}
}
  1. 编写CombineDriver类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.lagou.mr.comment.step3;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class CombineDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "CombineDriver");
job.setJarByClass(CombineDriver.class);
job.setMapperClass(CombineMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
//指定inputformat
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 1024 * 1024 * 4);
//指定输入,输出路径
FileInputFormat.setInputPaths(job, new Path("E:\\bigdatafile\\input\\step1"));
FileOutputFormat.setOutputPath(job, new Path("E:\\bigdatafile\\output\\merge-out"));
//指定reducetask的数量
job.setNumReduceTasks(3);
boolean b = job.waitForCompletion(true);
if (b) {
System.exit(0);
}
}
}

输出压缩

在博客MapReduce编程规范及示例编写中maven工程wordcount里的step1包的MergeDriver改造。

1
2
3
4
5
6
7
8
9
# 添加
//针对SequenceFile的压缩
//SnappyCodec压缩在本地不支持,所以采用默认类型
//SequenceFileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);
SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
//压缩类型: record压缩
SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.RECORD);
//压缩类型: block压缩
//SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);

注意,优化后的执行每个包的driver,顺序为step3->step1->step2,step3做出的压缩,step2如可识别会自动解压,不需要做处理。