### 5.1 实验目的
基于MapReduce思想,编写WordCount程序。
### 5.2 实验要求
1. 理解MapReduce编程思想;
2. 会编写MapReduce版本WordCount;
3. 会执行该程序;
4. 自行分析执行过程。
### 5.3 实验原理
MapReduce是一种计算模型,简单的说就是将大批量的工作(数据)分解(MAP)执行,然后再将结果合并成最终结果(REDUCE)。这样做的好处是可以在任务被分解后,可以通过大量机器进行并行计算,减少整个操作的时间。
适用范围:数据量大,但是数据种类小可以放入内存。
基本原理及要点:将数据交给不同的机器去处理,数据划分,结果归约。
理解MapReduce和Yarn:在新版Hadoop中,Yarn作为一个资源管理调度框架,是Hadoop下MapReduce程序运行的生存环境。其实MapRuduce除了可以运行Yarn框架下,也可以运行在诸如Mesos,Corona之类的调度框架上,使用不同的调度框架,需要针对Hadoop做不同的适配。
一个完成的MapReduce程序在Yarn中执行过程如下:
(1)ResourcManager JobClient向ResourcManager提交一个job。
(2)ResourcManager向Scheduler请求一个供MRAppMaster运行的container,然后启动它。
(3)MRAppMaster启动起来后向ResourcManager注册。
(4)ResourcManagerJobClient向ResourcManager获取到MRAppMaster相关的信息,然后直接与MRAppMaster进行通信。
(5)MRAppMaster算splits并为所有的map构造资源请求。
(6)MRAppMaster做一些必要的MR OutputCommitter的准备工作。
(7)MRAppMaster向RM(Scheduler)发起资源请求,得到一组供map/reduce task运行的container,然后与NodeManager一起对每一个container执行一些必要的任务,包括资源本地化等。
(8)MRAppMaster 监视运行着的task 直到完成,当task失败时,申请新的container运行失败的task。
(9)当每个map/reduce task完成后,MRAppMaster运行MR OutputCommitter的cleanup 代码,也就是进行一些收尾工作。
(10)当所有的map/reduce完成后,MRAppMaster运行OutputCommitter的必要的job commit或者abort APIs。
(11)MRAppMaster退出。
#### 5.3.1 MapReduce编程
编写在Hadoop中依赖Yarn框架执行的MapReduce程序,并不需要自己开发MRAppMaster和YARNRunner,因为Hadoop已经默认提供通用的YARNRunner和MRAppMaster程序, 大部分情况下只需要编写相应的Map处理和Reduce处理过程的业务程序即可。
编写一个MapReduce程序并不复杂,关键点在于掌握分布式的编程思想和方法,主要将计算过程分为以下五个步骤:
(1)迭代。遍历输入数据,并将之解析成key/value对。
(2)将输入key/value对映射(map)成另外一些key/value对。
(3)依据key对中间数据进行分组(grouping)。
(4)以组为单位对数据进行归约(reduce)。
(5)迭代。将最终产生的key/value对保存到输出文件中。
#### 5.3.2 Java API解析
(1)InputFormat:用于描述输入数据的格式,常用的为TextInputFormat提供如下两个功能:
数据切分: 按照某个策略将输入数据切分成若干个split,以便确定Map Task个数以及对应的split。
为Mapper提供数据:给定某个split,能将其解析成一个个key/value对。
(2)OutputFormat:用于描述输出数据的格式,它能够将用户提供的key/value对写入特定格式的文件中。
(3)Mapper/Reducer: Mapper/Reducer中封装了应用程序的数据处理逻辑。
(4)Writable:Hadoop自定义的序列化接口。实现该类的接口可以用作MapReduce过程中的value数据使用。
(5)WritableComparable:在Writable基础上继承了Comparable接口,实现该类的接口可以用作MapReduce过程中的key数据使用。(因为key包含了比较排序的操作)。
### 5.4 实验步骤
本实验主要分为,确认前期准备,编写MapReduce程序,打包提交代码。查看运行结果这几个步骤,详细如下:
#### 5.4.1 启动Hadoop
执行命令启动前面实验部署好的Hadoop系统。
~~~
[root@master ~]# cd /usr/cstor/hadoop/
[root@master hadoop]# sbin/start-all.sh
~~~
#### 5.4.2 验证HDFS上没有wordcount的文件夹
~~~
[root@client ~]# cd /usr/cstor/hadoop/
[root@client hadoop]# bin/hadoop fs -ls / #查看HDFS上根目录文件 /
~~~
此时HDFS上应该是没有wordcount文件夹。
#### 5.4.3 上传数据文件到HDFS
~~~
[root@client ~]# cd /usr/cstor/hadoop/
[root@client hadoop]# bin/hadoop fs -put /root/data/5/word /
~~~
#### 5.4.4 编写MapReduce程序
主要编写Map和Reduce类,其中Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法;Reduce过程需要继承org.apache.hadoop.mapreduce包中Reduce类,并重写其reduce方法。
~~~
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
//map方法,划分一行文本,读一个单词写出一个<单词,1>
public void map(Object key, Text value, Context context)throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);//写出<单词,1>
}}}
//定义reduce类,对相同的单词,把它们<K,VList>中的VList值全部相加
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();//相当于<Hello,1><Hello,1>,将两个1相加
}
result.set(sum);
context.write(key, result);//写出这个单词,和这个单词出现次数<单词,单词出现次数>
}}
public static void main(String[] args) throws Exception {//主方法,函数入口
Configuration conf = new Configuration(); //实例化配置文件类
Job job = new Job(conf, "WordCount"); //实例化Job类
job.setInputFormatClass(TextInputFormat.class); //指定使用默认输入格式类
TextInputFormat.setInputPaths(job, args[0]); //设置待处理文件的位置
job.setJarByClass(WordCount.class); //设置主类名
job.setMapperClass(TokenizerMapper.class); //指定使用上述自定义Map类
job.setCombinerClass(IntSumReducer.class); //指定开启Combiner函数
job.setMapOutputKeyClass(Text.class); //指定Map类输出的<K,V>,K类型
job.setMapOutputValueClass(IntWritable.class); //指定Map类输出的<K,V>,V类型
job.setPartitionerClass(HashPartitioner.class); //指定使用默认的HashPartitioner类
job.setReducerClass(IntSumReducer.class); //指定使用上述自定义Reduce类
job.setNumReduceTasks(Integer.parseInt(args[2])); //指定Reduce个数
job.setOutputKeyClass(Text.class); //指定Reduce类输出的<K,V>,K类型
job.setOutputValueClass(Text.class); //指定Reduce类输出的<K,V>,V类型
job.setOutputFormatClass(TextOutputFormat.class); //指定使用默认输出格式类
TextOutputFormat.setOutputPath(job, new Path(args[1])); //设置输出结果文件位置
System.exit(job.waitForCompletion(true) ? 0 : 1); //提交任务并监控任务状态
}
}
~~~
#### 5.3.5 使用Eclipse开发工具将该代码打包
假定打包后的文件名为hdpAction.jar,主类WordCount位于包njupt下,则可使用如下命令向YARN集群提交本应用。
~~~
[root@client ~]# ./yarn jar hdpAction.jar njupt.WordCount /word /wordcount 1
~~~
其中“yarn”为命令,“jar”为命令参数,后面紧跟打包后的代码地址,“njupt”为包名,“WordCount”为主类名,“/word”为输入文件在HDFS中的位置,/wordcount为输出文件在HDFS中的位置。
### 5.5 实验结果
#### 5.5.1 程序运行成功控制台上的显示内容
如图5-1所示:
![](https://box.kancloud.cn/63ec34a7bb9a266709913690e17d7723_383x332.jpg)
图5-1 提交wordcount
#### 5.5.2 在HDFS上查看结果
如图5-2所示:
![](https://box.kancloud.cn/b4cf5b3e6ff0cf4bab163183280ea2a3_529x166.jpg)
- GitHub-资源收集
- 【GitHub/Gitee】收录总榜单
- 【Office & Markdown & PDF】资源收集
- 【前端】资源收集
- 【开源项目】资源收集
- 【代码备份】资源收集
- 【代码加密】资源收集
- 【好文章推荐】资源收集
- Java大数据实践
- 基础实验操作
- 【一】基础操作实验
- HDFS
- 【二】部署HDFS
- 【三】读写HDFS文件
- YARN
- 【四】部署YARN集群
- MapReduce
- 【五】单词计数
- Hive
- 【十】部署Hive
- 【十一】新建Hive表
- 【十二】Hive分区
- ZooKeeper
- 【二十】部署ZooKeeper
- 【二十一】进程协作
- HBase
- 【二十二】部署HBase
- 【二十三】新建HBase表
- Storm
- 【二十四】部署Storm
- 【二十五】实时WordCountTopology
- Kafka
- 【二十七】Kafka订阅推送示例
- Redis
- 【二十九】Redis部署与简单使用
- 【三十】MapReduce与Spark读写Redis
- MongoDB
- 【三十一】读写MongoDB
- PHP实践
- 环境搭建
- PHP安装
- macOS搭建PHP开发环境
- laravel
- 【Laravel-admin】实践方案
- 技术选型
- 技术选型结果
- PHP开发流程
- Laravel自带异常
- 技术选型问题 & 解决方法
- 修改(Admin)文件夹路径
- 两个用户表合并
- 创建Token,获取接口数据
- CreateFreshApiToken中间件使用
- Generator从表生成文件,不包括迁移文件
- 添加用户的同时生产令牌
- 其它参考文章
- Laravel-admin常见问题
- form(),show()获取对象数据
- Form右上角按钮重写
- form回调中的错误提醒,回调传参
- 【小工具类】实践方案
- 字符串
- 数组
- 无限级分类递归
- 时间
- 正则表达式
- 文件
- 经纬度、时区
- DataEdit快捷操作类库
- 数据库表结构管理
- 【Guzzle】实践方案---工具类
- 【队列---Laravel-Horizon 】实践方案
- 【laravel-snappy】实践方案
- 【开发规范】实践方案
- PHP深入学习
- 缓存在高并发场景下的常见问题
- 一、缓存一致性问题
- 二、缓存并发问题
- 三、缓存穿透问题
- 四、缓存颠簸问题
- 五、缓存的雪崩现象
- 六、缓存无底洞现象
- Laravel源码解析(知识点)
- 闭包、IOC容器服务绑定延迟加载
- 延迟静态绑定基类
- 反射,依赖注入
- __callStatic 魔术方法,Facade 工作原理
- array_reduce,中间件解析
- Eloquent核心
- 线程、进程、协程
- Linux进程、线程、协程
- poll、epoll
- epoll原理
- Liunx线程调度算法
- 红黑树
- 同步/异步、阻塞/非阻塞
- PHP-FPM
- Nginx
- Git-PHPStorm-Composer工具使用
- git常用命令
- .gitignore忽略规则
- PHPStorm第一次使用
- PHPStorm关联gitlab
- 在Docker中使用Xdebug
- PHPStorm中使用Xdebug调试
- PHP Xdebug 远程调试
- Composer修改镜像源
- Swoole
- Go
- 惊群问题
- 线程模型比较
- 并发模型比较
- Lua
- OpenResty
- 数据一致性
- 悲观锁--VS--乐观锁
- 事务--mysql VS redis
- 事务嵌套--Doctrine VS Laravel
- 单体应用中执行顺序问题
- 数据一致性问题描述
- 分布式理论
- 数据一致性---接口幂等性
- 分布式事务---2PC VS 3PC
- 分布式事务---TCC
- 分布式事务---基于消息
- 接口安全性
- Nginx
- 优化常识
- nginx常用优化
- nginx解决本地开发时调用远程AIP跨域问题
- Nginx反向代理实现均衡负载
- 大型网站架构演变
- Keepalived+Nginx 高可用集群(主从模式)
- MySQL
- 关于最重要的参数选项调整建议
- 索引,Explain优化工具
- 事务级别
- sql好的书写习惯
- limit(分页)
- 赶集网Mysql36条军规
- 分库分表技术演进&最佳实践
- MariaDB 和 MySQL 全面对比
- 永远不要在 MySQL 中使用“utf8”
- 看云--推荐的Mysql优化
- 完整、详细的MySQL规范
- 慢查询日志
- pt-query-digest结果分析
- Oracle
- Oracle数据库备份/导出(exp/expd)、导入(imp/impd)
- [Oracle]EXPDP和IMPDP数据泵进行导出导入的方法
- 使用PLSQL进行Oracle数据导入导出
- Redis
- 看云-推荐的redis学习
- Memcache和Redis不同
- 阿里云Redis开发规范
- Centos7
- 虚拟机配置网络
- 硬盘挂载、分区、文件大小
- 防火墙(firewalld、firewalld-cmd、systemctl、iptables)
- 两个机器互相拷贝文件
- 查进程、查端口
- 压缩、解压
- 查看物理CPU个数、CPU内核数、线程数
- apt-get源--阿里
- Docker
- Dockerfile制作常用命令
- registry私有仓库
- PHP_7.2
- Dockerfile
- php.ini
- 使用说明
- Nginx_1.15
- Dockerfile
- nginx.conf
- prod_nginx.conf
- 使用说明
- MySql_5.7
- Dockerfile
- my.cnf
- 使用说明
- redmine_3.4
- Dockerfile
- 使用说明
- gitlab-ce_11.9.6-ce.0
- 使用说明
- Redis_5.0
- Dockerfile
- redis.conf
- 使用说明
- Jenkins
- Dockerfile
- 使用说明
- webssh--python3.7
- Dockerfile
- 使用说明
- 进阶使用
- 高阶使用
- minio
- 使用说明
- aws_cloud9_ide
- 使用说明-aws
- VNC
- 使用说明
- jdk1.8——yum安装
- tomcat9——安装
- guacamole——0.9.13
- libreoffice
- Dockerfile
- 使用说明
- Kubernetes
- kubectl常用命令
- 环境搭建(1.9.6)
- kubernetes1.9.6墙内离线部署
- kubernetes1.9.6单机器部署
- helm安装
- helm常用命令
- Laradock
- Swoole
- 环境的搭建
- swoole的简单实例
- 服务端的cli方式运行
- 客户端的运行方式
- 定时任务的入门
- 删除定时任务
- 初始化定时任务
- 日志管理
- 具体任务的异常捕获
- 手动重启shell脚本
- Elasticsearch
- Elasticsearch检索实践
- 读后感
- 【读书】登天的感觉——岳晓东
- 【读书】为何家会伤人——武志红
- 【读书】思考与致富——拿破仑-希尔
- 【感受】做事讲方法
- 【感受】未来畅想
- 【素材】智力问答
- 【百家】曾国藩家训
- 【百家】正说和珅
- 【感受】谈判小技巧
- 【读书】股票作手回忆录——利弗莫尔
- 【感受】最幸福的人——工匠
- 【收藏】土味情话大合集
- 【读书】解忧杂货店——东野圭吾
- 【读书】把时间当作朋友——李笑来
- 【感受】舆论和八卦
- 【读书】老人与海——海明威
- 【读书】必然——凯文凯利
- 【经典】逍遥游——庄周