logo头像
Snippet 博客主题

Hadoop学习之路(二十)MapReduce求TopN

** Hadoop学习之路(二十)MapReduce求TopN:** <Excerpt in index | 首页摘要>

​ 在Hadoop中,排序是MapReduce的灵魂,MapTask和ReduceTask均会对数据按Key排序,这个操作是MR框架的默认行为,不管你的业务逻辑上是否需要这一操作。

<The rest of contents | 余下全文>

技术点

MapReduce框架中,用到的排序主要有两种:快速排序和基于堆实现的优先级队列(PriorityQueue)。

Mapper阶段

从map输出到环形缓冲区的数据会被排序(这是MR框架中改良的快速排序),这个排序涉及partition和key,当缓冲区容量占用80%,会spill数据到磁盘,生成IFile文件,Map结束后,会将IFile文件排序合并成一个大文件(基于堆实现的优先级队列),以供不同的reduce来拉取相应的数据。

Reducer阶段

从Mapper端取回的数据已是部分有序,Reduce Task只需进行一次归并排序即可保证数据整体有序。为了提高效率,Hadoop将sort阶段和reduce阶段并行化,在sort阶段,Reduce Task为内存和磁盘中的文件建立了小顶堆,保存了指向该小顶堆根节点的迭代器,并不断的移动迭代器,以将key相同的数据顺次交给reduce()函数处理,期间移动迭代器的过程实际上就是不断调整小顶堆的过程(建堆→取堆顶元素→重新建堆→取堆顶元素…),这样,sort和reduce可以并行进行。

分组Top N分析

在数据处理中,经常会碰到这样一个场景,对表数据按照某一字段分组,然后找出各自组内最大的几条记录情形。针对这种分组Top N问题,我们利用Hive、MapReduce等多种工具实现一下。

场景模拟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
computer,huangxiaoming,85,86,41,75,93,42,85
computer,xuzheng,54,52,86,91,42
computer,huangbo,85,42,96,38
english,zhaobenshan,54,52,86,91,42,85,75
english,liuyifei,85,41,75,21,85,96,14
algorithm,liuyifei,75,85,62,48,54,96,15
computer,huangjiaju,85,75,86,85,85
english,liuyifei,76,95,86,74,68,74,48
english,huangdatou,48,58,67,86,15,33,85
algorithm,huanglei,76,95,86,74,68,74,48
algorithm,huangjiaju,85,75,86,85,85,74,86
computer,huangdatou,48,58,67,86,15,33,85
english,zhouqi,85,86,41,75,93,42,85,75,55,47,22
english,huangbo,85,42,96,38,55,47,22
algorithm,liutao,85,75,85,99,66
computer,huangzitao,85,86,41,75,93,42,85
math,wangbaoqiang,85,86,41,75,93,42,85
computer,liujialing,85,41,75,21,85,96,14,74,86
computer,liuyifei,75,85,62,48,54,96,15
computer,liutao,85,75,85,99,66,88,75,91
computer,huanglei,76,95,86,74,68,74,48
english,liujialing,75,85,62,48,54,96,15
math,huanglei,76,95,86,74,68,74,48
math,huangjiaju,85,75,86,85,85,74,86
math,liutao,48,58,67,86,15,33,85
english,huanglei,85,75,85,99,66,88,75,91
math,xuzheng,54,52,86,91,42,85,75
math,huangxiaoming,85,75,85,99,66,88,75,91
math,liujialing,85,86,41,75,93,42,85,75
english,huangxiaoming,85,86,41,75,93,42,85
algorithm,huangdatou,48,58,67,86,15,33,85
algorithm,huangzitao,85,86,41,75,93,42,85,75

一、数据解释

数据字段个数不固定:
第一个是课程名称,总共四个课程,computer,math,english,algorithm,
第二个是学生姓名,后面是每次考试的分数

二、统计需求:
1、统计每门课程的参考人数和课程平均分

2、统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数

3、求出每门课程参考学生成绩最高的学生的信息:课程,姓名和平均分

第一题

CourseScoreMR1.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
  1 import java.io.IOException;
