logo头像
Snippet 博客主题

Hadoop学习之路(二十二)MapReduce的输入和输出

** Hadoop学习之路(二十二)MapReduce的输入和输出:** <Excerpt in index | 首页摘要>

​ Hadoop学习之路(二十二)MapReduce的输入和输出

<The rest of contents | 余下全文>

MapReduce的输入

作为一个会编写MR程序的人来说,知道map方法的参数是默认的数据读取组件读取到的一行数据

1、是谁在读取? 是谁在调用这个map方法?

查看源码Mapper.java知道是run方法在调用map方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
 1 /**
2 *
3 * 找出谁在调用Run方法
4 *
5 *
6 * 有一个组件叫做:MapTask
7 *
8 * 就会有对应的方法在调用mapper.run(context);
9 *
10 *
11 * context.nextKeyValue() ====== lineRecordReader.nextKeyValue();
12 */
13 public void run(Context context) throws IOException, InterruptedException {
14
15 /**
16 * 在每一个mapTask被初始化出来的时候,就会被调用一次
17 */
18 setup(context);
19 try {
20
21 /**
22 * 数据读取组件每次读取到一行,都交给map方法执行一次
23 *
24 *
25 * context.nextKeyValue()的意义有连点:
26 *
27 * 1、读取一个key-value到该context对象中的两个属性中:key-value
28 * 2、方法的返回值并不是读取到的key-value,是标志有没有读取到key_value的布尔值
29 *
30 *
31 * context.getCurrentKey() ==== key
32 * context.getCurrentValue() ==== value
33 *
34 *
35 *
36 * 依赖于最底层的 LineRecordReader的实现
37 *
38 * 你的nextKeyValue方法的返回结果中,一定要包含 false
39 */
40 while (context.nextKeyValue()) {
41 map(context.getCurrentKey(), context.getCurrentValue(), context);
42 }
43
44 } finally {
45
46 /**
47 * 当前这个mapTask在执行完毕所有的该切片数据之后,会执行
48 */
49 cleanup(context);
50 }
51 }

此处map方法中有四个重要的方法:

1、context.nextKeyValue(); //负责读取数据,但是方法的返回值却不是读取到的key-value,而是返回了一个标识有没有读取到数据的布尔值

2、context.getCurrentKey(); //负责获取context.nextKeyValue() 读取到的key

3、context.getCurrentValue(); //负责获取context.nextKeyValue() 读取到的value

4、context.write(key,value); //负责输出mapper阶段输出的数据

2、谁在调用run方法?context参数怎么来的,是什么?

共同答案:找到了谁在调用run方法,那么就能知道这个谁就会给run方法传入一个参数叫做:context

最开始,mapper.run(context)是由mapTask实例对象进行调用

查看源码MapTask.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
 1 @Override
2 public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
3 throws IOException, ClassNotFoundException, InterruptedException {
4 this.umbilical = umbilical;
5
6 if (isMapTask()) {
7 // If there are no reducers then there won't be any sort. Hence the
8 // map
9 // phase will govern the entire attempt's progress.
10 if (conf.getNumReduceTasks() == 0) {
11 mapPhase = getProgress().addPhase("map", 1.0f);
12 } else {
13 // If there are reducers then the entire attempt's progress will
14 // be
15 // split between the map phase (67%) and the sort phase (33%).
16 mapPhase = getProgress().addPhase("map", 0.667f);
17 sortPhase = getProgress().addPhase("sort", 0.333f);
18 }
19 }
20 TaskReporter reporter = startReporter(umbilical);
21
22 boolean useNewApi = job.getUseNewMapper();
23 initialize(job, getJobID(), reporter, useNewApi);
24
25 // check if it is a cleanupJobTask
26 if (jobCleanup) {
27 runJobCleanupTask(umbilical, reporter);
28 return;
29 }
30 if (jobSetup) {
31 runJobSetupTask(umbilical, reporter);
32 return;
33 }
34 if (taskCleanup) {
35 runTaskCleanupTask(umbilical, reporter);
36 return;
37 }
38
39 /**
40 * run方法的核心:
41 *
42 * 新的API
43 */
44
45 if (useNewApi) {
46 /**
47 * jobConf对象, splitMetaInfo 切片信息 umbilical 通信协议
48 * reporter就是包含了各种计数器的一个对象
49 */
50 runNewMapper(job, splitMetaInfo, umbilical, reporter);
51 } else {
52 runOldMapper(job, splitMetaInfo, umbilical, reporter);
53 }
54
55 done(umbilical, reporter);
56 }

