logo头像
Snippet 博客主题

Hadoop学习之路(二十六)MapReduce的API使用(三)

** Hadoop学习之路(二十六)MapReduce的API使用(三):** <Excerpt in index | 首页摘要>

​ Hadoop学习之路(二十六)MapReduce的API使用(三)

<The rest of contents | 余下全文>

数据及需求

数据格式

movies.dat  3884条数据

1
2
3
4
5
6
7
8
9
10
1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action
10::GoldenEye (1995)::Action|Adventure|Thriller

users.dat  6041条数据

1
2
3
4
5
6
7
8
9
10
1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117
4::M::45::7::02460
5::M::25::20::55455
6::F::50::9::55117
7::M::35::1::06810
8::M::25::12::11413
9::M::25::17::61614
10::F::35::1::95370

ratings.dat  1000210条数据

1
2
3
4
5
6
7
8
9
10
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
1::919::4::978301368

数据解释

1、users.dat 数据格式为: 2::M::56::16::70072
对应字段为:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String
对应字段中文解释:用户id,性别,年龄,职业,邮政编码

2、movies.dat 数据格式为: 2::Jumanji (1995)::Adventure|Children’s|Fantasy
对应字段为:MovieID BigInt, Title String, Genres String
对应字段中文解释:电影ID,电影名字,电影类型

3、ratings.dat 数据格式为: 1::1193::5::978300760
对应字段为:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String
对应字段中文解释:用户ID,电影ID,评分,评分时间戳

用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型
userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType

需求统计

(1)求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)
(2)分别求男性,女性当中评分最高的10部电影(性别,电影名,评分)
(3)求movieid = 2116这部电影各年龄段(因为年龄就只有7个,就按这个7个分就好了)的平均影评(年龄段,评分)
(4)求最喜欢看电影(影评次数最多)的那位女性评最高分的10部电影的平均影评分(人,电影名,影评)
(5)求好片(评分>=4.0)最多的那个年份的最好看的10部电影
(6)求1997年上映的电影中,评分最高的10部Comedy类电影
(7)该影评库中各种类型电影中评价最高的5部电影(类型,电影名,平均影评分)
(8)各年评分最高的电影类型(年份,类型,影评分)
(9)每个地区最高评分的电影名,把结果存入HDFS(地区,电影名,电影评分)

代码实现

1、求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)

分析:此问题涉及到2个文件,ratings.dat和movies.dat,2个文件数据量倾斜比较严重,此处应该使用mapjoin方法,先将数据量较小的文件预先加载到内存中

