💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
### Direct方式 这种方法不使用接收者来接收数据,而是定期查询Kafka在每个主题+分区中的最新偏移量,并相应地定义每个批处理中的偏移范围。 * 编码 **运行参数** ``` spark:9092 kafka_streaming_topic ``` ~~~ object KafkaDirectWordCount { def main(args: Array[String]): Unit = { if(args.length != 2) { System.err.println("Usage: KafkaDirectWordCount <brokers> <topics>") System.exit(1) } val Array(brokers, topics) = args val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount") .setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) val topicsSet = topics.split(",").toSet val kafkaParams = Map[String,String]("metadata.broker.list"-> brokers) val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder]( ssc,kafkaParams,topicsSet ) messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } } ~~~ * 打包-上传 * 启动streaming ``` spark-submit --class com.bizzbee.spark.streaming.KafkaDirectWordCount --master local[2] --name KafkaDirectWordCount --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 /home/bizzbee/lib/spark-train-1.0.jar spark:9092 kafka_streaming_topic ```