logo头像
Snippet 博客主题

Hadoop学习之路(十三)MapReduce的初识

** Hadoop学习之路(十三)MapReduce的初识:** <Excerpt in index | 首页摘要>

​ Hadoop学习之路(十三)MapReduce的初识

<The rest of contents | 余下全文>

MapReduce是什么

首先让我们来重温一下 hadoop 的四大组件:

HDFS:分布式存储系统

MapReduce:分布式计算系统

YARN:hadoop 的资源调度系统

Common:以上三大组件的底层支撑组件,主要提供基础工具包和 RPC 框架等

MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用” 的核心框架

MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布 式运算程序,并发运行在一个 Hadoop 集群上

为什么需要 MapReduce

1、海量数据在单机上处理因为硬件资源限制,无法胜任

2、而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度

3、引入 MapReduce 框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将 分布式计算中的复杂性交由框架来处理

设想一个海量数据场景下的数据计算需求:

单机版:磁盘受限,内存受限,计算能力受限
分布式版:1、 数据存储的问题,hadoop 提供了 hdfs 解决了数据存储这个问题2、 运算逻辑至少要分为两个阶段,先并发计算(map),然后汇总(reduce)结果3、 这两个阶段的计算如何启动?如何协调?4、 运算程序到底怎么执行?数据找程序还是程序找数据?5、 如何分配两个阶段的多个运算任务?6、 如何管理任务的执行过程中间状态,如何容错?7、 如何监控?8、 出错如何处理?抛异常?重试?

  可见在程序由单机版扩成分布式版时,会引入大量的复杂工作。为了提高开发效率,可以将 分布式程序中的公共功能封装成框架,让开发人员可以将精力集中于业务逻辑。

  Hadoop 当中的 MapReduce 就是这样的一个分布式程序运算框架,它把大量分布式程序都会 涉及的到的内容都封装进了,让用户只用专注自己的业务逻辑代码的开发。它对应以上问题 的整体结构如下:

MRAppMaster:MapReduce Application Master,分配任务,协调任务的运行

MapTask:阶段并发任,负责 mapper 阶段的任务处理 YARNChild

ReduceTask:阶段汇总任务,负责 reducer 阶段的任务处理 YARNChild

MapReduce做什么

img

简单地讲,MapReduce可以做大数据处理。所谓大数据处理,即以价值为导向,对大数据加工、挖掘和优化等各种处理。

  MapReduce擅长处理大数据,它为什么具有这种能力呢?这可由MapReduce的设计思想发觉。MapReduce的思想就是“分而治之”。

  (1)Mapper负责“分”,即把复杂的任务分解为若干个“简单的任务”来处理。“简单的任务”包含三层含义:一是数据或计算的规模相对原任务要大大缩小;二是就近计算原则,即任务会分配到存放着所需数据的节点上进行计算;三是这些小任务可以并行计算,彼此间几乎没有依赖关系。

  (2)Reducer负责对map阶段的结果进行汇总。至于需要多少个Reducer,用户可以根据具体问题,通过在mapred-site.xml配置文件里设置参数mapred.reduce.tasks的值,缺省值为1。

MapReduce 程序运行演示

  在 MapReduce 组件里,官方给我们提供了一些样例程序,其中非常有名的就是 wordcount 和 pi 程序。这些 MapReduce 程序的代码都在 hadoop-mapreduce-examples-2.7.5.jar 包里,这 个 jar 包在 hadoop 安装目录下的/share/hadoop/mapreduce/目录里 下面我们使用 hadoop 命令来试跑例子程序,看看运行效果

MapReduce 示例 pi 的程序

1
2
3
4
[hadoop@hadoop1 ~]$ cd apps/hadoop-2.7.5/share/hadoop/mapreduce/
[hadoop@hadoop1 mapreduce]$ pwd
/home/hadoop/apps/hadoop-2.7.5/share/hadoop/mapreduce
[hadoop@hadoop1 mapreduce]$ hadoop jar hadoop-mapreduce-examples-2.7.5.jar pi 5 5

img

MapReduce 示例 wordcount 的程序

1
[hadoop@hadoop1 mapreduce]$ hadoop jar hadoop-mapreduce-examples-2.7.5.jar wordcount /wc/input1/ /wc/output1/

img

查看结果

1
[hadoop@hadoop1 mapreduce]$ hadoop fs -cat /wc/output1/part-r-00000

img

其他程序