2
3 import org.apache.hadoop.conf.Configuration;
4 import org.apache.hadoop.fs.FileSystem;
5 import org.apache.hadoop.fs.Path;
6 import org.apache.hadoop.io.DoubleWritable;
7 import org.apache.hadoop.io.LongWritable;
8 import org.apache.hadoop.io.Text;
9 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.Mapper;
11 import org.apache.hadoop.mapreduce.Reducer;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14
15 public class CourseScoreMR1 {
16
17 public static void main(String[] args) throws Exception {
18
19 Configuration conf = new Configuration();
20 FileSystem fs = FileSystem.get(conf);
21 Job job = Job.getInstance(conf);
22
23
24 job.setJarByClass(CourseScoreMR1.class);
25 job.setMapperClass(CourseScoreMR1Mapper.class);
26 job.setReducerClass(CourseScoreMR1Reducer.class);
27
28 job.setMapOutputKeyClass(Text.class);
29 job.setMapOutputValueClass(DoubleWritable.class);
30 job.setOutputKeyClass(Text.class);
31 job.setOutputValueClass(Text.class);
32
33
34 Path inputPath = new Path("E:\\bigdata\\cs\\input");
35 Path outputPath = new Path("E:\\bigdata\\cs\\output_1");
36 FileInputFormat.setInputPaths(job, inputPath);
37 if(fs.exists(outputPath)){
38 fs.delete(outputPath, true);
39 }
40 FileOutputFormat.setOutputPath(job, outputPath);
41
42
43 boolean isDone = job.waitForCompletion(true);
44 System.exit(isDone ? 0 : 1);
45 }
46
47 public static class CourseScoreMR1Mapper extends Mapper<LongWritable, Text, Text, DoubleWritable>{
48
49 /**
50 * 数据的三个字段: course , name, score
51 *
52 * value == algorithm,huangzitao,85,86,41,75,93,42,85,75
53 *
54 * 输出的key和value:
55 *
56 * key : course
57 *
58 * value : avgScore
59 *
60 * 格式化数值相关的操作的API : NumberFormat
61 * SimpleDateFormat
62 */
63
64 Text outKey = new Text();
65 DoubleWritable outValue = new DoubleWritable();
66
67 @Override
68 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
69
70 String[] split = value.toString().split(",");
71
72 String course = split[0];
73
74 int sum = 0;
75 int count = 0;
76
77 for(int i = 2; i<split.length; i++){
78 int tempScore = Integer.parseInt(split[i]);
79 sum += tempScore;
80
81 count++;
82 }
83
84 double avgScore = 1D * sum / count;
85
86
87 outKey.set(course);
88 outValue.set(avgScore);
89
90 context.write(outKey, outValue);
91 }
92
93 }
94
95 public static class CourseScoreMR1Reducer extends Reducer<Text, DoubleWritable, Text, Text>{
96
97
98 Text outValue = new Text();
99 /**
100 * key : course
101 *
102 * values : 98.7 87.6
103 */
104 @Override
105 protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
106
107 double sum = 0;
108 int count = 0;
109
110 for(DoubleWritable dw : values){
111 sum += dw.get();
112 count ++;
113 }
114
115 double lastAvgScore = sum / count;
116
117 outValue.set(count+"\t" + lastAvgScore);
118
119 context.write(key, outValue);
120 }
121 }
122 }

第二题

CourseScoreMR2.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
 1 import java.io.IOException;
