1-MapReduce入门

1 MapReduce是什么

2004年,谷歌发表了一篇名为《MapReduce》的论文,主要介绍了如何在分布式的存储系统上对数据进行高效率的计算。2005年,Nutch团队使用Java语言实现了这个技术,并命名为MapReduce。时至今日,MapReduce是Apache Hadoop的核心模块之一,是运行在HDFS上的分布式运算程序的编程框架,用于大规模数据集(大于1TB)的并行运算。其中的概念,”Map(映射)”和”Reduce(归约)”,是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。

《阅读资料》

  • 在过去的很长一段时间里,CPU的性能都会遵循”摩尔定律“,在性能上每隔18个月左右就是提高一倍。那个时候,不需要对程序做任何改变,仅仅通过使用更优秀的CPU,就可以进行性能提升。但是现在,在CPU性能提升的道路上,人类已经到达了制作工艺的瓶颈,因此,我们不能再把希望寄托在性能更高的CPU身上了。
  • 现在这个时候,大规模数据存储在分布式文件系统上,人们也开始采用分布式并行编程来提高程序的性能。分布式程序运行在大规模计算机集群上,集群是大量的廉价服务器,可以并行执行大规模数据处理任务,这样就获得了海量的计算能力
  • 分布式并行编程比传统的程序有明显的区别,它运行在大量计算机构成的集群上,可以充分利用集群的并行处理能力;同时,通过向集群中增加新的计算节点,就可以很容易的实现集群计算能力的扩展。

2 为什么学习MapReduce

MapReduce主要解决的是分布式文件存储系统上,数据的分布式计算的问题。如果我们将所有的需要处理的数据移动到一个节点上进行处理,那么只是在数据传输的过程中就得消耗大量的时间,而且还可能在一台节点存不下这大量的数据。就算是能够存储下,也能够接受数据移动所带来的时间消耗,集群中其他节点的计算资源也都是在闲置的,不能高效率的利用集群。

因此我们就需要进行分布式的计算,将计算程序分发给不同的节点。在每一个节点上处理自己节点的数据,最后将每一个节点的数据处理结果汇总在一起。而在分布式计算的过程中会遇到很多的分布式计算的细节问题,这些问题都是需要开发人员去考虑的。那么如何去解决这些问题呢?

MapReduce是一个开源的、分布式的计算框架,封装了分布式计算程序的实现细节,使得开发人员不需要了解分布式计算底层实现的情况下,就可以去开发一个分布式的计算程序。开发人员只需要将重心放在业务逻辑的实现即可,不需要关注分布式开发的底层细节。因此,对于开发人员来说,可以简化不少的工作量,提交程序开发的效率!

3 优缺点

3.1 优点

  • 易于编程

    1
    它简单的实现一些接口,就可以完成一个分布式程序,这个程序可以分布到大量的廉价的pc机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特性使的MapReduce编程变得非常流行。
  • 良好的扩展性

    1
    当你的计算资源得不到满足的时候,你可以简单的通过增加机器来扩展它的计算能力
  • 高容错性

    1
    MapReduce的设计初衷就是使程序能够部署在廉价的pc机器上,这就要求它具有很高的容错性。比如一个机器挂了,它可以把上面的计算任务转移到另一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由hadoop内部完成的。
  • 适合PB级以上海量数据的离线处理

3.2 缺点

  • 不适合做实时计算

    1
    MapReduce无法做到像Mysql那样做到毫秒或者秒级的返回结果
  • 不适合做流式计算

    1
    流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能流态变化。这是MR自身的设计特点决定了数据源必须是静态的。
  • 不适合DAG(有向图)计算

    1
    多个应用程序存在依赖关系,后一个应用程序的输入为前一个应用程序的输出,在这种情况下,MapReduce并不是不能做,而是使用后每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常低下。

4 核心思想

  1. MapReduce设计的一个理念是“计算向数据靠拢”(移动计算),而不是“数据向计算靠拢”(移动数据)

  2. 将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,移动到有数据存储的集群节点上,一是可以减少节点间的数据移动开销。二是在存储节点上可以并行计算,大大提高计算效率问题。

    1
    因为移动数据需要大量的网络传输开销,尤其是在大规模数据环境下,这种开销尤为惊人,所以移动计算要比移动数据更加经济。
  3. MapReduce一个完整的运算分为Map和Reduce两个部分。Map会处理本节点的原始数据,产生的数据会临时存储到本地磁盘。Reduce会跨节点fetch属于自己的数据,并进行处理,产生的数据会存储到HDFS上。

《阅读资料》