MovieMR1_1.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
  1 public class MovieMR1_1 {
2
3 public static void main(String[] args) throws Exception {
4
5 if(args.length < 4) {
6 args = new String[4];
7 args[0] = "/movie/input/";
8 args[1] = "/movie/output/";
9 args[2] = "/movie/cache/movies.dat";
10 args[3] = "/movie/output_last/";
11 }
12
13
14 Configuration conf1 = new Configuration();
15 conf1.set("fs.defaultFS", "hdfs://hadoop1:9000/");
16 System.setProperty("HADOOP_USER_NAME", "hadoop");
17 FileSystem fs1 = FileSystem.get(conf1);
18
19
20 Job job1 = Job.getInstance(conf1);
21
22 job1.setJarByClass(MovieMR1_1.class);
23
24 job1.setMapperClass(MoviesMapJoinRatingsMapper1.class);
25 job1.setReducerClass(MovieMR1Reducer1.class);
26
27 job1.setMapOutputKeyClass(Text.class);
28 job1.setMapOutputValueClass(IntWritable.class);
29
30 job1.setOutputKeyClass(Text.class);
31 job1.setOutputValueClass(IntWritable.class);
32
33
34
35 //缓存普通文件到task运行节点的工作目录
36 URI uri = new URI("hdfs://hadoop1:9000"+args[2]);
37 System.out.println(uri);
38 job1.addCacheFile(uri);
39
40
41 Path inputPath1 = new Path(args[0]);
42 Path outputPath1 = new Path(args[1]);
43 if(fs1.exists(outputPath1)) {
44 fs1.delete(outputPath1, true);
45 }
46 FileInputFormat.setInputPaths(job1, inputPath1);
47 FileOutputFormat.setOutputPath(job1, outputPath1);
48
49 boolean isDone = job1.waitForCompletion(true);
50 System.exit(isDone ? 0 : 1);
51
52 }
53
54 public static class MoviesMapJoinRatingsMapper1 extends Mapper<LongWritable, Text, Text, IntWritable>{
55
56 //用了存放加载到内存中的movies.dat数据
57 private static Map<String,String> movieMap = new HashMap<>();
58 //key:电影ID
59 Text outKey = new Text();
60 //value:电影名+电影类型
61 IntWritable outValue = new IntWritable();
62
63
64 /**
65 * movies.dat: 1::Toy Story (1995)::Animation|Children's|Comedy
66 *
67 *
68 * 将小表(movies.dat)中的数据预先加载到内存中去
69 * */
70 @Override
71 protected void setup(Context context) throws IOException, InterruptedException {
72
73 Path[] localCacheFiles = context.getLocalCacheFiles();
74
75
76 String strPath = localCacheFiles[0].toUri().toString();
77
78 BufferedReader br = new BufferedReader(new FileReader(strPath));
79 String readLine;
80 while((readLine = br.readLine()) != null) {
81
82 String[] split = readLine.split("::");
83 String movieId = split[0];
84 String movieName = split[1];
85 String movieType = split[2];
86
87 movieMap.put(movieId, movieName+"\t"+movieType);
88 }
89
90 br.close();
91 }
92
93
94 /**
95 * movies.dat: 1 :: Toy Story (1995) :: Animation|Children's|Comedy
96 * 电影ID 电影名字 电影类型
97 *
98 * ratings.dat: 1 :: 1193 :: 5 :: 978300760
99 * 用户ID 电影ID 评分 评分时间戳
100 *
101 * value: ratings.dat读取的数据
102 * */
103 @Override
104 protected void map(LongWritable key, Text value, Context context)
105 throws IOException, InterruptedException {
106
107 String[] split = value.toString().split("::");
108
109 String userId = split[0];
110 String movieId = split[1];
111 String movieRate = split[2];
112
113 //根据movieId从内存中获取电影名和类型
114 String movieNameAndType = movieMap.get(movieId);
115 String movieName = movieNameAndType.split("\t")[0];
116 String movieType = movieNameAndType.split("\t")[1];
117
118 outKey.set(movieName);
119 outValue.set(Integer.parseInt(movieRate));
120
121 context.write(outKey, outValue);
122
123 }
124
125 }
126
127
128 public static class MovieMR1Reducer1 extends Reducer<Text, IntWritable, Text, IntWritable>{
129 //每部电影评论的次数
130 int count;
131 //评分次数
132 IntWritable outValue = new IntWritable();
133
134 @Override
135 protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
136
137 count = 0;
138
139 for(IntWritable value : values) {
140 count++;
141 }
142
143 outValue.set(count);
144
145 context.write(key, outValue);
146 }
147
148 }
149
150
151 }

