6-实战案例

案例一: MR实战之小文件合并(自定义inputFormat)

1 准备

  1. 需求

无论hdfs还是MapReduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案

  1. 测试数据

  2. 分析

小文件的优化无非以下几种方式:

a) 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS

b) 在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并

c) 在MapReduce处理时,可采用combineInputFormat提高效率

2 实现

注意:本节实现的是上述第二种方式

  1. 程序的核心机制:

自定义一个InputFormat

改写RecordReader,实现一次读取一个完整文件封装为KV

在输出时使用SequenceFileOutPutFormat输出合并文件

  1. 代码如下

a) 自定义InputFromat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class WholeFileInputFormat extends
FileInputFormat<NullWritable, BytesWritable> {
//设置每个小文件不可分片,保证一个小文件生成一个key-value键值对
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}

@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
}

b) 自定义RecordReader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;

@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}

@Override
public NullWritable getCurrentKey() throws IOException,InterruptedException {
return NullWritable.get();
}

@Override
public BytesWritable getCurrentValue() throws IOException,InterruptedException {
return value;
}

@Override
public float getProgress() throws IOException {
return processed ? 1.0f : 0.0f;
}

@Override
public void close() throws IOException {
// do nothing
}
}

c) 定义MapReduce处理流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class SmallFilesToSequenceFileConverter extends Configured implements
Tool {
static class SequenceFileMapper extends
Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
private Text filenameKey;

@Override
protected void setup(Context context) throws IOException,InterruptedException {
InputSplit split = context.getInputSplit();
Path path = ((FileSplit) split).getPath();
filenameKey = new Text(path.toString());
}

@Override
protected void map(NullWritable key, BytesWritable value,Context context) throws IOException, InterruptedException {
context.write(filenameKey, value);
}
}

@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
System.setProperty("HADOOP_USER_NAME", "hdfs");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: combinefiles <in> <out>");
System.exit(2);
}

Job job = Job.getInstance(conf,"combine small files to sequencefile");
// job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
job.setMapperClass(SequenceFileMapper.class);
return job.waitForCompletion(true) ? 0 : 1;
}

public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(),args);
System.exit(exitCode);
}
}

案例二: MR实战之数据分类输出(自定义outputFormat)

1 准备

  1. 需求
1
2
3
4
5
6
7
现有一些原始日志需要做增强解析处理,流程:

a) 从原始日志文件中读取数据

b) 根据日志中的一个URL字段到外部知识库中获取信息增强到原始日志

c) 如果成功增强,则输出到增强结果目录;如果增强失败,则抽取原始数据中URL字段输出到待爬清单目录
  1. 测试数据

  2. 分析

程序的关键点是要在一个MapReduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现

2.2 项目实现

2 实现

  1. 实现要点

    a) 在MapReduce中访问外部资源

    b) 自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()

  2. 代码实现如下

    a) 数据库获取数据的工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class DBLoader {

public static void dbLoader(HashMap<String, String> ruleMap) {
Connection conn = null;
Statement st = null;
ResultSet res = null;

try {
Class.forName("com.mysql.jdbc.Driver");
conn = DriverManager.getConnection("jdbc:mysql://hdp-node01:3306/urlknowledge", "root", "root");
st = conn.createStatement();
res = st.executeQuery("select url,content from urlcontent");
while (res.next()) {
ruleMap.put(res.getString(1), res.getString(2));
}
} catch (Exception e) {
e.printStackTrace();

} finally {
try{
if(res!=null){
res.close();
}
if(st!=null){
st.close();
}
if(conn!=null){
conn.close();
}

}catch(Exception e){
e.printStackTrace();
}
}
}


public static void main(String[] args) {
DBLoader db = new DBLoader();
HashMap<String, String> map = new HashMap<String,String>();
db.dbLoader(map);
System.out.println(map.size());
}
}

b) 自定义一个outputformat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class LogEnhancerOutputFormat extends FileOutputFormat<Text, NullWritable>{


@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {


FileSystem fs = FileSystem.get(context.getConfiguration());
Path enhancePath = new Path("hdfs://hdp-node01:9000/flow/enhancelog/enhanced.log");
Path toCrawlPath = new Path("hdfs://hdp-node01:9000/flow/tocrawl/tocrawl.log");

FSDataOutputStream enhanceOut = fs.create(enhancePath);
FSDataOutputStream toCrawlOut = fs.create(toCrawlPath);


return new MyRecordWriter(enhanceOut,toCrawlOut);
}



static class MyRecordWriter extends RecordWriter<Text, NullWritable>{

FSDataOutputStream enhanceOut = null;
FSDataOutputStream toCrawlOut = null;

public MyRecordWriter(FSDataOutputStream enhanceOut, FSDataOutputStream toCrawlOut) {
this.enhanceOut = enhanceOut;
this.toCrawlOut = toCrawlOut;
}

@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {

//有了数据,你来负责写到目的地 —— hdfs
//判断,进来内容如果是带tocrawl的,就往待爬清单输出流中写 toCrawlOut
if(key.toString().contains("tocrawl")){
toCrawlOut.write(key.toString().getBytes());
}else{
enhanceOut.write(key.toString().getBytes());
}

}

@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {

if(toCrawlOut!=null){
toCrawlOut.close();
}
if(enhanceOut!=null){
enhanceOut.close();
}

}


}
}