那除了这两个程序以外,还有没有官方提供的其他程序呢,还有就是它们的源码在哪里呢?

我们打开 mapreduce 的源码工程,里面有一个 hadoop-mapreduce-project 项目:

img

里面有一个例子程序的子项目:hadoop-mapreduce-examples

img

其中 src 是例子程序源码目录,pom.xml 是该项目的 maven 管理配置文件,我们打开该文件, 找到第 127 行,它告诉了我们例子程序的主程序入口:

img

img

找到src\main\java\org\apache\hadoop\examples目录

img

打开主入口程序,看源代码:

img

找到这一步,我们就能知道其实 wordcount 程序的实际程序就是 WordCount.class,这就是我 们想要找的例子程序的源码。

WordCount.java源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.examples;
19
20 import java.io.IOException;
21 import java.util.StringTokenizer;
22
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.hadoop.io.IntWritable;
26 import org.apache.hadoop.io.Text;
27 import org.apache.hadoop.mapreduce.Job;
28 import org.apache.hadoop.mapreduce.Mapper;
29 import org.apache.hadoop.mapreduce.Reducer;
30 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
31 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
32 import org.apache.hadoop.util.GenericOptionsParser;
33
34 public class WordCount {
35
36 public static class TokenizerMapper
37 extends Mapper<Object, Text, Text, IntWritable>{
38
39 private final static IntWritable one = new IntWritable(1);
40 private Text word = new Text();
41
42 public void map(Object key, Text value, Context context
43 ) throws IOException, InterruptedException {
44 StringTokenizer itr = new StringTokenizer(value.toString());
45 while (itr.hasMoreTokens()) {
46 word.set(itr.nextToken());
47 context.write(word, one);
48 }
49 }
50 }
51
52 public static class IntSumReducer
53 extends Reducer<Text,IntWritable,Text,IntWritable> {
54 private IntWritable result = new IntWritable();
55
56 public void reduce(Text key, Iterable<IntWritable> values,
57 Context context
58 ) throws IOException, InterruptedException {
59 int sum = 0;
60 for (IntWritable val : values) {
61 sum += val.get();
62 }
63 result.set(sum);
64 context.write(key, result);
65 }
66 }
67
68 public static void main(String[] args) throws Exception {
69 Configuration conf = new Configuration();
70 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
71 if (otherArgs.length < 2) {
72 System.err.println("Usage: wordcount <in> [<in>...] <out>");
73 System.exit(2);
74 }
75 Job job = Job.getInstance(conf, "word count");
76 job.setJarByClass(WordCount.class);
77 job.setMapperClass(TokenizerMapper.class);
78 job.setCombinerClass(IntSumReducer.class);
79 job.setReducerClass(IntSumReducer.class);
80 job.setOutputKeyClass(Text.class);
81 job.setOutputValueClass(IntWritable.class);
82 for (int i = 0; i < otherArgs.length - 1; ++i) {
83 FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
84 }
85 FileOutputFormat.setOutputPath(job,
86 new Path(otherArgs[otherArgs.length - 1]));
87 System.exit(job.waitForCompletion(true) ? 0 : 1);
88 }
89 }

MapReduce 示例程序编写及编码规范

上一步,我们查看了 WordCount 这个 MapReduce 程序的源码编写,可以得出几点结论:

1、 该程序有一个 main 方法,来启动任务的运行,其中 job 对象就存储了该程序运行的必要 信息,比如指定 Mapper 类和 Reducer 类 job.setMapperClass(TokenizerMapper.class); job.setReducerClass(IntSumReducer.class);

2、 该程序中的 TokenizerMapper 类继承了 Mapper 类

3、 该程序中的 IntSumReducer 类继承了 Reducer 类

总结:MapReduce 程序的业务编码分为两个大部分,一部分配置程序的运行信息,一部分 编写该 MapReduce 程序的业务逻辑,并且业务逻辑的 map 阶段和 reduce 阶段的代码分别继 承 Mapper 类和 Reducer 类

编写自己的 Wordcount 程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
  1 package com.ghgj.mapreduce.wc.demo;
