💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
[TOC] # 准备数据 ~~~ Order_0000001,pd001,222.8 Order_0000001,pd005,25.8 Order_0000002,pd005,325.8 Order_0000002,pd003,522.8 Order_0000002,pd004,122.4 Order_0000003,pd001,222.8 Order_0000003,pd001,322.8 ~~~ ![](https://box.kancloud.cn/3cb59971cd2977453d2fb5ea5f490ba2_1874x1106.png) 他是记录订单编号,商品和成交金额 然后取出每个订单的top1和topN的数据 里面需要用到一个分组的 1. 利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce 2. 在reduce端利用GroupingComparator将订单id相同的kv聚合成组,然后取第一个即是最大值 # top1代码 **OrderBean** ~~~ package com.top; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class OrderBean implements WritableComparable<OrderBean> { private Text itemid; private DoubleWritable amount; public OrderBean() { } public OrderBean(Text itemid, DoubleWritable amount) { set(itemid, amount); } public void set(Text itemid, DoubleWritable amount) { this.itemid = itemid; this.amount = amount; } public Text getItemid() { return itemid; } public DoubleWritable getAmount() { return amount; } @Override public int compareTo(OrderBean o) { //比较他的订单id int cmp = this.itemid.compareTo(o.getItemid()); //如果订单id相同就比较金额 if (cmp == 0) { //-号表示倒叙 cmp = -this.amount.compareTo(o.getAmount()); } return cmp; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(itemid.toString()); out.writeDouble(amount.get()); } @Override public void readFields(DataInput in) throws IOException { String readUTF = in.readUTF(); double readDouble = in.readDouble(); this.itemid = new Text(readUTF); this.amount = new DoubleWritable(readDouble); } @Override public String toString() { return "OrderBean{" + "itemid=" + itemid + ", amount=" + amount + '}'; } } ~~~ **ItemIdPartitioner** ~~~ package com.top; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable> { @Override public int getPartition(OrderBean key, NullWritable nullWritable, int numPartitions) { //模拟源码中写的,保证一个订单中的相同bean的id一定能分到同一个地方 return (key.getItemid().hashCode() & Integer.MAX_VALUE) % numPartitions; } } ~~~ **ItemidGroupingComparator** ~~~ package com.top; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class ItemidGroupingComparator extends WritableComparator { protected ItemidGroupingComparator() { //一定要调用下super,里面放你要比较的对象 super(OrderBean.class, true); } //他会传入2个你上面的写的对象,比如这边是2个bean @Override public int compare(WritableComparable a, WritableComparable b) { //把这个bean强行转换下 OrderBean abean = (OrderBean) a; OrderBean bbean = (OrderBean) b; //取出这2个bean,如果这2个bean的id相比较是一样就放到一起 return abean.getItemid().compareTo(bbean.getItemid()); } } ~~~ **TopOne** ~~~ package com.top; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.StringUtils; import java.io.IOException; public class TopOne { public static class TopOneMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> { OrderBean bean = new OrderBean(); // Text itemid = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, ','); bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2]))); context.write(bean, NullWritable.get()); } } public static class TopOneReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> { @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TopOne.class); job.setMapperClass(TopOneMapper.class); job.setReducerClass(TopOneReducer.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/index/input")); //如果有这个文件夹就删除 Path out = new Path("/Users/jdxia/Desktop/website/hdfs/index/output/"); FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(out)) { fileSystem.delete(out, true); } //告诉框架,我们的处理结果要输出到什么地方 FileOutputFormat.setOutputPath(job, out); //注册一个GroupingComparator job.setGroupingComparatorClass(ItemidGroupingComparator.class); job.setPartitionerClass(ItemIdPartitioner.class); job.setNumReduceTasks(1); job.waitForCompletion(true); } } ~~~ # topN代码 bean中要添加 ~~~ @Override public boolean equals(Object o) { OrderBean bean = (OrderBean) o; return bean.getItemid().equals(this.itemid); } ~~~ 主类中修改 ~~~ package com.top; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.StringUtils; import java.io.IOException; public class TopN { static class TopNMapper extends Mapper<LongWritable, Text, OrderBean, OrderBean> { OrderBean v = new OrderBean(); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, ','); k.set(fields[0]); v.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2]))); context.write(v, v); } } static class TopNReducer extends Reducer<OrderBean, OrderBean, NullWritable, OrderBean> { int topn = 1; int count = 0; @Override protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); topn = Integer.parseInt(conf.get("topn")); } @Override protected void reduce(OrderBean key, Iterable<OrderBean> values, Context context) throws IOException, InterruptedException { count = 0; for (OrderBean bean : values) { if ((count++) == topn) { return; } context.write(NullWritable.get(), bean); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // ָ如果要写配置文件就这样写 // conf.addResource("userconfig.xml"); // System.out.println(conf.get("top.n")); // 我这边就直接设置要求top2了 conf.set("topn", "2"); Job job = Job.getInstance(conf); job.setJarByClass(TopN.class); job.setMapperClass(TopNMapper.class); job.setReducerClass(TopNReducer.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(OrderBean.class); FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/index/input")); //如果有这个文件夹就删除 Path out = new Path("/Users/jdxia/Desktop/website/hdfs/index/output/"); FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(out)) { fileSystem.delete(out, true); } //告诉框架,我们的处理结果要输出到什么地方 FileOutputFormat.setOutputPath(job, out); //注册一个GroupingComparator job.setGroupingComparatorClass(ItemidGroupingComparator.class); job.setPartitionerClass(ItemIdPartitioner.class); job.setNumReduceTasks(1); job.waitForCompletion(true); } } ~~~