MovieMR1_2.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
 1 public class MovieMR1_2 {
2
3 public static void main(String[] args) throws Exception {
4 if(args.length < 2) {
5 args = new String[2];
6 args[0] = "/movie/output/";
7 args[1] = "/movie/output_last/";
8 }
9
10
11 Configuration conf1 = new Configuration();
12 conf1.set("fs.defaultFS", "hdfs://hadoop1:9000/");
13 System.setProperty("HADOOP_USER_NAME", "hadoop");
14 FileSystem fs1 = FileSystem.get(conf1);
15
16
17 Job job = Job.getInstance(conf1);
18
19 job.setJarByClass(MovieMR1_2.class);
20
21 job.setMapperClass(MoviesMapJoinRatingsMapper2.class);
22 job.setReducerClass(MovieMR1Reducer2.class);
23
24
25 job.setMapOutputKeyClass(MovieRating.class);
26 job.setMapOutputValueClass(NullWritable.class);
27
28 job.setOutputKeyClass(MovieRating.class);
29 job.setOutputValueClass(NullWritable.class);
30
31
32 Path inputPath1 = new Path(args[0]);
33 Path outputPath1 = new Path(args[1]);
34 if(fs1.exists(outputPath1)) {
35 fs1.delete(outputPath1, true);
36 }
37 //对第一步的输出结果进行降序排序
38 FileInputFormat.setInputPaths(job, inputPath1);
39 FileOutputFormat.setOutputPath(job, outputPath1);
40
41 boolean isDone = job.waitForCompletion(true);
42 System.exit(isDone ? 0 : 1);
43
44
45 }
46
47 //注意输出类型为自定义对象MovieRating,MovieRating按照降序排序
48 public static class MoviesMapJoinRatingsMapper2 extends Mapper<LongWritable, Text, MovieRating, NullWritable>{
49
50 MovieRating outKey = new MovieRating();
51
52 @Override
53 protected void map(LongWritable key, Text value, Context context)
54 throws IOException, InterruptedException {
55 //'Night Mother (1986) 70
56 String[] split = value.toString().split("\t");
57
58 outKey.setCount(Integer.parseInt(split[1]));;
59 outKey.setMovieName(split[0]);
60
61 context.write(outKey, NullWritable.get());
62
63 }
64
65 }
66
67 //排序之后自然输出,只取前10部电影
68 public static class MovieMR1Reducer2 extends Reducer<MovieRating, NullWritable, MovieRating, NullWritable>{
69
70 Text outKey = new Text();
71 int count = 0;
72
73 @Override
74 protected void reduce(MovieRating key, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException {
75
76 for(NullWritable value : values) {
77 count++;
78 if(count > 10) {
79 return;
80 }
81 context.write(key, value);
82
83 }
84
85 }
86
87 }
88 }

MovieRating.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
 1 public class MovieRating implements WritableComparable<MovieRating>{
2 private String movieName;
3 private int count;
4
5 public String getMovieName() {
6 return movieName;
7 }
8 public void setMovieName(String movieName) {
9 this.movieName = movieName;
10 }
11 public int getCount() {
12 return count;
13 }
14 public void setCount(int count) {
15 this.count = count;
16 }
17
18 public MovieRating() {}
19
20 public MovieRating(String movieName, int count) {
21 super();
22 this.movieName = movieName;
23 this.count = count;
24 }
25
26
27 @Override
28 public String toString() {
29 return movieName + "\t" + count;
30 }
31 @Override
32 public void readFields(DataInput in) throws IOException {
33 movieName = in.readUTF();
34 count = in.readInt();
35 }
36 @Override
37 public void write(DataOutput out) throws IOException {
38 out.writeUTF(movieName);
39 out.writeInt(count);
40 }
41 @Override
42 public int compareTo(MovieRating o) {
43 return o.count - this.count ;
44 }
45
46 }

2、分别求男性,女性当中评分最高的10部电影(性别,电影名,评分)

分析:此问题涉及到3个表的联合查询,需要先将2个小表的数据预先加载到内存中,再进行查询

对三表进行联合

MoviesThreeTableJoin.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
  1 /**
2 * 进行3表的联合查询
3 *
4 * */
5 public class MoviesThreeTableJoin {
6
7 public static void main(String[] args) throws Exception {
8
9 if(args.length < 4) {
10 args = new String[4];
11 args[0] = "/movie/input/";
12 args[1] = "/movie/output2/";
13 args[2] = "/movie/cache/movies.dat";
14 args[3] = "/movie/cache/users.dat";
15 }
16
17 Configuration conf = new Configuration();
18 conf.set("fs.defaultFS", "hdfs://hadoop1:9000/");
19 System.setProperty("HADOOP_USER_NAME", "hadoop");
20 FileSystem fs = FileSystem.get(conf);
21 Job job = Job.getInstance(conf);
22
23 job.setJarByClass(MoviesThreeTableJoin.class);
24 job.setMapperClass(ThreeTableMapper.class);
25
26 job.setOutputKeyClass(Text.class);
27 job.setOutputValueClass(NullWritable.class);
28
29 URI uriUsers = new URI("hdfs://hadoop1:9000"+args[3]);
30 URI uriMovies = new URI("hdfs://hadoop1:9000"+args[2]);
31 job.addCacheFile(uriUsers);
32 job.addCacheFile(uriMovies);
33
34 Path inputPath = new Path(args[0]);
35 Path outputPath = new Path(args[1]);
36
37 if(fs.exists(outputPath)) {
38 fs.delete(outputPath,true);
39 }
40
41 FileInputFormat.setInputPaths(job, inputPath);
42 FileOutputFormat.setOutputPath(job, outputPath);
43
44 boolean isDone = job.waitForCompletion(true);
45 System.exit(isDone ? 0 : 1);
46
47 }
48
49
50 public static class ThreeTableMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
51
52
53 //用于缓存movies和users中数据
54 private Map<String,String> moviesMap = new HashMap<>();
55 private Map<String,String> usersMap = new HashMap<>();
56 //用来存放读取的ratings.dat中的一行数据
57 String[] ratings;
58
59
60 Text outKey = new Text();
61
62 @Override
63 protected void setup(Context context) throws IOException, InterruptedException {
64
65 BufferedReader br = null;
66
67 Path[] paths = context.getLocalCacheFiles();
68 String usersLine = null;
69 String moviesLine = null;
70
71 for(Path path : paths) {
72 String name = path.toUri().getPath();
73 if(name.contains("movies.dat")) {
74 //读取movies.dat文件中的一行数据
75 br = new BufferedReader(new FileReader(name));
76 while((moviesLine = br.readLine()) != null) {
77 /**对读取的这行数据按照::进行切分
78 * 2::Jumanji (1995)::Adventure|Children's|Fantasy
79 * 电影ID,电影名字,电影类型
80 *
81 *电影ID作为key,其余作为value
82 */
83 String[] split = moviesLine.split("::");
84 moviesMap.put(split[0], split[1]+"::"+split[2]);
85 }
86 }else if(name.contains("users.dat")) {
87 //读取users.dat文件中的一行数据
88 br = new BufferedReader(new FileReader(name));
89 while((usersLine = br.readLine()) != null) {
90 /**
91 * 对读取的这行数据按照::进行切分
92 * 2::M::56::16::70072
93 * 用户id,性别,年龄,职业,邮政编码
94 *
95 * 用户ID作为key,其他的作为value
96 * */
97 String[] split = usersLine.split("::");
98 System.out.println(split[0]+"----"+split[1]);
99 usersMap.put(split[0], split[1]+"::"+split[2]+"::"+split[3]+"::"+split[4]);
100 }
101 }
102
103 }
104
105 }
106
107
108 @Override
109 protected void map(LongWritable key, Text value, Context context)
110 throws IOException, InterruptedException {
111
112 ratings = value.toString().split("::");
113 //通过电影ID和用户ID获取用户表和电影表中的其他信息
114 String movies = moviesMap.get(ratings[1]);
115 String users = usersMap.get(ratings[0]);
116
117 //三表信息的联合
118 String threeTables = value.toString()+"::"+movies+"::"+users;
119 outKey.set(threeTables);
120
121 context.write(outKey, NullWritable.get());
122 }
123 }
124
125
126 }

