2-Hadoop序列化机制

1 为什么要序列化

  • **序列化: **序列化是指将具有结构化的内存对象转为0和1组成的字节序列,以便进行网络传输或持久存储到设备的过程。

  • 反序列化: 反序列化指的是将字节序列转为内存中具有结构化的对象的过程。

​ 在基于类的编程语言中,我们说需要的数据都会被封装成对象,在内存中进行管理。可是有些时候,这样的对象,我们想直接存储到磁盘中,或者想进行网络传输,那么需要怎么做呢?需要将对象序列化成0和1组成的字节序列,字节序列就可以存储到磁盘中,或者进行网络传输了。当我们需要对象时,就可以读取磁盘上的字节序列,或者接收网络传输过来的字节序列,进行反序列化成我们需要的对象就可以了。

2 Hadoop和Java序列化的对比

hadoop会涉及到大量数据的传输(网络IO),比如进程之间的通信(RPC协议),reduceTask的fetch操作。而网络带宽是极其稀缺的,因此使用序列化机制迫不及待。

Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系……),这些数据不是我们需要的,也不便于在网络中高效传输.

基于Hadoop在集群之间进行通讯或者RPC调用的时候,数据的序列化要快,体积要小,占用带宽要小的需求,因此,专门为hadoop单独开发了一套精简高效的序列化机制(Writable)。此序列化机制要求具有以下特点:

  • 紧凑:紧凑的格式能让我们充分利用网络带宽,而带宽是数据中心最稀缺的资源
  • 快速:进程通信形成了分布式系统的骨架,所以需要尽量减少序列化和反序列化的性能开销,这是基本的;
  • 可扩展:协议为了满足新的需求变化,所以控制客户端和服务器过程中,需要直接引进相应的协议,这些是新协议,原序列化方式能支持新的协议报文;
  • 互操作:能支持不同语言写的客户端和服务端进行交互;

需要注意的是: MapReduce的key和value,都必须是可序列化的。而针对于key而言,是数据排序的关键字,因此还需要提供比较接口:WritableComparable

3 常用类型介绍

Java数据类型 Hadoop序列化的数据类型 释义
byte ByteWritable 字节类型
short ShortWritable 短整型
int IntWritable 整型
long LongWritable 长整型
float FloatWritable 单精度浮点型
double DoubleWritable 双精度浮点型
boolean BooleanWritable 布尔型
String Text 字符串
array ArrayWritable 数组
Map MapWritable Map
null NullWritable

NullWritable是Writable的一个特殊的类型,它的序列化长度为0。它并不从数据流中读取数据,也不写入数据。它充当占位符。例如: 在MapReduce中,如果你不需要使用键或值,就可以将键或值的类型声明为NullWritable类型。

通过调用NullWritable.get()方法可以获取到这个实例。

4 序列化接口Writable

4.1 Writable接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface Writable {
/**
* Serialize the fields of this object to <code>out</code>.
*
* @param out <code>DataOuput</code> to serialize this object into.
* @throws IOException
*/
void write(DataOutput out) throws IOException;

/**
* Deserialize the fields of this object from <code>in</code>.
*
* <p>For efficiency, implementations should attempt to re-use storage in the
* existing object where possible.</p>
*
* @param in <code>DataInput</code> to deseriablize this object from.
* @throws IOException
*/
void readFields(DataInput in) throws IOException;
}

4.2 WritableComparable接口

1
public interface WritableComparable<T> extends Writable, Comparable<T> { }

5 自定义序列化类型

5.1 为什么要自定义

Hadoop为我们提供了很多支持Writable序列化的类,例如: Text、IntWritable、LongWritable等。但是这些类并不能够满足我们所有的需求,很多时候我们是需要定义自己的类,满足自己的需求的。而如果想要把一个自定义的数据类型作为K2V2或者K3V3来使用,那么就必须要实现序列化的接口Writable。如果这个自定义的数据类型是一个Key的数据类型,则还需要在满足比较的条件,也就是再额外实现一个Comparable的接口,或者可以直接实现WritableComparable接口。

5.2 如何自定义序列化数据类型

​ 定义一个自定义的类型,实现序列化接口即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package com.qianfeng.mr.writable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;

