![](https://img.kancloud.cn/39/fa/39fab755449eb98c4ab6ed8c82360c1e_676x432.png)
* 新建kafka配置文件`avro-memory-kafka.conf`
[ flume文档](http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.7.0/FlumeUserGuide.html)
```
# Name the components on this agent
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = memory-channel
# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = spark
avro-memory-kafka.sources.avro-source.port = 44444
# Describe the sink
#使用kafkasink向kafka输出消息
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.topic = bizzbee-replicated-topic
avro-memory-kafka.sinks.kafka-sink.brokerList = localhost:9092
avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1
avro-memory-kafka.sinks.kafka-sink.batchSize = 5
# Use a channel which buffers events in memory
avro-memory-kafka.channels.memory-channel.type = memory
# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
```
* 首先zookeeper应该是启动的。
* 启动kafka。
* 启动flume(2个,有一个是新的配置文件)
* 第一个flume监控着`/home/bizzbee/data/data.log`
~~~
flume-ng agent --name exec-memory-avro --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec-memory-avro.conf -Dflume.root.logger=INFO,console
~~~
* 新的flume
~~~
flume-ng agent --name avro-memory-kafka --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/avro-memory-kafka.conf -Dflume.root.logger=INFO,console
~~~
* 然后再起一个kafka的消费者
```
kafka-console-consumer.sh --bootstrap-server spark:9093 --topic bizzbee-replicated-topic
```
* 最后向data.log中写内容
![](https://img.kancloud.cn/26/fd/26fddf38382b3378bbd2793150852be9_1160x460.png)