三表联合之后的数据为

1
2
3
4
5
6
7
8
9
10
1000::1023::5::975041651::Winnie the Pooh and the Blustery Day (1968)::Animation|Children's::F::25::6::90027
1000::1029::3::975041859::Dumbo (1941)::Animation|Children's|Musical::F::25::6::90027
1000::1036::4::975040964::Die Hard (1988)::Action|Thriller::F::25::6::90027
1000::1104::5::975042421::Streetcar Named Desire, A (1951)::Drama::F::25::6::90027
1000::110::5::975040841::Braveheart (1995)::Action|Drama|War::F::25::6::90027
1000::1196::3::975040841::Star Wars: Episode V - The Empire Strikes Back (1980)::Action|Adventure|Drama|Sci-Fi|War::F::25::6::90027
1000::1198::5::975040841::Raiders of the Lost Ark (1981)::Action|Adventure::F::25::6::90027
1000::1200::4::975041125::Aliens (1986)::Action|Sci-Fi|Thriller|War::F::25::6::90027
1000::1201::5::975041025::Good, The Bad and The Ugly, The (1966)::Action|Western::F::25::6::90027
1000::1210::5::975040629::Star Wars: Episode VI - Return of the Jedi (1983)::Action|Adventure|Romance|Sci-Fi|War::F::25::6::90027

字段解释

1
2
3
1000    ::    1036    ::    4    ::    975040964    ::    Die Hard (1988)    ::    Action|Thriller    ::    F    ::    25    ::    6    ::    90027

用户ID 电影ID 评分    评分时间戳 电影名字 电影类型 性别 年龄 职业 邮政编码0        1        2        3            4              5            6      7      8       9

要分别求男性,女性当中评分最高的10部电影(性别,电影名,评分)

