logo头像
Snippet 博客主题

Hadoop学习之路(二十五)MapReduce的API使用(二)

** Hadoop学习之路(二十五)MapReduce的API使用(二):** <Excerpt in index | 首页摘要>

​ Hadoop学习之路(二十五)MapReduce的API使用(二)

<The rest of contents | 余下全文>

学生成绩—增强版

数据信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
 1 computer,huangxiaoming,85,86,41,75,93,42,85
2 computer,xuzheng,54,52,86,91,42
3 computer,huangbo,85,42,96,38
4 english,zhaobenshan,54,52,86,91,42,85,75
5 english,liuyifei,85,41,75,21,85,96,14
6 algorithm,liuyifei,75,85,62,48,54,96,15
7 computer,huangjiaju,85,75,86,85,85
8 english,liuyifei,76,95,86,74,68,74,48
9 english,huangdatou,48,58,67,86,15,33,85
10 algorithm,huanglei,76,95,86,74,68,74,48
11 algorithm,huangjiaju,85,75,86,85,85,74,86
12 computer,huangdatou,48,58,67,86,15,33,85
13 english,zhouqi,85,86,41,75,93,42,85,75,55,47,22
14 english,huangbo,85,42,96,38,55,47,22
15 algorithm,liutao,85,75,85,99,66
16 computer,huangzitao,85,86,41,75,93,42,85
17 math,wangbaoqiang,85,86,41,75,93,42,85
18 computer,liujialing,85,41,75,21,85,96,14,74,86
19 computer,liuyifei,75,85,62,48,54,96,15
20 computer,liutao,85,75,85,99,66,88,75,91
21 computer,huanglei,76,95,86,74,68,74,48
22 english,liujialing,75,85,62,48,54,96,15
23 math,huanglei,76,95,86,74,68,74,48
24 math,huangjiaju,85,75,86,85,85,74,86
25 math,liutao,48,58,67,86,15,33,85
26 english,huanglei,85,75,85,99,66,88,75,91
27 math,xuzheng,54,52,86,91,42,85,75
28 math,huangxiaoming,85,75,85,99,66,88,75,91
29 math,liujialing,85,86,41,75,93,42,85,75
30 english,huangxiaoming,85,86,41,75,93,42,85
31 algorithm,huangdatou,48,58,67,86,15,33,85
32 algorithm,huangzitao,85,86,41,75,93,42,85,75

数据解释

数据字段个数不固定:
第一个是课程名称,总共四个课程,computer,math,english,algorithm,
第二个是学生姓名,后面是每次考试的分数

统计需求

1、统计每门课程的参考人数和课程平均分

2、统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数

3、求出每门课程参考学生成绩最高的学生的信息:课程,姓名和平均分

第一题

