ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
因为 Kafka 项目在 0.8 和 0.10 版本之间引入了新的消费者 api,因此有两个单独的相应 Spark Streaming 包: ➢ spark-streaming-kafka-0-8 ➢ spark-streaming-kafka-0-10 注意选择正确的包,spark-streaming-kafka-0-8 与 Kafka Brokers 0.9 和 0.10 +兼容,但spark-streaming-kafka-0-10与Kafka Brokers 0.10之前版本不兼容。总之,向后兼容。具体区别如下图所示。 :-: ![](https://img.kancloud.cn/e4/e4/e4e495c80c535b2082c2bbfe10bd887d_957x358.png) spark-streaming-kafka-0-8 与 spark-streaming-kafka-0-10 区别 本次示例以0.10版本为例: ```xml <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.4.4</version> </dependency> ``` Spark Streaming 也提供了两种方式对接 Kafka 数据源:Receiver 和 Direct。 [TOC] # 1. Receiver 方式(了解) Receiver 是最早的方式。Receiver 方式通过 Receiver 来获取数据,是使用Kafka 的 High Level Consumer API 来实现的。Receiver 将 Kafka 数据源中获取的数据存储在 Spark Executor 的内存中,然后 Spark Streaming 启动的 job 会去处理那些数据。但是,在默认的配置下,这种方式可能会因为底层的故障而丢失数据。如果要启用高可靠机制,让数据零丢失,就<ins>必须启用 Spark Streaming 的预写日志机制(Write Ahead Log,WAL)</ins>。该机制会同步地将接收到的 Kafka 数据写入分布式文件系统(比如 HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。<br/> Receiver 方 式 只 有 在 streaming-kafka-0-8 才有支持 , 在较新的streaming-kafka-0-10 已经不再支持。 <br/> # 2. Direct 方式(无 Receiver)(掌握) (1)编写Spark Streaming程序 ```scala import org.apache.kafka.clients.consumer.{ConsumerRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @Date 2021/2/8 */ object KafkaWordCount { def main(args: Array[String]): Unit = { /** ********* 1. 创建StreamingContext ************/ val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[4]") // Seconds(5)是批处理间隔,即将5秒内新收集的数据作为一个单位进行处理 val ssc = new StreamingContext(conf, Seconds(5)) /** ********* 2. 准备连接kafka时的一些参数 ************/ val topices = "test1,test2" val topicsSet: Set[String] = topices.split(",").toSet val kafkaParams = Map[String, String]( "bootstrap.servers" -> "hadoop101:9092", "group.id" -> "testGroup1", "enable.auto.commit" -> "true", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer" ) /** ********* 3. 使用Kafka直连方式 ************/ val messages: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, // 分配策略 ConsumerStrategies.Subscribe(topicsSet, kafkaParams) // 订阅方式, 是按照topic订阅还是按照分区订阅 ) /** ********* 4. 加载数据 ************/ // 将从kafka中获取到的数据取出value值 val lines: DStream[String] = messages.map(_.value()) /** ************ 5. 进行统计 **************/ val result: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) result.print() // 打印输出 /** ********* 6. 启动Streaming程序 ************/ ssc.start() /** *********** 7. 等待应用程序终止 ****************/ ssc.awaitTermination() } } ``` (2)启动Kafka ```shell -- 启动Kafka [root@hadoop101 kafka]# bin/kafka-server-start.sh -daemon config/server.properties [root@hadoop101 kafka]# jps 3555 Kafka 3622 Jps ``` (3)创建主题并生产数据 ```shell -- 创建主题 [root@hadoop101 kafka]# bin/kafka-topics.sh --create --topic test1 --zookeeper hadoop101:2181 --replication-factor 1 --partitions 3 -- 启动kafka生产者并生产数据 [root@hadoop101 kafka]# bin/kafka-console-producer.sh --broker-list hadoop101:9092 --topic test1 >hello python hello ``` (4)然后Spark Streaming打印出如下信息 ```txt ------------------------------------------- Time: 1612781510000 ms ------------------------------------------- (python,1) (hello,2) ```