企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
:-: **常用的Source类型** | 类型 | 描述 | | --- | --- | | netcat source(TCP) | 它打开指定的端口并侦听数据。期望提供的数据是换行分隔的文本。每行文本被转换成一个 Flume 事件并通过连接的通道发送。 | | exec source | 执行Linux指令,并消费指令返回的结果。 | | spooling directory source | 该 Source 将监视指定目录的新文件,一旦有新文件产生立即解析它们。将给定的文件完全读入Channel后,默认情况下通过重命名该文件来表示完成,或者可以删除该文件,或者使用 trackerDir 跟踪已处理的文件。 | | http source | 用于接收HTTP的Get和Post请求。 | | avro source | 监听 Avro 端口并从外部 Avro 客户端流接收事件。当与另一个(前一跳)Flume Agent 上的 Avro Sink 一起使用时,它可以创建分层收集拓扑。 | | kafka source | 它读取来自Kafka主题的消息。 | | taildir source|可以同时监控一个或多个文件, 并带有偏移量存储文件来记录上次读到的位置, 下次可以接着读。<br/> 注意:taildir source 目前不能运行在 windows 系统上。 | **如何使用这些不同的Source:** (1)查看官方文档 https://flume.apache.org/releases/content/1.8.0/FlumeUserGuide.html 检查Source都有哪些配置。如下为 exec source的配置: ![](https://img.kancloud.cn/c4/5d/c45dd2ca9386f5fc26dea3facabdff2d_1291x503.png) (2)按照【Flume使用过程】这一节将Source的相关配置替换。<br/> 下面就上表列出的Source分别做一个使用例子。<br/> [TOC] # 1. exec source 举例 (1)创建一个`exec_source.conf`文件,编写Agent、Source、Channel、Sink的配置信息。 ```xml ############## 1. Agent初始化 ############ # agent为Agent的名字,可以随便命名 # s1、c1、sk1是在该agent下的Source、Channel、Sink,也可以随便命名 agent.sources = s1 agent.channels = c1 agent.sinks = sk1 ############## 2. Source配置 ############# # 设置Source的类型、Linux命令、通道 agent.sources.s1.type = exec # tail命令可以实时检测一个文件的内容是否发生了更改 agent.sources.s1.command = tail -f /opt/install/flume/myconf/learn.log agent.sources.s1.channels = c1 ############## 3. Sink配置 ############# # 设置Sink类型为logger模式 agent.sinks.sk1.type = logger # Sink从c1通道获取数据 agent.sinks.sk1.channel = c1 ############ 4. Channel配置 ############ # 设置Channel为内存模式,容量1000,传输参数100 agent.channels.c1.type = memory ``` (2)启动Agent服务 ```shell -- 切换到flume的根目录下执行下面的语句。当然如果你已经配置了环境变量则就不需要了 bin/flume-ng agent -c conf -f myconf/exec_source.conf --name agent -Dflume.root.logger=INFO,console ``` 可以看到打印出了learn.log文件的内容。 ![](https://img.kancloud.cn/1f/27/1f2790306531fe5f5712b3414613e136_1702x90.png) (3)当你改变learn.log文件的内容时,会被监测到。 ![](https://img.kancloud.cn/ea/be/eabe9182c40f576d013f1fa6576431f3_1690x184.png) <br/> # 2. spooling directory source 举例 (1)创建一个`spooldir_source.conf`文件,编写Agent、Source、Channel、Sink的配置信息。 ```xml ############## 1. Agent初始化 ############ # agent为Agent的名字,可以随便命名 # s1、c1、sk1是在该agent下的Source、Channel、Sink,也可以随便命名 agent.sources = s1 agent.channels = c1 agent.sinks = sk1 ############## 2. Source配置 ############# # 设置Source的类型、需要监控的目录、通道 agent.sources.s1.type = spooldir agent.sources.s1.spoolDir = /opt/install/flume/myconf/ agent.sources.s1.channels = c1 ############## 3. Sink配置 ############# # 设置Sink类型为logger模式 agent.sinks.sk1.type = logger # Sink从c1通道获取数据 agent.sinks.sk1.channel = c1 ############ 4. Channel配置 ############ # 设置Channel为内存模式 agent.channels.c1.type = memory ``` (2)启动Agent服务 ```shell -- 切换到flume的根目录下执行下面的语句。当然如果你已经配置了环境变量则就不需要了 bin/flume-ng agent -c conf -f myconf/spooldir_source.conf --name agent -Dflume.root.logger=INFO,console ``` (3)向/opt/install/flume/myconf新建一个test.log文件,查看Flume的输出如下: ![](https://img.kancloud.cn/ba/0d/ba0de959c33dbb55602321612d2275e3_1030x69.png) (4)查看/opt/install/flume/myconf下的所有文件都被新增了`.COMPLETED`后缀 ![](https://img.kancloud.cn/90/21/9021387d70a40c88badb569ff2b23c78_745x156.png) <br/> # 3. http source 举例 (1)创建一个`http_source.conf`文件,编写Agent、Source、Channel、Sink的配置信息。 ```xml ############## 1. Agent初始化 ############ # agent为Agent的名字,可以随便命名 # s1、c1、sk1是在该agent下的Source、Channel、Sink,也可以随便命名 agent.sources = s1 agent.channels = c1 agent.sinks = sk1 ############## 2. Source配置 ############# # 设置Source的类型、端口、通道 agent.sources.s1.type = http agent.sources.s1.port = 5678 agent.sources.s1.channels = c1 ############## 3. Sink配置 ############# # 设置Sink类型为logger模式 agent.sinks.sk1.type = logger # Sink从c1通道获取数据 agent.sinks.sk1.channel = c1 ############ 4. Channel配置 ############ # 设置Channel为内存模式 agent.channels.c1.type = memory ``` (2)启动Agent服务 ```shell -- 切换到flume的根目录下执行下面的语句。当然如果你已经配置了环境变量则就不需要了 bin/flume-ng agent -c conf -f myconf/http_source.conf --name agent -Dflume.root.logger=INFO,console ``` (3)发送post请求, 查看Flume输出 ```shell curl -XPOST hadoop101:5678 -d '[{"headers":{"h1":"v1","h2":"v2"},"body":"hello body"}]' ``` ![](https://img.kancloud.cn/ad/3f/ad3fad8751f167a88304b49c4fb928ff_1699x99.png) <br/> # 4. taildir source 举例 (1)创建一个`tail_source.conf`文件,编写Agent、Source、Channel、Sink的配置信息。 ```xml ############## 1. Agent初始化 ############ # agent为Agent的名字,可以随便命名 # s1、c1、sk1是在该agent下的Source、Channel、Sink,也可以随便命名 agent.sources = s1 agent.channels = c1 agent.sinks = sk1 ############## 2. Source配置 ############# # 设置Source的类型 agent.sources.s1.type = TAILDIR # 定义偏移量存储路径 agent.sources.s1.positionFile = /opt/install/flume/myconf/taildir_position.json # 定义文件组, 多个文件 f1,f2 agent.sources.s1.filegroups = f1 f2 # 对f1指定绝对路径 agent.sources.s1.filegroups.f1 = /opt/install/flume/myconf/f1.log # 向f1的header添加kv对 agent.sources.s1.headers.f1.headerKey1 = value1 # 对f2指定绝对路径 agent.sources.s1.filegroups.f2 = /opt/install/flume/myconf/f2.log # 向f2的header添加kv对 agent.sources.s1.headers.f2.headerKey2 = value2 # 是否添加一个头信息来存储文件的绝对路径, 默认是false agent.sources.s1.fileHeader = true # 指定通道 agent.sources.s1.channels = c1 ############## 3. Sink配置 ############# # 设置Sink类型为logger模式 agent.sinks.sk1.type = logger # Sink从c1通道获取数据 agent.sinks.sk1.channel = c1 ############ 4. Channel配置 ############ # 设置Channel为内存模式 agent.channels.c1.type = memory ``` (2)启动Agent服务 ```shell -- 切换到flume的根目录下执行下面的语句。当然如果你已经配置了环境变量则就不需要了 bin/flume-ng agent -c conf -f myconf/tail_source.conf --name agent -Dflume.root.logger=INFO,console ``` 暂时不知道tail source有什么效果。