💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
(1)编写Streaming程序 ```scala import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @Date 2021/01/20 */ object HDFSWordCount { def main(args: Array[String]): Unit = { /** ********* 1. 创建StreamingContext ************/ val conf: SparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getName) // Seconds(5)是批处理间隔,即将5秒内新收集的数据作为一个单位进行处理 val ssc: StreamingContext = new StreamingContext(conf, Seconds(5)) /** ********* 2. 加载数据 ************/ // 注意:textFileStream参数只能是目录,不能指定具体的文件 // 一行数据为DStream的一个元素 val lines: DStream[String] = ssc.textFileStream("hdfs://hadoop101:9000/spark/input") // val lines: DStream[String] = ssc.textFileStream("file:///E:\\hadoop\\input") // lines.count().print() /** ********* 3. 使用DSteam的各种算子进行计算 ************/ val result: DStream[(String, Int)] = lines.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _) result.print() // 打印输出 /** ********* 4. 启动Streaming程序 ************/ ssc.start() /** *********** 5. 等待应用程序终止 ****************/ // 或者调用ssc.stop()直接停止当前的Streaming程序 ssc.awaitTermination() } } ``` (2)启动上面的Streaming程序 (3)往HDFS`/spark-streaming/input`目录上传文件 ```shell [root@hadoop101 hadoop]# bin/hdfs dfs -mkdir /spark-streaming/input [root@hadoop101 hadoop]# bin/hdfs dfs -put /test-data/words1.txt /spark-streaming/input ``` (4)程序将打印出如下信息 ```txt ------------------------------------------- Time: 1612769255000 ms ------------------------------------------- (python,2) (hadoop,1) (hello,2) (kafka,1) ```