🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
[TOC] # 准备数据 flow.txt ~~~ 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 ~~~ 是手机号码跟后面的上下行流量 我们要统计每个手机号码后面的流量 # 代码 **FlowBean** ~~~ import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements WritableComparable<FlowBean> { private long upFlow; private long downFlow; private long sumFlow; //序列化框架在反序列化的时候创建对象的实例会去调用我们的无参构造函数 public FlowBean() { } public FlowBean(long upFlow, long downFlow, long sumFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = sumFlow; } public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } //序列化的方法 @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } //反序列化的方法 //注意:字段的反序列化的顺序跟序列化的顺序必须保持一致 @Override public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } /** * 这里进行我们自定义比较大小的规则 * 在reduce中会进行自动排序 */ @Override public int compareTo(FlowBean o) { return (int) (o.getSumFlow() - this.getSumFlow()); } //getter和setter方法 ~~~ 流量求和类 里面包含map,reduce,还有 ~~~ package com.folwsum; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.StringUtils; import java.io.IOException; public class FlowSum { //在kv中传输我们自定义的对象是可以的,不过必须要实现hadoop的序列化机制,也就是implement writable //输入的LongWritable,Text //输出 Text,FlowBean public static class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将抽取到的每一行数据进行字段的切分 String line = value.toString(); String[] fields = StringUtils.split(line, ' '); //抽取我们业务所需要的字段, String phoneNum = fields[1]; //取上下行流量 long upFlow = Long.parseLong(fields[fields.length -3]); long downFlow = Long.parseLong(fields[fields.length -2]); k.set(phoneNum); v.set(upFlow, downFlow); //赋值一次就序列化出去了,不会数据都是一致的 context.write(k, v); } } public static class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> { FlowBean v = new FlowBean(); //这里reduce方法接收到的key就是某一组<手机号,bean><手机号,bean><手机号,bean>当中一个的手机号 //这里的reduce方法接收到的value就是这一组kv对中的所有bean的一个迭代器 //reduce会把手机号码归类 @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long upFlowCount = 0; long downFlowCount = 0; for (FlowBean bean : values) { upFlowCount += bean.getUpFlow(); downFlowCount += bean.getDownFlow(); } v.set(upFlowCount, downFlowCount); context.write(key, v); } } //job驱动 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //上面这样写,不好,换了路径又要重新写,我们改为用他的类加载器加载他自己 job.setJarByClass(FlowSum.class); //告诉框架,我们程序所用的mapper类和reduce类是什么 job.setMapperClass(FlowSumMapper.class); job.setReducerClass(FlowSumReducer.class); //告诉框架我们程序输出的类型, // 如果map阶段和最终输出结果是一样的,这2行可以不写 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //告诉框架,我们程序使用的数据读取组件,结果输出所用的组件是什么 //TextInputFormat是mapreduce程序中内置的一种读取数据组件,准备的叫做读取文本的输入组件 //程序默认的输出组件就是TextOutputFormat,下面那个可以注释 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //告诉框架,我们要处理的数据文件在那个路径下 FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input/")); //告诉框架我们的处理结果要输出到什么地方 FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output/")); //这边不用submit,因为一提交就和我这个没关系了,我这就断开了就看不见了 // job.submit(); //提交后,然后等待服务器端返回值,看是不是true boolean res = job.waitForCompletion(true); //设置成功就退出码为0 System.exit(res ? 0 : 1); } } ~~~ # 按总量排序需求 MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key 所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable 然后重写key的compareTo方法 排序默认是按照字典排序 自定义排序,他自己他自定义的每个类里面都有compareTo方法 比如LongWritable 或者一些类实现或继承了一些比较接口 如果是我们自己定义的类呢? 如下 ![](https://box.kancloud.cn/df7eb02fcce7fecb56fbe7e500199823_1258x680.png) 然后代码 ~~~ package com.folwsum; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.StringUtils; import java.io.IOException; // 实现流量汇总并按照流量大小的倒序排序 public class FlowSumSort { public static class FlowSumSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> { FlowBean k = new FlowBean(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将抽取到的每一行数据进行字段的切分 String line = value.toString(); String[] fields = StringUtils.split(line, ' '); //抽取我们业务所需要的字段 String phoNum = fields[0]; long upFlow = Long.parseLong(fields[1]); long downFlow = Long.parseLong(fields[2]); k.set(upFlow, downFlow); v.set(phoNum); //赋值一次就序列化出去了,不会数据都是一致的 context.write(k, v); } } public static class FlowSumSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> { @Override protected void reduce(FlowBean bean, Iterable<Text> PhoneNum, Context context) throws IOException, InterruptedException { //这边写的时候会自动排序的 context.write(PhoneNum.iterator().next(), bean); } } public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumSort.class); //告诉程序,我们的程序所用的mapper类和reducer类是什么 job.setMapperClass(FlowSumSortMapper.class); job.setReducerClass(FlowSumSortReducer.class); //告诉框架,我们程序输出的数据类型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //告诉框架,我们程序使用的数据读取组件 结果输出所用的组件是什么 //TextInputFormat是mapreduce程序中内置的一种读取数据组件 准确的说 叫做 读取文本文件的输入组件 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //告诉框架,我们要处理的数据文件在那个路劲下 FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input/")); //告诉框架,我们的处理结果要输出到什么地方 FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output/")); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } } ~~~ # 一次性完成,统计和排序 我们看下继承的reduce ![](https://box.kancloud.cn/acb5180a8191603e0b6892998eaf1603_1656x826.png) reduce需要调用run方法,run方法中不仅执行了reduce最后还执行了cleanup 因为map不断的提交给reduce,reduce排序好了就要写,但是这时候一旦写到文件中,后面再来任务,再写的话,就不能和前面一起排序了 所以我们写到一个treeMap中,然后在cleanup中做treeMap做排序 代码主要把继承reduce中的那个类改了下 ~~~ package com.folwsum; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.StringUtils; import java.io.IOException; import java.util.Map; import java.util.Set; import java.util.TreeMap; public class OneStepFlowSumSort { public static class OneStepFlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将读取到的每一行数据进行字段的切分 String line = value.toString(); String[] fields = StringUtils.split(line, ' '); //抽取我们业务所需要的字段 String phoneNum = fields[1]; long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]); k.set(phoneNum); v.set(upFlow, downFlow); context.write(k, v); } } public static class OneStepFlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> { //在这里进行reduce端的局部缓存TreeMap TreeMap<FlowBean,Text> treeMap = new TreeMap<FlowBean, Text>(); //这里reduce方法接收到的key就是某一组《a手机号,bean》《a手机号,bean》 《b手机号,bean》《b手机号,bean》当中的第一个手机号 //这里reduce方法接收到的values就是这一组kv对中的所以bean的一个迭代器 @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long upFlowCount = 0; long downFlowCount = 0; for(FlowBean bean : values){ upFlowCount += bean.getUpFlow(); downFlowCount += bean.getDownFlow(); } FlowBean sumbean = new FlowBean(); sumbean.set(upFlowCount, downFlowCount); Text text = new Text(key.toString()); treeMap.put(sumbean, text); } //这里进行我们全局的最终输出 @Override protected void cleanup(Context context) throws IOException, InterruptedException { Set<Map.Entry<FlowBean,Text>> entrySet = treeMap.entrySet(); for(Map.Entry<FlowBean,Text> ent :entrySet){ context.write(ent.getValue(), ent.getKey()); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(OneStepFlowSumSort.class); //告诉程序,我们的程序所用的mapper类和reducer类是什么 job.setMapperClass(OneStepFlowSumMapper.class); job.setReducerClass(OneStepFlowSumReducer.class); //告诉框架,我们程序输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //告诉框架,我们程序使用的数据读取组件 结果输出所用的组件是什么 //TextInputFormat是mapreduce程序中内置的一种读取数据组件 准确的说 叫做 读取文本文件的输入组件 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //告诉框架,我们要处理的数据文件在那个路劲下 FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input/")); //告诉框架,我们的处理结果要输出到什么地方 FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output/")); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } } } ~~~ # 对不同的手机号码分成不同文件 默认的分区规则 ![](https://box.kancloud.cn/5a3c3c0506d330fbe7231416bbe2cd3c_1626x838.png) ## 分区类 Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask 默认的分发规则为:根据key的`hashcode%reducetask`数来分发 所以:如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner 自定义一个CustomPartitioner继承抽象类:Partitioner 然后在job对象中,设置自定义`partitioner: job.setPartitionerClass(CustomPartitioner.class)` 我们需要继承Partitioner这个分区类,来实现我们自己的分区 ~~~ package com.folwsum; import org.apache.hadoop.mapreduce.Partitioner; import java.util.HashMap; public class ProvivcePartitioner extends Partitioner { private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>(); static{ //我们这边就提前设置手机号码对应的分区 provinceMap.put("135", 0); provinceMap.put("136", 1); provinceMap.put("137", 2); provinceMap.put("138", 3); provinceMap.put("139", 4); } @Override public int getPartition(Object o, Object o2, int numPartitions) { //根据手机的前3位,进行取他的值,就是上面定义的 Integer code = provinceMap.get(o.toString().substring(0, 3)); if(code != null){ return code; } //没有取到就分到5去 return 5; } } ~~~ ## 任务类 主要是main方法里面的 ~~~ package com.folwsum; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.StringUtils; import java.io.IOException; public class FlowSumProvince { public static class ProvinceFlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将读取到的每一行数据进行字段的切分 String line = value.toString(); String[] fields = StringUtils.split(line, ' '); //抽取我们业务所需要的字段 String phoneNum = fields[1]; long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]); k.set(phoneNum); v.set(upFlow, downFlow); context.write(k, v); } } public static class ProvinceFlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> { @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long upFlowCount = 0; long downFlowCount = 0; for(FlowBean bean : values){ upFlowCount += bean.getUpFlow(); downFlowCount += bean.getDownFlow(); } FlowBean sumbean = new FlowBean(); sumbean.set(upFlowCount, downFlowCount); context.write(key, sumbean); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumProvince.class); //告诉程序,我们的程序所用的mapper类和reducer类是什么 job.setMapperClass(ProvinceFlowSumMapper.class); job.setReducerClass(ProvinceFlowSumReducer.class); //告诉框架,我们程序输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //设置我们的shuffer的分区组件使用我们自定义的组件 job.setPartitionerClass(ProvivcePartitioner.class); //这里设置我们的reduce task个数 默认是一个partition分区对应一个reduce task 输出文件也是一对一 //如果我们的Reduce task个数 < partition分区数 就会报错Illegal partition //如果我们的Reduce task个数 > partition分区数 不会报错,会有空文件产生 //如果我们的Reduce task个数 = 1 partitoner组件就无效了 不存在分区的结果 //这边设置为6,因为没有匹配到的就到第5个 job.setNumReduceTasks(6); //告诉框架,我们程序使用的数据读取组件 结果输出所用的组件是什么 //TextInputFormat是mapreduce程序中内置的一种读取数据组件 准确的说 叫做 读取文本文件的输入组件 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //告诉框架,我们要处理的数据文件在那个路劲下 FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input")); //如果有这个文件夹就删除 Path out = new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output"); FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(out)) { fileSystem.delete(out, true); } //告诉框架,我们的处理结果要输出到什么地方 FileOutputFormat.setOutputPath(job, out); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } } ~~~