[TOC]
# 准备数据
flow.txt
~~~
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
~~~
是手机号码跟后面的上下行流量
我们要统计每个手机号码后面的流量
# 代码
**FlowBean**
~~~
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow;
private long downFlow;
private long sumFlow;
//序列化框架在反序列化的时候创建对象的实例会去调用我们的无参构造函数
public FlowBean() {
}
public FlowBean(long upFlow, long downFlow, long sumFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = sumFlow;
}
public FlowBean(long upFlow, long downFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
public void set(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
//序列化的方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
//反序列化的方法
//注意:字段的反序列化的顺序跟序列化的顺序必须保持一致
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
/**
* 这里进行我们自定义比较大小的规则
* 在reduce中会进行自动排序
*/
@Override
public int compareTo(FlowBean o) {
return (int) (o.getSumFlow() - this.getSumFlow());
}
//getter和setter方法
~~~
流量求和类
里面包含map,reduce,还有
~~~
package com.folwsum;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
public class FlowSum {
//在kv中传输我们自定义的对象是可以的,不过必须要实现hadoop的序列化机制,也就是implement writable
//输入的LongWritable,Text
//输出 Text,FlowBean
public static class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
Text k = new Text();
FlowBean v = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//将抽取到的每一行数据进行字段的切分
String line = value.toString();
String[] fields = StringUtils.split(line, ' ');
//抽取我们业务所需要的字段,
String phoneNum = fields[1];
//取上下行流量
long upFlow = Long.parseLong(fields[fields.length -3]);
long downFlow = Long.parseLong(fields[fields.length -2]);
k.set(phoneNum);
v.set(upFlow, downFlow);
//赋值一次就序列化出去了,不会数据都是一致的
context.write(k, v);
}
}
public static class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
FlowBean v = new FlowBean();
//这里reduce方法接收到的key就是某一组<手机号,bean><手机号,bean><手机号,bean>当中一个的手机号
//这里的reduce方法接收到的value就是这一组kv对中的所有bean的一个迭代器
//reduce会把手机号码归类
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long upFlowCount = 0;
long downFlowCount = 0;
for (FlowBean bean : values) {
upFlowCount += bean.getUpFlow();
downFlowCount += bean.getDownFlow();
}
v.set(upFlowCount, downFlowCount);
context.write(key, v);
}
}
//job驱动
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//上面这样写,不好,换了路径又要重新写,我们改为用他的类加载器加载他自己
job.setJarByClass(FlowSum.class);
//告诉框架,我们程序所用的mapper类和reduce类是什么
job.setMapperClass(FlowSumMapper.class);
job.setReducerClass(FlowSumReducer.class);
//告诉框架我们程序输出的类型,
// 如果map阶段和最终输出结果是一样的,这2行可以不写
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//告诉框架,我们程序使用的数据读取组件,结果输出所用的组件是什么
//TextInputFormat是mapreduce程序中内置的一种读取数据组件,准备的叫做读取文本的输入组件
//程序默认的输出组件就是TextOutputFormat,下面那个可以注释
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//告诉框架,我们要处理的数据文件在那个路径下
FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input/"));
//告诉框架我们的处理结果要输出到什么地方
FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output/"));
//这边不用submit,因为一提交就和我这个没关系了,我这就断开了就看不见了
// job.submit();
//提交后,然后等待服务器端返回值,看是不是true
boolean res = job.waitForCompletion(true);
//设置成功就退出码为0
System.exit(res ? 0 : 1);
}
}
~~~
# 按总量排序需求
MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key
所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable
然后重写key的compareTo方法
排序默认是按照字典排序
自定义排序,他自己他自定义的每个类里面都有compareTo方法
比如LongWritable
或者一些类实现或继承了一些比较接口
如果是我们自己定义的类呢?
如下
![](https://box.kancloud.cn/df7eb02fcce7fecb56fbe7e500199823_1258x680.png)
然后代码
~~~
package com.folwsum;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
// 实现流量汇总并按照流量大小的倒序排序
public class FlowSumSort {
public static class FlowSumSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
FlowBean k = new FlowBean();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//将抽取到的每一行数据进行字段的切分
String line = value.toString();
String[] fields = StringUtils.split(line, ' ');
//抽取我们业务所需要的字段
String phoNum = fields[0];
long upFlow = Long.parseLong(fields[1]);
long downFlow = Long.parseLong(fields[2]);
k.set(upFlow, downFlow);
v.set(phoNum);
//赋值一次就序列化出去了,不会数据都是一致的
context.write(k, v);
}
}
public static class FlowSumSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean bean, Iterable<Text> PhoneNum, Context context) throws IOException, InterruptedException {
//这边写的时候会自动排序的
context.write(PhoneNum.iterator().next(), bean);
}
}
public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowSumSort.class);
//告诉程序,我们的程序所用的mapper类和reducer类是什么
job.setMapperClass(FlowSumSortMapper.class);
job.setReducerClass(FlowSumSortReducer.class);
//告诉框架,我们程序输出的数据类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//告诉框架,我们程序使用的数据读取组件 结果输出所用的组件是什么
//TextInputFormat是mapreduce程序中内置的一种读取数据组件 准确的说 叫做 读取文本文件的输入组件
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//告诉框架,我们要处理的数据文件在那个路劲下
FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input/"));
//告诉框架,我们的处理结果要输出到什么地方
FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output/"));
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
~~~
# 一次性完成,统计和排序
我们看下继承的reduce
![](https://box.kancloud.cn/acb5180a8191603e0b6892998eaf1603_1656x826.png)
reduce需要调用run方法,run方法中不仅执行了reduce最后还执行了cleanup
因为map不断的提交给reduce,reduce排序好了就要写,但是这时候一旦写到文件中,后面再来任务,再写的话,就不能和前面一起排序了
所以我们写到一个treeMap中,然后在cleanup中做treeMap做排序
代码主要把继承reduce中的那个类改了下
~~~
package com.folwsum;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
public class OneStepFlowSumSort {
public static class OneStepFlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
Text k = new Text();
FlowBean v = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//将读取到的每一行数据进行字段的切分
String line = value.toString();
String[] fields = StringUtils.split(line, ' ');
//抽取我们业务所需要的字段
String phoneNum = fields[1];
long upFlow = Long.parseLong(fields[fields.length - 3]);
long downFlow = Long.parseLong(fields[fields.length - 2]);
k.set(phoneNum);
v.set(upFlow, downFlow);
context.write(k, v);
}
}
public static class OneStepFlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
//在这里进行reduce端的局部缓存TreeMap
TreeMap<FlowBean,Text> treeMap = new TreeMap<FlowBean, Text>();
//这里reduce方法接收到的key就是某一组《a手机号,bean》《a手机号,bean》 《b手机号,bean》《b手机号,bean》当中的第一个手机号
//这里reduce方法接收到的values就是这一组kv对中的所以bean的一个迭代器
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long upFlowCount = 0;
long downFlowCount = 0;
for(FlowBean bean : values){
upFlowCount += bean.getUpFlow();
downFlowCount += bean.getDownFlow();
}
FlowBean sumbean = new FlowBean();
sumbean.set(upFlowCount, downFlowCount);
Text text = new Text(key.toString());
treeMap.put(sumbean, text);
}
//这里进行我们全局的最终输出
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
Set<Map.Entry<FlowBean,Text>> entrySet = treeMap.entrySet();
for(Map.Entry<FlowBean,Text> ent :entrySet){
context.write(ent.getValue(), ent.getKey());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(OneStepFlowSumSort.class);
//告诉程序,我们的程序所用的mapper类和reducer类是什么
job.setMapperClass(OneStepFlowSumMapper.class);
job.setReducerClass(OneStepFlowSumReducer.class);
//告诉框架,我们程序输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//告诉框架,我们程序使用的数据读取组件 结果输出所用的组件是什么
//TextInputFormat是mapreduce程序中内置的一种读取数据组件 准确的说 叫做 读取文本文件的输入组件
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//告诉框架,我们要处理的数据文件在那个路劲下
FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input/"));
//告诉框架,我们的处理结果要输出到什么地方
FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output/"));
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
}
~~~
# 对不同的手机号码分成不同文件
默认的分区规则
![](https://box.kancloud.cn/5a3c3c0506d330fbe7231416bbe2cd3c_1626x838.png)
## 分区类
Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask
默认的分发规则为:根据key的`hashcode%reducetask`数来分发
所以:如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner
自定义一个CustomPartitioner继承抽象类:Partitioner
然后在job对象中,设置自定义`partitioner: job.setPartitionerClass(CustomPartitioner.class)`
我们需要继承Partitioner这个分区类,来实现我们自己的分区
~~~
package com.folwsum;
import org.apache.hadoop.mapreduce.Partitioner;
import java.util.HashMap;
public class ProvivcePartitioner extends Partitioner {
private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();
static{
//我们这边就提前设置手机号码对应的分区
provinceMap.put("135", 0);
provinceMap.put("136", 1);
provinceMap.put("137", 2);
provinceMap.put("138", 3);
provinceMap.put("139", 4);
}
@Override
public int getPartition(Object o, Object o2, int numPartitions) {
//根据手机的前3位,进行取他的值,就是上面定义的
Integer code = provinceMap.get(o.toString().substring(0, 3));
if(code != null){
return code;
}
//没有取到就分到5去
return 5;
}
}
~~~
## 任务类
主要是main方法里面的
~~~
package com.folwsum;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
public class FlowSumProvince {
public static class ProvinceFlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
Text k = new Text();
FlowBean v = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//将读取到的每一行数据进行字段的切分
String line = value.toString();
String[] fields = StringUtils.split(line, ' ');
//抽取我们业务所需要的字段
String phoneNum = fields[1];
long upFlow = Long.parseLong(fields[fields.length - 3]);
long downFlow = Long.parseLong(fields[fields.length - 2]);
k.set(phoneNum);
v.set(upFlow, downFlow);
context.write(k, v);
}
}
public static class ProvinceFlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long upFlowCount = 0;
long downFlowCount = 0;
for(FlowBean bean : values){
upFlowCount += bean.getUpFlow();
downFlowCount += bean.getDownFlow();
}
FlowBean sumbean = new FlowBean();
sumbean.set(upFlowCount, downFlowCount);
context.write(key, sumbean);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowSumProvince.class);
//告诉程序,我们的程序所用的mapper类和reducer类是什么
job.setMapperClass(ProvinceFlowSumMapper.class);
job.setReducerClass(ProvinceFlowSumReducer.class);
//告诉框架,我们程序输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//设置我们的shuffer的分区组件使用我们自定义的组件
job.setPartitionerClass(ProvivcePartitioner.class);
//这里设置我们的reduce task个数 默认是一个partition分区对应一个reduce task 输出文件也是一对一
//如果我们的Reduce task个数 < partition分区数 就会报错Illegal partition
//如果我们的Reduce task个数 > partition分区数 不会报错,会有空文件产生
//如果我们的Reduce task个数 = 1 partitoner组件就无效了 不存在分区的结果
//这边设置为6,因为没有匹配到的就到第5个
job.setNumReduceTasks(6);
//告诉框架,我们程序使用的数据读取组件 结果输出所用的组件是什么
//TextInputFormat是mapreduce程序中内置的一种读取数据组件 准确的说 叫做 读取文本文件的输入组件
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//告诉框架,我们要处理的数据文件在那个路劲下
FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input"));
//如果有这个文件夹就删除
Path out = new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output");
FileSystem fileSystem = FileSystem.get(conf);
if (fileSystem.exists(out)) {
fileSystem.delete(out, true);
}
//告诉框架,我们的处理结果要输出到什么地方
FileOutputFormat.setOutputPath(job, out);
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
~~~
- linux
- 常用命令
- 高级文本命令
- 面试题
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推荐
- java高级特性
- 多线程
- 实现线程的三种方式
- 同步关键词
- 读写锁
- 锁的相关概念
- 多线程的join
- 有三个线程T1 T2 T3,保证顺序执行
- java五种线程池
- 守护线程与普通线程
- ThreadLocal
- BlockingQueue消息队列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty简介
- 案例一发送字符串
- 案例二发送对象
- 轻量级RPC开发
- 简介
- spring(IOC/AOP)
- spring初始化顺序
- 通过ApplicationContextAware加载Spring上下文
- InitializingBean的作用
- 结论
- 自定义注解
- zk在框架中的应用
- hadoop
- 简介
- hadoop集群搭建
- hadoop单机安装
- HDFS简介
- hdfs基本操作
- hdfs环境搭建
- 常见问题汇总
- hdfs客户端操作
- mapreduce工作机制
- 案列-单词统计
- 局部聚合Combiner
- 案列-流量统计(分区,排序,比较)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法实现
- 案例-求topN(分组)
- 自定义inputFormat
- 自定义outputFormat
- 框架运算全流程
- mapreduce的优化方案
- HA机制
- Hive
- 安装
- DDL操作
- 创建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 严格模式
- 数据类型
- shell参数
- 函数
- 内置运算符
- 内置函数
- 自定义函数
- Transform实现
- 特殊分割符处理
- 案例
- 级联求和accumulate
- flume
- 简介
- 安装
- 常用的组件
- 拦截器
- 案例
- 采集目录到HDFS
- 采集文件到HDFS
- 多个agent串联
- 日志采集和汇总
- 自定义拦截器
- 高可用配置
- 使用注意
- sqoop
- 安装
- 数据导入
- 导入数据到HDFS
- 导入关系表到HIVE
- 导入表数据子集
- 增量导入
- 数据导出
- 作业
- 原理
- azkaban
- 简介
- 安装
- 案例
- 简介
- command类型单一job
- command类型多job工作流flow
- HDFS操作任务
- mapreduce任务
- hive脚本任务
- hbase
- 简介
- 安装
- 命令行
- 基本CURD
- 过滤器查询
- 系统架构
- 物理存储
- 寻址机制
- 读写过程
- Region管理
- master工作机制
- 建表高级属性
- 与mapreduce结合
- 协处理器
- 点击流平台开发
- 简介
- storm
- 简介
- 安装
- 集群启动及任务过程分析
- 单词统计
- 并行度
- ACK容错机制
- ACK简介