## 基于Receiver的整合
* 首先要启动zookeeper
* 启动kafka
```
[bizzbee@spark bin]$ ./kafka-server-start.sh -daemon /home/bizzbee/app/kafka_2.11-2.1.1/config/server.properties
```
* 创建Topic
```
[bizzbee@spark bin]$ ./kafka-topics.sh --create --zookeeper spark:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming_topic
```
* 查看现有的topic
```
[bizzbee@spark bin]$ ./kafka-topics.sh --list --zookeeper spark:2181
__consumer_offsets
bizzbee
bizzbee-replicated-topic
bizzbee-topic
bizzbee_topic
jjj
kafka_steaming_topic
```
* 控制台创建生产者
```
[bizzbee@spark bin]$ ./kafka-console-producer.sh --broker-list spark:9092 --topic kafka_streaming_topic
>
```
* 控制台消费者
```
kafka-console-consumer.sh --bootstrap-server spark:9092 --topic kafka_streaming_topic
//老版本消费者
kafka-console-consumer.sh --zookeeper spark:2181 --topic kafka_streaming_topic
```
* 编程。
**使用kafka0.8以上的整合包**
~~~
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.2.0
~~~
**参数配置**
```
spark:2181 test kafka_streaming_topic 1
```
~~~
def main(args: Array[String]): Unit = {
if(args.length != 4) {
System.err.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
// TODO... Spark Streaming如何对接Kafka
val messages = KafkaUtils.createStream(ssc, zkQuorum, group,topicMap)
//(null,ddd ddd ddd)
println(messages.print())
//ddd ddd ddd
println(messages.map(_._2).print())
//aaa
//aaa
//aaa
//aaa
//aaa
println(messages.map(_._2).flatMap(_.split(" ")).print())
//(aaa,1)
//(aaa,1)
//(aaa,1)
//(aaa,1)
//(aaa,1)
println(messages.map(_._2).flatMap(_.split(" ")).map((_,1)).print())
// TODO... 自己去测试为什么要取第二个
//(aaa,5)
messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
~~~
* 然后在控制台生产者发送消息。
### 打包服务器运行
* 打包
```
mvn clean package -DskipTests
```
* 上传
```
scp target/spark-train-1.0.jar bizzbee@spark:~/lib
```
* 启动spark streaming
```
spark-submit --class com.bizzbee.spark.streaming.KafkaReceiverWordCount --master local[2] --name KafkaReceiverWordCount --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 /home/bizzbee/lib/spark-train-1.0.jar spark:2181 test kafka_streaming_topic 1
```
* 查看streaming 的UI界面。
**在服务区4040端口**
![](https://img.kancloud.cn/0c/6c/0c6c43485aae51f5f018335ad610b34a_1006x476.png)
* [ ] 啊