2
3 import java.io.IOException;
4
5 import org.apache.hadoop.conf.Configuration;
6 import org.apache.hadoop.fs.FileSystem;
7 import org.apache.hadoop.fs.Path;
8 import org.apache.hadoop.io.IntWritable;
9 import org.apache.hadoop.io.LongWritable;
10 import org.apache.hadoop.io.Text;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16
17 /**
18 *
19 * 描述: MapReduce出入门:WordCount例子程序
20 */
21 public class WordCountMR {
22
23 /**
24 * 该main方法是该mapreduce程序运行的入口,其中用一个Job类对象来管理程序运行时所需要的很多参数:
25 * 比如,指定用哪个组件作为数据读取器、数据结果输出器 指定用哪个类作为map阶段的业务逻辑类,哪个类作为reduce阶段的业务逻辑类
26 * 指定wordcount job程序的jar包所在路径 .... 以及其他各种需要的参数
27 */
28 public static void main(String[] args) throws Exception {
29 // 指定hdfs相关的参数
30 Configuration conf = new Configuration();
31 // conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
32 System.setProperty("HADOOP_USER_NAME", "hadoop");
33
34 // 这是高可用的集群的配置文件。如果不是高可用集群,请自行替换配置文件
35 // conf.addResource("hdfs_config/core-site.xml");
36 // conf.addResource("hdfs_config/hdfs-site.xml");
37
38 // conf.set("mapreduce.framework.name", "yarn");
39 // conf.set("yarn.resourcemanager.hostname", "hadoop04");
40
41 // 通过Configuration对象获取Job对象,该job对象会组织所有的该MapReduce程序所有的各种组件
42 Job job = Job.getInstance(conf);
43
44 // 设置jar包所在路径
45 job.setJarByClass(WordCountMR.class);
46
47 // 指定mapper类和reducer类
48 job.setMapperClass(WordCountMapper.class);
49 job.setReducerClass(WordCountReducer.class);
50
51 // Mapper的输入key-value类型,由MapReduce框架决定
52 // 指定maptask的输出类型
53 job.setMapOutputKeyClass(Text.class);
54 job.setMapOutputValueClass(IntWritable.class);
55 // 假如 mapTask的输出key-value类型,跟reduceTask的输出key-value类型一致,那么,以上两句代码可以不用设置
56
57 // reduceTask的输入key-value类型 就是 mapTask的输出key-value类型。所以不需要指定
58 // 指定reducetask的输出类型
59 job.setOutputKeyClass(Text.class);
60 job.setOutputValueClass(IntWritable.class);
61
62 // 为job指定输入数据的组件和输出数据的组件,以下两个参数是默认的,所以不指定也是OK的
63 // job.setInputFormatClass(TextInputFormat.class);
64 // job.setOutputFormatClass(TextOutputFormat.class);
65
66 // 为该mapreduce程序制定默认的数据分区组件。默认是 HashPartitioner.class
67 // job.setPartitionerClass(HashPartitioner.class);
68
69 // 如果MapReduce程序在Eclipse中,运行,也可以读取Windows系统本地的文件系统中的数据
70 Path inputPath = new Path("D:\\bigdata\\wordcount\\input");
71 Path outputPath = new Path("D:\\bigdata\\wordcount\\output33");
72
73 // 设置该MapReduce程序的ReduceTask的个数
74 // job.setNumReduceTasks(3);
75
76 // 指定该mapreduce程序数据的输入和输出路径
77 // Path inputPath = new Path("/wordcount/input");
78 // Path outputPath = new Path("/wordcount/output");
79 // 该段代码是用来判断输出路径存在不存在,存在就删除,虽然方便操作,但请谨慎
80 FileSystem fs = FileSystem.get(conf);
81 if (fs.exists(outputPath)) {
82 fs.delete(outputPath, true);
83 }
84
85 // 设置wordcount程序的输入路径
86 FileInputFormat.setInputPaths(job, inputPath);
87 // 设置wordcount程序的输出路径
88 FileOutputFormat.setOutputPath(job, outputPath);
89
90 // job.submit();
91 // 最后提交任务(verbose布尔值 决定要不要将运行进度信息输出给用户)
92 boolean waitForCompletion = job.waitForCompletion(true);
93 System.exit(waitForCompletion ? 0 : 1);
94 }
95
96 /**
97 * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
98 *
99 * KEYIN 是指框架读取到的数据的key的类型,在默认的InputFormat下,读到的key是一行文本的起始偏移量,所以key的类型是Long
100 * VALUEIN 是指框架读取到的数据的value的类型,在默认的InputFormat下,读到的value是一行文本的内容,所以value的类型是String
101 * KEYOUT 是指用户自定义逻辑方法返回的数据中key的类型,由用户业务逻辑决定,在此wordcount程序中,我们输出的key是单词,所以是String
102 * VALUEOUT 是指用户自定义逻辑方法返回的数据中value的类型,由用户业务逻辑决定,在此wordcount程序中,我们输出的value是单词的数量,所以是Integer
103 *
104 * 但是,String ,Long等jdk中自带的数据类型,在序列化时,效率比较低,hadoop为了提高序列化效率,自定义了一套序列化框架
105 * 所以,在hadoop的程序中,如果该数据需要进行序列化(写磁盘,或者网络传输),就一定要用实现了hadoop序列化框架的数据类型
106 *
107 * Long ----> LongWritable
108 * String ----> Text
109 * Integer ----> IntWritable
110 * Null ----> NullWritable
111 */
112 static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
113
114 /**
115 * LongWritable key : 该key就是value该行文本的在文件当中的起始偏移量
116 * Text value : 就是MapReduce框架默认的数据读取组件TextInputFormat读取文件当中的一行文本
117 */
118 @Override
119 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
120
121 // 切分单词
122 String[] words = value.toString().split(" ");
123 for (String word : words) {
124 // 每个单词计数一次,也就是把单词组织成<hello,1>这样的key-value对往外写出
125 context.write(new Text(word), new IntWritable(1));
126 }
127 }
128 }
129
130 /**
131 * 首先,和前面一样,Reducer类也有输入和输出,输入就是Map阶段的处理结果,输出就是Reduce最后的输出
132 * reducetask在调我们写的reduce方法,reducetask应该收到了前一阶段(map阶段)中所有maptask输出的数据中的一部分
133 * (数据的key.hashcode%reducetask数==本reductask号),所以reducetaks的输入类型必须和maptask的输出类型一样
134 *
135 * reducetask将这些收到kv数据拿来处理时,是这样调用我们的reduce方法的: 先将自己收到的所有的kv对按照k分组(根据k是否相同)
136 * 将某一组kv中的第一个kv中的k传给reduce方法的key变量,把这一组kv中所有的v用一个迭代器传给reduce方法的变量values
137 */
138 static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
139
140 /**
141 * Text key : mapTask输出的key值
142 * Iterable<IntWritable> values : key对应的value的集合(该key只是相同的一个key)
143 *
144 * reduce方法接收key值相同的一组key-value进行汇总计算
145 */
146 @Override
147 protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
148
149 // 结果汇总
150 int sum = 0;
151 for (IntWritable v : values) {
152 sum += v.get();
153 }
154 // 汇总的结果往外输出
155 context.write(key, new IntWritable(sum));
156 }
157 }
158 }

