多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
**和push方式相反,pull方式要先启动flume,然后启动spark streaming** ## 配置文件 ``` simple-agent.sources = netcat-source simple-agent.sinks = spark-sink simple-agent.channels = memory-channel simple-agent.sources.netcat-source.type = netcat simple-agent.sources.netcat-source.bind = spark simple-agent.sources.netcat-source.port = 44444 simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink simple-agent.sinks.spark-sink.hostname = spark simple-agent.sinks.spark-sink.port = 41414 simple-agent.channels.memory-channel.type = memory simple-agent.sources.netcat-source.channels = memory-channel simple-agent.sinks.spark-sink.channel = memory-channel ``` ## 启动flume ``` flume-ng agent --name simple-agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/flume_pull_streaming.conf -Dflume.root.logger=INFO,console & ``` ## 启动telnet ``` [bizzbee@spark ~]$ telnet spark 44444 Trying 192.168.31.70... Connected to spark. Escape character is '^]'. ``` ## 运行streaming 代码 * 运行参数是: spark 41414 * linux主机名 和flume sink的端口 ~~~ object FlumePullWordCount { def main(args: Array[String]): Unit = { if(args.length != 2) { System.err.println("Usage: FlumePushWordCount <hostname> <port>") System.exit(1) } val Array(hostname, port) = args val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePullWordCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) //TODO... 如何使用SparkStreaming整合Flume val flumeStream = FlumeUtils.createPollingStream(ssc, hostname, port.toInt) flumeStream.map(x=> new String(x.event.getBody.array()).trim) .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } } ~~~ ## 向spark主机44444端口发送数据即可。 ## 生产环境运行 * 代码中注释掉setMaster。 * 打包。 ``` wangyijiadeMacBook-Air:sparktrain bizzbee$ mvn clean package -DskipTests ``` * 上传。 ``` scp spark-train-1.0.jar bizzbee@192.168.31.70:~/lib/ ``` * 服务器端启动streaming