2
3 import org.apache.hadoop.conf.Configuration;
4 import org.apache.hadoop.fs.FileSystem;
5 import org.apache.hadoop.fs.Path;
6 import org.apache.hadoop.io.LongWritable;
7 import org.apache.hadoop.io.NullWritable;
8 import org.apache.hadoop.io.Text;
9 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.Mapper;
11 import org.apache.hadoop.mapreduce.Reducer;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14
15 import com.ghgj.mr.exercise.pojo.CourseScore;
16 import com.ghgj.mr.exercise.ptn.CSPartitioner;
17
18 public class CourseScoreMR2{
19
20 public static void main(String[] args) throws Exception {
21
22 Configuration conf = new Configuration();
23
24 FileSystem fs = FileSystem.get(conf);
25 Job job = Job.getInstance(conf);
26
27
28 job.setJarByClass(CourseScoreMR2.class);
29 job.setMapperClass(CourseScoreMR2Mapper.class);
30 // job.setReducerClass(CourseScoreMR2Reducer.class);
31
32 job.setMapOutputKeyClass(CourseScore.class);
33 job.setMapOutputValueClass(NullWritable.class);
34 // job.setOutputKeyClass(CourseScore.class);
35 // job.setOutputValueClass(NullWritable.class);
36
37
38 job.setPartitionerClass(CSPartitioner.class);
39 job.setNumReduceTasks(4);
40
41
42 Path inputPath = new Path("E:\\bigdata\\cs\\input");
43 Path outputPath = new Path("E:\\bigdata\\cs\\output_2");
44 FileInputFormat.setInputPaths(job, inputPath);
45 if(fs.exists(outputPath)){
46 fs.delete(outputPath, true);
47 }
48 FileOutputFormat.setOutputPath(job, outputPath);
49
50
51 boolean isDone = job.waitForCompletion(true);
52 System.exit(isDone ? 0 : 1);
53 }
54
55 public static class CourseScoreMR2Mapper extends Mapper<LongWritable, Text, CourseScore, NullWritable>{
56
57 CourseScore cs = new CourseScore();
58
59 /**
60 * value = math,huangxiaoming,85,75,85,99,66,88,75,91
61 */
62 @Override
63 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
64
65 String[] split = value.toString().split(",");
66
67 String course = split[0];
68 String name = split[1];
69
70 int sum = 0;
71 int count = 0;
72
73 for(int i = 2; i<split.length; i++){
74 int tempScore = Integer.parseInt(split[i]);
75 sum += tempScore;
76
77 count++;
78 }
79
80 double avgScore = 1D * sum / count;
81
82 cs.setCourse(course);
83 cs.setName(name);
84 cs.setScore(avgScore);
85
86 context.write(cs, NullWritable.get());
87 }
88
89 }
90
91 public static class CourseScoreMR2Reducer extends Reducer<CourseScore, NullWritable, CourseScore, NullWritable>{
92
93 @Override
94 protected void reduce(CourseScore key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
95
96
97 }
98 }
99 }

CSPartitioner.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
 1 import org.apache.hadoop.io.NullWritable;
2 import org.apache.hadoop.mapreduce.Partitioner;
3
4 import com.ghgj.mr.exercise.pojo.CourseScore;
5
6 public class CSPartitioner extends Partitioner<CourseScore,NullWritable>{
7
8 /**
9 *
10 */
11 @Override
12 public int getPartition(CourseScore key, NullWritable value, int numPartitions) {
13
14 String course = key.getCourse();
15 if(course.equals("math")){
16 return 0;
17 }else if(course.equals("english")){
18 return 1;
19 }else if(course.equals("computer")){
20 return 2;
21 }else{
22 return 3;
23 }
24
25
26 }
27
28
29 }

第三题

CourseScoreMR3.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
  1 import java.io.IOException;
