ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
## Push的方式 * 从配置文件中拷贝一份配置进行修改。 ``` cp exec-memory-avro.conf flume_push_streaming.conf ``` * 配置文件内容 ``` simple-agent.sources = netcat-source simple-agent.sinks = avro-sink simple-agent.channels = memory-channel simple-agent.sources.netcat-source.type = netcat #数据的来源 # 要从本机的44444端口获取数据 simple-agent.sources.netcat-source.bind = spark simple-agent.sources.netcat-source.port = 44444 simple-agent.sinks.avro-sink.type = avro #接受数据的机器 # 实验时为我的开发机器mac的地址。 simple-agent.sinks.avro-sink.hostname = 192.168.31.131 simple-agent.sinks.avro-sink.port = 41414 #simple-agentwhich buffers events in memory simple-agent.channels.memory-channel.type = memory #simple-agente and sink to the channel simple-agent.sources.netcat-source.channels = memory-channel simple-agent.sinks.avro-sink.channel = memory-channel ``` * 在开发机mac上开发 ~~~ object FlumePushWordCount { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount"); val ssc = new StreamingContext(sparkConf,Seconds(5)); val flumeStream = FlumeUtils.createStream(ssc,"0.0.0.0",41414) flumeStream.map(x=>new String(x.event.getBody.array()).trim).flatMap(_.split(" ")).map((_,1)) .reduceByKey(_+_).print(); ssc.start(); ssc.awaitTermination(); } } ~~~ * 然后启动,会坚挺开发机的41414端口。 * 启动linux上的flume。 ``` flume-ng agent --name simple-agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/flume_push_streaming.conf -Dflume.root.logger=INFO,console & ``` * 在linux机器上向本机44444端口传输数据。 * 注意机器名字要用配置文件中的。(不然连不上,暂不知原因) [bizzbee@spark conf]$ telnet spark 44444 Trying 192.168.31.70... Connected to spark. Escape character is '^]'. ### 提交到生产运行 * 打包 ``` mvn clean package -DskipTests ``` * 上传 ``` scp spark-train-1.0.jar bizzbee@192.168.31.70:~/lib ``` * 修改flume配置文件 ``` simple-agent.sinks.avro-sink.hostname = spark ``` * 启动应用程序 * 这里--packages参数第一个是需要下载的依赖,会较长时间下载。 ``` spark-submit --class com.bizzbee.spark.streaming.FlumePushWordCount --master local[2] --packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 spark-train-1.0.jar spark 41414 ``` * 启动flume ``` flume-ng agent --name simple-agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/flume_push_streaming.conf -Dflume.root.logger=INFO,console & ``` * 发送数据到44444端口 ``` [bizzbee@spark conf]$ telnet spark 44444 Trying 192.168.31.70... Connected to spark. Escape character is '^]'. 啊 OK s OK s OK OK ``` ``` ------------------------------------------- Time: 1587472795000 ms ------------------------------------------- (sk,2) (ks,3) ('kk,1) (k,6) (jk,1) ```