1 MapReduce是什么 2004年,谷歌发表了一篇名为《MapReduce》的论文,主要介绍了如何在分布式的存储系统上对数据进行高效率的计算。2005年,Nutch团队使用Java语言实现了这个技术,并命名为MapReduce。时至今日,MapReduce是Apache Hadoop的核心模块之一,是运行在HDFS上的分布式运算程序的编程框架,用于大规模数据集(大于1TB)的并行运算。其中的概念,”Map(映射)”和”Reduce(归约)”,是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。
《阅读资料》
在过去的很长一段时间里,CPU的性能都会遵循”摩尔定律“,在性能上每隔18个月左右就是提高一倍。那个时候,不需要对程序做任何改变,仅仅通过使用更优秀的CPU,就可以进行性能提升。但是现在,在CPU性能提升的道路上,人类已经到达了制作工艺的瓶颈,因此,我们不能再把希望寄托在性能更高的CPU身上了。
现在这个时候,大规模数据存储在分布式文件系统上,人们也开始采用分布式并行编程来提高程序的性能。分布式程序运行在大规模计算机集群上,集群是大量的廉价服务器,可以并行执行大规模数据处理任务,这样就获得了海量的计算能力
分布式并行编程比传统的程序有明显的区别,它运行在大量计算机构成的集群上,可以充分利用集群的并行处理能力;同时,通过向集群中增加新的计算节点,就可以很容易的实现集群计算能力的扩展。
2 为什么学习MapReduce MapReduce主要解决的是分布式文件存储系统上,数据的分布式计算的问题。如果我们将所有的需要处理的数据移动到一个节点上进行处理,那么只是在数据传输的过程中就得消耗大量的时间,而且还可能在一台节点存不下这大量的数据。就算是能够存储下,也能够接受数据移动所带来的时间消耗,集群中其他节点的计算资源也都是在闲置的,不能高效率的利用集群。
因此我们就需要进行分布式的计算,将计算程序分发给不同的节点。在每一个节点上处理自己节点的数据,最后将每一个节点的数据处理结果汇总在一起。而在分布式计算的过程中会遇到很多的分布式计算的细节问题,这些问题都是需要开发人员去考虑的。那么如何去解决这些问题呢?
MapReduce是一个开源的、分布式的计算框架,封装了分布式计算程序的实现细节,使得开发人员不需要了解分布式计算底层实现的情况下,就可以去开发一个分布式的计算程序。开发人员只需要将重心放在业务逻辑的实现即可,不需要关注分布式开发的底层细节。因此,对于开发人员来说,可以简化不少的工作量,提交程序开发的效率!
3 优缺点 3.1 优点
易于编程
1 它简单的实现一些接口,就可以完成一个分布式程序,这个程序可以分布到大量的廉价的pc 机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特性使的MapReduce编程变得非常流行。
良好的扩展性
1 当你的计算资源得不到满足的时候,你可以简单的通过增加机器来扩展它的计算能力
高容错性
1 MapReduce 的设计初衷就是使程序能够部署在廉价的pc 机器上,这就要求它具有很高的容错性。比如一个机器挂了,它可以把上面的计算任务转移到另一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由hadoop内部完成的。
适合PB级以上海量数据的离线处理
3.2 缺点
不适合做实时计算
1 MapReduce无法做到像Mysql那样做到毫秒或者秒级的返回结果
不适合做流式计算
1 流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能流态变化。这是MR自身的设计特点决定了数据源必须是静态的。
不适合DAG(有向图)计算
1 多个应用程序存在依赖关系,后一个应用程序的输入为前一个应用程序的输出,在这种情况下,MapReduce并不是不能做,而是使用后每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常低下。
4 核心思想
MapReduce设计的一个理念是“计算向数据靠拢”(移动计算),而不是“数据向计算靠拢”(移动数据)
将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,移动到有数据存储的集群节点上,一是可以减少节点间的数据移动开销。二是在存储节点上可以并行计算,大大提高计算效率问题。
1 因为移动数据需要大量的网络传输开销,尤其是在大规模数据环境下,这种开销尤为惊人,所以移动计算要比移动数据更加经济。
MapReduce一个完整的运算分为Map和Reduce两个部分。Map会处理本节点的原始数据,产生的数据会临时存储到本地磁盘。Reduce会跨节点fetch属于自己的数据,并进行处理,产生的数据会存储到HDFS上。
《阅读资料》
Hadoop的MapReduce核心技术起源于谷歌在2004年发表的关于MapReduce系统的论文介绍。论文中有这么一句话:Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages。这句话提到了MapReduce思想来源,大致意思是,MapReduce的灵感来源于函数式语言(比如Lisp)中的内置函数map(映射)和reduce(规约)。
简单来说,在函数式语言里,map表示对一个列表(List)中的每个元素做计算,reduce表示对一个列表中的每个元素做迭代计算。它们具体的计算是通过传入的函数来实现的,map和reduce提供的是计算的框架。我们想一下,reduce既然能做迭代计算,那就表示列表中的元素是相关的(比如我想对列表中的所有元素做相加求和,那么列表中至少都应该是数值吧)。而map是对列表中每个元素做单独处理的,这表示列表中可以是杂乱无章的数据。这样看来,就有点联系了。在MapReduce里,Map处理的是原始数据,自然是杂乱无章的,每条数据之间互相没有关系;到了Reduce阶段,数据是以key后面跟着若干个value来组织的,这些value有相关性,至少它们都在一个key下面,于是就符合函数式语言里map和reduce的基本思想了。
5 阶段分类 MapReduce的程序在运行的过程中,一般分为两个阶段: Map阶段和Reduce阶段
5.1 第一阶段:Map 第一阶段,也称之为Map阶段。这个阶段会有若干个MapTask实例,完全并行运行,互不相干。每个MapTask会读取分析一个InputSplit(输入分片,简称分片)对应的原始数据。计算的结果数据会临时保存到所在节点的本地磁盘里。
该阶段的编程模型中会有一个map函数需要开发人员重写,map函数的输入是一个<key,value>对,map函数的输出也是一个<key,value>对,key和value的类型需要开发人员指定。参考下图:
5.2 第二阶段:Reduce 第二阶段,也称为Reduce阶段。这个阶段会有若干个ReduceTask实例并发运行,互不相干。但是他们的数据依赖于上一个阶段的所有maptask并发实例的输出。一个ReudceTask会从多个MapTask运行节点上fetch自己要处理的分区数据。经过处理后,输出到HDFS上。
该阶段的编程模型中有一个reduce函数需要开发人员重写,reduce函数的输入也是一个<key,value>对,reduce函数的输出也是一个<key,value>对。这里要强调的是,reduce的输入其实就是map的输出,只不过map的输出经过shuffle技术后变成了<key,List<Value>>而已。参考下图:
注意: MapReduce编程模型只能包含一个map阶段和一个reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
6 编程规范 用户编写MapReduce程序的时候,需要设计至少三个类: Mapper、Reducer、Driver(用于提交MR的任务)
6.1 Mapper
自定义类,继承 org.apache.hadoop.mapreduce.Mapper 类型
定义K1,V1,K2,V2的泛型(K1,V1是Mapper的输入数据类型,K2,V2是Mapper的输出数据类型)
重写map方法(处理逻辑)
注意: Map方法,每一个键值对都会调用一次。
6.2 Reducer
自定义类,继承 org.apache.hadoop.mapreduce.Reducer 类型
定义K2,V2,K3,V3的泛型(K2,V2是Reducer的输入数据类型,K3,V3是Reducer的输出数据类型)
重写reduce方法的处理逻辑
注意: reduce方法,默认按key分组,每一组都调用一次。
6.3 Driver MapReduce的程序,需要进行执行之前的属性配置与任务的提交,这些操作都需要在Driver类中来完成。
7 WordCount案例演示 7.1 案例需求 给定一个路径,统计这个路径下所有的文件中的每一个单词出现的次数。
7.2 测试数据 1 2 3 4 5 6 7 8 9 10 11 12 a.txt hello qianfeng hello 1999 hello beijing hello world hello hello java goodb.txt hello xisanqi hello bingbing hello chenchen hello ACMilan hello china c.txt hello hadoop hello java hello storm hello spark hello redis hello zookeeper hello hive hello hbase hello flume
7.3 pom文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 <dependencies > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-common</artifactId > <version > 3.3.1</version > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-client</artifactId > <version > 3.3.1</version > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-hdfs</artifactId > <version > 3.3.1</version > </dependency > </dependencies >
7.4 定义Mapper类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package wc;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class WordCountMapper extends Mapper <LongWritable, Text, Text, IntWritable> { @Override protected void map (LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split("\\s+" ); for (String word : words) { context.write(new Text (word), new IntWritable (1 )); } } }
7.5 定义Reducer类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package wc;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WordCountReducer extends Reducer <Text, IntWritable, Text, IntWritable> { @Override protected void reduce (Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int times = 0 ; for (IntWritable value : values) { times += value.get(); } context.write(key, new IntWritable (times)); } }
7.6 定义Driver类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package wc;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordCountDriver { public static void main (String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration (); conf.set("fs.defaultFS" , "hdfs://192.168.10.101:9820" ); Job job = Job.getInstance(conf); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setJarByClass(WordCountDriver.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path (args[0 ])); FileOutputFormat.setOutputPath(job, new Path (args[1 ])); System.exit(job.waitForCompletion(true ) ? 0 : -1 ); } }
7.7 打jar包
在本地创建好测试数据,将其上传到HDFS
1 hdfs dfs -put ~/input /wordcount
检查YARN是否开启,如果没有开启,将其开启
执行程序
1 2 3 # hadoop jar jar包名称 驱动类全名 输入路径 输出路径 # 注意,输出路径不能存在,必须由程序自己创建。如果输出路径存在,会出现异常 hadoop jar xxxxx.jar com.qianfeng.wordcount.WordCount.Driver /wordcount/input /wordcount/output
7.8 查看结果 1 hdfs dfs -cat /wordcount/output/*
8 Partitioner组件 8.1 介绍 我们已经实现了一个WordCount的案例,统计了一个目录下所有的文件中每一个单词出现的次数,并将计算结果输出到了一个文件中存储起来。但是有时候我们的需求是需要将处理的结果存储在多个文件中,这时应该怎么做呢?
首先我们需要知道一个前提条件,在MapReduce的程序中,最终生成多少个文件是由ReduceTask的数量来决定的。在上述的案例中我们最终生成了一个文件,是因为只有一个ReduceTask。那么如果需要最终生成两个结果文件呢?我只需要将ReduceTask的数量设置为2即可。这样就带来了另外一个问题: 某一个单词的统计结果,到底应该存放在哪个文件中呢?
在MapReduce的程序中,一个MapTask处理一个分片的数据(后文会讲解),而一个ReduceTask是用来处理一个分区的数据的。因此我如果需要明确某一个单词的统计结果到底存放在哪个文件中,只需要设置好分区即可。而Partitioner就是一个分区器组件,来实现数据的分区操作的。默认的分区器是HashPartitioner,如果我们需要实现自己的分区逻辑,就需要自定义分区器了。
HashPartitioner的逻辑:
分区器计算每一个Key的HashCode
将计算后的HashCode % ReduceTask的数量,得到一个分区的编号
8.2 自定义Partitioner 需求: 将单词的统计结果存入三个文件: a-i开头的存一个文件、j-q开头的存一个文件、其他的存一个文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package wc;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class WordCountPartitioner extends Partitioner <Text, IntWritable> { @Override public int getPartition (Text text, IntWritable intWritable, int i) { char firstLetter = text.toString().charAt(0 ); if (firstLetter >= 'a' && firstLetter <= 'i' ) { return 0 ; } else if (firstLetter >= 'j' && firstLetter <= 'q' ) { return 1 ; } return 2 ; } }
8.3 应用分区器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 package wc;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordCountDriver { public static void main (String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration (); conf.set("fs.defaultFS" , "hdfs://192.168.10.101:9820" ); Job job = Job.getInstance(conf); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setJarByClass(WordCountDriver.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path (args[0 ])); FileOutputFormat.setOutputPath(job, new Path (args[1 ])); job.setPartitionerClass(WordCountPartitioner.class); job.setNumReduceTasks(3 ); System.exit(job.waitForCompletion(true ) ? 0 : -1 ); } }
9 IDE运行MapReduce的模式 9.1 local模式测本地文件 原理:
将MapReduce的任务资源调度设置为local,不使用YARN进行资源调度。
将文件系统设置为本地文件系统,不使用HDFS
1 2 3 Configuration configuration = new Configuration (); configuration.set("mapreduce.framework.name" , "local" ); configuration.set("fs.defaultFS" , "file:///" );
9.2 local模式测集群文件 原理:
将MapReduce的任务资源调度设置为local,不使用YARN进行资源调度。
将文件系统设置为HDFS
1 2 3 Configuration configuration = new Configuration (); configuration.set("mapreduce.framework.name" , "local" ); configuration.set("fs.defaultFS" , "hdfs://qianfeng01:9820" );
9.3 YARN模式测集群 原理:
将MapReduce的任务资源调度设置为YARN
将文件系统设置为HDFS
1 2 3 4 Configuration configuration = new Configuration (); configuration.set("mapreduce.framework.name" , "yarn" ); configuration.set("fs.defaultFS" , "hdfs://qianfeng01:9820" ); configuration.set("mapreduce.app-submission.cross-platform" , "true" );
注意事项:
在上述的配置都完成后,将程序打jar包,然后将jar包添加到classpath,才可以运行程序。
在jar包上右键 -> Add As Library
9.4 打jar包传Linux运行 原理:
将写好的程序打jar包,上传到Linux
使用hadoop jar的命令去执行