企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 1. 序列化注意事项 (1)必须实现 Writable 接口。 (2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造。 ```java public FlowBean() { super(); } ``` (3)重写序列化方法 ```java @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } ``` (4)重写反序列化方法 ```java @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } ``` (5)**注意反序列化的字段顺序和序列化的字段顺序完全一致** (6)要想把结果显示在文件中,需要重写 toString(),可用`\t`分开。 (7)如果需要将自定义的 bean 放在 key 中传输,则还需要实现 comparable接口,实现 compareTo()方法,因为 MapReduce 框中的 shuffle 过程一定会对 Key进行排序。 ```java @Override public int compareTo(FlowBean o) { return this.sumFlow > o.getSumFlow() ? -1 : 1; } ``` <br/> # 2. 案例:统计手机流量信息 **1. 需求** 给定的文件中是手机流量信息,统计每一个手机号耗费的总上行流量、下行流量、总流量。 <br/> **2. 案例数据`phone_data.txt`** 输入数据格式: ```txt 1368544993057 13568795243 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 手机号码 上行流量 下行流量 ``` 数据中每个字段之间的分隔符为`\t`,要求输出数据格式: ``` 13568795243 1116 954 2070 手机号码 上行流量 下行流量 总流量 ``` **3. 分析** Map 阶段: (1)读取文件中的每一行数据,按`\t`切分出字段。 (2)提取出手机号、上行流量、下行流量。 (3)以手机号为 key,bean 对象为 value 输出,即 context.write(手机号,Bean)。 Reduce 阶段: (1)按照手机号累加求和算出上行流量和下行流量得到总流量。 (2)实现自定义的 bean 来封装流量信息,并将 bean 作为输出的 key 来传输。 (3)MapRedduce 程序在处理数据的过程中会对数据排序(map 输出的 k/v 在传输到 reduce 之前会排序),排序的依据是 map 输出的 key。 <br/> **4. 编写mapreduce程序** (1)编写流量统计的 FlowBean 对象 ```java package com.exa.mapreduce001.flow; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * Date: 2020/12/30 */ public class FlowBean implements Writable { //定义相关属性 private long upFlow; private long downFlow; private long sumFlow; public FlowBean() { } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } //序列化方法 public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } //反序列化方法 public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } //set方法 public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } } ``` (2)编写 FlowCountMapper ```java package com.exa.mapreduce001.flow; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * Date: 2020/12/30 * * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN:输入的key * VALUEIN:输入的value * KEYOUT:输出的key * VALUEOUT:输出的value * * 1368544993057 13568795243 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 */ public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1368544993057 13568795243 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 String[] split = value.toString().split("\t"); String phone = split[1]; long upFlow = Long.parseLong(split[split.length - 3]); long downFlow = Long.parseLong(split[split.length - 2]); k.set(phone); v.set(upFlow, downFlow); context.write(k, v); } } ``` (3)编写 FlowCountReducer ```java package com.exa.mapreduce001.flow; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Date: 2020/12/30 * * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> */ public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> { FlowBean v = new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long sum_upFlow = 0; long sum_downFlow = 0; for (FlowBean flowBean : values) { sum_upFlow += flowBean.getUpFlow(); sum_downFlow += flowBean.getDownFlow(); } v.set(sum_upFlow, sum_downFlow); context.write(key, v); } } ``` (4)编写驱动类 ```java package com.exa.mapreduce001.flow; import com.exa.mapreduce001.wordcount.WordCountReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * Date: 2020/12/30 */ public class FlowsumDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1 获取配置信息,或者 job 对象实例 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2. 指定本程序的 jar 包所在的本地路径 job.setJarByClass(FlowsumDriver.class); // 3. 指定本业务 job 要使用的 mapper/Reducer 业务类 job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); // 4. 指定 mapper 输出数据的 kv 类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 5. 指定最终输出的数据的 kv 类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 6. 指定 job 的输入/输出目录 // 输入目录要已经存在 FileInputFormat.setInputPaths(job, new Path("file:///E:\\hadoop\\input")); // 输出目录不能已经存在 FileOutputFormat.setOutputPath(job, new Path("file:///E:\\hadoop\\output")); // 7. 将 job 中配置的相关参数,以及 job 所用的 java 类所在的 jar 包,提交给 yarn 去运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } } ``` <br/> **5. 得出的结果如下** *`E:\hadoop\output\part-r-00000`* ```txt 13480253104 180 180 360 13502468823 7335 110349 117684 13560436666 3597 25635 29232 13560439658 2034 5892 7926 ```