🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
[TOC] # 分析 求出哪些人两两之间有共同好友,及他俩的共同好友都是谁 数据准备 ~~~ A:B,C,D,F,E,O B:A,C,E,K C:F,A,D,I D:A,E,F,L E:B,C,D,M,L F:A,B,C,D,E,O,M G:A,C,D,E,F H:A,C,D,E,O I:A,O J:B,O K:A,C,D L:D,E,F M:E,F,G O:A,H,I,J ~~~ ![](https://box.kancloud.cn/ad11e0d73b3095f37a8e7470b51903d1_270x61.png) 分析下 ~~~ 比如前面是用户,后面是好友,那我们第一次就把好友开始统计,从冒号后面开始统计第一个输出: 把好友标在前面,用户放在后面(map阶段) b -a c -a d -a a -b c -b 然后把他们聚合,因为这样是有重复的(reduce阶段) 把第一个当做key,key相同的,其余的当做迭代的value 第一个输出: b -> a e j c ->a b e f h ------------------------- 对上面的结果进行每行两两组合(map阶段) 后面的2个两两组合(注意写之前要排序) 不然a-b和b-a会认为不同,map写的之前要排序下,都变成a-b 第二个MR: a-e b a-j b e-j b a-b c a-e c 然后把他们聚合(reduce阶段) 比如 a-e b c d a-m e f ~~~ 因为他是基于已经存在的单向好友关系的,反过来再找好友就是双向的 然后不断集合和排序,排序主要是防止A-B,B-A出现,两两组合 # 代码 ## 第一步 ~~~ package com.Commonfriends; import com.index.IndexStepTwo; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; public class CommonFriendsStepOne { public static class CommonFriendsStepOneMapper extends Mapper<LongWritable, Text, Text, Text> { //比如前面是用户,后面是好友,那我们第一次就把好友开始统计,从冒号后面开始统计第一个输出: //把好友标在前面,用户放在后面 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] splits = line.split(":"); String person = splits[0]; String[] friends = splits[1].split(","); for (String fString : friends) { context.write(new Text(fString), new Text(person)); } } } //然后把他们聚合 public static class CommonFriendsStepOneReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text friend, Iterable<Text> person, Context context) throws IOException, InterruptedException { StringBuffer sBuffer = new StringBuffer(); for (Text pText : person) { sBuffer.append(pText).append("-"); } context.write(friend,new Text(sBuffer.toString())); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(); job.setJarByClass(CommonFriendsStepOne.class); //告诉程序,我们的程序所用的mapper类和reducer类是什么 job.setMapperClass(CommonFriendsStepOneMapper.class); job.setReducerClass(CommonFriendsStepOneReducer.class); //告诉框架,我们程序输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //告诉框架,我们程序使用的数据读取组件 结果输出所用的组件是什么 //TextInputFormat是mapreduce程序中内置的一种读取数据组件 准确的说 叫做 读取文本文件的输入组件 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //告诉框架,我们要处理的数据文件在那个路劲下 FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/index/input/")); //如果有这个文件夹就删除 Path out = new Path("/Users/jdxia/Desktop/website/hdfs/index/output/"); FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(out)) { fileSystem.delete(out, true); } //告诉框架,我们的处理结果要输出到什么地方 FileOutputFormat.setOutputPath(job, out); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } } ~~~ ## 第二步 其他要把第一步的结果,放到input下 ~~~ package com.Commonfriends; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; import java.util.Arrays; public class CommonFriendsStepTwo { /** * A I-K-C-B-G-F-H-O-D- B A-F-J-E- C A-E-B-H-F-G-K- * */ public static class CommonFriendsStepTwoMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] splits = line.split(" "); String friend = splits[0]; String[] persons = splits[1].split("-"); Arrays.sort(persons); for (int i = 0; i < persons.length - 1; i++) { for (int j = i + 1; j < persons.length; j++) { context.write(new Text(persons[i] + "-" + persons[j]), new Text(friend)); } } } } public static class CommonFriendsStepTwoReducer extends Reducer<Text,Text,Text,Text> { @Override protected void reduce(Text person_pair, Iterable<Text> friends, Context context) throws IOException, InterruptedException { StringBuffer sBuffer = new StringBuffer(); for (Text fText: friends) { sBuffer.append(fText).append(" "); } context.write(person_pair, new Text(sBuffer.toString())); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(); job.setJarByClass(CommonFriendsStepTwo.class); //告诉程序,我们的程序所用的mapper类和reducer类是什么 job.setMapperClass(CommonFriendsStepTwoMapper.class); job.setReducerClass(CommonFriendsStepTwoReducer.class); //告诉框架,我们程序输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //告诉框架,我们程序使用的数据读取组件 结果输出所用的组件是什么 //TextInputFormat是mapreduce程序中内置的一种读取数据组件 准确的说 叫做 读取文本文件的输入组件 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //告诉框架,我们要处理的数据文件在那个路径下 FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/index/input/")); //如果有这个文件夹就删除 Path out = new Path("/Users/jdxia/Desktop/website/hdfs/index/output/"); FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(out)) { fileSystem.delete(out, true); } //告诉框架,我们的处理结果要输出到什么地方 FileOutputFormat.setOutputPath(job, out); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } } ~~~