## Push的方式
* 从配置文件中拷贝一份配置进行修改。
```
cp exec-memory-avro.conf flume_push_streaming.conf
```
* 配置文件内容
```
simple-agent.sources = netcat-source
simple-agent.sinks = avro-sink
simple-agent.channels = memory-channel
simple-agent.sources.netcat-source.type = netcat
#数据的来源
# 要从本机的44444端口获取数据
simple-agent.sources.netcat-source.bind = spark
simple-agent.sources.netcat-source.port = 44444
simple-agent.sinks.avro-sink.type = avro
#接受数据的机器
# 实验时为我的开发机器mac的地址。
simple-agent.sinks.avro-sink.hostname = 192.168.31.131
simple-agent.sinks.avro-sink.port = 41414
#simple-agentwhich buffers events in memory
simple-agent.channels.memory-channel.type = memory
#simple-agente and sink to the channel
simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.avro-sink.channel = memory-channel
```
* 在开发机mac上开发
~~~
object FlumePushWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount");
val ssc = new StreamingContext(sparkConf,Seconds(5));
val flumeStream = FlumeUtils.createStream(ssc,"0.0.0.0",41414)
flumeStream.map(x=>new String(x.event.getBody.array()).trim).flatMap(_.split(" ")).map((_,1))
.reduceByKey(_+_).print();
ssc.start();
ssc.awaitTermination();
}
}
~~~
* 然后启动,会坚挺开发机的41414端口。
* 启动linux上的flume。
```
flume-ng agent --name simple-agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/flume_push_streaming.conf -Dflume.root.logger=INFO,console &
```
* 在linux机器上向本机44444端口传输数据。
* 注意机器名字要用配置文件中的。(不然连不上,暂不知原因)
[bizzbee@spark conf]$ telnet spark 44444
Trying 192.168.31.70...
Connected to spark.
Escape character is '^]'.
### 提交到生产运行
* 打包
```
mvn clean package -DskipTests
```
* 上传
```
scp spark-train-1.0.jar bizzbee@192.168.31.70:~/lib
```
* 修改flume配置文件
```
simple-agent.sinks.avro-sink.hostname = spark
```
* 启动应用程序
* 这里--packages参数第一个是需要下载的依赖,会较长时间下载。
```
spark-submit --class com.bizzbee.spark.streaming.FlumePushWordCount --master local[2] --packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 spark-train-1.0.jar spark 41414
```
* 启动flume
```
flume-ng agent --name simple-agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/flume_push_streaming.conf -Dflume.root.logger=INFO,console &
```
* 发送数据到44444端口
```
[bizzbee@spark conf]$ telnet spark 44444
Trying 192.168.31.70...
Connected to spark.
Escape character is '^]'.
啊
OK
s
OK
s
OK
OK
```
```
-------------------------------------------
Time: 1587472795000 ms
-------------------------------------------
(sk,2)
(ks,3)
('kk,1)
(k,6)
(jk,1)
```