🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
[TOC] # 准备数据 ~~~ Order_0000001,pd001,222.8 Order_0000001,pd005,25.8 Order_0000002,pd005,325.8 Order_0000002,pd003,522.8 Order_0000002,pd004,122.4 Order_0000003,pd001,222.8 Order_0000003,pd001,322.8 ~~~ ![](https://box.kancloud.cn/1c1e31009c16da314e7313be3331cb3a_1346x785.png) 他是记录订单编号,商品和成交金额 然后取出每个订单的top1和topN的数据 里面需要用到一个分组的 1. 利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce 2. 在reduce端利用GroupingComparator将订单id相同的kv聚合成组,然后取第一个即是最大值 # 简介 每个map就负责自己这边的先按照订单id排序,订单id一样按照金额排序 然后reduce从map这边获取,按照key归类,每个key是每组订单,reducer这边直接输出就行了 输出的时候groupingComparator会进行分组输出 # top1代码 每个订单有多个商品,找出每笔订单中的最大的一个商品 ![](https://box.kancloud.cn/b12a7f6f6f00d26dbaadc80fe50cc97c_1206x575.png) **OrderBean** ~~~ package com.top; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class OrderBean implements WritableComparable<OrderBean> { private Text itemid; private DoubleWritable amount; public OrderBean() { } public OrderBean(Text itemid, DoubleWritable amount) { set(itemid, amount); } public void set(Text itemid, DoubleWritable amount) { this.itemid = itemid; this.amount = amount; } public Text getItemid() { return itemid; } public DoubleWritable getAmount() { return amount; } @Override public int compareTo(OrderBean o) { //比较他的订单id int cmp = this.itemid.compareTo(o.getItemid()); //如果订单id相同就比较金额 if (cmp == 0) { //-号表示倒序 cmp = -this.amount.compareTo(o.getAmount()); } return cmp; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(itemid.toString()); out.writeDouble(amount.get()); } @Override public void readFields(DataInput in) throws IOException { String readUTF = in.readUTF(); double readDouble = in.readDouble(); this.itemid = new Text(readUTF); this.amount = new DoubleWritable(readDouble); } @Override public String toString() { return "OrderBean{" + "itemid=" + itemid + ", amount=" + amount + '}'; } } ~~~ **ItemIdPartitioner** 自定义分区组件 **保证一个订单中的相同bean的id一定能分到同一个地方** ~~~ package com.top; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable> { @Override public int getPartition(OrderBean key, NullWritable nullWritable, int numPartitions) { //模拟源码中写的,保证一个订单中的相同bean的id一定能分到同一个地方 return (key.getItemid().hashCode() & Integer.MAX_VALUE) % numPartitions; } } ~~~ **ItemidGroupingComparator** ~~~ package com.top; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class ItemidGroupingComparator extends WritableComparator { protected ItemidGroupingComparator() { //一定要调用下super,里面放你要比较的对象 super(OrderBean.class, true); } //他会传入2个你上面的写的对象,比如这边是2个bean @Override public int compare(WritableComparable a, WritableComparable b) { //把这个bean强行转换下 OrderBean abean = (OrderBean) a; OrderBean bbean = (OrderBean) b; //取出这2个bean,如果这2个bean的id相比较是一样就放到一起比较 //会调用bean里面的比较出结果,负的就舍弃,在reduce的输出阶段 //reduce接收的时候都能接收,输出的时候谁要是小了就舍弃了 return abean.getItemid().compareTo(bbean.getItemid()); } } ~~~ **TopOne** ~~~ package com.top; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.StringUtils; import java.io.IOException; public class TopOne { public static class TopOneMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> { OrderBean bean = new OrderBean(); // Text itemid = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); //如果是空行就直接返回 if (line.equals("")) { return ;} String[] fields = StringUtils.split(line, ','); bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2]))); context.write(bean, NullWritable.get()); } } public static class TopOneReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> { @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TopOne.class); job.setMapperClass(TopOneMapper.class); job.setReducerClass(TopOneReducer.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/index/input")); //如果有这个文件夹就删除 Path out = new Path("/Users/jdxia/Desktop/website/hdfs/index/output/"); FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(out)) { fileSystem.delete(out, true); } //告诉框架,我们的处理结果要输出到什么地方 FileOutputFormat.setOutputPath(job, out); //注册一个GroupingComparator job.setGroupingComparatorClass(ItemidGroupingComparator.class); //设置分区 job.setPartitionerClass(ItemIdPartitioner.class); //1个reduce任务 job.setNumReduceTasks(1); job.waitForCompletion(true); } } ~~~ # topN代码 bean中要添加 ~~~ @Override public boolean equals(Object o) { OrderBean bean = (OrderBean) o; return bean.getItemid().equals(this.itemid); } ~~~ 主类中修改 ~~~ package com.top; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.StringUtils; import java.io.IOException; public class TopN { static class TopNMapper extends Mapper<LongWritable, Text, OrderBean, OrderBean> { OrderBean v = new OrderBean(); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, ','); k.set(fields[0]); v.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2]))); context.write(v, v); } } static class TopNReducer extends Reducer<OrderBean, OrderBean, NullWritable, OrderBean> { int topn = 1; int count = 0; @Override protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); //从驱动配置中取值 topn = Integer.parseInt(conf.get("topn")); } @Override protected void reduce(OrderBean key, Iterable<OrderBean> values, Context context) throws IOException, InterruptedException { count = 0; for (OrderBean bean : values) { //只给迭代topn次 if ((count++) == topn) { return; } context.write(NullWritable.get(), bean); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // ָ如果要写配置文件就这样写 // conf.addResource("userconfig.xml"); // System.out.println(conf.get("top.n")); //代表要取top几 // 我这边就直接设置要求top2了 conf.set("topn", "2"); Job job = Job.getInstance(conf); job.setJarByClass(TopN.class); job.setMapperClass(TopNMapper.class); job.setReducerClass(TopNReducer.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(OrderBean.class); FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/index/input")); //如果有这个文件夹就删除 Path out = new Path("/Users/jdxia/Desktop/website/hdfs/index/output/"); FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(out)) { fileSystem.delete(out, true); } //告诉框架,我们的处理结果要输出到什么地方 FileOutputFormat.setOutputPath(job, out); //注册一个GroupingComparator,用来比较 job.setGroupingComparatorClass(ItemidGroupingComparator.class); //设置分区分组 job.setPartitionerClass(ItemIdPartitioner.class); //1个reduce任务 job.setNumReduceTasks(1); job.waitForCompletion(true); } } ~~~ # 总结 top几是由reducer那边决定的,他决定要输出几个就是几个, 但是比如多个key到reducer这边,怎么分组是自定义groupingComparator的事情 分组完,按照自定义bean中的排序进行排的