/**
*
* 第一种方式: 可以直接实现WritableComparable接口
* 因为WritableComparable已经实现了Writable接口和Comparable接口
* 第二种方式:可以实现Writable接口和Comparable接口
*
* 注意:如果自定义的类型,会被作为key进行传输,那么必须要实现Comparable接口,因为底层会对key进行排序。
* 如果不作为key使用,只需要实现序列化接口Writable即可。
*/
public class TextPair implements WritableComparable {
private Text name;
private Text info;

public TextPair(){
name = new Text();
info = new Text();
}

public TextPair(Text name, Text info) {
this.name = name;
this.info = info;
}

/**
* 重载一个构造器
* @return
*/
public TextPair(String name,String info){
this.name = new Text(name);
this.info = new Text(info);
}

public Text getName() {
return name;
}

public void setName(Text name) {
this.name = name;
}

public Text getInfo() {
return info;
}

public void setInfo(Text info) {
this.info = info;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TextPair textPair = (TextPair) o;
return Objects.equals(name, textPair.name) &&
Objects.equals(info, textPair.info);
}

@Override
public int hashCode() {
return Objects.hash(name, info);
}

public String toString(){
return "["+name.toString()+","+info.toString()+"]";
}


/**
* 序列化方法,将属性序列化成字节序列
* @param out
* @throws IOException
*/
public void write(DataOutput out) throws IOException {
//将属性写到输出流程
name.write(out);
info.write(out);

//如果不是hadoop类型,比如是java类型
// out.writeUTF(name);
// out.writeUTF(info);
}


/**
* 反序列化方法,从流中读取字节序列进行反序列化。
* @param in
* @throws IOException
*/
public void readFields(DataInput in) throws IOException {
//要按照序列化的顺序进行反序列化
name.readFields(in);
info.readFields(in);
}


public int compareTo(Object o) {
return 0;
}
}

5.3 测试序列化后的体积

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.qianfeng.mr.writable;

import org.apache.hadoop.io.Writable;

import java.io.*;

/**
* @Description 测试:java类型和hadoop类型在序列化后的字节数量
*/
public class TestDemo {
public static void main(String[] args) throws IOException {
/**
* 序列化java对象
*/
StudentSer s1 = new StudentSer("小张",23);
ObjectOutputStream oos = new ObjectOutputStream(
new FileOutputStream("D:/studentser.txt"));

oos.writeObject(s1);
oos.close();

/**
* 序列化hadoop对象
*/
StudentWri s2 = new StudentWri("小张",23);
DataOutputStream dos = new DataOutputStream(
new FileOutputStream("D:/studentwri.txt"));
s2.write(dos);
dos.close();
}
}

/**
* java类型
*/
class StudentSer implements Serializable{
String name;
int age;

public StudentSer(String name, int age) {
this.name = name;
this.age = age;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}
}
/**
* hadoop类型
* */
class StudentWri implements Writable{
String name;
int age;
public StudentWri(String name,int age){
this.name = name;
this.age = age;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(age);
}

@Override
public void readFields(DataInput in) throws IOException {
name = in.readUTF();
age = in.readInt();
}
}

6 手机流量统计案例

6.1 手机流量的需求分析

需求: 读取文件中每一行的手机流量信息,统计每一个手机号码消费的总上行、总下行和总流量。

分析:

  • 对于每一行的统计信息来说,我们需要统计手机号码、上行流量和下行流量。但是MapReduce的程序设计要求我们在Map阶段和Reduce阶段输出的都是一个键值对。因此我们需要自定义数据类型,将手机流量信息封装起来。

    类: PhoneFlowBean

    属性: 手机号码phoneNumber,上行流量: upFlow,下行流量: downFlow

  • Map阶段读取每一行的数据,将上行和下行流量封装到一个PhoneFlowBean对象中。

    K2: 手机号码

    V2: PhoneFlowBean对象

  • Reduce阶段将每一个手机号码对应的PhoneFlowBean对象封装的信息汇总起来

6.2 PhoneFlowBean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package flow;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;

