## MapReduce 编程模型
![](https://img.kancloud.cn/10/50/10504daab5da985afd79efd0eb6987f9_918x407.png)
> 业务被分成map阶段和reduce阶段。
## Mapreduce执行步骤
![](https://img.kancloud.cn/d7/c0/d7c022a25de361c148150aaa8cf15baa_837x713.png)
## 自定义mapper步骤
~~~
package com.bizzbee.bigdata.hadoop.mr.wc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
* Mapper是一个范型类
* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
* KEYIN:map 任务读数据的key类型 long
* VALUEIN :map读数据的value类型 string
*
* 词频统计
* hello world welcome
* hello welcome
*
* 输出--》(word,1)
*
* KEYOUT key输出类型 String
* VALUEOUT value输出类型 integer
*
*
*因为Long String Integer是Java里面的数据类型。
* Hadoop自定义类型:序列化和反序列化
*
*
* */
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
/*
* 重写map方法
* */
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//每一行数据用tab分割拆开
String[] words = value.toString().split("\t");
for(String word:words){
context.write(new Text(word),new IntWritable(1));
}
}
}
~~~
## 自定义reduce
~~~
package com.bizzbee.bigdata.hadoop.mr.wc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
/*
* 输入: (hello,1)(hello,1)
* (welcome,1)
* map输出到reduce,是按照相同的key分发到一个reduce上去执行。
*
* reduce1:(hello,1)(hello,1)(hello,1)==》(hello,<1,1,1>)
* */
public class WordCountReducer extends Reducer<Text, IntWritable, Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count =0;
Iterator<IntWritable> iterator = values.iterator();
while (iterator.hasNext()){
IntWritable value = iterator.next();
//get就是把IntWritable转换回int
count +=value.get();
}
context.write(key,new IntWritable(count));
}
}
~~~
## 创建driver运行统计
~~~
package com.bizzbee.bigdata.hadoop.mr.wc;
import com.bizzbee.bigdata.hadoop.hdfs.Constants;
import com.bizzbee.bigdata.hadoop.hdfs.ParamsUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.net.URI;
import java.util.Properties;
public class WordCountApp {
public static void main(String[] args) throws Exception{
//加载配置文件
Properties properties = ParamsUtils.getProperties();
//填写hdfs上面的用户,不加这句报错
System.setProperty("HADOOP_USER_NAME","bizzbee");
Configuration configuration = new Configuration();
configuration.set("dfs.client.use.datanode.hostname", "true");
configuration.set("fs.defaultFS","hdfs://tencent2:8020");
Job job = Job.getInstance(configuration);
job.setJarByClass(WordCountApp.class);
// 设置Job对应的参数: 主类
job.setJarByClass(WordCountApp.class);
// 设置Job对应的参数: 设置自定义的Mapper和Reducer处理类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 设置Job对应的参数: Mapper输出key和value的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置Job对应的参数: Reduce输出key和value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 如果输出目录已经存在,则先删除
FileSystem fileSystem = FileSystem.get(new URI("hdfs://tencent2:8020"),configuration, "bizzbee");
Path outputPath = new Path("/bizzbee/output/result");
if(fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath,true);
}
// 设置Job对应的参数: Mapper输出key和value的类型:作业输入和输出的路径
FileInputFormat.setInputPaths(job, new Path("/bizzbee/input/article"));
FileOutputFormat.setOutputPath(job, outputPath);
// 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : -1);
}
}
~~~
![](https://img.kancloud.cn/c1/3a/c13aec83757736335a9de5a5cfe753c1_174x691.png)
## combiner
combiner是在map阶段线进行一次聚合操作。
优点:减少io,提高性能,可以节省网络带宽。
缺点:求平均数等几种操作不可用。
![](https://img.kancloud.cn/7b/80/7b80f62278f1e890c750ff82356f9444_975x521.png)