1、以性别和电影名分组,以电影名+性别为key,以评分为value进行计算;

2、以性别+电影名+评分作为对象,以性别分组,以评分降序进行输出TOP10

业务逻辑:MoviesDemo2.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
  1 public class MoviesDemo2 {
2
3 public static void main(String[] args) throws Exception {
4
5 Configuration conf1 = new Configuration();
6 Configuration conf2 = new Configuration();
7 FileSystem fs1 = FileSystem.get(conf1);
8 FileSystem fs2 = FileSystem.get(conf2);
9 Job job1 = Job.getInstance(conf1);
10 Job job2 = Job.getInstance(conf2);
11
12 job1.setJarByClass(MoviesDemo2.class);
13 job1.setMapperClass(MoviesDemo2Mapper1.class);
14 job2.setMapperClass(MoviesDemo2Mapper2.class);
15 job1.setReducerClass(MoviesDemo2Reducer1.class);
16 job2.setReducerClass(MoviesDemo2Reducer2.class);
17
18 job1.setOutputKeyClass(Text.class);
19 job1.setOutputValueClass(DoubleWritable.class);
20
21 job2.setOutputKeyClass(MoviesSexBean.class);
22 job2.setOutputValueClass(NullWritable.class);
23
24 job2.setGroupingComparatorClass(MoviesSexGC.class);
25
26 Path inputPath1 = new Path("D:\\MR\\hw\\movie\\output3he1");
27 Path outputPath1 = new Path("D:\\MR\\hw\\movie\\output2_1");
28 Path inputPath2 = new Path("D:\\MR\\hw\\movie\\output2_1");
29 Path outputPath2 = new Path("D:\\MR\\hw\\movie\\output2_end");
30
31 if(fs1.exists(outputPath1)) {
32 fs1.delete(outputPath1,true);
33 }
34 if(fs2.exists(outputPath2)) {
35 fs2.delete(outputPath2,true);
36 }
37
38
39 FileInputFormat.setInputPaths(job1, inputPath1);
40 FileOutputFormat.setOutputPath(job1, outputPath1);
41
42 FileInputFormat.setInputPaths(job2, inputPath2);
43 FileOutputFormat.setOutputPath(job2, outputPath2);
44
45 JobControl control = new JobControl("MoviesDemo2");
46
47 ControlledJob aJob = new ControlledJob(job1.getConfiguration());
48 ControlledJob bJob = new ControlledJob(job2.getConfiguration());
49
50 bJob.addDependingJob(aJob);
51
52 control.addJob(aJob);
53 control.addJob(bJob);
54
55 Thread thread = new Thread(control);
56 thread.start();
57
58 while(!control.allFinished()) {
59 thread.sleep(1000);
60 }
61 System.exit(0);
62
63
64 }
65
66
67 /**
68 * 数据来源:3个文件关联之后的输出文件
69 * 以电影名+性别为key,以评分为value进行输出
70 *
71 * 1000::1036::4::975040964::Die Hard (1988)::Action|Thriller::F::25::6::90027
72 *
73 * 用户ID::电影ID::评分::评分时间戳::电影名字::电影类型::性别::年龄::职业::邮政编码
74 *
75 * */
76 public static class MoviesDemo2Mapper1 extends Mapper<LongWritable, Text, Text, DoubleWritable>{
77
78 Text outKey = new Text();
79 DoubleWritable outValue = new DoubleWritable();
80
81 @Override
82 protected void map(LongWritable key, Text value,Context context)
83 throws IOException, InterruptedException {
84
85 String[] split = value.toString().split("::");
86 String strKey = split[4]+"\t"+split[6];
87 String strValue = split[2];
88
89 outKey.set(strKey);
90 outValue.set(Double.parseDouble(strValue));
91
92 context.write(outKey, outValue);
93 }
94
95 }
96
97 /**
98 * 以电影名+性别为key,计算平均分
99 * */
100 public static class MoviesDemo2Reducer1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>{
101
102 DoubleWritable outValue = new DoubleWritable();
103
104 @Override
105 protected void reduce(Text key, Iterable<DoubleWritable> values,Context context)
106 throws IOException, InterruptedException {
107
108 int count = 0;
109 double sum = 0;
110 for(DoubleWritable value : values) {
111 count++;
112 sum += Double.parseDouble(value.toString());
113 }
114 double avg = sum / count;
115
116 outValue.set(avg);
117 context.write(key, outValue);
118 }
119 }
120
121 /**
122 * 以电影名+性别+评分作为对象,以性别分组,以评分降序排序
123 * */
124 public static class MoviesDemo2Mapper2 extends Mapper<LongWritable, Text, MoviesSexBean, NullWritable>{
125
126 MoviesSexBean outKey = new MoviesSexBean();
127
128 @Override
129 protected void map(LongWritable key, Text value,Context context)
130 throws IOException, InterruptedException {
131
132 String[] split = value.toString().split("\t");
133 outKey.setMovieName(split[0]);
134 outKey.setSex(split[1]);
135 outKey.setScore(Double.parseDouble(split[2]));
136
137 context.write(outKey, NullWritable.get());
138
139 }
140 }
141
142 /**
143 * 取性别男女各前10名评分最好的电影
144 * */
145 public static class MoviesDemo2Reducer2 extends Reducer<MoviesSexBean, NullWritable, MoviesSexBean, NullWritable>{
146
147 @Override
148 protected void reduce(MoviesSexBean key, Iterable<NullWritable> values,Context context)
149 throws IOException, InterruptedException {
150
151 int count = 0;
152 for(NullWritable nvl : values) {
153 count++;
154 context.write(key, NullWritable.get());
155 if(count == 10) {
156 return;
157 }
158 }
159
160 }
161 }
162 }

