💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
![](https://img.kancloud.cn/39/fa/39fab755449eb98c4ab6ed8c82360c1e_676x432.png) * 新建kafka配置文件`avro-memory-kafka.conf` [ flume文档](http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.7.0/FlumeUserGuide.html) ``` # Name the components on this agent avro-memory-kafka.sources = avro-source avro-memory-kafka.sinks = kafka-sink avro-memory-kafka.channels = memory-channel # Describe/configure the source avro-memory-kafka.sources.avro-source.type = avro avro-memory-kafka.sources.avro-source.bind = spark avro-memory-kafka.sources.avro-source.port = 44444 # Describe the sink #使用kafkasink向kafka输出消息 avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink avro-memory-kafka.sinks.kafka-sink.topic = bizzbee-replicated-topic avro-memory-kafka.sinks.kafka-sink.brokerList = localhost:9092 avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1 avro-memory-kafka.sinks.kafka-sink.batchSize = 5 # Use a channel which buffers events in memory avro-memory-kafka.channels.memory-channel.type = memory # Bind the source and sink to the channel avro-memory-kafka.sources.avro-source.channels = memory-channel avro-memory-kafka.sinks.kafka-sink.channel = memory-channel ``` * 首先zookeeper应该是启动的。 * 启动kafka。 * 启动flume(2个,有一个是新的配置文件) * 第一个flume监控着`/home/bizzbee/data/data.log` ~~~ flume-ng agent --name exec-memory-avro --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec-memory-avro.conf -Dflume.root.logger=INFO,console ~~~ * 新的flume ~~~ flume-ng agent --name avro-memory-kafka --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/avro-memory-kafka.conf -Dflume.root.logger=INFO,console ~~~ * 然后再起一个kafka的消费者 ``` kafka-console-consumer.sh --bootstrap-server spark:9093 --topic bizzbee-replicated-topic ``` * 最后向data.log中写内容 ![](https://img.kancloud.cn/26/fd/26fddf38382b3378bbd2793150852be9_1160x460.png)