MRAvgScore1.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
  1 /**
2 * 需求:统计每门课程的参考人数和课程平均分
3 * */
4 public class MRAvgScore1 {
5
6 public static void main(String[] args) throws Exception {
7
8 Configuration conf1 = new Configuration();
9 Configuration conf2 = new Configuration();
10
11 Job job1 = Job.getInstance(conf1);
12 Job job2 = Job.getInstance(conf2);
13
14 job1.setJarByClass(MRAvgScore1.class);
15 job1.setMapperClass(AvgScoreMapper1.class);
16 //job.setReducerClass(MFReducer.class);
17
18 job1.setOutputKeyClass(Text.class);
19 job1.setOutputValueClass(DoubleWritable.class);
20
21 Path inputPath1 = new Path("D:\\MR\\hw\\work3\\input");
22 Path outputPath1 = new Path("D:\\MR\\hw\\work3\\output_hw1_1");
23
24 FileInputFormat.setInputPaths(job1, inputPath1);
25 FileOutputFormat.setOutputPath(job1, outputPath1);
26
27
28 job2.setMapperClass(AvgScoreMapper2.class);
29 job2.setReducerClass(AvgScoreReducer2.class);
30
31 job2.setOutputKeyClass(Text.class);
32 job2.setOutputValueClass(DoubleWritable.class);
33
34 Path inputPath2 = new Path("D:\\MR\\hw\\work3\\output_hw1_1");
35 Path outputPath2 = new Path("D:\\MR\\hw\\work3\\output_hw1_end");
36
37 FileInputFormat.setInputPaths(job2, inputPath2);
38 FileOutputFormat.setOutputPath(job2, outputPath2);
39
40 JobControl control = new JobControl("AvgScore");
41
42 ControlledJob aJob = new ControlledJob(job1.getConfiguration());
43 ControlledJob bJob = new ControlledJob(job2.getConfiguration());
44
45 bJob.addDependingJob(aJob);
46
47 control.addJob(aJob);
48 control.addJob(bJob);
49
50 Thread thread = new Thread(control);
51 thread.start();
52
53 while(!control.allFinished()) {
54 thread.sleep(1000);
55 }
56 System.exit(0);
57
58 }
59
60
61
62 /**
63 * 数据类型:computer,huangxiaoming,85,86,41,75,93,42,85
64 *
65 * 需求:统计每门课程的参考人数和课程平均分
66 *
67 * 分析:以课程名称+姓名作为key,以平均分数作为value
68 * */
69 public static class AvgScoreMapper1 extends Mapper<LongWritable, Text, Text, DoubleWritable>{
70
71 @Override
72 protected void map(LongWritable key, Text value,Context context)
73 throws IOException, InterruptedException {
74
75 String[] splits = value.toString().split(",");
76 //拼接成要输出的key
77 String outKey = splits[0]+"\t"+splits[1];
78 int length = splits.length;
79 int sum = 0;
80 //求出成绩的总和
81 for(int i=2;i<length;i++) {
82 sum += Integer.parseInt(splits[i]);
83 }
84 //求出平均分
85 double outValue = sum / (length - 2);
86
87 context.write(new Text(outKey), new DoubleWritable(outValue));
88
89 }
90
91 }
92
93 /**
94 * 对第一次MapReduce输出的结果进一步计算,第一步输出结果样式为
95 * math huangjiaju 82.0
96 * math huanglei 74.0
97 * math huangxiaoming 83.0
98 * math liujialing 72.0
99 * math liutao 56.0
100 * math wangbaoqiang 72.0
101 * math xuzheng 69.0
102 *
103 * 需求:统计每门课程的参考人数和课程平均分
104 * 分析:以课程名称作为key,以分数作为value进行 输出
105 *
106 * */
107 public static class AvgScoreMapper2 extends Mapper<LongWritable, Text, Text, DoubleWritable>{
108
109 @Override
110 protected void map(LongWritable key, Text value,Context context)
111 throws IOException, InterruptedException {
112
113 String[] splits = value.toString().split("\t");
114 String outKey = splits[0];
115 String outValue = splits[2];
116
117 context.write(new Text(outKey), new DoubleWritable(Double.parseDouble(outValue)));
118 }
119
120 }
121
122 /**
123 * 针对同一门课程,对values进行遍历计数,看看有多少人参加了考试,并计算出平均成绩
124 * */
125 public static class AvgScoreReducer2 extends Reducer<Text, DoubleWritable, Text, Text>{
126
127 @Override
128 protected void reduce(Text key, Iterable<DoubleWritable> values,
129 Context context) throws IOException, InterruptedException {
130
131 int count = 0;
132 double sum = 0;
133 for(DoubleWritable value : values) {
134 count++;
135 sum += value.get();
136 }
137
138 double avg = sum / count;
139 String outValue = count + "\t" + avg;
140 context.write(key, new Text(outValue));
141 }
142
143 }
144
145
146 }

第二题