得出伪代码调动新的API

1
2
3
4
5
1        mapTask.run(){
2 runNewMapper(){
3 mapper.run(mapperContext);
4 }
5 }

3、查看runNewMapper方法

发现此方法还是在MapTask.java中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
  1 /**
2 * 这就是具体的调用逻辑的核心;
3 *
4 *
5 * mapper.run(context);
6 *
7 *
8 *
9 * @param job
10 * @param splitIndex
11 * @param umbilical
12 * @param reporter
13 * @throws IOException
14 * @throws ClassNotFoundException
15 * @throws InterruptedException
16 */
17 @SuppressWarnings("unchecked")
18 private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runNewMapper(final JobConf job, final TaskSplitIndex splitIndex,
19 final TaskUmbilicalProtocol umbilical, TaskReporter reporter)
20 throws IOException, ClassNotFoundException, InterruptedException {
21 // make a task context so we can get the classes
22 org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(
23 job, getTaskID(), reporter);
24 // make a mapper
25 org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE>) ReflectionUtils
26 .newInstance(taskContext.getMapperClass(), job);
27
28
29
30
31 /**
32 * inputFormat.createRecordReader() === RecordReader real
33 *
34 *
35 * inputFormat就是TextInputFormat类的实例对象
36 *
37 * TextInputFormat组件中的createRecordReader方法的返回值就是 LineRecordReader的实例对象
38 */
39 // make the input format
40 org.apache.hadoop.mapreduce.InputFormat<INKEY, INVALUE> inputFormat =
41 (org.apache.hadoop.mapreduce.InputFormat<INKEY, INVALUE>)
42 ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
43
44
45
46
47
48 // rebuild the input split
49 org.apache.hadoop.mapreduce.InputSplit split = null;
50 split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset());
51 LOG.info("Processing split: " + split);
52
53 /**
54 * NewTrackingRecordReader这个类中一定有三个方法:
55 *
56 * nextKeyValue
57 * getCurrentKey
58 * getCurrentValue
59 *
60 * NewTrackingRecordReader的里面的三个方法的实现
61 * 其实是依赖于于inputFormat对象的createRecordReader方法的返回值的 三个方法的实现
62 *
63 * 默认的InputFormat: TextInputFormat
64 * 默认的RecordReader:LineRecordReader
65 *
66 *
67 * 最终:NewTrackingRecordReader的三个方法的实现是依赖于:LineRecordReader这个类中的三个同名方法的实现
68 */
69 org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> input =
70 new NewTrackingRecordReader<INKEY, INVALUE>(
71 split, inputFormat, reporter, taskContext);
72
73 job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
74
75
76
77
78
79 /**
80 * 声明一个Output对象用来给mapper的key-value进行输出
81 */
82 org.apache.hadoop.mapreduce.RecordWriter output = null;
83 // get an output object
84 if (job.getNumReduceTasks() == 0) {
85
86 /**
87 * NewDirectOutputCollector 直接输出的一个收集器, 这个类中一定有一个方法 叫做 write
88 */
89 output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
90 } else {
91
92
93 /**
94 * 有reducer阶段了。
95 *
96 * 1、能确定,一定会排序
97 *
98 * 2、能否确定一定会使用Parititioner, 不一定。 在逻辑上可以任务没有起作用。
99 *
100 * NewOutputCollector 这个类当中,一定有一个方法:write方法
101 */
102 output = new NewOutputCollector(taskContext, job, umbilical, reporter);
103 }
104
105
106
107
108
109 /**
110 * mapContext对象中一定包含三个方法
111 *
112 * 找到了之前第一查看源码实现的方法的问题的答案:
113 *
114 * 问题:找到谁调用MapContextImpl这个类的构造方法
115 *
116 * mapContext就是MapContextImpl的实例对象
117 *
118 * MapContextImpl类中一定有三个方法:
119 *
120 * input === NewTrackingRecordReader
121 *
122 *
123 *
124 * 确定的知识:
125 *
126 * 1、mapContext对象中,一定有write方法
127 *
128 * 2、通过观看MapContextImpl的组成,发现其实没有write方法
129 *
130 * 解决:
131 *
132 * 其实mapContext.write方法的调用是来自于MapContextImpl这个类的父类
133 *
134 *
135 *
136 * 最底层的write方法: output.write();
137 */
138 org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext =
139 new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(
140 job, getTaskID(), input, output, committer, reporter, split);
141
142 /**
143 * mapperContext的内部一定包含是三个犯法:
144 *
145 * nextKeyValue
146 * getCurrentKey
147 * getCurrentValue
148 *
149 * mapperContext的具体实现是依赖于new Context(context);
150 * context = mapContext
151 *
152 * 结论:
153 *
154 * mapContext对象的内部一定包含以下三个方法:
155 *
156 * nextKeyValue
157 * getCurrentKey
158 * getCurrentValue
159 *
160 *
161 * mapContext 中 也有一个方法叫做:write(key,value)
162 */
163 org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE>.Context mapperContext =
164 new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>()
165 .getMapContext(mapContext);
166
167 try {
168
169
170
171
172 input.initialize(split, mapperContext);
173
174
175
176 /**
177 * 复杂调用整个mapTask执行的入口
178 *
179 * 方法的逻辑构成:
180 *
181 * 1、重点方法在最后,或者在try中
182 * 2、其他的代码,几乎只有两个任务:一个用来记录记日志或者完善流程。。 一个准备核心方法的参数
183 */
184 mapper.run(mapperContext);
185
186
187
188 mapPhase.complete();
189 setPhase(TaskStatus.Phase.SORT);
190 statusUpdate(umbilical);
191 input.close();
192 input = null;
193 output.close(mapperContext);
194 output = null;
195
196
197
198 } finally {
199 closeQuietly(input);
200 closeQuietly(output, mapperContext);
201 }
202 }