2
3 import org.apache.hadoop.conf.Configuration;
4 import org.apache.hadoop.fs.FileSystem;
5 import org.apache.hadoop.fs.Path;
6 import org.apache.hadoop.io.LongWritable;
7 import org.apache.hadoop.io.NullWritable;
8 import org.apache.hadoop.io.Text;
9 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.Mapper;
11 import org.apache.hadoop.mapreduce.Reducer;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14
15 import com.ghgj.mr.exercise.gc.CourseScoreGC;
16 import com.ghgj.mr.exercise.pojo.CourseScore;
17
18 public class CourseScoreMR3{
19
20 private static final int TOPN = 3;
21
22 public static void main(String[] args) throws Exception {
23
24 Configuration conf = new Configuration();
25 FileSystem fs = FileSystem.get(conf);
26 Job job = Job.getInstance(conf);
27
28
29 job.setJarByClass(CourseScoreMR3.class);
30 job.setMapperClass(CourseScoreMR2Mapper.class);
31 job.setReducerClass(CourseScoreMR2Reducer.class);
32
33 job.setMapOutputKeyClass(CourseScore.class);
34 job.setMapOutputValueClass(NullWritable.class);
35 job.setOutputKeyClass(CourseScore.class);
36 job.setOutputValueClass(NullWritable.class);
37
38
39 // job.setPartitionerClass(CSPartitioner.class);
40 // job.setNumReduceTasks(4);
41
42
43 // 指定分组规则
44 job.setGroupingComparatorClass(CourseScoreGC.class);
45
46
47 Path inputPath = new Path("E:\\bigdata\\cs\\input");
48 Path outputPath = new Path("E:\\bigdata\\cs\\output_3_last");
49 FileInputFormat.setInputPaths(job, inputPath);
50 if(fs.exists(outputPath)){
51 fs.delete(outputPath, true);
52 }
53 FileOutputFormat.setOutputPath(job, outputPath);
54
55
56 boolean isDone = job.waitForCompletion(true);
57 System.exit(isDone ? 0 : 1);
58 }
59
60 public static class CourseScoreMR2Mapper extends Mapper<LongWritable, Text, CourseScore, NullWritable>{
61
62 CourseScore cs = new CourseScore();
63
64 /**
65 * value = math,huangxiaoming,85,75,85,99,66,88,75,91
66 */
67 @Override
68 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
69
70 String[] split = value.toString().split(",");
71
72 String course = split[0];
73 String name = split[1];
74
75 int sum = 0;
76 int count = 0;
77
78 for(int i = 2; i<split.length; i++){
79 int tempScore = Integer.parseInt(split[i]);
80 sum += tempScore;
81
82 count++;
83 }
84
85 double avgScore = 1D * sum / count;
86
87 cs.setCourse(course);
88 cs.setName(name);
89 cs.setScore(avgScore);
90
91 context.write(cs, NullWritable.get());
92 }
93
94 }
95
96 public static class CourseScoreMR2Reducer extends Reducer<CourseScore, NullWritable, CourseScore, NullWritable>{
97
98 int count = 0;
99
100 /**
101 * reducer阶段的reduce方法的调用参数:key相同的额一组key-vlaue
102 *
103 * redcuer阶段,每次遇到一个不同的key的key_value组, 那么reduce方法就会被调用一次。
104 *
105 *
106 * values这个迭代器只能迭代一次。
107 * values迭代器在迭代的过程中迭代出来的value会变,同时,这个value所对应的key也会跟着变 合理
108 *
109 */
110 @Override
111 protected void reduce(CourseScore key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
112
113
114 int count = 0;
115
116 for(NullWritable nvl : values){
117 System.out.println("*********************************** " + (++count) + " " + key.toString());
118
119 if(count == 3){
120 return;
121 }
122 }
123
124
125 // 原样输出
126 /*for(NullWritable nvl : values){
127 context.write(key, nvl);
128 }*/
129
130
131 // 输出每门课程的最高分数 , 预期结果中,key的显示都是一样的
132 // for(NullWritable nvl : values){
133 // System.out.println(key + " - " nvl);
134 //
135 // valueList.add(nvl);
136 // }
137
138 // List<Value> valueList = null;
139 // 预期结果中,key的显示都是一样的
140 /*int count = 0;
141 for(NullWritable nvl : values){
142 count++;
143 }
144 for(int i = 0; i<count; i++){
145 valueList.get(i) = value
146 System.out.println(key + " - "+ value);
147 }*/
148
149
150 // math hello 1
151 // math hi 2
152 }
153 }
154 }

CourseScoreGC.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
 1 import org.apache.hadoop.io.WritableComparable;
2 import org.apache.hadoop.io.WritableComparator;
3
4 import com.ghgj.mr.exercise.pojo.CourseScore;
5
6 /**
7 * 分组规则的指定
8 */
9 public class CourseScoreGC extends WritableComparator{
10
11 public CourseScoreGC(){
12 super(CourseScore.class, true);
13 }
14
15 /**
16 *
17 * 方法的定义解释:
18 *
19 * 方法的意义:一般来说,都可以从方法名找到一些提示
20 * 方法的参数:将来你的MR程序中,要作为key的两个对象,是否是相同的对象
21 * 方法的返回值: 返回值类型为int 当返回值为0的时候。证明, 两个参数对象,经过比较之后,是同一个对象
22 *
23 * 在我们的需求中: 分组规则是 Course
24 *
25 */
26 @Override
27 public int compare(WritableComparable a, WritableComparable b) {
28
29 CourseScore cs1 = (CourseScore)a;
30 CourseScore cs2 = (CourseScore)b;
31
32 int compareTo = cs1.getCourse().compareTo(cs2.getCourse());
33
34 return compareTo;
35 }
36 }