MRAvgScore2.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
 1 public class MRAvgScore2 {
2
3 public static void main(String[] args) throws Exception {
4
5 Configuration conf = new Configuration();
6
7 Job job = Job.getInstance(conf);
8
9 job.setJarByClass(MRAvgScore2.class);
10 job.setMapperClass(ScoreMapper3.class);
11 job.setReducerClass(ScoreReducer3.class);
12
13 job.setOutputKeyClass(StudentBean.class);
14 job.setOutputValueClass(NullWritable.class);
15
16 job.setPartitionerClass(CoursePartitioner.class);
17 job.setNumReduceTasks(4);
18
19 Path inputPath = new Path("D:\\MR\\hw\\work3\\output_hw1_1");
20 Path outputPath = new Path("D:\\MR\\hw\\work3\\output_hw2_1");
21
22 FileInputFormat.setInputPaths(job, inputPath);
23 FileOutputFormat.setOutputPath(job, outputPath);
24 boolean isDone = job.waitForCompletion(true);
25 System.exit(isDone ? 0 : 1);
26 }
27
28
29 public static class ScoreMapper3 extends Mapper<LongWritable, Text, StudentBean, NullWritable>{
30
31 @Override
32 protected void map(LongWritable key, Text value,Context context)
33 throws IOException, InterruptedException {
34
35 String[] splits = value.toString().split("\t");
36
37 double score = Double.parseDouble(splits[2]);
38 DecimalFormat df = new DecimalFormat("#.0");
39 df.format(score);
40
41 StudentBean student = new StudentBean(splits[0],splits[1],score);
42
43 context.write(student, NullWritable.get());
44
45 }
46
47 }
48
49 public static class ScoreReducer3 extends Reducer<StudentBean, NullWritable, StudentBean, NullWritable>{
50
51 @Override
52 protected void reduce(StudentBean key, Iterable<NullWritable> values,Context context)
53 throws IOException, InterruptedException {
54
55 for(NullWritable nvl : values){
56 context.write(key, nvl);
57 }
58
59 }
60 }
61 }

StudentBean.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
 1 public class StudentBean implements WritableComparable<StudentBean>{
2 private String course;
3 private String name;
4 private double avgScore;
5
6 public String getCourse() {
7 return course;
8 }
9 public void setCourse(String course) {
10 this.course = course;
11 }
12 public String getName() {
13 return name;
14 }
15 public void setName(String name) {
16 this.name = name;
17 }
18 public double getavgScore() {
19 return avgScore;
20 }
21 public void setavgScore(double avgScore) {
22 this.avgScore = avgScore;
23 }
24 public StudentBean(String course, String name, double avgScore) {
25 super();
26 this.course = course;
27 this.name = name;
28 this.avgScore = avgScore;
29 }
30 public StudentBean() {
31 super();
32 }
33
34 @Override
35 public String toString() {
36 return course + "\t" + name + "\t" + avgScore;
37 }
38 @Override
39 public void readFields(DataInput in) throws IOException {
40 course = in.readUTF();
41 name = in.readUTF();
42 avgScore = in.readDouble();
43 }
44 @Override
45 public void write(DataOutput out) throws IOException {
46 out.writeUTF(course);
47 out.writeUTF(name);
48 out.writeDouble(avgScore);
49 }
50 @Override
51 public int compareTo(StudentBean stu) {
52 double diffent = this.avgScore - stu.avgScore;
53 if(diffent == 0) {
54 return 0;
55 }else {
56 return diffent > 0 ? -1 : 1;
57 }
58 }
59
60
61 }

第三题