public class PhoneFlowBean implements Writable {
private String phoneNumber;
private int upFlow;
private int downFlow;

public PhoneFlowBean() {
}

public PhoneFlowBean(String phoneNumber, int upFlow, int downFlow) {
this.phoneNumber = phoneNumber;
this.upFlow = upFlow;
this.downFlow = downFlow;
}

@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.phoneNumber);
dataOutput.writeInt(this.upFlow);
dataOutput.writeInt(this.downFlow);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
this.phoneNumber = dataInput.readUTF();
this.upFlow = dataInput.readInt();
this.downFlow = dataInput.readInt();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PhoneFlowBean that = (PhoneFlowBean) o;
return upFlow == that.upFlow && downFlow == that.downFlow && Objects.equals(phoneNumber, that.phoneNumber);
}

@Override
public int hashCode() {
return Objects.hash(phoneNumber, upFlow, downFlow);
}

public String getPhoneNumber() {
return phoneNumber;
}

public void setPhoneNumber(String phoneNumber) {
this.phoneNumber = phoneNumber;
}

public int getUpFlow() {
return upFlow;
}

public void setUpFlow(int upFlow) {
this.upFlow = upFlow;
}

public int getDownFlow() {
return downFlow;
}

public void setDownFlow(int downFlow) {
this.downFlow = downFlow;
}
}

6.3 Mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package flow;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class PhoneFlowMapper extends Mapper<LongWritable, Text, Text, PhoneFlowBean> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, PhoneFlowBean>.Context context) throws IOException, InterruptedException {
// 1. 读取一行的数据
String line = value.toString();
// 2. 切分出每一个部分
String[] parts = line.split("\\s+");
// 3. 提取手机号码
String phoneNumber = parts[1];
// 4. 提取上行流量
int upFlow = Integer.parseInt(parts[parts.length - 3]);
// 5. 提取下行流量
int downFlow = Integer.parseInt(parts[parts.length - 2]);
// 6. 封装PhoneFlowBean对象
PhoneFlowBean bean = new PhoneFlowBean(phoneNumber, upFlow, downFlow);
// 7. 写出键值对K2V2
context.write(new Text(phoneNumber), bean);
}
}

6.4 Reducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package flow;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class PhoneFlowReducer extends Reducer<Text, PhoneFlowBean, Text, Text> {
@Override
protected void reduce(Text key, Iterable<PhoneFlowBean> values, Reducer<Text, PhoneFlowBean, Text, Text>.Context context) throws IOException, InterruptedException {
// 1. 定义变量,用来统计总上行流量、总下行流量
int sumUpFlow = 0, sumDownFlow = 0;
// 2. 遍历所有的流量统计信息,计算总流量
for (PhoneFlowBean bean : values) {
sumUpFlow += bean.getUpFlow();
sumDownFlow += bean.getDownFlow();
}
// 3. 计算总流量
int sumFlow = sumUpFlow + sumDownFlow;
// 4. 拼接结果字符串
String flowDesc = String.format("总上行流量: %d, 总下行流量: %d, 总流量: %d", sumUpFlow, sumDownFlow, sumFlow);
// 5. 写出结果
context.write(key, new Text(flowDesc));
}
}

6.5 Driver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package flow;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class PhoneFlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1. 读取配置信息
Configuration configuration = new Configuration();
// 设置本地模式调本地文件
configuration.set("mapreduce.framework.name", "local");
configuration.set("fs.defaultFS", "file:///");

// 2. 创建Job
Job job = Job.getInstance(configuration);

// 3. 设置相关的Mapper和Reducer的类
job.setMapperClass(PhoneFlowMapper.class);
job.setReducerClass(PhoneFlowReducer.class);
job.setJarByClass(PhoneFlowDriver.class);

// 4. 设置输出的KV的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(PhoneFlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

// 5. 设置输入输出的路径
FileInputFormat.setInputPaths(job, new Path("file/in/phoneFlow/HTTP_20130313143750.dat"));

// 6. 检测输出路径是否存在,如果存在先删除之前的内容
Path outPath = new Path("file/out/phoneFlow");
FileSystem fileSystem = FileSystem.get(configuration);
if (fileSystem.exists(outPath)) {
fileSystem.delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath);

// 7. 提交任务
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

2-Hadoop序列化机制
http://www.zivjie.cn/2024/06/15/Hadoop/MapReduce/2-Hadoop序列化机制/
作者
Francis
发布于
2024年6月15日
许可协议