多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
``` 对【Hadoop序列化对象 -> 自定义序列化对象 -> 案例:统计手机流量信息】 这个案例添加分区。 ``` 1. 需求:要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)。 2. 默认 partition 分区 ```java public class HashPartitioner<K, V> extends Partitioner<K, V> { public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } } ``` 默认分区是<ins>根据 key 的 hashCode 对 reduceTasks 个数取模得到的</ins>。用户没法控制哪个 key 存储到哪个分区。 3. 自定义 Partitioner 步骤 (1)自定义类继承 Partitioner,重写 getPartition()方法。 ```java package com.exa.mapreduce001.flow; import com.exa.mapreduce001.flow.FlowBean; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class ProvincePartitioner extends Partitioner<Text, FlowBean> { public int getPartition(Text text, FlowBean flowBean, int i) { String preNum = text.toString().substring(0, 3); int partition = 4; if ("136".equals(preNum)) { partition = 0; } if ("137".equals(preNum)) { partition = 1; } if ("138".equals(preNum)) { partition = 2; } if ("139".equals(preNum)) { partition = 3; } return partition; } } ``` (2)在 job 驱动中,设置自定义 Partitioner并指定reduceTask数量。 ```java package com.exa.mapreduce001.flow; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * Date: 2020/12/30 */ public class FlowsumDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1 获取配置信息,或者 job 对象实例 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2. 指定本程序的 jar 包所在的本地路径 job.setJarByClass(FlowsumDriver.class); // 3. 指定本业务 job 要使用的 mapper/Reducer 业务类 job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); // 4. 指定 mapper 输出数据的 kv 类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 5. 指定最终输出的数据的 kv 类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // ===========添加分区和指定reduceTask数量========== job.setPartitionerClass(ProvincePartitioner.class); job.setNumReduceTasks(5); // 6. 指定 job 的输入/输出目录 // 输入目录要已经存在 FileInputFormat.setInputPaths(job, new Path("file:///E:\\hadoop\\input")); // 输出目录不能已经存在 FileOutputFormat.setOutputPath(job, new Path("file:///E:\\hadoop\\output")); // 7. 将 job 中配置的相关参数,以及 job 所用的 java 类所在的 jar 包,提交给 yarn 去运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } } ``` (3)得出如下5个分区文件 ``` E:\hadoop\output\part-r-00000 E:\hadoop\output\part-r-00001 E:\hadoop\output\part-r-00002 E:\hadoop\output\part-r-00003 E:\hadoop\output\part-r-00004 ``` **注意:** 如果 `reduceTask 的数量>getPartition 的结果数`,则会多产生几个空的输出文件 part-r-000xx; 如果 `1<reduceTask 的数量<getPartition` 的结果数,则有一部分分区数据无处安放,会 Exception; 如果 `reduceTask 的数量=1`,则不管 mapTask 端输出多少个分区文件,最终结果都交给这一个 reduceTask,最终也就只会产生一个结果文件 part-r-00000; 例如:假设自定义分区数为 5,则 (1)job.setNumReduceTasks(1);会正常运行,只不过会产生一个输出文件 (2)job.setNumReduceTasks(2);会报错 (3)job.setNumReduceTasks(6);大于 5,程序会正常运行,会产生空文件