### Direct方式
这种方法不使用接收者来接收数据,而是定期查询Kafka在每个主题+分区中的最新偏移量,并相应地定义每个批处理中的偏移范围。
* 编码
**运行参数**
```
spark:9092 kafka_streaming_topic
```
~~~
object KafkaDirectWordCount {
def main(args: Array[String]): Unit = {
if(args.length != 2) {
System.err.println("Usage: KafkaDirectWordCount <brokers> <topics>")
System.exit(1)
}
val Array(brokers, topics) = args
val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("metadata.broker.list"-> brokers)
val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
ssc,kafkaParams,topicsSet
)
messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
~~~
* 打包-上传
* 启动streaming
```
spark-submit --class com.bizzbee.spark.streaming.KafkaDirectWordCount --master local[2] --name KafkaDirectWordCount --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 /home/bizzbee/lib/spark-train-1.0.jar spark:9092 kafka_streaming_topic
```