企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## MapReduce 编程模型 ![](https://img.kancloud.cn/10/50/10504daab5da985afd79efd0eb6987f9_918x407.png) > 业务被分成map阶段和reduce阶段。 ## Mapreduce执行步骤 ![](https://img.kancloud.cn/d7/c0/d7c022a25de361c148150aaa8cf15baa_837x713.png) ## 自定义mapper步骤 ~~~ package com.bizzbee.bigdata.hadoop.mr.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /* * Mapper是一个范型类 * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN:map 任务读数据的key类型 long * VALUEIN :map读数据的value类型 string * * 词频统计 * hello world welcome * hello welcome * * 输出--》(word,1) * * KEYOUT key输出类型 String * VALUEOUT value输出类型 integer * * *因为Long String Integer是Java里面的数据类型。 * Hadoop自定义类型:序列化和反序列化 * * * */ public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> { /* * 重写map方法 * */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //每一行数据用tab分割拆开 String[] words = value.toString().split("\t"); for(String word:words){ context.write(new Text(word),new IntWritable(1)); } } } ~~~ ## 自定义reduce ~~~ package com.bizzbee.bigdata.hadoop.mr.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; /* * 输入: (hello,1)(hello,1) * (welcome,1) * map输出到reduce,是按照相同的key分发到一个reduce上去执行。 * * reduce1:(hello,1)(hello,1)(hello,1)==》(hello,<1,1,1>) * */ public class WordCountReducer extends Reducer<Text, IntWritable, Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count =0; Iterator<IntWritable> iterator = values.iterator(); while (iterator.hasNext()){ IntWritable value = iterator.next(); //get就是把IntWritable转换回int count +=value.get(); } context.write(key,new IntWritable(count)); } } ~~~ ## 创建driver运行统计 ~~~ package com.bizzbee.bigdata.hadoop.mr.wc; import com.bizzbee.bigdata.hadoop.hdfs.Constants; import com.bizzbee.bigdata.hadoop.hdfs.ParamsUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.net.URI; import java.util.Properties; public class WordCountApp { public static void main(String[] args) throws Exception{ //加载配置文件 Properties properties = ParamsUtils.getProperties(); //填写hdfs上面的用户,不加这句报错 System.setProperty("HADOOP_USER_NAME","bizzbee"); Configuration configuration = new Configuration(); configuration.set("dfs.client.use.datanode.hostname", "true"); configuration.set("fs.defaultFS","hdfs://tencent2:8020"); Job job = Job.getInstance(configuration); job.setJarByClass(WordCountApp.class); // 设置Job对应的参数: 主类 job.setJarByClass(WordCountApp.class); // 设置Job对应的参数: 设置自定义的Mapper和Reducer处理类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 设置Job对应的参数: Mapper输出key和value的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 设置Job对应的参数: Reduce输出key和value的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 如果输出目录已经存在,则先删除 FileSystem fileSystem = FileSystem.get(new URI("hdfs://tencent2:8020"),configuration, "bizzbee"); Path outputPath = new Path("/bizzbee/output/result"); if(fileSystem.exists(outputPath)) { fileSystem.delete(outputPath,true); } // 设置Job对应的参数: Mapper输出key和value的类型:作业输入和输出的路径 FileInputFormat.setInputPaths(job, new Path("/bizzbee/input/article")); FileOutputFormat.setOutputPath(job, outputPath); // 提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : -1); } } ~~~ ![](https://img.kancloud.cn/c1/3a/c13aec83757736335a9de5a5cfe753c1_174x691.png) ## combiner combiner是在map阶段线进行一次聚合操作。 优点:减少io,提高性能,可以节省网络带宽。 缺点:求平均数等几种操作不可用。 ![](https://img.kancloud.cn/7b/80/7b80f62278f1e890c750ff82356f9444_975x521.png)