MapReduce 程序编写规范

1、用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行 MR 程序的客户端)

2、Mapper 的输入数据是 KV 对的形式(KV 的类型可自定义)

3、Mapper 的输出数据是 KV 对的形式(KV 的类型可自定义)

4、Mapper 中的业务逻辑写在 map()方法中

5、map()方法(maptask 进程)对每一个<k,v>调用一次

6、Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV 对的形式

7、Reducer 的业务逻辑写在 reduce()方法中

8、Reducetask 进程对每一组相同 k 的<k,v>组调用一次 reduce()方法

9、用户自定义的 Mapper 和 Reducer 都要继承各自的父类

10、整个程序需要一个 Drvier 来进行提交,提交的是一个描述了各种必要信息的 job 对象

WordCount 的业务逻辑

1、 maptask 阶段处理每个数据分块的单词统计分析,思路是每遇到一个单词则把其转换成 一个 key-value 对,比如单词 hello,就转换成<’hello’,1>发送给 reducetask 去汇总

2、 reducetask 阶段将接受 maptask 的结果,来做汇总计数

MapReduce 运行方式及 Debug

集群运行模式

打 jar 包,提交任务到集群运行,适用:生产环境,不适用:测试,调试,开发

要点一:首先要把代码打成 jar 上传到 linux 服务器

要点二:用 hadoop jar 的命令去提交代码到 yarn 集群运行

要点三:处理的数据和输出结果应该位于 hdfs 文件系统

要点四:如果需要在 windows 中的 eclipse 当中直接提交 job 到集群,则需要修改 YarnRunner 类,这个比较复杂,不建议使用

本地运行模式

Eclipse 开发环境下本地运行,好处是方便调试和测试

直接在IDE环境中进行环境 : eclipse

1、直接运行在本地,读取本地数据

2、直接运行在本地,读取远程的文件系统的数据

3、直接在IDE中提交任务给YARN集群运行