## updateStateByKey
除了能够支持 RDD 的算子外,DStream 还有部分独有的*transformation*算子,这当中比较常用的是 `updateStateByKey`。文章开头的词频统计程序,只能统计每一次输入文本中单词出现的数量,想要统计所有历史输入中单词出现的数量,可以使用 `updateStateByKey` 算子。代码如下:
~~~scala
object NetworkWordCountV2 {
def main(args: Array[String]) {
/*
* 本地测试时最好指定 hadoop 用户名,否则会默认使用本地电脑的用户名,
* 此时在 HDFS 上创建目录时可能会抛出权限不足的异常
*/
System.setProperty("HADOOP_USER_NAME", "root")
val sparkConf = new SparkConf().setAppName("NetworkWordCountV2").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
/*必须要设置检查点*/
ssc.checkpoint("hdfs://hadoop001:8020/spark-streaming")
val lines = ssc.socketTextStream("hadoop001", 9999)
lines.flatMap(_.split(" ")).map(x => (x, 1))
.updateStateByKey[Int](updateFunction _) //updateStateByKey 算子
.print()
ssc.start()
ssc.awaitTermination()
}
/**
* 累计求和
*
* @param currentValues 当前的数据
* @param preValues 之前的数据
* @return 相加后的数据
*/
def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
val current = currentValues.sum
val pre = preValues.getOrElse(0)
Some(current + pre)
}
}
~~~
使用 `updateStateByKey` 算子,你必须使用 `ssc.checkpoint()` 设置检查点,这样当使用 `updateStateByKey` 算子时,它会去检查点中取出上一次保存的信息,并使用自定义的 `updateFunction` 函数将上一次的数据和本次数据进行相加,然后返回。
## 处理文件系统的数据
~~~
object FileWordCount{
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[3]").setAppName("FileWordCount")
val ssc = new StreamingContext(sparkConf,Seconds(5));
var lines = ssc.textFileStream("/Users/bizzbee/Desktop/work/projects/sparktrain/ss")
val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_);
result.print()
ssc.start()
ssc.awaitTermination()
}
}
~~~
* 这里是**监控ss目录下新增的文件**。要从别的地方移过来,不能直接在里面写。
* 结果打印:
![](https://img.kancloud.cn/f9/74/f974618e512ef6d787d1d758dc2d0bf2_171x240.png)
* 文件必须相同的格式。
## foreachRDD 将每次的RDD内容放进数据库
~~~
object ForeachRDDApp{
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.socketTextStream("localhost", 6789)
val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
result.print()
result.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
val connection = createConnection()
partitionOfRecords.foreach(record => {
val sql = "insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")"
connection.createStatement().execute(sql)
})
connection.close()
})
})
ssc.start()
ssc.awaitTermination()
}
/**
* 获取MySQL的连接
*/
def createConnection() = {
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://localhost:3306/stark", "root", "934158")
}
}
~~~
![](https://img.kancloud.cn/b2/9c/b29caee14401623e7dbbdfa1d7557b2c_227x204.png)
* 存在问题:
![](https://img.kancloud.cn/1a/34/1a346093196377b556bfe4c74ff7b8a9_1015x222.png)
## 窗口的DStream
![](https://img.kancloud.cn/0c/7a/0c7a2234057597fb0944c5067d8093c3_994x388.png)
* *窗口长度* - The duration of the window (3 in the figure).
* *窗口间隔* - The interval at which the window operation is performed (2 in the figure).
~~~scala
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
~~~
##黑名单处理
![](https://img.kancloud.cn/f9/fa/f9fa5af79846265e411885c3c9ae5c55_803x275.png)
![](https://img.kancloud.cn/55/50/5550ed4c49c21dde18fc717494baf9d1_629x205.png)
~~~
object TranformApp{
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
/**
* 创建StreamingContext需要两个参数:SparkConf和batch interval
*/
val ssc = new StreamingContext(sparkConf, Seconds(5))
/**
* 构建黑名单
*/
val blacks = List("wade", "james")
val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x => (x, true))
val lines = ssc.socketTextStream("localhost", 6789)
val clicklog = lines.map(x => (x.split(",")(1), x)).transform(rdd => {
rdd.leftOuterJoin(blacksRDD)
.filter(x=> x._2._2.getOrElse(false) != true)
.map(x=>x._2._1)
})
clicklog.print()
ssc.start()
ssc.awaitTermination()
}
}
~~~
![](https://img.kancloud.cn/d3/f8/d3f8565497301395f9abf9722cab31fc_154x165.png)