********前面 Transformationt 算子的测试都是在本地开发环境中直接跑代码,这里 Actions 算子的测试主要在 spark-shell 中进行操作。需要说明的 Actions 算子如下:
#### 下面来具体说明:
**(1)reduce**
通过函数 func 聚集数据集中的所有元素。Func 函数接受 2 个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行。
关于 reduce 的执行过程,可以对比 scala 中类似的 reduce 函数。
~~~
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:29
scala> val ret = listRDD.reduce((v1, v2) => v1 + v2)
...
ret: Int = 21
~~~
注意:需要注意的是,不同于 Transformation 算子,其结果仍然是 RDD,但是`执行Actions算子之后,其结果不再是RDD,而是一个标量。`
**(2)collect**
在 Driver 的程序中,以数组的形式,返回数据集的所有元素。**这通常会在使用 filter 或者其它操作后**,返回一个足够小的数据子集再使用,直接将整个 RDD 集 Collect 返回,很可能会让 Driver 程序 OOM,这点尤其需要注意。
~~~
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:29
scala> val ret = listRDD.collect()
...
ret: Array[Int] = Array(1, 2, 3, 4, 5, 6)
~~~
**(3)count**
返回数据集的元素个数
~~~
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:29
scala> val ret = listRDD.count()
...
ret: Long = 6
~~~
**(4)take**
返回一个数组,由数据集的前 n 个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是 Driver 程序所在机器,单机计算所有的元素 (Gateway 的内存压力会增大,需要谨慎使用)。
~~~
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:29
scala> listRDD.take(3)
...
res7: Array[Int] = Array(1, 2, 3)
~~~
**(5)first**
返回数据集的第一个元素(类似于 take(1))
~~~
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:29
scala> listRDD.first()
...
res8: Int = 1
~~~
**(6)saveAsTextFile**
将数据集的元素,以 textfile 的形式,保存到本地文件系统,hdfs 或者任何其它 hadoop 支持的文件系统。Spark 将会调用每个元素的 toString 方法,并将它转换为文件中的`一行文本`。
~~~
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:29
scala> listRDD.saveAsTextFile("file:///home/uplooking/data/spark/action")
...
~~~
可以在文件系统中查看到保存的文件:
~~~
[root@WGH action]$ pwd
/home/uplooking/data/spark/action
[root@WGH action]$ ls
part-00000 part-00001 part-00002 part-00003 _SUCCESS
~~~
其实可以看到,保存的跟 Hadoop 的格式是一样的。
当然因为我的 spark 集群中已经做了跟 hadoop 相关的配置,所以也可以把文件保存到 hdfs 中:
~~~
scala> listRDD.saveAsTextFile("hdfs://ns1/output/spark/action")
...
~~~
然后就可以在 hdfs 中查看到保存的文件:
~~~
[root@WGH action]$ hdfs dfs -ls /output/spark/action
18/04/27 10:27:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 5 items
-rw-r--r-- 3 root supergroup 0 2018-04-27 10:25 /output/spark/action/_SUCCESS
-rw-r--r-- 3 root supergroup 2 2018-04-27 10:25 /output/spark/action/part-00000
-rw-r--r-- 3 root supergroup 4 2018-04-27 10:25 /output/spark/action/part-00001
-rw-r--r-- 3 root supergroup 2 2018-04-27 10:25 /output/spark/action/part-00002
-rw-r--r-- 3 root supergroup 4 2018-04-27 10:25 /output/spark/action/part-00003
~~~
可以看到,保存的格式跟保存到本地文件系统是一样的。
**(7)foreach**
在数据集的每一个元素上,运行函数 func。这通常用于更新一个累加器变量,或者和外部存储系统做交互。
~~~
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:29
scala> listRDD.foreach(println)
...
~~~
**(8)saveAsNewAPIHadoopFile**
也就是将数据保存到 Hadoop HDFS 中,但是需要注意的是,前面使用 saveAsTextFile 也可以进行相关操作,其使用的就是 saveAsNewAPIHadoopFile 或者 saveAsHadoopFile 这两个 API,而其两者的区别是:
saveAsHadoopFile 的 OutputFormat 使用的:org.apache.hadoop.mapred 中的早期的类
saveAsNewAPIHadoopFile 的 OutputFormat 使用的:org.apache.hadoop.mapreduce 中的新的类。但不管使用哪一个,都是可以完成工作的。
测试代码如下:
~~~
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.{SparkConf, SparkContext}
/**
* Spark算子操作之Action
* saveAsNewAPIHAdoopFile
* * saveAsHadoopFile
* 和saveAsNewAPIHadoopFile的唯一区别就在于OutputFormat的不同
* saveAsHadoopFile的OutputFormat使用的:org.apache.hadoop.mapred中的早期的类
* saveAsNewAPIHadoopFile的OutputFormat使用的:org.apache.hadoop.mapreduce中的新的类
* 使用哪一个都可以完成工作
*
* 前面在使用saveAsTextFile时也可以保存到hadoop文件系统中,注意其源代码也是使用上面的操作的
*
* Caused by: java.net.UnknownHostException: ns1
... 35 more
找不到ns1,因为我们在本地没有配置,无法正常解析,就需要将hadoop的配置文件信息给我们加载进来
hdfs-site.xml.heihei,core-site.xml.heihei
*/
object _05SparkActionOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_05SparkActionOps.getClass.getSimpleName)
val sc = new SparkContext(conf)
val list = List("hello you", "hello he", "hello me")
val listRDD = sc.parallelize(list)
val pairsRDD = listRDD.map(word => (word, 1))
val retRDD = pairsRDD.reduceByKey((v1, v2) => v1 + v2)
retRDD.saveAsNewAPIHadoopFile(
"hdfs://ns1/spark/action", // 保存的路径
classOf[Text], // 相当于mr中的k3
classOf[IntWritable], // 相当于mr中的v3
classOf[TextOutputFormat[Text, IntWritable]] // 设置(k3, v3)的outputFormatClass
)
}
}
~~~
之后我们可以在 hdfs 中查看到相应的文件输出:
~~~
[root@WGH ~]$ hdfs dfs -ls /spark/action
18/04/27 12:07:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 3 items
-rw-r--r-- 3 Administrator supergroup 0 2018-04-27 12:07 /spark/action/_SUCCESS
-rw-r--r-- 3 Administrator supergroup 13 2018-04-27 12:07 /spark/action/part-r-00000
-rw-r--r-- 3 Administrator supergroup 11 2018-04-27 12:07 /spark/action/part-r-00001
[root@WGH ~]$ hdfs dfs -text /spark/action/part-r-00000
18/04/27 12:08:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
hello 3
me 1
[root@WGH ~]$ hdfs dfs -text /spark/action/part-r-00001
18/04/27 12:08:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
you 1
he 1
~~~
## Actions
下面的表格列了 Spark 支持的一些常用 actions。详细内容请参阅 RDD API 文档([Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD), [Java](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaRDD.html), [Python](https://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html)) 和 PairRDDFunctions 文档([Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions), [Java](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html))。
**Action 动作算子**:
| 动作算子 | 含义 |
| --------------------------------------- | ------------------------------------------------------------ |
| reduce(func) | 通过 func 函数聚集 RDD 中的所有元素,这个功能必须是可交换且可并联的 |
| collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
| count() | 返回 RDD 的元素个数 |
| first() | 返回 RDD 的第一个元素 (类似于 take(1)) |
| take(n) | 返回一个由数据集的前 n 个元素组成的数组 |
| takeSample(withReplacement,num, [seed]) | 返回一个数组,该数组由从数据集中随机采样的 num 个元素组成,可以选择是否用随机数替换不足的部分,seed 用于指定随机数生成器种子 |
| takeOrdered(n, [ordering]) | 返回自然顺序或者自定义顺序的前 n 个元素 |
| **saveAsTextFile**(path) | 将数据集的元素以 textfile 的形式保存到 HDFS 文件系统或者其他支持的文件系统,对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文本 |
| **saveAsSequenceFile**(path) | 将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使 HDFS 或者其他 Hadoop 支持的文件系统 |
| saveAsObjectFile(path) | 将数据集的元素,以 Java 序列化的方式保存到指定的目录下 |
| **countByKey**() | 针对 (K,V) 类型的 RDD,返回一个 (K,Int) 的 map,表示每一个 key 对应的元素个数 |
| foreach(func) | 在数据集的每一个元素上,运行函数 func 进行更新 |
| **foreachPartition**(func) | 在数据集的每一个分区上,运行函数 func |
**统计操作**:
| 算子 | 含义 |
| -------------- | -------------------------- |
| count | 个数 |
| mean | 均值 |
| sum | 求和 |
| max | 最大值 |
| min | 最小值 |
| variance | 方差 |
| sampleVariance | 从采样中计算方差 |
| stdev | 标准差: 衡量数据的离散程度 |
| sampleStdev | 采样的标准差 |
| stats | 查看统计结果 |
- Introduction
- 快速上手
- Spark Shell
- 独立应用程序
- 开始翻滚吧!
- RDD编程基础
- 基础介绍
- 外部数据集
- RDD 操作
- 转换Transformations
- map与flatMap解析
- 动作Actions
- RDD持久化
- RDD容错机制
- 传递函数到 Spark
- 使用键值对
- RDD依赖关系与DAG
- 共享变量
- Spark Streaming
- 一个快速的例子
- 基本概念
- 关联
- 初始化StreamingContext
- 离散流
- 输入DStreams
- DStream中的转换
- DStream的输出操作
- 缓存或持久化
- Checkpointing
- 部署应用程序
- 监控应用程序
- 性能调优
- 减少批数据的执行时间
- 设置正确的批容量
- 内存调优
- 容错语义
- Spark SQL
- 概述
- SparkSQLvsHiveSQL
- 数据源
- RDDs
- parquet文件
- JSON数据集
- Hive表
- 数据源例子
- join操作
- 聚合操作
- 性能调优
- 其他
- Spark SQL数据类型
- 其它SQL接口
- 编写语言集成(Language-Integrated)的相关查询
- GraphX编程指南
- 开始
- 属性图
- 图操作符
- Pregel API
- 图构造者
- 部署
- 顶点和边RDDs
- 图算法
- 例子
- 更多文档
- 提交应用程序
- 独立运行Spark
- 在yarn上运行Spark
- Spark配置
- RDD 持久化