💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
[TOC] # 分析 采集需求:比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs ![](https://box.kancloud.cn/b6ab7695deefefa0d7e25d526fa14e59_664x267.png) 根据需求,首先定义以下3大要素 * 采集源,即source——监控文件内容更新 : `exec 'tail -F file'` * 下沉目标,即sink——HDFS文件系统 : hdfs sink * Source和sink之间的传递通道——channel,可用file channel 也可以用 内存channel # 配置文件 ~~~ # 定义名称 agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # Describe/configure tail -F source1 # 定义source # source的类型的exec,这是个命令行,需要个命令 agent1.sources.source1.type = exec # tail -F监控 这个文件的新增的变化 agent1.sources.source1.command = tail -F /root/data/access_log # 用的是bash agent1.sources.source1.shell=/bin/bash -c #configure host for source # 使用2个拦截器,i1和i2,多个拦截器用空格分开 agent1.sources.source1.interceptors = i1 i2 # 类型是host agent1.sources.source1.interceptors.i1.type = host # 解析对应host里面的hostname agent1.sources.source1.interceptors.i1.hostHeader = hostname # 主机名默认是不是使用ip,如果是false,这解析就是对应的主机名了 agent1.sources.source1.interceptors.i1.userIP=true agent1.sources.source1.interceptors.i2.type = timestamp # Describe sink1 agent1.sinks.sink1.type = hdfs # 这边写hdfs的 agent1.sinks.sink1.hdfs.path=hdfs://master:9000/file/%{hostname}/%y-%m-%d/%H-%M # 文件的前缀,在hdfs上的前缀 agent1.sinks.sink1.hdfs.filePrefix = access_log # 批次大小,就是文件达到多少条才提交到hdfs agent1.sinks.sink1.hdfs.batchSize= 100 # 当前文件存储数据类型,还可以用压缩格式 agent1.sinks.sink1.hdfs.fileType = DataStream # 文件的格式类型 agent1.sinks.sink1.hdfs.writeFormat =Text # 达到下面的三个任何一个就按照那个标准生成一个新文件 #滚动生成的文件按大小生成 agent1.sinks.sink1.hdfs.rollSize = 10240 #滚动生成的文件按行数生成 agent1.sinks.sink1.hdfs.rollCount = 1000 #滚动生成的文件按时间生成,秒 agent1.sinks.sink1.hdfs.rollInterval = 10 # 整体就是每10分钟滚动生成一个目录 #开启滚动生成目录 agent1.sinks.sink1.hdfs.round = true #以10为一梯度滚动生成,单位在下面 agent1.sinks.sink1.hdfs.roundValue = 10 #单位为分钟 agent1.sinks.sink1.hdfs.roundUnit = minute # Use a channel which buffers events in memory # 管道的类型 agent1.channels.channel1.type = memory # 多久之后将数据从source移动到channel,channel移动到sink agent1.channels.channel1.keep-alive = 120 # 管道的容量,字节 agent1.channels.channel1.capacity = 500000 # 事务的类型,多少条之后source推送到channel或者channel推送到sinks agent1.channels.channel1.transactionCapacity = 600 # Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1 ~~~ # 测试 启动 ~~~ flume-ng agent -c conf -f fhd.conf -n agent1 -Dflume.root.logger=INFO,console ~~~ fhd.conf换成你自己写的,不同的目录加上目录 -n代表上面定义的agent的名字 `/root/hadoop2/logs/access_log`这个文件有改动就会记录上传到hdfs