💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
输入1: TCP 安装netcat nc -l -p 9999 代码 ~~~ def main(args: Array[String]): Unit = { val conf=new SparkConf().setMaster("local").setAppName("WordCount") val ssc=new StreamingContext(conf,Seconds(10)); //通过 ssc 创建 Dstream 对象,得到 lines 变量从服务器获得数据,每条记录表示一行文本数据 val lines=ssc.socketTextStream("localhost",9999) val words=lines.flatMap(_.split(" ")) //统计单词个数,words 由 map+reduceByKey 两个操作,pairs 类型 DStream import org.apache.spark.streaming.StreamingContext._ val pairs=words.map(w=>(w,1)) val wordCounts=pairs.reduceByKey(_+_) wordCounts.print() //启动上面计算并等待终止 ssc.start() ssc.awaitTermination() } ~~~ 输入2:文件 ~~~ def main(args: Array[String]): Unit = { println("heelllsssssssssssssssssssssssssssssssssssssss") val conf=new SparkConf().setMaster("local").setAppName("WordCount") val ssc=new StreamingContext(conf,Seconds(10)); //通过 ssc 创建 Dstream 对象,得到 lines 变量从服务器获得数据,每条记录表示一行文本数据 val lines=ssc.textFileStream("d:/aa") val words=lines.flatMap(_.split(" ")) //统计单词个数,words 由 map+reduceByKey 两个操作,pairs 类型 DStream import org.apache.spark.streaming.StreamingContext._ val pairs=words.map(w=>(w,1)) val wordCounts=pairs.reduceByKey(_+_) wordCounts.print() //启动上面计算并等待终止 ssc.start() ssc.awaitTermination() } ~~~ 输入3:kafaka 1) 启动zookeeper zkServer.sh start 查看zookeeper状态 zkServer.sh status 2) 启动kafka bin/kafka-server-start.sh –daemon config/server.properties 3) 创建kafka topic bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 2 --partitions 4 --topic test_1 bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic test_1 ![](https://box.kancloud.cn/4c2987f3c2ae07507d113b8460a13983_576x61.png) 查看topic bin/kafka-topics.sh --list --zookeeper master:2181 4) 启动生产者消费者 在启动 kafka-server 之后启动,运行producer: $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_1 在另一个终端运行 consumer: $ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_1 5) 使用程序替换消费者 ~~~ import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Duration, StreamingContext} def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("kafka-spark-demo") val scc = new StreamingContext(sparkConf, Duration(5000)) scc.sparkContext.setLogLevel("ERROR") scc.checkpoint(".") // 因为使用到了updateStateByKey,所以必须要设置checkpoint val topics = Set("test") //我们需要消费的kafka数据的topic val brokers = "192.168.80.3:9092" val kafkaParam = Map[String, String]( "zookeeper.connect" -> "192.168.80.3:2181", // "group.id" -> "test-consumer-group", "metadata.broker.list" -> brokers, // kafka的broker list地址 "serializer.class" -> "kafka.serializer.StringEncoder" ) val stream:InputDStream[(String, String)] = createStream(scc, kafkaParam, topics) stream.map(_._2) //取出value .flatMap(_.split(" ")) // 将字符串使用空格分隔 .map(r => (r, 1)) // 每个单词映射成一个pair .updateStateByKey[Int](updateFunc) // 用当前batch的数据区更新已有的数据 .print() // 打印前10个数据 scc.start() // 真正启动程序 scc.awaitTermination() //阻塞等待 } val updateFunc = (currentValues: Seq[Int], preValue: Option[Int]) => { val curr = currentValues.sum val pre = preValue.getOrElse(0) Some(curr + pre) } def createStream(scc: StreamingContext, kafkaParam: Map[String, String], topics: Set[String]) = { KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](scc, kafkaParam, topics) } ~~~