多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
:-: **常用的Sink类型** | 类型 | 描述 | | --- | --- | | avro sink | 作为avro客户端向avro服务端发送avro事件。 | | HDFS sink | 将事件写入Hadoop分布式文件系统(HDFS)。 | | Hive sink | 包含分隔文本或JSON数据流事件直接进入Hive表或分区;<br/>传入的事件数据字段映射到Hive表中相应的列。 | | HBase sink | 此 Sink 将数据写入 HBase。 | | Kafka sink | 此 Sink 将数据写入 Kafka。 | <br/> **怎么使用?** (1)查看官方文档:https://flume.apache.org/releases/content/1.8.0/FlumeUserGuide.html ![](https://img.kancloud.cn/d3/7a/d37a76729351c90c36a14f91b3f6a71f_1301x395.png) <br/> [TOC] # 1. http和avro的配合进行PRC(远程)调用 (1)编写Flume文件,设置Agent、Source、Channel、Sink的配置信息。 *`http_source.conf`* ```xml ############## 1. Agent初始化 ############ # agent为Agent的名字,可以随便命名 # s1、c1、sk1是在该agent下的Source、Channel、Sink,也可以随便命名 agent.sources = s1 agent.channels = c1 agent.sinks = sk1 ############## 2. Source配置 ############# # 指定Source类型、端口号、通道 agent.sources.s1.type = http agent.sources.s1.port = 5678 agent.sources.s1.channels = c1 ############ 3. Channel配置 ############ # 设置Channel为内存模式 agent.channels.c1.type = memory ############ 4. Sink配置 ############# # 指定sink类型、主机、端口号(不能与Source的端口号相同)、通道 agent.sinks.sk1.type = avro agent.sinks.sk1.hostname = hadoop101 agent.sinks.sk1.port = 4444 agent.sinks.sk1.channel = c1 ``` <br/> *`avro_source.conf`* ```xml ############## 1. Agent初始化 ############ # agent为Agent的名字,可以随便命名 # s1、c1、sk1是在该agent下的Source、Channel、Sink,也可以随便命名 agent.sources = s1 agent.channels = c1 agent.sinks = sk1 ############## 2. Source配置 ############# # 指定Source类型、主机、端口号(与上面的sink端口号相同)、通道 agent.sources.s1.type = avro agent.sources.s1.bind = hadoop101 agent.sources.s1.port = 4444 agent.sources.s1.channels = c1 ############ 3. Channel配置 ############ # 指定Channel类型 agent.channels.c1.type = memory ############ 4. Sink配置 ############# # 指定sink类型、通道 agent.sinks.sk1.type = logger agent.sinks.sk1.channel = c1 ``` (2)启动Flume ```shell -- 切换到flume的根目执行语句。当然如果你已经配置了环境变量则就不需要了 -- 先启动avro_source bin/flume-ng agent -c conf -f myconf/avro_source.conf --name agent -Dflume.root.logger=INFO,console -- 再启动http_source bin/flume-ng agent -c conf -f myconf/http_source.conf --name agent -Dflume.root.logger=INFO,console ``` ![](https://img.kancloud.cn/09/20/092010769e43ffdface01f84b5c997c8_1394x106.png) ![](https://img.kancloud.cn/ff/57/ff578f6b5e8e8cdc4e0e93a40cd0a4fc_1426x110.png) (3)发送post请求,查看avro_source的Flume输出 ```shell curl -XPOST hadoop101:5678 -d '[{"headers":{"h1":"v1","h2":"v2"},"body":"hello body"}]' ``` ![](https://img.kancloud.cn/3e/67/3e675f8fe0426abda4970b73716cb0f3_1697x115.png) <br/> # 2. HDFS Sink的使用 (1)编写`hdfs_sink.conf`文件 ```conf ############## 1. Agent初始化 ############ # agent为Agent的名字,可以随便命名 # s1、c1、sk1是在该agent下的Source、Channel、Sink,也可以随便命名 agent.sources = s1 agent.channels = c1 agent.sinks = sk1 ############## 2. Source配置 ############# # 设置Source的类型、Linux命令、通道 agent.sources.s1.type = netcat agent.sources.s1.bind = hadoop101 agent.sources.s1.port = 5678 agent.sources.s1.channels = c1 ############ 3. Channel配置 ############ # 设置Channel为内存模式 agent.channels.c1.type = memory ############ 4. Sink配置 ############# # 设置Sink类型 agent.sinks.sk1.type = hdfs agent.sinks.sk1.hdfs.path = /flume/data/ # Sink从c1通道获取数据 agent.sinks.sk1.channel = c1 ``` (2)启动Flume ```shell [root@hadoop101 flume]# bin/flume-ng agent -c conf -f myconf/hdfs_sink.conf --name agent -Dflume.root.logger=INFO,console ``` (3)启动telnet发送数据 ```shell [root@hadoop101 flume]# telnet hadoop101 5678 Trying 192.168.75.129... Connected to hadoop101. Escape character is '^]'. hello world OK hello spark OK hello hadoop scala OK ``` (4)查看Flume日志输出 ![](https://img.kancloud.cn/4a/8a/4a8a2de00f36b083976b407e6d1885a7_1162x246.png) (5)查看hdfs上的文件 ```shell [root@hadoop101 hadoop]# bin/hdfs dfs -text /flume/data/FlumeData.1611000631177 21/01/19 04:15:33 WARN util.NativeCodeLoader: Unable to load 1611000632253 68 65 6c 6c 6f 20 77 6f 72 6c 64 0d 1611000636996 68 65 6c 6c 6f 20 73 70 61 72 6b 0d 1611000644003 68 65 6c 6c 6f 20 68 61 64 6f 6f 70 20 73 63 61 6c 61 0d ```