对象:MoviesSexBean.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
 1 public class MoviesSexBean implements WritableComparable<MoviesSexBean>{
2
3 private String movieName;
4 private String sex;
5 private double score;
6
7 public MoviesSexBean() {
8 super();
9 }
10 public MoviesSexBean(String movieName, String sex, double score) {
11 super();
12 this.movieName = movieName;
13 this.sex = sex;
14 this.score = score;
15 }
16 public String getMovieName() {
17 return movieName;
18 }
19 public void setMovieName(String movieName) {
20 this.movieName = movieName;
21 }
22 public String getSex() {
23 return sex;
24 }
25 public void setSex(String sex) {
26 this.sex = sex;
27 }
28 public double getScore() {
29 return score;
30 }
31 public void setScore(double score) {
32 this.score = score;
33 }
34 @Override
35 public String toString() {
36 return movieName + "\t" + sex + "\t" + score ;
37 }
38 @Override
39 public void readFields(DataInput in) throws IOException {
40 movieName = in.readUTF();
41 sex = in.readUTF();
42 score = in.readDouble();
43 }
44 @Override
45 public void write(DataOutput out) throws IOException {
46 out.writeUTF(movieName);
47 out.writeUTF(sex);
48 out.writeDouble(score);
49 }
50 @Override
51 public int compareTo(MoviesSexBean o) {
52
53 int result = this.getSex().compareTo(o.getSex());
54 if(result == 0) {
55 double diff = this.getScore() - o.getScore();
56
57 if(diff == 0) {
58 return 0;
59 }else {
60 return diff > 0 ? -1 : 1;
61 }
62
63 }else {
64 return result > 0 ? -1 : 1;
65 }
66
67 }
68
69
70
71 }

分组:MoviesSexGC.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 1 public class MoviesSexGC extends WritableComparator{
2
3 public MoviesSexGC() {
4 super(MoviesSexBean.class,true);
5 }
6
7 @Override
8 public int compare(WritableComparable a, WritableComparable b) {
9
10 MoviesSexBean msb1 = (MoviesSexBean)a;
11 MoviesSexBean msb2 = (MoviesSexBean)b;
12
13 return msb1.getSex().compareTo(msb2.getSex());
14 }
15
16 }

3、求movieid = 2116这部电影各年龄段(因为年龄就只有7个,就按这个7个分就好了)的平均影评(年龄段,评分)

