**和push方式相反,pull方式要先启动flume,然后启动spark streaming**
## 配置文件
```
simple-agent.sources = netcat-source
simple-agent.sinks = spark-sink
simple-agent.channels = memory-channel
simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = spark
simple-agent.sources.netcat-source.port = 44444
simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
simple-agent.sinks.spark-sink.hostname = spark
simple-agent.sinks.spark-sink.port = 41414
simple-agent.channels.memory-channel.type = memory
simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.spark-sink.channel = memory-channel
```
## 启动flume
```
flume-ng agent --name simple-agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/flume_pull_streaming.conf -Dflume.root.logger=INFO,console &
```
## 启动telnet
```
[bizzbee@spark ~]$ telnet spark 44444
Trying 192.168.31.70...
Connected to spark.
Escape character is '^]'.
```
## 运行streaming 代码
* 运行参数是: spark 41414
* linux主机名 和flume sink的端口
~~~
object FlumePullWordCount {
def main(args: Array[String]): Unit = {
if(args.length != 2) {
System.err.println("Usage: FlumePushWordCount <hostname> <port>")
System.exit(1)
}
val Array(hostname, port) = args
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePullWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
//TODO... 如何使用SparkStreaming整合Flume
val flumeStream = FlumeUtils.createPollingStream(ssc, hostname, port.toInt)
flumeStream.map(x=> new String(x.event.getBody.array()).trim)
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
~~~
## 向spark主机44444端口发送数据即可。
## 生产环境运行
* 代码中注释掉setMaster。
* 打包。
```
wangyijiadeMacBook-Air:sparktrain bizzbee$ mvn clean package -DskipTests
```
* 上传。
```
scp spark-train-1.0.jar bizzbee@192.168.31.70:~/lib/
```
* 服务器端启动streaming