1 准备
- 需求
无论hdfs还是MapReduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案
测试数据
分析
小文件的优化无非以下几种方式:
a) 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
b) 在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并
c) 在MapReduce处理时,可采用combineInputFormat提高效率
2 实现
注意:本节实现的是上述第二种方式
- 程序的核心机制:
自定义一个InputFormat
改写RecordReader,实现一次读取一个完整文件封装为KV
在输出时使用SequenceFileOutPutFormat输出合并文件
- 代码如下
a) 自定义InputFromat
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> { @Override protected boolean isSplitable(JobContext context, Path file) { return false; }
@Override public RecordReader<NullWritable, BytesWritable> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { WholeFileRecordReader reader = new WholeFileRecordReader(); reader.initialize(split, context); return reader; } }
|
b) 自定义RecordReader
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> { private FileSplit fileSplit; private Configuration conf; private BytesWritable value = new BytesWritable(); private boolean processed = false;
@Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.fileSplit = (FileSplit) split; this.conf = context.getConfiguration(); }
@Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!processed) { byte[] contents = new byte[(int) fileSplit.getLength()]; Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(conf); FSDataInputStream in = null; try { in = fs.open(file); IOUtils.readFully(in, contents, 0, contents.length); value.set(contents, 0, contents.length); } finally { IOUtils.closeStream(in); } processed = true; return true; } return false; }
@Override public NullWritable getCurrentKey() throws IOException,InterruptedException { return NullWritable.get(); }
@Override public BytesWritable getCurrentValue() throws IOException,InterruptedException { return value; }
@Override public float getProgress() throws IOException { return processed ? 1.0f : 0.0f; }
@Override public void close() throws IOException { } }
|
c) 定义MapReduce处理流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class SmallFilesToSequenceFileConverter extends Configured implements Tool { static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> { private Text filenameKey;
@Override protected void setup(Context context) throws IOException,InterruptedException { InputSplit split = context.getInputSplit(); Path path = ((FileSplit) split).getPath(); filenameKey = new Text(path.toString()); }
@Override protected void map(NullWritable key, BytesWritable value,Context context) throws IOException, InterruptedException { context.write(filenameKey, value); } }
@Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); System.setProperty("HADOOP_USER_NAME", "hdfs"); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: combinefiles <in> <out>"); System.exit(2); }
Job job = Job.getInstance(conf,"combine small files to sequencefile"); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); job.setMapperClass(SequenceFileMapper.class); return job.waitForCompletion(true) ? 0 : 1; }
public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(),args); System.exit(exitCode); } }
|
1 准备
- 需求
1 2 3 4 5 6 7
| 现有一些原始日志需要做增强解析处理,流程:
a) 从原始日志文件中读取数据
b) 根据日志中的一个URL字段到外部知识库中获取信息增强到原始日志
c) 如果成功增强,则输出到增强结果目录;如果增强失败,则抽取原始数据中URL字段输出到待爬清单目录
|
测试数据
分析
程序的关键点是要在一个MapReduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现
2.2 项目实现
2 实现
实现要点
a) 在MapReduce中访问外部资源
b) 自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()
代码实现如下
a) 数据库获取数据的工具
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public class DBLoader {
public static void dbLoader(HashMap<String, String> ruleMap) { Connection conn = null; Statement st = null; ResultSet res = null;
try { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://hdp-node01:3306/urlknowledge", "root", "root"); st = conn.createStatement(); res = st.executeQuery("select url,content from urlcontent"); while (res.next()) { ruleMap.put(res.getString(1), res.getString(2)); } } catch (Exception e) { e.printStackTrace();
} finally { try{ if(res!=null){ res.close(); } if(st!=null){ st.close(); } if(conn!=null){ conn.close(); }
}catch(Exception e){ e.printStackTrace(); } } }
public static void main(String[] args) { DBLoader db = new DBLoader(); HashMap<String, String> map = new HashMap<String,String>(); db.dbLoader(map); System.out.println(map.size()); } }
|
b) 自定义一个outputformat
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| public class LogEnhancerOutputFormat extends FileOutputFormat<Text, NullWritable>{
@Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(context.getConfiguration()); Path enhancePath = new Path("hdfs://hdp-node01:9000/flow/enhancelog/enhanced.log"); Path toCrawlPath = new Path("hdfs://hdp-node01:9000/flow/tocrawl/tocrawl.log"); FSDataOutputStream enhanceOut = fs.create(enhancePath); FSDataOutputStream toCrawlOut = fs.create(toCrawlPath); return new MyRecordWriter(enhanceOut,toCrawlOut); } static class MyRecordWriter extends RecordWriter<Text, NullWritable>{ FSDataOutputStream enhanceOut = null; FSDataOutputStream toCrawlOut = null; public MyRecordWriter(FSDataOutputStream enhanceOut, FSDataOutputStream toCrawlOut) { this.enhanceOut = enhanceOut; this.toCrawlOut = toCrawlOut; }
@Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { if(key.toString().contains("tocrawl")){ toCrawlOut.write(key.toString().getBytes()); }else{ enhanceOut.write(key.toString().getBytes()); } }
@Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if(toCrawlOut!=null){ toCrawlOut.close(); } if(enhanceOut!=null){ enhanceOut.close(); } } } }
|
c) 开发MapReduce处理流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| public class LogEnhancer {
static class LogEnhancerMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
HashMap<String, String> knowledgeMap = new HashMap<String, String>();
@Override protected void setup(org.apache.hadoop.MapReduce.Mapper.Context context) throws IOException, InterruptedException {
DBLoader.dbLoader(knowledgeMap);
}
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = StringUtils.split(line, "\t");
try { String url = fields[26];
String content = knowledgeMap.get(url);
String result = ""; if (null == content) { result = url + "\t" + "tocrawl\n"; } else { result = line + "\t" + content + "\n"; }
context.write(new Text(result), NullWritable.get()); } catch (Exception e) {
} }
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(LogEnhancer.class);
job.setMapperClass(LogEnhancerMapper.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(LogEnhancerOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true); System.exit(0); } }
|
案例三: MR实战之TOPN(自定义GroupingComparator)
1 准备
- 需求+测试数据
有如下订单数据
| 订单id |
商品id |
成交金额 |
| Order_0000001 |
Pdt_01 |
222.8 |
| Order_0000001 |
Pdt_05 |
25.8 |
| Order_0000002 |
Pdt_03 |
522.8 |
| Order_0000002 |
Pdt_04 |
122.4 |
| Order_0000002 |
Pdt_05 |
722.4 |
| Order_0000003 |
Pdt_01 |
222.8 |
现在需要求出每一个订单中成交金额最大的一笔交易
分析
a) 利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce
b) 在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值
2 实现
a)自定义groupingcomparator
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class ItemidGroupingComparator extends WritableComparator {
protected ItemidGroupingComparator() {
super(OrderBean.class, true); }
@Override public int compare(WritableComparable a, WritableComparable b) { OrderBean abean = (OrderBean) a; OrderBean bbean = (OrderBean) b;
return abean.getItemid().compareTo(bbean.getItemid()); } }
|
b)定义订单信息bean
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| public class OrderBean implements WritableComparable<OrderBean>{ private Text itemid; private DoubleWritable amount;
public OrderBean() { } public OrderBean(Text itemid, DoubleWritable amount) { set(itemid, amount); }
public void set(Text itemid, DoubleWritable amount) {
this.itemid = itemid; this.amount = amount;
}
public Text getItemid() { return itemid; }
public DoubleWritable getAmount() { return amount; }
@Override public int compareTo(OrderBean o) { int cmp = this.itemid.compareTo(o.getItemid()); if (cmp == 0) {
cmp = -this.amount.compareTo(o.getAmount()); } return cmp; }
@Override public void write(DataOutput out) throws IOException { out.writeUTF(itemid.toString()); out.writeDouble(amount.get());
}
@Override public void readFields(DataInput in) throws IOException { String readUTF = in.readUTF(); double readDouble = in.readDouble();
this.itemid = new Text(readUTF); this.amount= new DoubleWritable(readDouble); }
@Override public String toString() { return itemid.toString() + "\t" + amount.get(); } }
|
c) 编写MapReduce处理流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| public class SecondarySort {
static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
OrderBean bean = new OrderBean();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString(); String[] fields = StringUtils.split(line, "\t");
bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1])));
context.write(bean, NullWritable.get());
}
}
static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
@Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); Job job = Job.getInstance(conf);
job.setJarByClass(SecondarySort.class);
job.setMapperClass(SecondarySortMapper.class); job.setReducerClass(SecondarySortReducer.class);
job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setGroupingComparatorClass(ItemidGroupingComparator.class); job.setPartitionerClass(ItemIdPartitioner.class);
job.setNumReduceTasks(3);
job.waitForCompletion(true);
}
}
|