以第二部三表联合之后的文件进行操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
 1 public class MovieDemo3 {
2
3 public static void main(String[] args) throws Exception {
4
5 Configuration conf = new Configuration();
6 FileSystem fs = FileSystem.get(conf);
7 Job job = Job.getInstance(conf);
8
9 job.setJarByClass(MovieDemo3.class);
10 job.setMapperClass(MovieDemo3Mapper.class);
11 job.setReducerClass(MovieDemo3Reducer.class);
12
13 job.setOutputKeyClass(Text.class);
14 job.setOutputValueClass(DoubleWritable.class);
15
16 Path inputPath = new Path("D:\\MR\\hw\\movie\\3he1");
17 Path outputPath = new Path("D:\\MR\\hw\\movie\\outpu3");
18
19 if(fs.exists(outputPath)) {
20 fs.delete(outputPath,true);
21 }
22
23 FileInputFormat.setInputPaths(job, inputPath);
24 FileOutputFormat.setOutputPath(job, outputPath);
25
26 boolean isDone = job.waitForCompletion(true);
27 System.exit(isDone ? 0 : 1);
28
29 }
30
31
32 /**
33 * 1000::1036::4::975040964::Die Hard (1988)::Action|Thriller::F::25::6::90027
34 *
35 * 用户ID::电影ID::评分::评分时间戳::电影名字::电影类型::性别::年龄::职业::邮政编码
36 * 0 1 2 3 4 5 6 7 8 9
37 *
38 * key:电影ID+电影名字+年龄段
39 * value:评分
40 * 求movieid = 2116这部电影各年龄段
41 * */
42 public static class MovieDemo3Mapper extends Mapper<LongWritable, Text, Text, DoubleWritable>{
43
44 Text outKey = new Text();
45 DoubleWritable outValue = new DoubleWritable();
46
47 @Override
48 protected void map(LongWritable key, Text value, Context context)
49 throws IOException, InterruptedException {
50
51 String[] split = value.toString().split("::");
52 int movieID = Integer.parseInt(split[1]);
53
54 if(movieID == 2116) {
55 String strKey = split[1]+"\t"+split[4]+"\t"+split[7];
56 String strValue = split[2];
57
58 outKey.set(strKey);
59 outValue.set(Double.parseDouble(strValue));
60
61 context.write(outKey, outValue);
62 }
63
64 }
65 }
66
67
68
69 /**
70 * 对map的输出结果求平均评分
71 * */
72 public static class MovieDemo3Reducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable>{
73
74 DoubleWritable outValue = new DoubleWritable();
75
76 @Override
77 protected void reduce(Text key, Iterable<DoubleWritable> values, Context context)
78 throws IOException, InterruptedException {
79
80 int count = 0;
81 double sum = 0;
82
83 for(DoubleWritable value : values) {
84 count++;
85 sum += Double.parseDouble(value.toString());
86 }
87
88 double avg = sum / count;
89
90 outValue.set(avg);
91
92 context.write(key, outValue);
93
94 }
95
96 }
97
98 }

4、求最喜欢看电影(影评次数最多)的那位女性评最高分的10部电影的平均影评分(人,电影名,影评)

1
2
3
4
5
1000    ::    1036    ::    4    ::    975040964    ::    Die Hard (1988)    ::    Action|Thriller    ::    F    ::    25    ::    6    ::    90027

用户ID 电影ID 评分    评分时间戳 电影名字 电影类型 性别 年龄 职业 邮政编码

0        1        2        3            4              5            6      7      8       9