能确定的是:mapperContext一定有上面说的那四个重要的方法,往上继续查找mapperContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 /**
143 * mapperContext的内部一定包含是三个犯法:
144 *
145 * nextKeyValue
146 * getCurrentKey
147 * getCurrentValue
148 *
149 * mapperContext的具体实现是依赖于new Context(context);
150 * context = mapContext
151 *
152 * 结论:
153 *
154 * mapContext对象的内部一定包含以下三个方法:
155 *
156 * nextKeyValue
157 * getCurrentKey
158 * getCurrentValue
159 *
160 *
161 * mapContext 中 也有一个方法叫做:write(key,value)
162 */
163 org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE>.Context mapperContext =
164 new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>()
165 .getMapContext(mapContext);

查看WrappedMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.mapreduce.lib.map;
20
21 import java.io.IOException;
22 import java.net.URI;
23
24 import org.apache.hadoop.classification.InterfaceAudience;
25 import org.apache.hadoop.classification.InterfaceStability;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.conf.Configuration.IntegerRanges;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.io.RawComparator;
30 import org.apache.hadoop.mapreduce.Counter;
31 import org.apache.hadoop.mapreduce.InputFormat;
32 import org.apache.hadoop.mapreduce.InputSplit;
33 import org.apache.hadoop.mapreduce.JobID;
34 import org.apache.hadoop.mapreduce.MapContext;
35 import org.apache.hadoop.mapreduce.Mapper;
36 import org.apache.hadoop.mapreduce.OutputCommitter;
37 import org.apache.hadoop.mapreduce.OutputFormat;
38 import org.apache.hadoop.mapreduce.Partitioner;
39 import org.apache.hadoop.mapreduce.Reducer;
40 import org.apache.hadoop.mapreduce.TaskAttemptID;
41 import org.apache.hadoop.security.Credentials;
42
43 /**
44 * A {@link Mapper} which wraps a given one to allow custom
45 * {@link Mapper.Context} implementations.
46 */
47 @InterfaceAudience.Public
48 @InterfaceStability.Evolving
49 public class WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
50
51 /**
52 * Get a wrapped {@link Mapper.Context} for custom implementations.
53 *
54 * @param mapContext
55 * <code>MapContext</code> to be wrapped
56 * @return a wrapped <code>Mapper.Context</code> for custom implementations
57 */
58 public Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context getMapContext(
59 MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {
60 return new Context(mapContext);
61 }
62
63 @InterfaceStability.Evolving
64 public class Context extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context {
65
66 protected MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext;
67
68 public Context(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {
69 this.mapContext = mapContext;
70 }
71
72 /**
73 * Get the input split for this map.
74 */
75 public InputSplit getInputSplit() {
76 return mapContext.getInputSplit();
77 }
78
79 @Override
80 public KEYIN getCurrentKey() throws IOException, InterruptedException {
81 return mapContext.getCurrentKey();
82 }
83
84 @Override
85 public VALUEIN getCurrentValue() throws IOException, InterruptedException {
86 return mapContext.getCurrentValue();
87 }
88
89 @Override
90 public boolean nextKeyValue() throws IOException, InterruptedException {
91 return mapContext.nextKeyValue();
92 }
93
94 @Override
95 public Counter getCounter(Enum<?> counterName) {
96 return mapContext.getCounter(counterName);
97 }
98
99 @Override
100 public Counter getCounter(String groupName, String counterName) {
101 return mapContext.getCounter(groupName, counterName);
102 }
103
104 @Override
105 public OutputCommitter getOutputCommitter() {
106 return mapContext.getOutputCommitter();
107 }
108
109 @Override
110 public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException {
111 mapContext.write(key, value);
112 }
113
114 @Override
115 public String getStatus() {
116 return mapContext.getStatus();
117 }
118
119 @Override
120 public TaskAttemptID getTaskAttemptID() {
121 return mapContext.getTaskAttemptID();
122 }
123
124 @Override
125 public void setStatus(String msg) {
126 mapContext.setStatus(msg);
127 }
128
129 @Override
130 public Path[] getArchiveClassPaths() {
131 return mapContext.getArchiveClassPaths();
132 }
133
134 @Override
135 public String[] getArchiveTimestamps() {
136 return mapContext.getArchiveTimestamps();
137 }
138
139 @Override
140 public URI[] getCacheArchives() throws IOException {
141 return mapContext.getCacheArchives();
142 }
143
144 @Override
145 public URI[] getCacheFiles() throws IOException {
146 return mapContext.getCacheFiles();
147 }
148
149 @Override
150 public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass() throws ClassNotFoundException {
151 return mapContext.getCombinerClass();
152 }
153
154 @Override
155 public Configuration getConfiguration() {
156 return mapContext.getConfiguration();
157 }
158
159 @Override
160 public Path[] getFileClassPaths() {
161 return mapContext.getFileClassPaths();
162 }
163
164 @Override
165 public String[] getFileTimestamps() {
166 return mapContext.getFileTimestamps();
167 }
168
169 @Override
170 public RawComparator<?> getCombinerKeyGroupingComparator() {
171 return mapContext.getCombinerKeyGroupingComparator();
172 }
173
174 @Override
175 public RawComparator<?> getGroupingComparator() {
176 return mapContext.getGroupingComparator();
177 }
178
179 @Override
180 public Class<? extends InputFormat<?, ?>> getInputFormatClass() throws ClassNotFoundException {
181 return mapContext.getInputFormatClass();
182 }
183
184 @Override
185 public String getJar() {
186 return mapContext.getJar();
187 }
188
189 @Override
190 public JobID getJobID() {
191 return mapContext.getJobID();
192 }
193
194 @Override
195 public String getJobName() {
196 return mapContext.getJobName();
197 }
198
199 @Override
200 public boolean getJobSetupCleanupNeeded() {
201 return mapContext.getJobSetupCleanupNeeded();
202 }
203
204 @Override
205 public boolean getTaskCleanupNeeded() {
206 return mapContext.getTaskCleanupNeeded();
207 }
208
209 @Override
210 public Path[] getLocalCacheArchives() throws IOException {
211 return mapContext.getLocalCacheArchives();
212 }
213
214 @Override
215 public Path[] getLocalCacheFiles() throws IOException {
216 return mapContext.getLocalCacheFiles();
217 }
218
219 @Override
220 public Class<?> getMapOutputKeyClass() {
221 return mapContext.getMapOutputKeyClass();
222 }
223
224 @Override
225 public Class<?> getMapOutputValueClass() {
226 return mapContext.getMapOutputValueClass();
227 }
228
229 @Override
230 public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass() throws ClassNotFoundException {
231 return mapContext.getMapperClass();
232 }
233
234 @Override
235 public int getMaxMapAttempts() {
236 return mapContext.getMaxMapAttempts();
237 }
238
239 @Override
240 public int getMaxReduceAttempts() {
241 return mapContext.getMaxReduceAttempts();
242 }
243
244 @Override
245 public int getNumReduceTasks() {
246 return mapContext.getNumReduceTasks();
247 }
248
249 @Override
250 public Class<? extends OutputFormat<?, ?>> getOutputFormatClass() throws ClassNotFoundException {
251 return mapContext.getOutputFormatClass();
252 }
253
254 @Override
255 public Class<?> getOutputKeyClass() {
256 return mapContext.getOutputKeyClass();
257 }
258
259 @Override
260 public Class<?> getOutputValueClass() {
261 return mapContext.getOutputValueClass();
262 }
263
264 @Override
265 public Class<? extends Partitioner<?, ?>> getPartitionerClass() throws ClassNotFoundException {
266 return mapContext.getPartitionerClass();
267 }
268
269 @Override
270 public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass() throws ClassNotFoundException {
271 return mapContext.getReducerClass();
272 }
273
274 @Override
275 public RawComparator<?> getSortComparator() {
276 return mapContext.getSortComparator();
277 }
278
279 @Override
280 public boolean getSymlink() {
281 return mapContext.getSymlink();
282 }
283
284 @Override
285 public Path getWorkingDirectory() throws IOException {
286 return mapContext.getWorkingDirectory();
287 }
288
289 @Override
290 public void progress() {
291 mapContext.progress();
292 }
293
294 @Override
295 public boolean getProfileEnabled() {
296 return mapContext.getProfileEnabled();
297 }
298
299 @Override
300 public String getProfileParams() {
301 return mapContext.getProfileParams();
302 }
303
304 @Override
305 public IntegerRanges getProfileTaskRange(boolean isMap) {
306 return mapContext.getProfileTaskRange(isMap);
307 }
308
309 @Override
310 public String getUser() {
311 return mapContext.getUser();
312 }
313
314 @Override
315 public Credentials getCredentials() {
316 return mapContext.getCredentials();
317 }
318
319 @Override
320 public float getProgress() {
321 return mapContext.getProgress();
322 }
323 }
324 }

此类里面一定有那4个重要的方法,发现调用了mapContext,继续往上找

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
110 * mapContext对象中一定包含三个方法
111 *
112 * 找到了之前第一查看源码实现的方法的问题的答案:
113 *
114 * 问题:找到谁调用MapContextImpl这个类的构造方法
115 *
116 * mapContext就是MapContextImpl的实例对象
117 *
118 * MapContextImpl类中一定有三个方法:
119 *
120 * input === NewTrackingRecordReader
121 *
122 *
123 *
124 * 确定的知识:
125 *
126 * 1、mapContext对象中,一定有write方法
127 *
128 * 2、通过观看MapContextImpl的组成,发现其实没有write方法
129 *
130 * 解决:
131 *
132 * 其实mapContext.write方法的调用是来自于MapContextImpl这个类的父类
133 *
134 *
135 *
136 * 最底层的write方法: output.write();
137 */
138 org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext =
139 new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(
140 job, getTaskID(), input, output, committer, reporter, split);

mapConext就是这个类MapContextImpl的实例对象

继续确定:

1
2
3
4
5
6
7
mapConext = new MapContextImpl(input)
mapConext.nextKeyVlaue(){

LineRecordReader real = input.createRecordReader();

real.nextKeyValue();
}

查看MapContextImpl.java源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
 1 public class MapContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
2 extends TaskInputOutputContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
3 implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
4
5
6 private RecordReader<KEYIN, VALUEIN> reader;
7 private InputSplit split;
8
9 public MapContextImpl(Configuration conf,
10 TaskAttemptID taskid,
11 RecordReader<KEYIN, VALUEIN> reader,
12 RecordWriter<KEYOUT, VALUEOUT> writer,
13 OutputCommitter committer,
14 StatusReporter reporter,
15 InputSplit split) {
16
17
18
19 // 通过super调用父类的构造方法
20 super(conf, taskid, writer, committer, reporter);
21
22
23
24 this.reader = reader;
25 this.split = split;
26 }
27
28 /**
29 * Get the input split for this map.
30 */
31 public InputSplit getInputSplit() {
32 return split;
33 }

40 @Override
41 public KEYIN getCurrentKey() throws IOException, InterruptedException {
42 return reader.getCurrentKey();
43 }
44
45 @Override
46 public VALUEIN getCurrentValue() throws IOException, InterruptedException {
47 return reader.getCurrentValue();
48 }
49
50 @Override
51 public boolean nextKeyValue() throws IOException, InterruptedException {
52 return reader.nextKeyValue();
53 }
54
55
56
57
58 }