企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 简介 目标: 使用flume-1监控文件变动,flume-1将变动内容传递给flume-2,flume-2负责存储到HDFS. 同时flume-1将变动内容传递给flume-3,flume-3负责输出到local filesystem. ![](https://box.kancloud.cn/dbf30f8bc3c2a7bec63c08567b2002df_1850x526.png) # 实现 1. 创建flume-1.conf,用于监控hive.log文件的变动,同时产生两个channel和两个sink分别输送给flume-2和flume-3 **flume-1.conf** ~~~ a1.sources=r1 a1.sinks=k1 k2 a1.channels=c1 c2 # 将数据流复制给多个channel a1.sources.r1.selector.type=replicating # 定义sources a1.sources.r1.type=exec a1.sources.r1.command=tail -F /root/data/hive.log a1.sources.r1.shell=/bin/bash -c # 定义sinks a1.sinks.k1.type=avro a1.sinks.k1.hostname=master a1.sinks.k1.port=4141 a1.sinks.k2.type=avro a1.sinks.k2.hostname=master a1.sinks.k2.port=4142 # 定义channel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 a1.channels.c2.type=memory a1.channels.c2.capacity=1000 a1.channels.c2.transactionCapacity=100 # 绑定 a1.sources.r1.channels=c1 c2 a1.sinks.k1.channel=c1 a1.sinks.k2.channel=c2 ~~~ **flume-2.conf** ~~~ a2.sources=r1 a2.sinks=k1 a2.channels=c1 # 定义sources a2.sources.r1.type=avro a2.sources.r1.bind=master a2.sources.r1.port=4141 # 定义sink a2.sinks.k1.type=hdfs a2.sinks.k1.hdfs.path=hdfs://master:8020/flume2?%Y-%m-%d-%H #上传文件的前缀 a2.sinks.k1.hdfs.filePrefix=flume2- #是否按照时间滚动文件夹 a2.sinks.k1.hdfs.round=true #多少时间单位创建一个新的文件夹 a2.sinks.k1.hdfs.roundValue=1 #重新定义时间单位 a2.sinks.k1.hdfs.roundUnit=hour #是否使用本地时间戳 a2.sinks.k1.hdfs.useLocalTimeStamp=true #积攒多少个Event才flush到HDFS一次 a2.sinks.k1.hdfs.batchSize=100 #设置文件类型,可支持压缩 a2.sinks.k1.hdfs.fileType=DataStream #多久生成一个新的文件 a2.sinks.k1.hdfs.rollInterval=600 #设置每个文件的滚动大小大概是128M a2.sinks.k1.hdfs.rollSize=134217700 #文件的滚动与Event数量无关 a2.sinks.k1.hdfs.rollCount=0 #最小冗余数 a2.sinks.k1.hdfs.minBlockReplicas=1 # 定义channel a2.channels.c1.type=memory a2.channels.c1.capacity=1000 a2.channels.c1.transactionCapacity=100 # 绑定 a2.sources.r1.channels=c1 a2.sinks.k1.channel=c1 ~~~ **flume-3.conf** ~~~ a3.sources=r1 a3.sinks=k1 a3.channels=c1 # 定义source a3.sources.r1.type=avro a3.sources.r1.bind=master a3.sources.r1.port=4142 # 定义sink a3.sinks.k1.type=file_roll a3.sinks.k1.sink.directory=/root/flume3 # 定义channel a3.channels.c1.type=memory a3.channels.c1.capacity=1000 a3.channels.c1.transactionCapacity=100 # 绑定 a3.sources.r1.channels=r1 a3.sinks.k1.channel=c1 ~~~