MRScore3.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
  1 public class MRScore3 {
2
3 public static void main(String[] args) throws Exception {
4
5 Configuration conf1 = new Configuration();
6 Configuration conf2 = new Configuration();
7
8 Job job1 = Job.getInstance(conf1);
9 Job job2 = Job.getInstance(conf2);
10
11 job1.setJarByClass(MRScore3.class);
12 job1.setMapperClass(MRMapper3_1.class);
13 //job.setReducerClass(ScoreReducer3.class);
14
15
16 job1.setMapOutputKeyClass(IntWritable.class);
17 job1.setMapOutputValueClass(StudentBean.class);
18 job1.setOutputKeyClass(IntWritable.class);
19 job1.setOutputValueClass(StudentBean.class);
20
21 job1.setPartitionerClass(CoursePartitioner2.class);
22
23 job1.setNumReduceTasks(4);
24
25 Path inputPath = new Path("D:\\MR\\hw\\work3\\input");
26 Path outputPath = new Path("D:\\MR\\hw\\work3\\output_hw3_1");
27
28 FileInputFormat.setInputPaths(job1, inputPath);
29 FileOutputFormat.setOutputPath(job1, outputPath);
30
31 job2.setMapperClass(MRMapper3_2.class);
32 job2.setReducerClass(MRReducer3_2.class);
33
34 job2.setMapOutputKeyClass(IntWritable.class);
35 job2.setMapOutputValueClass(StudentBean.class);
36 job2.setOutputKeyClass(StudentBean.class);
37 job2.setOutputValueClass(NullWritable.class);
38
39 Path inputPath2 = new Path("D:\\MR\\hw\\work3\\output_hw3_1");
40 Path outputPath2 = new Path("D:\\MR\\hw\\work3\\output_hw3_end");
41
42 FileInputFormat.setInputPaths(job2, inputPath2);
43 FileOutputFormat.setOutputPath(job2, outputPath2);
44
45 JobControl control = new JobControl("Score3");
46
47 ControlledJob aJob = new ControlledJob(job1.getConfiguration());
48 ControlledJob bJob = new ControlledJob(job2.getConfiguration());
49
50 bJob.addDependingJob(aJob);
51
52 control.addJob(aJob);
53 control.addJob(bJob);
54
55 Thread thread = new Thread(control);
56 thread.start();
57
58 while(!control.allFinished()) {
59 thread.sleep(1000);
60 }
61 System.exit(0);
62
63
64 }
65
66
67
68
69 public static class MRMapper3_1 extends Mapper<LongWritable, Text, IntWritable, StudentBean>{
70
71 StudentBean outKey = new StudentBean();
72 IntWritable outValue = new IntWritable();
73 List<String> scoreList = new ArrayList<>();
74
75 protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
76
77 scoreList.clear();
78 String[] splits = value.toString().split(",");
79 long sum = 0;
80
81 for(int i=2;i<splits.length;i++) {
82 scoreList.add(splits[i]);
83 sum += Long.parseLong(splits[i]);
84 }
85
86 Collections.sort(scoreList);
87 outValue.set(Integer.parseInt(scoreList.get(scoreList.size()-1)));
88
89 double avg = sum * 1.0/(splits.length-2);
90 outKey.setCourse(splits[0]);
91 outKey.setName(splits[1]);
92 outKey.setavgScore(avg);
93
94 context.write(outValue, outKey);
95
96 };
97 }
98
99
100
101 public static class MRMapper3_2 extends Mapper<LongWritable, Text,IntWritable, StudentBean >{
102
103 StudentBean outValue = new StudentBean();
104 IntWritable outKey = new IntWritable();
105
106 protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
107
108 String[] splits = value.toString().split("\t");
109 outKey.set(Integer.parseInt(splits[0]));
110
111 outValue.setCourse(splits[1]);
112 outValue.setName(splits[2]);
113 outValue.setavgScore(Double.parseDouble(splits[3]));
114
115 context.write(outKey, outValue);
116
117
118 };
119 }
120
121
122 public static class MRReducer3_2 extends Reducer<IntWritable, StudentBean, StudentBean, NullWritable>{
123
124 StudentBean outKey = new StudentBean();
125
126 @Override
127 protected void reduce(IntWritable key, Iterable<StudentBean> values,Context context)
128 throws IOException, InterruptedException {
129
130 int length = values.toString().length();
131
132 for(StudentBean value : values) {
133 outKey = value;
134 }
135
136 context.write(outKey, NullWritable.get());
137
138 }
139 }
140
141
142 }