ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
## 基于Receiver的整合 * 首先要启动zookeeper * 启动kafka ``` [bizzbee@spark bin]$ ./kafka-server-start.sh -daemon /home/bizzbee/app/kafka_2.11-2.1.1/config/server.properties ``` * 创建Topic ``` [bizzbee@spark bin]$ ./kafka-topics.sh --create --zookeeper spark:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming_topic ``` * 查看现有的topic ``` [bizzbee@spark bin]$ ./kafka-topics.sh --list --zookeeper spark:2181 __consumer_offsets bizzbee bizzbee-replicated-topic bizzbee-topic bizzbee_topic jjj kafka_steaming_topic ``` * 控制台创建生产者 ``` [bizzbee@spark bin]$ ./kafka-console-producer.sh --broker-list spark:9092 --topic kafka_streaming_topic > ``` * 控制台消费者 ``` kafka-console-consumer.sh --bootstrap-server spark:9092 --topic kafka_streaming_topic //老版本消费者 kafka-console-consumer.sh --zookeeper spark:2181 --topic kafka_streaming_topic ``` * 编程。 **使用kafka0.8以上的整合包** ~~~ groupId = org.apache.spark artifactId = spark-streaming-kafka-0-8_2.11 version = 2.2.0 ~~~ **参数配置** ``` spark:2181 test kafka_streaming_topic 1 ``` ~~~ def main(args: Array[String]): Unit = { if(args.length != 4) { System.err.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>") } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount") .setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap // TODO... Spark Streaming如何对接Kafka val messages = KafkaUtils.createStream(ssc, zkQuorum, group,topicMap) //(null,ddd ddd ddd) println(messages.print()) //ddd ddd ddd println(messages.map(_._2).print()) //aaa //aaa //aaa //aaa //aaa println(messages.map(_._2).flatMap(_.split(" ")).print()) //(aaa,1) //(aaa,1) //(aaa,1) //(aaa,1) //(aaa,1) println(messages.map(_._2).flatMap(_.split(" ")).map((_,1)).print()) // TODO... 自己去测试为什么要取第二个 //(aaa,5) messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } ~~~ * 然后在控制台生产者发送消息。 ### 打包服务器运行 * 打包 ``` mvn clean package -DskipTests ``` * 上传 ``` scp target/spark-train-1.0.jar bizzbee@spark:~/lib ``` * 启动spark streaming ``` spark-submit --class com.bizzbee.spark.streaming.KafkaReceiverWordCount --master local[2] --name KafkaReceiverWordCount --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 /home/bizzbee/lib/spark-train-1.0.jar spark:2181 test kafka_streaming_topic 1 ``` * 查看streaming 的UI界面。 **在服务区4040端口** ![](https://img.kancloud.cn/0c/6c/0c6c43485aae51f5f018335ad610b34a_1006x476.png) * [ ] 啊