ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
### access.log 日志文件 * 第二个字段:手机号 * 倒数第三字段:上行流量 * 倒数第二字段:下行流量 ~~~ 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200 1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 ~~~ > 需求:统计每个手机号上行流量和、下行流量和、总的流量和(上行流量和+下行流量和) ### Access.java 手机号、上行流量、下行流量、总流量 ~~~ private String phone; private long up; private long down; private long sum; ~~~ > 既然要求和:根据手机号进行分组,然后把该手机号对应的上下行流量加起来 ### Mapper: 把手机号、上行流量、下行流量 拆开 把手机号作为key,把Access作为value写出去 ~~~ package com.bizzbee.bigdata.hadoop.mr.access; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class AccessMapper extends Mapper<LongWritable, Text,Text,Access> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { /*把手机号作为key,把Access作为value写出去*/ //value对应的行数据按照指定分隔符拆开 String[] lines = value.toString().split("\t"); String phone = lines[1]; //手机号 long up = Long.parseLong(lines[lines.length-3]);//上行流量在一行的倒数第三个。 long down = Long.parseLong(lines[lines.length-2]); context.write(new Text(phone),new Access(phone,up,down)); } } ~~~ ### Reducer:(13726238888,<Access,Access>) ~~~ package com.bizzbee.bigdata.hadoop.mr.access; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class AccessReducer extends Reducer<Text,Access, NullWritable,Access> { /* * key:phone * values:Access.class * */ @Override protected void reduce(Text key, Iterable<Access> values, Context context) throws IOException, InterruptedException { long ups = 0; long downs = 0; for (Access access:values){ ups +=access.getUp(); downs +=access.getDown(); } /*手机号access类里面有,所以key就nullwritable就行了*/ context.write(NullWritable.get(),new Access(key.toString(),ups,downs)); } } ~~~ ### Partitioner * Partitioner决定maptask输出的数据交由哪个reducetask处理 * 默认实现:分发的key的hash值与reduce task个数取模 ~~~ package com.bizzbee.bigdata.hadoop.mr.access; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class AccessPartitioner extends Partitioner<Text,Access> { /* * * Text:手机号*/ public int getPartition(Text phone, Access access, int i) { /*不同手机号开头的结果输出到不同的文件里面去*/ if (phone.toString().startsWith("13")){ return 0; }else if(phone.toString().startsWith("15")){ return 1; }else{ return 2; } } } ~~~ ## Driver ~~~ package com.bizzbee.bigdata.hadoop.mr.access; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class AccessLocalApp { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(AccessLocalApp.class); job.setMapperClass(AccessMapper.class); job.setReducerClass(AccessReducer.class); // 设置自定义分区规则 job.setPartitionerClass(AccessPartitioner.class); // 设置reduce个数 job.setNumReduceTasks(3); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Access.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Access.class); // FileInputFormat.setInputPaths(job, new Path(args[0])); // FileOutputFormat.setOutputPath(job, new Path(args[1])); FileInputFormat.setInputPaths(job, new Path("/Users/bizzbee/IdeaProjects/hadooptrainv2/input/access.log")); FileOutputFormat.setOutputPath(job, new Path("/Users/bizzbee/IdeaProjects/hadooptrainv2/output/access")); job.waitForCompletion(true); } } ~~~