logo头像
Snippet 博客主题

Hadoop学习之路(二十八)MapReduce的API使用(五)

** Hadoop学习之路(二十八)MapReduce的API使用(五):** <Excerpt in index | 首页摘要>

​ Hadoop学习之路(二十八)MapReduce的API使用(五)

<The rest of contents | 余下全文>

求所有两两用户之间的共同好友

数据格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J,K

以上是数据:
A:B,C,D,F,E,O
表示:B,C,D,E,F,O是A用户的好友。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
  1 public class SharedFriend {
2 /*
3 第一阶段的map函数主要完成以下任务
4 1.遍历原始文件中每行<所有朋友>信息
5 2.遍历“朋友”集合,以每个“朋友”为键,原来的“人”为值 即输出<朋友,人>
6 */
7 static class SharedFriendMapper01 extends Mapper<LongWritable, Text, Text, Text>{
8 @Override
9 protected void map(LongWritable key, Text value,Context context)
10 throws IOException, InterruptedException {
11 String line = value.toString();
12 String[] person_friends = line.split(":");
13 String person = person_friends[0];
14 String[] friends = person_friends[1].split(",");
15
16 for(String friend : friends){
17 context.write(new Text(friend), new Text(person));
18 }
19 }
20 }
21
22 /*
23 第一阶段的reduce函数主要完成以下任务
24 1.对所有传过来的<朋友,list(人)>进行拼接,输出<朋友,拥有这名朋友的所有人>
25 */
26 static class SharedFriendReducer01 extends Reducer<Text, Text, Text, Text>{
27 @Override
28 protected void reduce(Text key, Iterable<Text> values,Context context)
29 throws IOException, InterruptedException {
30 StringBuffer sb = new StringBuffer();
31 for(Text friend : values){
32 sb.append(friend.toString()).append(",");
33 }
34 sb.deleteCharAt(sb.length()-1);
35 context.write(key, new Text(sb.toString()));
36 }
37 }
38
39 /*
40 第二阶段的map函数主要完成以下任务
41 1.将上一阶段reduce输出的<朋友,拥有这名朋友的所有人>信息中的 “拥有这名朋友的所有人”进行排序 ,以防出现B-C C-B这样的重复
42 2.将 “拥有这名朋友的所有人”进行两两配对,并将配对后的字符串当做键,“朋友”当做值输出,即输出<人-人,共同朋友>
43 */
44 static class SharedFriendMapper02 extends Mapper<LongWritable, Text, Text, Text>{
45 @Override
46 protected void map(LongWritable key, Text value,Context context)
47 throws IOException, InterruptedException {
48 String line = value.toString();
49 String[] friend_persons = line.split("\t");
50 String friend = friend_persons[0];
51 String[] persons = friend_persons[1].split(",");
52 Arrays.sort(persons); //排序
53
54 //两两配对
55 for(int i=0;i<persons.length-1;i++){
56 for(int j=i+1;j<persons.length;j++){
57 context.write(new Text(persons[i]+"-"+persons[j]+":"), new Text(friend));
58 }
59 }
60 }
61 }
62
63 /*
64 第二阶段的reduce函数主要完成以下任务
65 1.<人-人,list(共同朋友)> 中的“共同好友”进行拼接 最后输出<人-人,两人的所有共同好友>
66 */
67 static class SharedFriendReducer02 extends Reducer<Text, Text, Text, Text>{
68 @Override
69 protected void reduce(Text key, Iterable<Text> values,Context context)
70 throws IOException, InterruptedException {
71 StringBuffer sb = new StringBuffer();
72 Set<String> set = new HashSet<String>();
73 for(Text friend : values){
74 if(!set.contains(friend.toString()))
75 set.add(friend.toString());
76 }
77 for(String friend : set){
78 sb.append(friend.toString()).append(",");
79 }
80 sb.deleteCharAt(sb.length()-1);
81
82 context.write(key, new Text(sb.toString()));
83 }
84 }
85
86 public static void main(String[] args)throws Exception {
87 Configuration conf = new Configuration();
88
89 //第一阶段
90 Job job1 = Job.getInstance(conf);
91 job1.setJarByClass(SharedFriend.class);
92 job1.setMapperClass(SharedFriendMapper01.class);
93 job1.setReducerClass(SharedFriendReducer01.class);
94
95 job1.setOutputKeyClass(Text.class);
96 job1.setOutputValueClass(Text.class);
97
98 FileInputFormat.setInputPaths(job1, new Path("H:/大数据/mapreduce/sharedfriend/input"));
99 FileOutputFormat.setOutputPath(job1, new Path("H:/大数据/mapreduce/sharedfriend/output"));
100
101 boolean res1 = job1.waitForCompletion(true);
102
103 //第二阶段
104 Job job2 = Job.getInstance(conf);
105 job2.setJarByClass(SharedFriend.class);
106 job2.setMapperClass(SharedFriendMapper02.class);
107 job2.setReducerClass(SharedFriendReducer02.class);
108
109 job2.setOutputKeyClass(Text.class);
110 job2.setOutputValueClass(Text.class);
111
112 FileInputFormat.setInputPaths(job2, new Path("H:/大数据/mapreduce/sharedfriend/output"));
113 FileOutputFormat.setOutputPath(job2, new Path("H:/大数据/mapreduce/sharedfriend/output01"));
114
115 boolean res2 = job2.waitForCompletion(true);
116
117 System.exit(res1?0:1);
118 }
119 }

第一阶段输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
 1 A    F,I,O,K,G,D,C,H,B
2 B E,J,F,A
3 C B,E,K,A,H,G,F
4 D H,C,G,F,E,A,K,L
5 E A,B,L,G,M,F,D,H
6 F C,M,L,A,D,G
7 G M
8 H O
9 I O,C
10 J O
11 K O,B
12 L D,E
13 M E,F
14 O A,H,I,J,F

第二阶段输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
 1 A-B    C,E
2 A-C D,F
3 A-D E,F
4 A-E C,B,D
5 A-F E,O,C,D,B
6 A-G F,C,E,D
7 A-H D,O,C,E
8 A-I O
9 A-J B,O
10 A-K C,D
11 A-L D,E,F
12 A-M E,F
13 B-C A
14 B-D A,E
15 B-E C
16 B-F A,C,E
17 B-G E,C,A
18 B-H A,E,C
19 B-I A
20 B-K A,C
21 B-L E
22 B-M E
23 B-O K,A
24 C-D F,A
25 C-E D
26 C-F D,A
27 C-G D,F,A
28 C-H D,A
29 C-I A
30 C-K A,D
31 C-L D,F
32 C-M F
33 C-O I,A
34 D-E L
35 D-F A,E
36 D-G F,A,E
37 D-H A,E
38 D-I A
39 D-K A
40 D-L F,E
41 D-M F,E
42 D-O A
43 E-F C,D,M,B
44 E-G C,D
45 E-H C,D
46 E-J B
47 E-K D,C
48 E-L D
49 F-G C,E,D,A
50 F-H D,O,A,E,C
51 F-I A,O
52 F-J O,B
53 F-K D,C,A
54 F-L D,E
55 F-M E
56 F-O A
57 G-H E,C,D,A
58 G-I A
59 G-K D,A,C
60 G-L F,E,D
61 G-M E,F
62 G-O A
63 H-I A,O
64 H-J O
65 H-K C,D,A
66 H-L D,E
67 H-M E
68 H-O A
69 I-J O
70 I-K A
71 I-O A
72 K-L D
73 K-O A
74 L-M F,E