### access.log 日志文件
* 第二个字段:手机号
* 倒数第三字段:上行流量
* 倒数第二字段:下行流量
~~~
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200
1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200
1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200
1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200
~~~
> 需求:统计每个手机号上行流量和、下行流量和、总的流量和(上行流量和+下行流量和)
### Access.java
手机号、上行流量、下行流量、总流量
~~~
private String phone;
private long up;
private long down;
private long sum;
~~~
> 既然要求和:根据手机号进行分组,然后把该手机号对应的上下行流量加起来
### Mapper: 把手机号、上行流量、下行流量 拆开
把手机号作为key,把Access作为value写出去
~~~
package com.bizzbee.bigdata.hadoop.mr.access;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class AccessMapper extends Mapper<LongWritable, Text,Text,Access> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/*把手机号作为key,把Access作为value写出去*/
//value对应的行数据按照指定分隔符拆开
String[] lines = value.toString().split("\t");
String phone = lines[1]; //手机号
long up = Long.parseLong(lines[lines.length-3]);//上行流量在一行的倒数第三个。
long down = Long.parseLong(lines[lines.length-2]);
context.write(new Text(phone),new Access(phone,up,down));
}
}
~~~
### Reducer:(13726238888,<Access,Access>)
~~~
package com.bizzbee.bigdata.hadoop.mr.access;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class AccessReducer extends Reducer<Text,Access, NullWritable,Access> {
/*
* key:phone
* values:Access.class
* */
@Override
protected void reduce(Text key, Iterable<Access> values, Context context) throws IOException, InterruptedException {
long ups = 0;
long downs = 0;
for (Access access:values){
ups +=access.getUp();
downs +=access.getDown();
}
/*手机号access类里面有,所以key就nullwritable就行了*/
context.write(NullWritable.get(),new Access(key.toString(),ups,downs));
}
}
~~~
### Partitioner
* Partitioner决定maptask输出的数据交由哪个reducetask处理
* 默认实现:分发的key的hash值与reduce task个数取模
~~~
package com.bizzbee.bigdata.hadoop.mr.access;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class AccessPartitioner extends Partitioner<Text,Access> {
/*
*
* Text:手机号*/
public int getPartition(Text phone, Access access, int i) {
/*不同手机号开头的结果输出到不同的文件里面去*/
if (phone.toString().startsWith("13")){
return 0;
}else if(phone.toString().startsWith("15")){
return 1;
}else{
return 2;
}
}
}
~~~
## Driver
~~~
package com.bizzbee.bigdata.hadoop.mr.access;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class AccessLocalApp {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(AccessLocalApp.class);
job.setMapperClass(AccessMapper.class);
job.setReducerClass(AccessReducer.class);
// 设置自定义分区规则
job.setPartitionerClass(AccessPartitioner.class);
// 设置reduce个数
job.setNumReduceTasks(3);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Access.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Access.class);
// FileInputFormat.setInputPaths(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileInputFormat.setInputPaths(job, new Path("/Users/bizzbee/IdeaProjects/hadooptrainv2/input/access.log"));
FileOutputFormat.setOutputPath(job, new Path("/Users/bizzbee/IdeaProjects/hadooptrainv2/output/access"));
job.waitForCompletion(true);
}
}
~~~