c) 开发MapReduce处理流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class LogEnhancer {

static class LogEnhancerMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

HashMap<String, String> knowledgeMap = new HashMap<String, String>();

/**
* maptask在初始化时会先调用setup方法一次 利用这个机制,将外部的知识库加载到maptask执行的机器内存中
*/
@Override
protected void setup(org.apache.hadoop.MapReduce.Mapper.Context context) throws IOException, InterruptedException {

DBLoader.dbLoader(knowledgeMap);

}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();

String[] fields = StringUtils.split(line, "\t");

try {
String url = fields[26];

// 对这一行日志中的url去知识库中查找内容分析信息
String content = knowledgeMap.get(url);

// 根据内容信息匹配的结果,来构造两种输出结果
String result = "";
if (null == content) {
// 输往待爬清单的内容
result = url + "\t" + "tocrawl\n";
} else {
// 输往增强日志的内容
result = line + "\t" + content + "\n";
}

context.write(new Text(result), NullWritable.get());
} catch (Exception e) {

}
}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

job.setJarByClass(LogEnhancer.class);

job.setMapperClass(LogEnhancerMapper.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

// 要将自定义的输出格式组件设置到job中
job.setOutputFormatClass(LogEnhancerOutputFormat.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

// 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
// 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);
System.exit(0);
}
}

案例三: MR实战之TOPN(自定义GroupingComparator)

1 准备

  1. 需求+测试数据

有如下订单数据

订单id 商品id 成交金额
Order_0000001 Pdt_01 222.8
Order_0000001 Pdt_05 25.8
Order_0000002 Pdt_03 522.8
Order_0000002 Pdt_04 122.4
Order_0000002 Pdt_05 722.4
Order_0000003 Pdt_01 222.8

现在需要求出每一个订单中成交金额最大的一笔交易

  1. 分析

    a) 利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce

    b) 在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值

2 实现

a)自定义groupingcomparator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ItemidGroupingComparator extends WritableComparator {

protected ItemidGroupingComparator() {

super(OrderBean.class, true);
}


@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean abean = (OrderBean) a;
OrderBean bbean = (OrderBean) b;

//将item_id相同的bean都视为相同,从而聚合为一组
return abean.getItemid().compareTo(bbean.getItemid());
}
}

b)定义订单信息bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class OrderBean implements WritableComparable<OrderBean>{
private Text itemid;
private DoubleWritable amount;

public OrderBean() {
}
public OrderBean(Text itemid, DoubleWritable amount) {
set(itemid, amount);
}

public void set(Text itemid, DoubleWritable amount) {

this.itemid = itemid;
this.amount = amount;

}

public Text getItemid() {
return itemid;
}

public DoubleWritable getAmount() {
return amount;
}

@Override
public int compareTo(OrderBean o) {
int cmp = this.itemid.compareTo(o.getItemid());
if (cmp == 0) {

cmp = -this.amount.compareTo(o.getAmount());
}
return cmp;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(itemid.toString());
out.writeDouble(amount.get());

}

@Override
public void readFields(DataInput in) throws IOException {
String readUTF = in.readUTF();
double readDouble = in.readDouble();

this.itemid = new Text(readUTF);
this.amount= new DoubleWritable(readDouble);
}


@Override
public String toString() {
return itemid.toString() + "\t" + amount.get();
}
}

c) 编写MapReduce处理流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class SecondarySort {

static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{

OrderBean bean = new OrderBean();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();
String[] fields = StringUtils.split(line, "\t");

bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1])));

context.write(bean, NullWritable.get());

}

}

static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{


//在设置了groupingcomparator以后,这里收到的kv数据 就是: <1001 87.6>,null <1001 76.5>,null ....
//此时,reduce方法中的参数key就是上述kv组中的第一个kv的key:<1001 87.6>
//要输出同一个item的所有订单中最大金额的那一个,就只要输出这个key
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}


public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setJarByClass(SecondarySort.class);

job.setMapperClass(SecondarySortMapper.class);
job.setReducerClass(SecondarySortReducer.class);


job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//指定shuffle所使用的GroupingComparator类
job.setGroupingComparatorClass(ItemidGroupingComparator.class);
//指定shuffle所使用的partitioner类
job.setPartitionerClass(ItemIdPartitioner.class);

job.setNumReduceTasks(3);

job.waitForCompletion(true);

}

}

6-实战案例
http://www.zivjie.cn/2024/06/15/Hadoop/MapReduce/6-实战案例/
作者
Francis
发布于
2024年6月15日
许可协议