Hadoop的MapReduce核心技术起源于谷歌在2004年发表的关于MapReduce系统的论文介绍。论文中有这么一句话:Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages。这句话提到了MapReduce思想来源,大致意思是,MapReduce的灵感来源于函数式语言(比如Lisp)中的内置函数map(映射)和reduce(规约)。

简单来说,在函数式语言里,map表示对一个列表(List)中的每个元素做计算,reduce表示对一个列表中的每个元素做迭代计算。它们具体的计算是通过传入的函数来实现的,map和reduce提供的是计算的框架。我们想一下,reduce既然能做迭代计算,那就表示列表中的元素是相关的(比如我想对列表中的所有元素做相加求和,那么列表中至少都应该是数值吧)。而map是对列表中每个元素做单独处理的,这表示列表中可以是杂乱无章的数据。这样看来,就有点联系了。在MapReduce里,Map处理的是原始数据,自然是杂乱无章的,每条数据之间互相没有关系;到了Reduce阶段,数据是以key后面跟着若干个value来组织的,这些value有相关性,至少它们都在一个key下面,于是就符合函数式语言里map和reduce的基本思想了。

5 阶段分类

MapReduce的程序在运行的过程中,一般分为两个阶段: Map阶段和Reduce阶段

5.1 第一阶段:Map

第一阶段,也称之为Map阶段。这个阶段会有若干个MapTask实例,完全并行运行,互不相干。每个MapTask会读取分析一个InputSplit(输入分片,简称分片)对应的原始数据。计算的结果数据会临时保存到所在节点的本地磁盘里。

该阶段的编程模型中会有一个map函数需要开发人员重写,map函数的输入是一个<key,value>对,map函数的输出也是一个<key,value>对,key和value的类型需要开发人员指定。参考下图:

5.2 第二阶段:Reduce

第二阶段,也称为Reduce阶段。这个阶段会有若干个ReduceTask实例并发运行,互不相干。但是他们的数据依赖于上一个阶段的所有maptask并发实例的输出。一个ReudceTask会从多个MapTask运行节点上fetch自己要处理的分区数据。经过处理后,输出到HDFS上。

该阶段的编程模型中有一个reduce函数需要开发人员重写,reduce函数的输入也是一个<key,value>对,reduce函数的输出也是一个<key,value>对。这里要强调的是,reduce的输入其实就是map的输出,只不过map的输出经过shuffle技术后变成了<key,List<Value>>而已。参考下图:

注意: MapReduce编程模型只能包含一个map阶段和一个reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。

6 编程规范

用户编写MapReduce程序的时候,需要设计至少三个类: Mapper、Reducer、Driver(用于提交MR的任务)

6.1 Mapper

  1. 自定义类,继承 org.apache.hadoop.mapreduce.Mapper 类型
  2. 定义K1,V1,K2,V2的泛型(K1,V1是Mapper的输入数据类型,K2,V2是Mapper的输出数据类型)
  3. 重写map方法(处理逻辑)

注意: Map方法,每一个键值对都会调用一次。

6.2 Reducer

  1. 自定义类,继承 org.apache.hadoop.mapreduce.Reducer 类型
  2. 定义K2,V2,K3,V3的泛型(K2,V2是Reducer的输入数据类型,K3,V3是Reducer的输出数据类型)
  3. 重写reduce方法的处理逻辑

注意: reduce方法,默认按key分组,每一组都调用一次。

6.3 Driver

MapReduce的程序,需要进行执行之前的属性配置与任务的提交,这些操作都需要在Driver类中来完成。

7 WordCount案例演示

7.1 案例需求

给定一个路径,统计这个路径下所有的文件中的每一个单词出现的次数。

7.2 测试数据

1
2
3
4
5
6
7
8
9
10
11
12
a.txt
hello qianfeng hello 1999 hello beijing hello
world hello hello java good

b.txt
hello xisanqi hello bingbing
hello chenchen hello
ACMilan hello china

c.txt
hello hadoop hello java hello storm hello spark hello redis hello zookeeper
hello hive hello hbase hello flume

7.3 pom文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>

7.4 定义Mapper类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* Map阶段,每当读取到一行的数据,都会触发这个方法
* 注意: LongWritable、Text、IntWritable都是Hadoop中的序列化的数据类型,相当于Java中的long、String、int类型。这部分内容在后续的序列化部分讲解。
*
* @param key 读取到数据的行偏移量,表示这一行的数据的首字符在这个数据块中是第几个字符
* @param value 读取到的一行的数据
* @param context 操作上下文
*/
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 1. 获取读取到的每一行的数据
String line = value.toString();
// 2. 将每一行的数据进行切割,得到每一个单词
String[] words = line.split("\\s+");
// 3. 将每一个单词与数字1拼接成键值对,写出
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}

