企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
样例输入: file1.txt ``` aa 8 bb 5 cc 3 dd 10 ee 7 ff 10 gg 22 hh 33 aa 12 bb 12 cc 8 kk 9 tt 13 ``` file2.txt ``` aaa 8 bbb 5 ccc 3 ddd 10 eee 7 fff 10 ggg 22 es 33 aaa 12 bbb 12 ccc 8 kkt 9 tta 13 ``` 分析,不同文件中的key不同,相同文件中的key有相同的,需要先对key合并求和。 代码实现 ``` package mapreduce; import java.io.IOException; import java.util.TreeMap; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class TopK { public final static Integer K=10; public static class MapClass extends Mapper<LongWritable, Text, Text, IntWritable>{ public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{ String fields []=value.toString().split("\\s+"); context.write(new Text(fields[0]),new IntWritable(Integer.parseInt(fields[1]))); } } public static class Reduce extends Reducer<Text,IntWritable, Text,IntWritable>{ TreeMap<Integer, String> map=new TreeMap<Integer,String>(); public void reduce(Text key,Iterable<IntWritable> value,Context context)throws IOException,InterruptedException{ int sum = 0; for(IntWritable num : value){ sum+=num.get(); } map.put(sum, key.toString()); if(map.size()>K){ map.remove(map.firstKey()); } } @Override protected void cleanup(Reducer<Text,IntWritable, Text,IntWritable>.Context context)throws IOException,InterruptedException{ for(Integer num:map.keySet()){ context.write(new Text(map.get(num)),new IntWritable(num)); } } } public static void main(String[] args)throws Exception { // TODO Auto-generated method stub System.setProperty("hadoop.home.dir", "D:\\hadoop2.7.6"); Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(TopK.class); job.setJobName("topk"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path("D:\\input3\\file1.txt")); FileInputFormat.addInputPath(job, new Path("D:\\input3\\file2.txt")); FileOutputFormat.setOutputPath(job, new Path("D:\\topoutput3")); job.waitForCompletion(true); } } ```