💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
* 需求:使用Spark Streaming+Spark SQL完成WordCount * 分析:将每个RDD转换为DataFrame (1)编写Streaming程序 ```scala import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @Date 2021/2/8 */ object NetWorkSQLWordCount { def main(args: Array[String]): Unit = { /** ********* 1. 创建StreamingContext ************/ val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[4]") val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() import spark.implicits._ // Seconds(5)是批处理间隔,即将5秒内新收集的数据作为一个单位进行处理 val ssc = new StreamingContext(spark.sparkContext, Seconds(5)) /** ********* 2. 加载数据 ************/ // socketTextStream(hostname, port, StorageLevel) val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop101", 9999, StorageLevel.MEMORY_AND_DISK_SER_2) /** ************ 3. 进行统计 **************/ val words: DStream[String] = lines.flatMap(_.split("\\s+")) words.foreachRDD(rdd => { if (rdd.count() != 0) { val df: DataFrame = rdd.map(x => Word(x)).toDF() df.createOrReplaceTempView("tb_word") spark.sql("select word,count(1) from tb_word group by word").show() } }) /** ************ 4. 启动Streaming程序 **************/ ssc.start() /** *********** 5. 等待应用程序终止 ****************/ ssc.awaitTermination() } // 定义一个样例类, 用于转换为DF case class Word(word: String) } ``` (2)启动`nc`并输入一些单词 ```shell [root@hadoop101 /]# nc -lk 9999 python python java javascript python ``` (3)启动上面的Streaming程序,打印出如下信息 ```txt +----------+--------+ | word|count(1)| +----------+--------+ |javascript| 1| | java| 1| | python| 3| +----------+--------+ ```