1 为什么要序列化
在基于类的编程语言中,我们说需要的数据都会被封装成对象,在内存中进行管理。可是有些时候,这样的对象,我们想直接存储到磁盘中,或者想进行网络传输,那么需要怎么做呢?需要将对象序列化成0和1组成的字节序列,字节序列就可以存储到磁盘中,或者进行网络传输了。当我们需要对象时,就可以读取磁盘上的字节序列,或者接收网络传输过来的字节序列,进行反序列化成我们需要的对象就可以了。
2 Hadoop和Java序列化的对比 hadoop会涉及到大量数据的传输(网络IO),比如进程之间的通信(RPC协议),reduceTask的fetch操作。而网络带宽是极其稀缺的,因此使用序列化机制迫不及待。
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系……),这些数据不是我们需要的,也不便于在网络中高效传输.
基于Hadoop在集群之间进行通讯或者RPC调用的时候,数据的序列化要快,体积要小,占用带宽要小的需求,因此,专门为hadoop单独开发了一套精简高效的序列化机制(Writable)。此序列化机制要求具有以下特点:
紧凑:紧凑的格式能让我们充分利用网络带宽,而带宽是数据中心最稀缺的资源
快速:进程通信形成了分布式系统的骨架,所以需要尽量减少序列化和反序列化的性能开销,这是基本的;
可扩展:协议为了满足新的需求变化,所以控制客户端和服务器过程中,需要直接引进相应的协议,这些是新协议,原序列化方式能支持新的协议报文;
互操作:能支持不同语言写的客户端和服务端进行交互;
需要注意的是: MapReduce的key和value,都必须是可序列化的。而针对于key而言,是数据排序的关键字,因此还需要提供比较接口:WritableComparable
3 常用类型介绍
Java数据类型
Hadoop序列化的数据类型
释义
byte
ByteWritable
字节类型
short
ShortWritable
短整型
int
IntWritable
整型
long
LongWritable
长整型
float
FloatWritable
单精度浮点型
double
DoubleWritable
双精度浮点型
boolean
BooleanWritable
布尔型
String
Text
字符串
array
ArrayWritable
数组
Map
MapWritable
Map
null
NullWritable
空
NullWritable是Writable的一个特殊的类型,它的序列化长度为0。它并不从数据流中读取数据,也不写入数据。它充当占位符。例如: 在MapReduce中,如果你不需要使用键或值,就可以将键或值的类型声明为NullWritable类型。
通过调用NullWritable.get()方法可以获取到这个实例。
4 序列化接口Writable 4.1 Writable接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public interface Writable { void write (DataOutput out) throws IOException; void readFields (DataInput in) throws IOException; }
4.2 WritableComparable接口 1 public interface WritableComparable <T> extends Writable , Comparable<T> { }
5 自定义序列化类型 5.1 为什么要自定义 Hadoop为我们提供了很多支持Writable序列化的类,例如: Text、IntWritable、LongWritable等。但是这些类并不能够满足我们所有的需求,很多时候我们是需要定义自己的类,满足自己的需求的。而如果想要把一个自定义的数据类型作为K2V2或者K3V3来使用,那么就必须要实现序列化的接口Writable。如果这个自定义的数据类型是一个Key的数据类型,则还需要在满足比较的条件,也就是再额外实现一个Comparable的接口,或者可以直接实现WritableComparable接口。
5.2 如何自定义序列化数据类型 定义一个自定义的类型,实现序列化接口即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 package com.qianfeng.mr.writable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.util.Objects;public class TextPair implements WritableComparable { private Text name; private Text info; public TextPair () { name = new Text (); info = new Text (); } public TextPair (Text name, Text info) { this .name = name; this .info = info; } public TextPair (String name,String info) { this .name = new Text (name); this .info = new Text (info); } public Text getName () { return name; } public void setName (Text name) { this .name = name; } public Text getInfo () { return info; } public void setInfo (Text info) { this .info = info; } @Override public boolean equals (Object o) { if (this == o) return true ; if (o == null || getClass() != o.getClass()) return false ; TextPair textPair = (TextPair) o; return Objects.equals(name, textPair.name) && Objects.equals(info, textPair.info); } @Override public int hashCode () { return Objects.hash(name, info); } public String toString () { return "[" +name.toString()+"," +info.toString()+"]" ; } public void write (DataOutput out) throws IOException { name.write(out); info.write(out); } public void readFields (DataInput in) throws IOException { name.readFields(in); info.readFields(in); } public int compareTo (Object o) { return 0 ; } }
5.3 测试序列化后的体积 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 package com.qianfeng.mr.writable;import org.apache.hadoop.io.Writable;import java.io.*;public class TestDemo { public static void main (String[] args) throws IOException { StudentSer s1 = new StudentSer ("小张" ,23 ); ObjectOutputStream oos = new ObjectOutputStream ( new FileOutputStream ("D:/studentser.txt" )); oos.writeObject(s1); oos.close(); StudentWri s2 = new StudentWri ("小张" ,23 ); DataOutputStream dos = new DataOutputStream ( new FileOutputStream ("D:/studentwri.txt" )); s2.write(dos); dos.close(); } }class StudentSer implements Serializable { String name; int age; public StudentSer (String name, int age) { this .name = name; this .age = age; } public String getName () { return name; } public void setName (String name) { this .name = name; } public int getAge () { return age; } public void setAge (int age) { this .age = age; } }class StudentWri implements Writable { String name; int age; public StudentWri (String name,int age) { this .name = name; this .age = age; } public String getName () { return name; } public void setName (String name) { this .name = name; } public int getAge () { return age; } public void setAge (int age) { this .age = age; } @Override public void write (DataOutput out) throws IOException { out.writeUTF(name); out.writeInt(age); } @Override public void readFields (DataInput in) throws IOException { name = in.readUTF(); age = in.readInt(); } }
6 手机流量统计案例 6.1 手机流量的需求分析 需求: 读取文件中每一行的手机流量信息,统计每一个手机号码消费的总上行、总下行和总流量。
分析:
对于每一行的统计信息来说,我们需要统计手机号码、上行流量和下行流量。但是MapReduce的程序设计要求我们在Map阶段和Reduce阶段输出的都是一个键值对。因此我们需要自定义数据类型,将手机流量信息封装起来。
类: PhoneFlowBean
属性: 手机号码phoneNumber,上行流量: upFlow,下行流量: downFlow
Map阶段读取每一行的数据,将上行和下行流量封装到一个PhoneFlowBean对象中。
K2: 手机号码
V2: PhoneFlowBean对象
Reduce阶段将每一个手机号码对应的PhoneFlowBean对象封装的信息汇总起来
6.2 PhoneFlowBean 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 package flow;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.util.Objects;public class PhoneFlowBean implements Writable { private String phoneNumber; private int upFlow; private int downFlow; public PhoneFlowBean () { } public PhoneFlowBean (String phoneNumber, int upFlow, int downFlow) { this .phoneNumber = phoneNumber; this .upFlow = upFlow; this .downFlow = downFlow; } @Override public void write (DataOutput dataOutput) throws IOException { dataOutput.writeUTF(this .phoneNumber); dataOutput.writeInt(this .upFlow); dataOutput.writeInt(this .downFlow); } @Override public void readFields (DataInput dataInput) throws IOException { this .phoneNumber = dataInput.readUTF(); this .upFlow = dataInput.readInt(); this .downFlow = dataInput.readInt(); } @Override public boolean equals (Object o) { if (this == o) return true ; if (o == null || getClass() != o.getClass()) return false ; PhoneFlowBean that = (PhoneFlowBean) o; return upFlow == that.upFlow && downFlow == that.downFlow && Objects.equals(phoneNumber, that.phoneNumber); } @Override public int hashCode () { return Objects.hash(phoneNumber, upFlow, downFlow); } public String getPhoneNumber () { return phoneNumber; } public void setPhoneNumber (String phoneNumber) { this .phoneNumber = phoneNumber; } public int getUpFlow () { return upFlow; } public void setUpFlow (int upFlow) { this .upFlow = upFlow; } public int getDownFlow () { return downFlow; } public void setDownFlow (int downFlow) { this .downFlow = downFlow; } }
6.3 Mapper 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package flow;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class PhoneFlowMapper extends Mapper <LongWritable, Text, Text, PhoneFlowBean> { @Override protected void map (LongWritable key, Text value, Mapper<LongWritable, Text, Text, PhoneFlowBean>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] parts = line.split("\\s+" ); String phoneNumber = parts[1 ]; int upFlow = Integer.parseInt(parts[parts.length - 3 ]); int downFlow = Integer.parseInt(parts[parts.length - 2 ]); PhoneFlowBean bean = new PhoneFlowBean (phoneNumber, upFlow, downFlow); context.write(new Text (phoneNumber), bean); } }
6.4 Reducer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package flow;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class PhoneFlowReducer extends Reducer <Text, PhoneFlowBean, Text, Text> { @Override protected void reduce (Text key, Iterable<PhoneFlowBean> values, Reducer<Text, PhoneFlowBean, Text, Text>.Context context) throws IOException, InterruptedException { int sumUpFlow = 0 , sumDownFlow = 0 ; for (PhoneFlowBean bean : values) { sumUpFlow += bean.getUpFlow(); sumDownFlow += bean.getDownFlow(); } int sumFlow = sumUpFlow + sumDownFlow; String flowDesc = String.format("总上行流量: %d, 总下行流量: %d, 总流量: %d" , sumUpFlow, sumDownFlow, sumFlow); context.write(key, new Text (flowDesc)); } }
6.5 Driver 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package flow;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class PhoneFlowDriver { public static void main (String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration configuration = new Configuration (); configuration.set("mapreduce.framework.name" , "local" ); configuration.set("fs.defaultFS" , "file:///" ); Job job = Job.getInstance(configuration); job.setMapperClass(PhoneFlowMapper.class); job.setReducerClass(PhoneFlowReducer.class); job.setJarByClass(PhoneFlowDriver.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(PhoneFlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path ("file/in/phoneFlow/HTTP_20130313143750.dat" )); Path outPath = new Path ("file/out/phoneFlow" ); FileSystem fileSystem = FileSystem.get(configuration); if (fileSystem.exists(outPath)) { fileSystem.delete(outPath, true ); } FileOutputFormat.setOutputPath(job, outPath); System.exit(job.waitForCompletion(true ) ? 0 : 1 ); } }