7.5 定义Reducer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* 经过Shuffle阶段的处理,Map阶段写出的所有键值对按照Key进行了分组,并将所有的值都存入一个集合中。形成了 <K2, Iterable<V2>>的键值对。每一个Key都会触发一次这个方法。
*
* @param key K2,在这个需求中就是每一个单词
* @param values V2,一个单词出现的所有的次数
* @param context 操作上下文
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 遍历这个Key对应的所有的次数,累加到一起即可得到单词出现的总次数
// 1. 定义一个变量,用来记录总的次数
int times = 0;
// 2. 遍历每一个次数
for (IntWritable value : values) {
// 3. 累加次数,因为value是IntWritable类型的,因此需要使用get获取到包装的整型的值
times += value.get();
}
// 4. 将最终的结果写出
context.write(key, new IntWritable(times));
}
}

7.6 定义Driver类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package wc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1. 获取配置信息
// 加载默认的配置,即core-default.xml、hdfs-default.xml、mapred-default.xml和yarn-default.xml中的配置信息
// 然后读取项目中的core-site.xml、hdfs-site.xml、mapred-site.xml和yarn-site.xml中配置的信息,更新某些属性的值
Configuration conf = new Configuration();
// 2. 如果需要继续更新某些属性的值,可以在代码中更新
conf.set("fs.defaultFS", "hdfs://192.168.10.101:9820");
// 3. 创建Job对象
Job job = Job.getInstance(conf);

// 4. 设置Mapper类型
job.setMapperClass(WordCountMapper.class);
// 5. 设置Reducer类型
job.setReducerClass(WordCountReducer.class);
// 6. 设置驱动类型
job.setJarByClass(WordCountDriver.class);

// 7. 设置Map阶段输出的键值对类型
// 如果Map阶段输出的键值对类型与Reduce阶段输出的键值对类型相同,则可以省略这个设置
// 例如: 现在的Map阶段输出是<Text, IntWritable>类型的,与Reduce阶段的数据类型相同,因此可以省略不写
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(IntWritable.class);

// 8. 设置Reduce阶段输出的键值对类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 9. 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 10. 提交Job
System.exit(job.waitForCompletion(true) ? 0 : -1);
}
}

7.7 打jar包

  1. 在本地创建好测试数据,将其上传到HDFS

    1
    hdfs dfs -put ~/input /wordcount
  2. 检查YARN是否开启,如果没有开启,将其开启

    1
    start-yarn.sh
  3. 执行程序

    1
    2
    3
    # hadoop jar jar包名称 驱动类全名 输入路径 输出路径
    # 注意,输出路径不能存在,必须由程序自己创建。如果输出路径存在,会出现异常
    hadoop jar xxxxx.jar com.qianfeng.wordcount.WordCount.Driver /wordcount/input /wordcount/output

7.8 查看结果

1
hdfs dfs -cat /wordcount/output/*

8 Partitioner组件

8.1 介绍

我们已经实现了一个WordCount的案例,统计了一个目录下所有的文件中每一个单词出现的次数,并将计算结果输出到了一个文件中存储起来。但是有时候我们的需求是需要将处理的结果存储在多个文件中,这时应该怎么做呢?

首先我们需要知道一个前提条件,在MapReduce的程序中,最终生成多少个文件是由ReduceTask的数量来决定的。在上述的案例中我们最终生成了一个文件,是因为只有一个ReduceTask。那么如果需要最终生成两个结果文件呢?我只需要将ReduceTask的数量设置为2即可。这样就带来了另外一个问题: 某一个单词的统计结果,到底应该存放在哪个文件中呢?

在MapReduce的程序中,一个MapTask处理一个分片的数据(后文会讲解),而一个ReduceTask是用来处理一个分区的数据的。因此我如果需要明确某一个单词的统计结果到底存放在哪个文件中,只需要设置好分区即可。而Partitioner就是一个分区器组件,来实现数据的分区操作的。默认的分区器是HashPartitioner,如果我们需要实现自己的分区逻辑,就需要自定义分区器了。

HashPartitioner的逻辑:

  • 分区器计算每一个Key的HashCode
  • 将计算后的HashCode % ReduceTask的数量,得到一个分区的编号

8.2 自定义Partitioner

需求: 将单词的统计结果存入三个文件: a-i开头的存一个文件、j-q开头的存一个文件、其他的存一个文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class WordCountPartitioner extends Partitioner<Text, IntWritable> {
/**
* 计算每一个键值对所对应的分区号,分区号从0开始
* @param text 键
* @param intWritable 值
* @param i 总的ReduceTask的数量
* @return 该键值对对应的分区
*/
@Override
public int getPartition(Text text, IntWritable intWritable, int i) {
// 需求是将键值对按照首字母分为三个分区: a-i, j-q, 其他
// 1. 获取首字母
char firstLetter = text.toString().charAt(0);
// 2. 判断范围
if (firstLetter >= 'a' && firstLetter <= 'i') {
return 0;
} else if (firstLetter >= 'j' && firstLetter <= 'q') {
return 1;
}
return 2;
}
}

