1 概述
这是MapReduce的一种优化策略:通过压缩编码对mapper或者reducer的输出进行压缩,以减少磁盘IO,提高MR程序运行速度(但相应增加了cpu运算负担)
1 2 3 4 5
| 1) MapReduce支持将map输出的结果或者reduce输出的结果进行压缩,以减少网络IO或最终输出数据的体积 2) 压缩特性运用得当能提高性能,但运用不当也可能降低性能 3) 基本原则: -运算密集型的job,少用压缩 -IO密集型的job,多用压缩
|
2 MR支持的压缩编码

考虑Hadoop应用处理的数据集比较大,因此需要借助压缩。下面是按照效率从高到低排列的
1 2 3 4
| (1)使用容器格式文件,例如:顺序文件、RCFile、Avro数据格式支持压缩和切分文件。另外在配合使用一些快速压缩工具,例如:LZO、LZ4或者Snappy. (2)使用支持切分压缩格式,例如bzip2 (3)在应用中将文件切分成块,对每块进行任意格式压缩。这种情况确保压缩后的数据接近HDFS块大小。 (4)存储未压缩文件,以原始文件存储。
|
3 Reducer输出压缩
在配置参数或在代码中都可以设置reduce的输出压缩
- 在配置参数中设置
1 2 3 4 5
| 压缩属性:mapreduce.output.fileoutputformat.compress 设置为true 压缩格式属性:mapreduce.output.fileoutputformat.compress.codec 类库有:org.apache.hadoop.io.compress.DefaultCodec .deflate org.apache.hadoop.io.compress.GzipCodec .gz 压缩类型属性:mapreduce.output.fileoutputformat.compress.type=RECORD
|
- 也可以在代码中用
1
| conf.set(“mapreduce.output.fileoutputformat.compress”,”true”)
|
- 在代码中设置
1 2 3
| Job job = Job.getInstance(conf); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class.forName(""));
|
4 Mapper输出压缩
在配置参数或在代码中都可以设置reduce的输出压缩
- 在配置参数中设置
1 2 3
| mapreduce.map.output.compress=true
mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
|
- 在代码中设置
1 2 3
| conf.set(“mapreduce.map.output.compress”,” true”) conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true); conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class);
|
5 压缩文件的读取
Hadoop自带的InputFormat类内置支持压缩文件的读取,比如TextInputformat类,在其initialize方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath();
final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file); CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); if (null!=codec) { isCompressedInput = true; decompressor = CodecPool.getDecompressor(codec); if (codec instanceof SplittableCompressionCodec) { final SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream( fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); in = new CompressedSplitLineReader(cIn, job, this.recordDelimiterBytes); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; } else { in = new SplitLineReader(codec.createInputStream(fileIn, decompressor), job, this.recordDelimiterBytes); filePosition = fileIn; } } else { fileIn.seek(start); in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes); filePosition = fileIn; } }
|