企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 分析 采集需求:比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs,使用agent串联 ![](https://box.kancloud.cn/e18d4e601a0465768b2a1cb254c179b7_1035x303.png) 根据需求,首先定义以下3大要素 第一台flume agent * 采集源,即source——监控文件内容更新 : exec ‘tail -F file’ * 下沉目标,即sink——数据的发送者,实现序列化 : avro sink * Source和sink之间的传递通道——channel,可用file channel 也可以用 内存channel 第二台flume agent * 采集源,即source——接受数据。并实现反序列化 : avro source * 下沉目标,即sink——HDFS文件系统 : HDFS sink * Source和sink之间的传递通道——channel,可用file channel 也可以用 内存channel # 配置文件 第一台配置 Flume-agent1 ~~~ #tail-avro-avro-logger.conf # Name the components on this agent # 定义名称 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec # 监听这个文件 a1.sources.r1.command = tail -F /root/logs/test.log a1.sources.r1.channels = c1 # Describe the sink ##sink端的avro是一个数据发送者 a1.sinks.k1.type = avro # 推给这个机器,自己定义 a1.sinks.k1.hostname = master # 端口 a1.sinks.k1.port = 41414 # 批量大小 a1.sinks.k1.batch-size = 10 # Use a channel which buffers events in memory # 内存channels a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel # 组装起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ~~~ Flume-agent2: avro-hdfs.conf ~~~ a1.sources = r1 a1.sinks =s1 a1.channels = c1 ##source中的avro组件是一个接收者服务 a1.sources.r1.type = avro # 绑定一个ip和端口 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414 a1.sinks.s1.type=hdfs # hdfs目录 a1.sinks.s1.hdfs.path=hdfs://master:9000/flumedata a1.sinks.s1.hdfs.filePrefix = access_log a1.sinks.s1.hdfs.batchSize= 100 a1.sinks.s1.hdfs.fileType = DataStream a1.sinks.s1.hdfs.writeFormat =Text a1.sinks.s1.hdfs.rollSize = 10240 a1.sinks.s1.hdfs.rollCount = 1000 a1.sinks.s1.hdfs.rollInterval = 10 a1.sinks.s1.hdfs.round = true a1.sinks.s1.hdfs.roundValue = 10 a1.sinks.s1.hdfs.roundUnit = minute a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.s1.channel = c1 ~~~