8.3 应用分区器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package wc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* @author 章鱼哥
* @company 北京千锋互联科技有限公司
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1. 获取配置信息
// 加载默认的配置,即core-default.xml、hdfs-default.xml、mapred-default.xml和yarn-default.xml中的配置信息
// 然后读取项目中的core-site.xml、hdfs-site.xml、mapred-site.xml和yarn-site.xml中配置的信息,更新某些属性的值
Configuration conf = new Configuration();
// 2. 如果需要继续更新某些属性的值,可以在代码中更新
conf.set("fs.defaultFS", "hdfs://192.168.10.101:9820");
// 3. 创建Job对象
Job job = Job.getInstance(conf);

// 4. 设置Mapper类型
job.setMapperClass(WordCountMapper.class);
// 5. 设置Reducer类型
job.setReducerClass(WordCountReducer.class);
// 6. 设置驱动类型
job.setJarByClass(WordCountDriver.class);

// 7. 设置Map阶段输出的键值对类型
// 如果Map阶段输出的键值对类型与Reduce阶段输出的键值对类型相同,则可以省略这个设置
// 例如: 现在的Map阶段输出是<Text, IntWritable>类型的,与Reduce阶段的数据类型相同,因此可以省略不写
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(IntWritable.class);

// 8. 设置Reduce阶段输出的键值对类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 9. 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 10. 设置分区器
job.setPartitionerClass(WordCountPartitioner.class);
// 11. 设置ReduceTask的数量
// ReduceTask的数量最好和分区的数量保持一致:
// 如果ReduceTask的数量多于分区的数量: 会出现多余的ReduceTask空占资源,不去处理任何的数据,浪费资源,且生成空的结果文件
// 如果ReduceTask的数量少于分区的数量: 会出现某个分区的数据暂时无法处理,需要等待某ReduceTask任务处理结束后再处理这个分区的数据,无法高效并发
job.setNumReduceTasks(3);

// 12. 提交Job
System.exit(job.waitForCompletion(true) ? 0 : -1);
}
}

9 IDE运行MapReduce的模式

9.1 local模式测本地文件

原理:

  • 将MapReduce的任务资源调度设置为local,不使用YARN进行资源调度。
  • 将文件系统设置为本地文件系统,不使用HDFS
1
2
3
Configuration configuration = new Configuration();
configuration.set("mapreduce.framework.name", "local"); // 设置为本地运行模式,任务不会在YARN上运行。
configuration.set("fs.defaultFS", "file:///"); // 设置为本地文件系统,不使用HDFS。

9.2 local模式测集群文件

原理:

  • 将MapReduce的任务资源调度设置为local,不使用YARN进行资源调度。
  • 将文件系统设置为HDFS
1
2
3
Configuration configuration = new Configuration();
configuration.set("mapreduce.framework.name", "local"); // 设置为本地运行模式,任务不会在YARN上运行。
configuration.set("fs.defaultFS", "hdfs://qianfeng01:9820"); // 设置为HDFS。

9.3 YARN模式测集群

原理:

  • 将MapReduce的任务资源调度设置为YARN
  • 将文件系统设置为HDFS
1
2
3
4
Configuration configuration = new Configuration();
configuration.set("mapreduce.framework.name", "yarn"); // 设置为本地运行模式,任务不会在YARN上运行。
configuration.set("fs.defaultFS", "hdfs://qianfeng01:9820"); // 设置为HDFS。
configuration.set("mapreduce.app-submission.cross-platform", "true"); // 跨平台任务提交打开。

注意事项:

在上述的配置都完成后,将程序打jar包,然后将jar包添加到classpath,才可以运行程序。

在jar包上右键 -> Add As Library

9.4 打jar包传Linux运行

原理:

  • 将写好的程序打jar包,上传到Linux
  • 使用hadoop jar的命令去执行

1-MapReduce入门
http://www.zivjie.cn/2024/06/15/Hadoop/MapReduce/1-MapReduce入门/
作者
Francis
发布于
2024年6月15日
许可协议