(1)求出评论次数最多的女性ID

  MoviesDemo4_1.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
  1 public class MoviesDemo4 {
2
3 public static void main(String[] args) throws Exception {
4
5 Configuration conf1 = new Configuration();
6 FileSystem fs1 = FileSystem.get(conf1);
7 Job job1 = Job.getInstance(conf1);
8
9 job1.setJarByClass(MoviesDemo4.class);
10 job1.setMapperClass(MoviesDemo4Mapper1.class);
11 job1.setReducerClass(MoviesDemo4Reducer1.class);
12
13
14 job1.setMapOutputKeyClass(Text.class);
15 job1.setMapOutputValueClass(Text.class);
16 job1.setOutputKeyClass(Text.class);
17 job1.setOutputValueClass(DoubleWritable.class);
18
19
20 Configuration conf2 = new Configuration();
21 FileSystem fs2 = FileSystem.get(conf2);
22 Job job2 = Job.getInstance(conf2);
23
24 job2.setJarByClass(MoviesDemo4.class);
25 job2.setMapperClass(MoviesDemo4Mapper2.class);
26 job2.setReducerClass(MoviesDemo4Reducer2.class);
27
28 job2.setMapOutputKeyClass(Moviegoers.class);
29 job2.setMapOutputValueClass(NullWritable.class);
30 job2.setOutputKeyClass(Moviegoers.class);
31 job2.setOutputValueClass(NullWritable.class);
32
33 Path inputPath1 = new Path("D:\\MR\\hw\\movie\\3he1");
34 Path outputPath1 = new Path("D:\\MR\\hw\\movie\\outpu4_1");
35
36 if(fs1.exists(outputPath1)) {
37 fs1.delete(outputPath1,true);
38 }
39
40 FileInputFormat.setInputPaths(job1, inputPath1);
41 FileOutputFormat.setOutputPath(job1, outputPath1);
42
43
44 Path inputPath2 = new Path("D:\\MR\\hw\\movie\\outpu4_1");
45 Path outputPath2 = new Path("D:\\MR\\hw\\movie\\outpu4_2");
46
47 if(fs2.exists(outputPath2)) {
48 fs2.delete(outputPath2,true);
49 }
50
51 FileInputFormat.setInputPaths(job2, inputPath2);
52 FileOutputFormat.setOutputPath(job2, outputPath2);
53
54 JobControl control = new JobControl("MoviesDemo4");
55
56 ControlledJob ajob = new ControlledJob(job1.getConfiguration());
57 ControlledJob bjob = new ControlledJob(job2.getConfiguration());
58
59 bjob.addDependingJob(ajob);
60
61 control.addJob(ajob);
62 control.addJob(bjob);
63
64 Thread thread = new Thread(control);
65 thread.start();
66
67 while(!control.allFinished()) {
68 thread.sleep(1000);
69 }
70 System.exit(0);
71 }
72
73 /**
74 * 1000::1036::4::975040964::Die Hard (1988)::Action|Thriller::F::25::6::90027
75 *
76 * 用户ID::电影ID::评分::评分时间戳::电影名字::电影类型::性别::年龄::职业::邮政编码
77 * 0 1 2 3 4 5 6 7 8 9
78 *
79 * 1、key:用户ID
80 * 2、value:电影名+评分
81 *
82 * */
83 public static class MoviesDemo4Mapper1 extends Mapper<LongWritable, Text, Text, Text>{
84
85 Text outKey = new Text();
86 Text outValue = new Text();
87
88 @Override
89 protected void map(LongWritable key, Text value, Context context)
90 throws IOException, InterruptedException {
91
92 String[] split = value.toString().split("::");
93
94 String strKey = split[0];
95 String strValue = split[4]+"\t"+split[2];
96
97 if(split[6].equals("F")) {
98 outKey.set(strKey);
99 outValue.set(strValue);
100 context.write(outKey, outValue);
101 }
102
103 }
104
105 }
106
107 //统计每位女性的评论总数
108 public static class MoviesDemo4Reducer1 extends Reducer<Text, Text, Text, IntWritable>{
109
110 IntWritable outValue = new IntWritable();
111
112 @Override
113 protected void reduce(Text key, Iterable<Text> values, Context context)
114 throws IOException, InterruptedException {
115
116 int count = 0;
117 for(Text value : values) {
118 count++;
119 }
120 outValue.set(count);
121 context.write(key, outValue);
122 }
123
124 }
125
126 //对第一次MapReduce的输出结果进行降序排序
127 public static class MoviesDemo4Mapper2 extends Mapper<LongWritable, Text,Moviegoers,NullWritable>{
128
129 Moviegoers outKey = new Moviegoers();
130
131 @Override
132 protected void map(LongWritable key, Text value, Context context)
133 throws IOException, InterruptedException {
134
135 String[] split = value.toString().split("\t");
136
137 outKey.setName(split[0]);
138 outKey.setCount(Integer.parseInt(split[1]));
139 context.write(outKey, NullWritable.get());
140 }
141
142 }
143
144 //排序之后取第一个值(评论最多的女性ID和评论次数)
145 public static class MoviesDemo4Reducer2 extends Reducer<Moviegoers,NullWritable, Moviegoers,NullWritable>{
146
147 int count = 0;
148
149 @Override
150 protected void reduce(Moviegoers key, Iterable<NullWritable> values,Context context)
151 throws IOException, InterruptedException {
152
153 for(NullWritable nvl : values) {
154 count++;
155 if(count > 1) {
156 return;
157 }
158 context.write(key, nvl);
159 }
160
161 }
162
163 }
164
165
166 }