企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 对不同的手机号码分成不同文件 接着上面的例子 默认的分区规则 ![](https://box.kancloud.cn/1b730c49dd2e0a7cd8e609328db5aba3_928x469.png) ## 分区类 ~~~ package com.folwsum; import org.apache.hadoop.mapreduce.Partitioner; import java.util.HashMap; public class ProvivcePartitioner extends Partitioner { private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>(); static{ //我们这边就提前设置手机号码对应的分区 provinceMap.put("135", 0); provinceMap.put("136", 1); provinceMap.put("137", 2); provinceMap.put("138", 3); provinceMap.put("139", 4); } @Override public int getPartition(Object o, Object o2, int numPartitions) { //根据手机的前3位,进行取他的值,就是上面定义的 Integer code = provinceMap.get(o.toString().substring(0, 3)); if(code != null){ return code; } //没有取到就分到5去 return 5; } } ~~~ ## 任务类 主要是main方法里面的 ~~~ package com.folwsum; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.StringUtils; import java.io.IOException; public class FlowSumProvince { public static class ProvinceFlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将读取到的每一行数据进行字段的切分 String line = value.toString(); String[] fields = StringUtils.split(line, ' '); //抽取我们业务所需要的字段 String phoneNum = fields[1]; long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]); k.set(phoneNum); v.set(upFlow, downFlow); context.write(k, v); } } public static class ProvinceFlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> { @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long upFlowCount = 0; long downFlowCount = 0; for(FlowBean bean : values){ upFlowCount += bean.getUpFlow(); downFlowCount += bean.getDownFlow(); } FlowBean sumbean = new FlowBean(); sumbean.set(upFlowCount, downFlowCount); context.write(key, sumbean); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumProvince.class); //告诉程序,我们的程序所用的mapper类和reducer类是什么 job.setMapperClass(ProvinceFlowSumMapper.class); job.setReducerClass(ProvinceFlowSumReducer.class); //告诉框架,我们程序输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //设置我们的shuffer的分区组件使用我们自定义的组件 job.setPartitionerClass(ProvivcePartitioner.class); //这里设置我们的reduce task个数 默认是一个partition分区对应一个reduce task 输出文件也是一对一 //如果我们的Reduce task个数 < partition分区数 就会报错Illegal partition //如果我们的Reduce task个数 > partition分区数 不会报错,会有空文件产生 //如果我们的Reduce task个数 = 1 partitoner组件就无效了 不存在分区的结果 //这边设置为6,因为没有匹配到的就到第5个 job.setNumReduceTasks(6); //告诉框架,我们程序使用的数据读取组件 结果输出所用的组件是什么 //TextInputFormat是mapreduce程序中内置的一种读取数据组件 准确的说 叫做 读取文本文件的输入组件 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //告诉框架,我们要处理的数据文件在那个路劲下 FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input")); //如果有这个文件夹就删除 Path out = new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output"); FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(out)) { fileSystem.delete(out, true); } //告诉框架,我们的处理结果要输出到什么地方 FileOutputFormat.setOutputPath(job, out); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } } ~~~