[TOC]
# 注意启动脚本命令的书写
agent 的名称别写错了,后台执行加上`nohup ... &`
# channel参数
~~~
capacity:默认该通道中最大的可以存储的event数量
trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量
keep-alive:event添加到通道中或者移出的允许时间
注意:capacity > trasactionCapacity
~~~
# 日志采集到HDFS配置
## 说明1(sink端)
~~~
#定义sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://192.168.200.101:9000/source/logs/%{type}/%Y%m%d
a1.sinks.k1.hdfs.filePrefix =events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
#时间类型
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件不按条数生成
a1.sinks.k1.hdfs.rollCount = 0
#生成的文件按时间生成
a1.sinks.k1.hdfs.rollInterval = 30
#生成的文件按大小生成
a1.sinks.k1.hdfs.rollSize = 10485760
#批量写入hdfs的个数
a1.sinks.k1.hdfs.batchSize = 10000
flume操作hdfs的线程数(包括新建,写入等)
a1.sinks.k1.hdfs.threadsPoolSize=10
#操作hdfs超时时间
a1.sinks.k1.hdfs.callTimeout=30000
~~~
## 说明2 (sink端)
| hdfs.round | false | 如果时间戳向下舍入(如果为true,则会影响除%t之外的所有基于时间的转义序列) |
| --- | --- | --- |
| hdfs.roundValue | 1 | 舍入到最高倍数(在使用hdfs.roundUnit配置的单位中),小于当前时间 |
| hdfs.roundUnit | second | 舍入值的单位 - second,分钟或小时 |
* round: 默认值:false 是否启用时间上的”舍弃”,这里的”舍弃”,类似于”四舍五入”
* roundValue:默认值:1 时间上进行“舍弃”的值;
* roundUnit: 默认值:seconds时间上进行”舍弃”的单位,包含:second,minute,hour
案例(1):
~~~
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H:%M/%S
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
~~~
当时间为`2015-10-16 17:38:59`时候,hdfs.path依然会被解析为:
~~~
/flume/events/2015-10-16/17:30/00
/flume/events/2015-10-16/17:40/00
/flume/events/2015-10-16/17:50/00
~~~
因为设置的是舍弃10分钟内的时间,因此,该目录每10分钟新生成一个。
案例(2):
~~~
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H:%M/%S
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
~~~
现象:10秒为时间梯度生成对应的目录,目录下面包括很多小文件!!!
HDFS产生的数据目录格式如下:
~~~
/flume/events/2016-07-28/18:45/10
/flume/events/2016-07-28/18:45/20
/flume/events/2016-07-28/18:45/30
/flume/events/2016-07-28/18:45/40
/flume/events/2016-07-28/18:45/50
/flume/events/2016-07-28/18:46/10
/flume/events/2016-07-28/18:46/20
/flume/events/2016-07-28/18:46/30
/flume/events/2016-07-28/18:46/40
/flume/events/2016-07-28/18:46/50
~~~
# 断点续传
日志采集使用tail -F 监控一个文件新增的内容
(详细见案例:flume的第6个配置案例-分类收集数据-使用static拦截器)
Source端的代码:
~~~
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /root/data/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx
~~~
这里会出现这样一个情况,当你的这个flume agent程序挂了或者是服务器宕机了,那么随着文件内容的增加,下次重启时,会消费到重复的数据, 怎么办呢?
解决方案:使用改进版的配置信息,修改信息
~~~
a1.sources.r2.command= tail -n +$(tail -n1 /root/log) -F /root/data/nginx.log | awk 'ARGIND==1{i=$0;next}{i++;if($0~/^tail/){i=0};print $0;print i >> "/root/log";fflush("")}' /root/log-
~~~
意思就是说:Source每次读取一条信息,就往/root/log文件记住当前消息的行数。这样的话当你的程序挂了之后,重启时先获取上次读取所在的行数,依次从下读,这样避免了数据重复。
而在flume1.7已经集成了该功能
配置文件:
~~~
a1.channels = ch1
a1.sources = s1
a1.sinks = hdfs-sink1
#channel
a1.channels.ch1.type = memory
a1.channels.ch1.capacity=100000
a1.channels.ch1.transactionCapacity=50000
#source
a1.sources.s1.channels = ch1
#监控一个目录下的多个文件新增的内容
a1.sources.s1.type = taildir
#通过 json 格式存下每个文件消费的偏移量,避免从头消费
a1.sources.s1.positionFile = /var/local/apache-flume-1.7.0-bin/taildir_position.json
a1.sources.s1.filegroups = f1 f2 f3
a1.sources.s1.filegroups.f1 = /root/data/access.log
a1.sources.s1.filegroups.f2 = /root/data/nginx.log
a1.sources.s1.filegroups.f3 = /root/data/web.log
a1.sources.s1.headers.f1.headerKey = access
a1.sources.s1.headers.f2.headerKey = nginx
a1.sources.s1.headers.f3.headerKey = web
a1.sources.s1.fileHeader = true
##sink
a1.sinks.hdfs-sink1.channel = ch1
a1.sinks.hdfs-sink1.type = hdfs
a1.sinks.hdfs-sink1.hdfs.path =hdfs://master:9000/demo/data
a1.sinks.hdfs-sink1.hdfs.filePrefix = event_data
a1.sinks.hdfs-sink1.hdfs.fileSuffix = .log
a1.sinks.hdfs-sink1.hdfs.rollSize = 10485760
a1.sinks.hdfs-sink1.hdfs.rollInterval =20
a1.sinks.hdfs-sink1.hdfs.rollCount = 0
a1.sinks.hdfs-sink1.hdfs.batchSize = 1500
a1.sinks.hdfs-sink1.hdfs.round = true
a1.sinks.hdfs-sink1.hdfs.roundUnit = minute
a1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25
a1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
a1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1
a1.sinks.hdfs-sink1.hdfs.fileType =DataStream
a1.sinks.hdfs-sink1.hdfs.writeFormat = Text
a1.sinks.hdfs-sink1.hdfs.callTimeout = 60000
~~~
# flume的header参数配置讲解
~~~
#配置信息test-header.conf
a1.channels = c1
a1.sources = r1
a1.sinks = k1
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=50000
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /var/tmp
a1.sources.r1.batchSize= 100
a1.sources.r1.inputCharset = UTF-8
a1.sources.r1.fileHeader = true
# 控制台的key,mmm,文件的绝对路径
a1.sources.r1.fileHeaderKey = mmm
a1.sources.r1.basenameHeader = true
# 文件的名称,看下面的控制台
a1.sources.r1.basenameHeaderKey = nnn
#sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
~~~
执行脚本:
~~~
bin/flume-ng agent -c conf -f conf/test-header.conf -name a1 -Dflume.root.logger=DEBUG,console
~~~
看到内容控制台打印的信息:
~~~
Event: { headers:{mmm=/var/tmp/bbb, nnn=bbb} body: 30 30 30 000 }
Event: { headers:{mmm=/var/tmp/aaa, nnn=aaa} body: 31 31 31 111 }
~~~
其中aaa, bbb 为目录/var/tmp 下面的2个文件名称
官网描述:
![](https://box.kancloud.cn/56d588f162df251b48f67f321d1531b0_799x375.png)
- linux
- 常用命令
- 高级文本命令
- 面试题
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推荐
- java高级特性
- 多线程
- 实现线程的三种方式
- 同步关键词
- 读写锁
- 锁的相关概念
- 多线程的join
- 有三个线程T1 T2 T3,保证顺序执行
- java五种线程池
- 守护线程与普通线程
- ThreadLocal
- BlockingQueue消息队列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty简介
- 案例一发送字符串
- 案例二发送对象
- 轻量级RPC开发
- 简介
- spring(IOC/AOP)
- spring初始化顺序
- 通过ApplicationContextAware加载Spring上下文
- InitializingBean的作用
- 结论
- 自定义注解
- zk在框架中的应用
- hadoop
- 简介
- hadoop集群搭建
- hadoop单机安装
- HDFS简介
- hdfs基本操作
- hdfs环境搭建
- 常见问题汇总
- hdfs客户端操作
- mapreduce工作机制
- 案列-单词统计
- 局部聚合Combiner
- 案列-流量统计(分区,排序,比较)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法实现
- 案例-求topN(分组)
- 自定义inputFormat
- 自定义outputFormat
- 框架运算全流程
- mapreduce的优化方案
- HA机制
- Hive
- 安装
- DDL操作
- 创建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 严格模式
- 数据类型
- shell参数
- 函数
- 内置运算符
- 内置函数
- 自定义函数
- Transform实现
- 特殊分割符处理
- 案例
- 级联求和accumulate
- flume
- 简介
- 安装
- 常用的组件
- 拦截器
- 案例
- 采集目录到HDFS
- 采集文件到HDFS
- 多个agent串联
- 日志采集和汇总
- 自定义拦截器
- 高可用配置
- 使用注意
- sqoop
- 安装
- 数据导入
- 导入数据到HDFS
- 导入关系表到HIVE
- 导入表数据子集
- 增量导入
- 数据导出
- 作业
- 原理
- azkaban
- 简介
- 安装
- 案例
- 简介
- command类型单一job
- command类型多job工作流flow
- HDFS操作任务
- mapreduce任务
- hive脚本任务
- hbase
- 简介
- 安装
- 命令行
- 基本CURD
- 过滤器查询
- 系统架构
- 物理存储
- 寻址机制
- 读写过程
- Region管理
- master工作机制
- 建表高级属性
- 与mapreduce结合
- 协处理器
- 点击流平台开发
- 简介
- storm
- 简介
- 安装
- 集群启动及任务过程分析
- 单词统计
- 并行度
- ACK容错机制
- ACK简介