企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 注意启动脚本命令的书写 agent 的名称别写错了,后台执行加上`nohup ... &` # channel参数 ~~~ capacity:默认该通道中最大的可以存储的event数量 trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量 keep-alive:event添加到通道中或者移出的允许时间 注意:capacity > trasactionCapacity 容量要大于事务大小 ~~~ # 日志采集到HDFS配置 ## 说明1(sink端) ~~~ #定义sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path=hdfs://192.168.200.101:9000/source/logs/%{type}/%Y%m%d a1.sinks.k1.hdfs.filePrefix =events a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text #时间类型 a1.sinks.k1.hdfs.useLocalTimeStamp = true # 如果不想用某个条件就设为0 #生成的文件不按条数生成 a1.sinks.k1.hdfs.rollCount = 0 #生成的文件按时间生成 a1.sinks.k1.hdfs.rollInterval = 30 #生成的文件按大小生成 a1.sinks.k1.hdfs.rollSize = 10485760 #批量写入hdfs的个数 a1.sinks.k1.hdfs.batchSize = 10000 flume操作hdfs的线程数(包括新建,写入等) a1.sinks.k1.hdfs.threadsPoolSize=10 #操作hdfs超时时间 a1.sinks.k1.hdfs.callTimeout=30000 ~~~ ## 说明2 (sink端) | hdfs.round | false | 如果时间戳向下舍入(如果为true,则会影响除%t之外的所有基于时间的转义序列) | | --- | --- | --- | | hdfs.roundValue | 1 | 舍入到最高倍数(在使用hdfs.roundUnit配置的单位中),小于当前时间 | | hdfs.roundUnit | second | 舍入值的单位 - second,分钟或小时 | * round: 默认值:false 是否启用时间上的”舍弃”,这里的”舍弃”,类似于'四舍五入' * roundValue:默认值:1 时间上进行'舍弃'的值; * roundUnit: 默认值:seconds时间上进行”舍弃”的单位,包含:second,minute,hour 案例(1): ~~~ a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H:%M/%S a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute ~~~ 当时间为`2015-10-16 17:38:59`时候,hdfs.path依然会被解析为: ~~~ /flume/events/2015-10-16/17:30/00 /flume/events/2015-10-16/17:40/00 /flume/events/2015-10-16/17:50/00 ~~~ 因为设置的是舍弃10分钟内的时间,因此,该目录每10分钟新生成一个。 案例(2): ~~~ a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H:%M/%S a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = second ~~~ 现象:10秒为时间梯度生成对应的目录,目录下面包括很多小文件!!! HDFS产生的数据目录格式如下: ~~~ /flume/events/2016-07-28/18:45/10 /flume/events/2016-07-28/18:45/20 /flume/events/2016-07-28/18:45/30 /flume/events/2016-07-28/18:45/40 /flume/events/2016-07-28/18:45/50 /flume/events/2016-07-28/18:46/10 /flume/events/2016-07-28/18:46/20 /flume/events/2016-07-28/18:46/30 /flume/events/2016-07-28/18:46/40 /flume/events/2016-07-28/18:46/50 ~~~ # 断点续传 日志采集使用tail -F 监控一个文件新增的内容 (详细见案例:flume的第6个配置案例-分类收集数据-使用static拦截器) Source端的代码: ~~~ a1.sources.r2.type = exec a1.sources.r2.command = tail -F /root/data/nginx.log a1.sources.r2.interceptors = i2 a1.sources.r2.interceptors.i2.type = static a1.sources.r2.interceptors.i2.key = type a1.sources.r2.interceptors.i2.value = nginx ~~~ 这里会出现这样一个情况,当你的这个flume agent程序挂了或者是服务器宕机了,那么随着文件内容的增加,下次重启时,会消费到重复的数据, 怎么办呢? 解决方案:使用改进版的配置信息,修改信息 ~~~ a1.sources.r2.command= tail  -n +$(tail -n1 /root/log) -F /root/data/nginx.log | awk 'ARGIND==1{i=$0;next}{i++;if($0~/^tail/){i=0};print $0;print i >> "/root/log";fflush("")}' /root/log-  ~~~ 意思就是说:Source每次读取一条信息,就往/root/log文件记住当前消息的行数。这样的话当你的程序挂了之后,重启时先获取上次读取所在的行数,依次从下读,这样避免了数据重复。 而在flume1.7已经集成了该功能 存下每个文件消费的偏移量,避免从头消费 但是 Flume1.7 中如果文件重命名,那么会被当成新文件而被重新采集 配置文件: ~~~ a1.channels = ch1 a1.sources = s1 a1.sinks = hdfs-sink1 #channel a1.channels.ch1.type = memory a1.channels.ch1.capacity=100000 a1.channels.ch1.transactionCapacity=50000 #source a1.sources.s1.channels = ch1 #监控一个目录下的多个文件新增的内容 a1.sources.s1.type = taildir #通过 json 格式存下每个文件消费的偏移量,避免从头消费 a1.sources.s1.positionFile = /var/local/apache-flume-1.7.0-bin/taildir_position.json a1.sources.s1.filegroups = f1 f2 f3 a1.sources.s1.filegroups.f1 = /root/data/access.log a1.sources.s1.filegroups.f2 = /root/data/nginx.log a1.sources.s1.filegroups.f3 = /root/data/web.log a1.sources.s1.headers.f1.headerKey = access a1.sources.s1.headers.f2.headerKey = nginx a1.sources.s1.headers.f3.headerKey = web a1.sources.s1.fileHeader = true ##sink a1.sinks.hdfs-sink1.channel = ch1 a1.sinks.hdfs-sink1.type = hdfs a1.sinks.hdfs-sink1.hdfs.path =hdfs://master:9000/demo/data a1.sinks.hdfs-sink1.hdfs.filePrefix = event_data a1.sinks.hdfs-sink1.hdfs.fileSuffix = .log a1.sinks.hdfs-sink1.hdfs.rollSize = 10485760 a1.sinks.hdfs-sink1.hdfs.rollInterval =20 a1.sinks.hdfs-sink1.hdfs.rollCount = 0 a1.sinks.hdfs-sink1.hdfs.batchSize = 1500 a1.sinks.hdfs-sink1.hdfs.round = true a1.sinks.hdfs-sink1.hdfs.roundUnit = minute a1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25 a1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true a1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1 a1.sinks.hdfs-sink1.hdfs.fileType =DataStream a1.sinks.hdfs-sink1.hdfs.writeFormat = Text a1.sinks.hdfs-sink1.hdfs.callTimeout = 60000 ~~~ # header参数配置 ~~~ #配置信息test-header.conf a1.channels = c1 a1.sources = r1 a1.sinks = k1 #channel a1.channels.c1.type = memory a1.channels.c1.capacity=100000 a1.channels.c1.transactionCapacity=50000 #source a1.sources.r1.channels = c1 # 监控目录下的文件 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /var/tmp a1.sources.r1.batchSize= 100 a1.sources.r1.inputCharset = UTF-8 # 是否要添加一个标题存储绝对路径的文件名 a1.sources.r1.fileHeader = true # 控制台的key是mmm,文件的绝对路径作为值加到里面 a1.sources.r1.fileHeaderKey = mmm  # 是否要添加头文件的存储 a1.sources.r1.basenameHeader = true # 控制台的key是nn,值是文件名,加到里面 a1.sources.r1.basenameHeaderKey = nnn # 文件的名称,看下面的控制台 #sink a1.sinks.k1.type = logger a1.sinks.k1.channel = c1 ~~~ 执行脚本: ~~~ bin/flume-ng agent -c conf -f conf/test-header.conf  -name a1 -Dflume.root.logger=DEBUG,console ~~~ 看到内容控制台打印的信息: ~~~ Event: { headers:{mmm=/var/tmp/bbb, nnn=bbb} body: 30 30 30 000 } Event: { headers:{mmm=/var/tmp/aaa, nnn=aaa} body: 31 31 31 111 } ~~~ 其中aaa, bbb 为目录/var/tmp 下面的2个文件名称 官网描述: ![](https://box.kancloud.cn/1981ea4e5fdd6a6c766977dd3a341c74_800x377.png)