样例输入:
file1.txt
```
aa 8
bb 5
cc 3
dd 10
ee 7
ff 10
gg 22
hh 33
aa 12
bb 12
cc 8
kk 9
tt 13
```
file2.txt
```
aaa 8
bbb 5
ccc 3
ddd 10
eee 7
fff 10
ggg 22
es 33
aaa 12
bbb 12
ccc 8
kkt 9
tta 13
```
分析,不同文件中的key不同,相同文件中的key有相同的,需要先对key合并求和。
代码实现
```
package mapreduce;
import java.io.IOException;
import java.util.TreeMap;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class TopK {
public final static Integer K=10;
public static class MapClass extends Mapper<LongWritable, Text, Text, IntWritable>{
public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
String fields []=value.toString().split("\\s+");
context.write(new Text(fields[0]),new IntWritable(Integer.parseInt(fields[1])));
}
}
public static class Reduce extends Reducer<Text,IntWritable, Text,IntWritable>{
TreeMap<Integer, String> map=new TreeMap<Integer,String>();
public void reduce(Text key,Iterable<IntWritable> value,Context context)throws IOException,InterruptedException{
int sum = 0;
for(IntWritable num : value){
sum+=num.get();
}
map.put(sum, key.toString());
if(map.size()>K){
map.remove(map.firstKey());
}
}
@Override
protected void cleanup(Reducer<Text,IntWritable, Text,IntWritable>.Context context)throws IOException,InterruptedException{
for(Integer num:map.keySet()){
context.write(new Text(map.get(num)),new IntWritable(num));
}
}
}
public static void main(String[] args)throws Exception {
// TODO Auto-generated method stub
System.setProperty("hadoop.home.dir", "D:\\hadoop2.7.6");
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(TopK.class);
job.setJobName("topk");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path("D:\\input3\\file1.txt"));
FileInputFormat.addInputPath(job, new Path("D:\\input3\\file2.txt"));
FileOutputFormat.setOutputPath(job, new Path("D:\\topoutput3"));
job.waitForCompletion(true);
}
}
```
- 空白目录
- 第一章 Linux虚拟机安装
- 第二章 SSH配置
- 第三章 jdk配置
- 第四章 Hadoop配置-单机
- 第五章 Hadoop配置-集群
- 第六章 HDFS
- 第七章 MapReduce
- 7.1 MapReduce(上)
- 7.2 MapReduce(下)
- 7.3 MapReduce实验1 去重
- 7.4 MapReduce实验2 单例排序
- 7.5 MapReduce实验3 TopK
- 7.6 MapReduce实验4 倒排索引
- 第八章 Hive
- Hive安装
- 数据定义
- 数据操作
- 第九章 HBase
- 第十章 SaCa RealRec数据科学平台
- 第十一章 Spark Core
- 第十二章 Spark Streaming
- 第十章 Spark测试题