[TOC]
# MapReduce重要配置参数
## 资源相关参数
**以下参数是在用户自己的mr应用程序中配置就可以生效**
1. **mapreduce.map.memory.mb**: 一个Map Task可使用的资源上限(单位:MB),默认为1024。如果Map Task实际使用的资源量超过该值,则会被强制杀死。
2. **mapreduce.reduce.memory.mb**: 一个Reduce Task可使用的资源上限(单位:MB),默认为1024。如果Reduce Task实际使用的资源量超过该值,则会被强制杀死。
3. **mapreduce.map.cpu.vcores**: 每个Map task可使用的最多cpu core数目, 默认值: 1
4. **mapreduce.reduce.cpu.vcores**: 每个Reduce task可使用的最多cpu core数目, 默认值: 1
5. `mapreduce.map.java.opts`: Map Task的JVM参数,你可以在此配置默认的java heap size等参数, e.g.
`“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc” `(@taskid@会被Hadoop框架自动换为相应的taskid), 默认值: “”
6. `mapreduce.reduce.java.opts`: Reduce Task的JVM参数,你可以在此配置默认的java heap size等参数, e.g.
`“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc”`, 默认值: “”
**应该在yarn启动之前就配置在服务器的配置文件中才能生效**
7. `yarn.scheduler.minimum-allocation-mb` 1024 给应用程序container分配的最小内存
8. `yarn.scheduler.maximum-allocation-mb` 8192 给应用程序container分配的最大内存
9. `yarn.scheduler.minimum-allocation-vcores` 1
10. `yarn.scheduler.maximum-allocation-vcores` 32
11. `yarn.nodemanager.resource.memory-mb` 8192
**shuffle性能优化的关键参数,应在yarn启动之前就配置好**
12. mapreduce.task.io.sort.mb 100 ` //shuffle的环形缓冲区大小,默认100m`
14. mapreduce.map.sort.spill.percent 0.8 `//环形缓冲区溢出的阈值,默认80%`
## 容错相关参数
1. `mapreduce.map.maxattempts`: 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
2. `mapreduce.reduce.maxattempts`: 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
3. `mapreduce.map.failures.maxpercent`: 当失败的Map Task失败比例超过该值为,整个作业则失败,默认值为0. 如果你的应用程序允许丢弃部分输入数据,则该该值设为一个大于0的值,比如5,表示如果有低于5%的Map Task失败(如果一个Map Task重试次数超过mapreduce.map.maxattempts,则认为这个Map Task失败,其对应的输入数据将不会产生任何结果),整个作业扔认为成功。
4. `mapreduce.reduce.failures.maxpercent`: 当失败的Reduce Task失败比例超过该值为,整个作业则失败,默认值为0.
5. `mapreduce.task.timeout`: Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,可能是卡住了,也许永远会卡主,为了防止因为用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是300000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是`“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”`
## 本地运行mapreduce 作业
设置以下几个参数:
~~~
mapreduce.framework.name=local
mapreduce.jobtracker.address=local
fs.defaultFS=local
~~~
## 效率和稳定性相关参数
1. mapreduce.map.speculative: 是否为Map Task打开推测执行机制,默认为false
2. mapreduce.reduce.speculative: 是否为Reduce Task打开推测执行机制,默认为false
3. mapreduce.job.user.classpath.first & mapreduce.task.classpath.user.precedence:当同一个class同时出现在用户jar包和hadoop jar中时,优先使用哪个jar包中的class,默认为false,表示优先使用hadoop jar中的class。
4. mapreduce.input.fileinputformat.split.minsize: FileInputFormat做切片时的最小切片大小,
5. mapreduce.input.fileinputformat.split.maxsize: FileInputFormat做切片时的最大切片大小
(切片的默认大小就等于blocksize,即 134217728)
# 全局计数器
在实际生产代码中,常常需要将数据处理过程中遇到的不合规数据行进行全局计数,类似这种需求可以借助mapreduce框架中提供的全局计数器来实现
![](https://box.kancloud.cn/785f99ed03a9fa67c23ae1e8645a1d07_1051x899.png)
这个统计是全局的
# 多job串联
一个稍复杂点的处理逻辑往往需要多个mapreduce程序串联处理,多job的串联可以借助mapreduce框架的JobControl实现
1. 我们可以用shell脚本,根据状态返回,来决定下一步的shell执行还是不执行
2. 可以设置多个job他们的依赖关系
~~~
ControlledJob cJob1 = new ControlledJob(job1.getConfiguration());
ControlledJob cJob2 = new ControlledJob(job2.getConfiguration());
ControlledJob cJob3 = new ControlledJob(job3.getConfiguration());
cJob1.setJob(job1);
cJob2.setJob(job2);
cJob3.setJob(job3);
// 设置作业依赖关系,job2执行依赖job1,job3依赖job2
cJob2.addDependingJob(cJob1);
cJob3.addDependingJob(cJob2);
//设置JobControl,里面放一个组名
JobControl jobControl = new JobControl("RecommendationJob");
jobControl.addJob(cJob1);
jobControl.addJob(cJob2);
jobControl.addJob(cJob3);
// 新建一个线程来运行已加入JobControl中的作业,开始进程并等待结束
Thread jobControlThread = new Thread(jobControl);
jobControlThread.start();
//判断是不是已经finish了,没有finish就继续执行
while (!jobControl.allFinished()) {
Thread.sleep(500);
}
jobControl.stop();
return 0;
~~~
# 数据压缩
## 概述
这是mapreduce的一种优化策略:通过压缩编码对mapper或者reducer的输出进行压缩,以减少磁盘IO,提高MR程序运行速度(但相应增加了cpu运算负担)
1. Mapreduce支持将map输出的结果或者reduce输出的结果进行压缩,以减少网络IO或最终输出数据的体积
2. 压缩特性运用得当能提高性能,但运用不当也可能降低性能
3. 基本原则:
运算密集型的job,少用压缩
IO密集型的job,多用压缩
## MR支持的压缩编码
![](https://box.kancloud.cn/438a3c2e9ecc41a083c8bbcfc05ffa66_657x202.png)
## Reducer输出压缩
在配置参数或在代码中都可以设置reduce的输出压缩
1. 在配置参数中设置
~~~
mapreduce.output.fileoutputformat.compress=false
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
mapreduce.output.fileoutputformat.compress.type=RECORD
~~~
2. 在代码中设置
~~~
Job job = Job.getInstance(conf);
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class.forName(""));
~~~
## Mapper输出压缩
在配置参数或在代码中都可以设置reduce的输出压缩
1. 在配置参数中设置
~~~
mapreduce.map.output.compress=false
mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
~~~
2. 在代码中设置:
~~~
conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);
conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class);
~~~
## 压缩文件的读取(源码)
Hadoop自带的InputFormat类内置支持压缩文件的读取,比如TextInputformat类,在其initialize方法中:
~~~
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
//根据文件后缀名创建相应压缩编码的codec
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
//判断是否属于可切片压缩编码类型
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
//如果是可切片压缩编码,则创建一个CompressedSplitLineReader读取压缩数据
in = new CompressedSplitLineReader(cIn, job,
this.recordDelimiterBytes);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
//如果是不可切片压缩编码,则创建一个SplitLineReader读取压缩数据,并将文件输入流转换成解压数据流传递给普通SplitLineReader读取
in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
//如果不是压缩文件,则创建普通SplitLineReader读取数据
in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);
filePosition = fileIn;
}
~~~
- linux
- 常用命令
- 高级文本命令
- 面试题
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推荐
- java高级特性
- 多线程
- 实现线程的三种方式
- 同步关键词
- 读写锁
- 锁的相关概念
- 多线程的join
- 有三个线程T1 T2 T3,保证顺序执行
- java五种线程池
- 守护线程与普通线程
- ThreadLocal
- BlockingQueue消息队列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty简介
- 案例一发送字符串
- 案例二发送对象
- 轻量级RPC开发
- 简介
- spring(IOC/AOP)
- spring初始化顺序
- 通过ApplicationContextAware加载Spring上下文
- InitializingBean的作用
- 结论
- 自定义注解
- zk在框架中的应用
- hadoop
- 简介
- hadoop集群搭建
- hadoop单机安装
- HDFS简介
- hdfs基本操作
- hdfs环境搭建
- 常见问题汇总
- hdfs客户端操作
- mapreduce工作机制
- 案列-单词统计
- 局部聚合Combiner
- 案列-流量统计(分区,排序,比较)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法实现
- 案例-求topN(分组)
- 自定义inputFormat
- 自定义outputFormat
- 框架运算全流程
- mapreduce的优化方案
- HA机制
- Hive
- 安装
- DDL操作
- 创建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 严格模式
- 数据类型
- shell参数
- 函数
- 内置运算符
- 内置函数
- 自定义函数
- Transform实现
- 特殊分割符处理
- 案例
- 级联求和accumulate
- flume
- 简介
- 安装
- 常用的组件
- 拦截器
- 案例
- 采集目录到HDFS
- 采集文件到HDFS
- 多个agent串联
- 日志采集和汇总
- 自定义拦截器
- 高可用配置
- 使用注意
- sqoop
- 安装
- 数据导入
- 导入数据到HDFS
- 导入关系表到HIVE
- 导入表数据子集
- 增量导入
- 数据导出
- 作业
- 原理
- azkaban
- 简介
- 安装
- 案例
- 简介
- command类型单一job
- command类型多job工作流flow
- HDFS操作任务
- mapreduce任务
- hive脚本任务
- hbase
- 简介
- 安装
- 命令行
- 基本CURD
- 过滤器查询
- 系统架构
- 物理存储
- 寻址机制
- 读写过程
- Region管理
- master工作机制
- 建表高级属性
- 与mapreduce结合
- 协处理器
- 点击流平台开发
- 简介
- storm
- 简介
- 安装
- 集群启动及任务过程分析
- 单词统计
- 并行度
- ACK容错机制
- ACK简介