企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
********前面 Transformationt 算子的测试都是在本地开发环境中直接跑代码,这里 Actions 算子的测试主要在 spark-shell 中进行操作。需要说明的 Actions 算子如下: #### 下面来具体说明: **(1)reduce** 通过函数 func 聚集数据集中的所有元素。Func 函数接受 2 个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行。 关于 reduce 的执行过程,可以对比 scala 中类似的 reduce 函数。 ~~~ scala> val list = List(1, 2, 3, 4, 5, 6) list: List[Int] = List(1, 2, 3, 4, 5, 6) scala> val listRDD = sc.parallelize(list) listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:29 scala> val ret = listRDD.reduce((v1, v2) => v1 + v2) ... ret: Int = 21 ~~~ 注意:需要注意的是,不同于 Transformation 算子,其结果仍然是 RDD,但是`执行Actions算子之后,其结果不再是RDD,而是一个标量。` **(2)collect** 在 Driver 的程序中,以数组的形式,返回数据集的所有元素。**这通常会在使用 filter 或者其它操作后**,返回一个足够小的数据子集再使用,直接将整个 RDD 集 Collect 返回,很可能会让 Driver 程序 OOM,这点尤其需要注意。 ~~~ scala> val list = List(1, 2, 3, 4, 5, 6) list: List[Int] = List(1, 2, 3, 4, 5, 6) scala> val listRDD = sc.parallelize(list) listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:29 scala> val ret = listRDD.collect() ... ret: Array[Int] = Array(1, 2, 3, 4, 5, 6) ~~~ **(3)count** 返回数据集的元素个数 ~~~ scala> val list = List(1, 2, 3, 4, 5, 6) list: List[Int] = List(1, 2, 3, 4, 5, 6) scala> val listRDD = sc.parallelize(list) listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:29 scala> val ret = listRDD.count() ... ret: Long = 6 ~~~ **(4)take** 返回一个数组,由数据集的前 n 个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是 Driver 程序所在机器,单机计算所有的元素 (Gateway 的内存压力会增大,需要谨慎使用)。 ~~~ scala> val list = List(1, 2, 3, 4, 5, 6) list: List[Int] = List(1, 2, 3, 4, 5, 6) scala> val listRDD = sc.parallelize(list) listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:29 scala> listRDD.take(3) ... res7: Array[Int] = Array(1, 2, 3) ~~~ **(5)first** 返回数据集的第一个元素(类似于 take(1)) ~~~ scala> val list = List(1, 2, 3, 4, 5, 6) list: List[Int] = List(1, 2, 3, 4, 5, 6) scala> val listRDD = sc.parallelize(list) listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:29 scala> listRDD.first() ... res8: Int = 1 ~~~ **(6)saveAsTextFile** 将数据集的元素,以 textfile 的形式,保存到本地文件系统,hdfs 或者任何其它 hadoop 支持的文件系统。Spark 将会调用每个元素的 toString 方法,并将它转换为文件中的`一行文本`。 ~~~ scala> val list = List(1, 2, 3, 4, 5, 6) list: List[Int] = List(1, 2, 3, 4, 5, 6) scala> val listRDD = sc.parallelize(list) listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:29 scala> listRDD.saveAsTextFile("file:///home/uplooking/data/spark/action") ... ~~~ 可以在文件系统中查看到保存的文件: ~~~ [root@WGH action]$ pwd /home/uplooking/data/spark/action [root@WGH action]$ ls part-00000 part-00001 part-00002 part-00003 _SUCCESS ~~~ 其实可以看到,保存的跟 Hadoop 的格式是一样的。 当然因为我的 spark 集群中已经做了跟 hadoop 相关的配置,所以也可以把文件保存到 hdfs 中: ~~~ scala> listRDD.saveAsTextFile("hdfs://ns1/output/spark/action") ... ~~~ 然后就可以在 hdfs 中查看到保存的文件: ~~~ [root@WGH action]$ hdfs dfs -ls /output/spark/action 18/04/27 10:27:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 5 items -rw-r--r-- 3 root supergroup 0 2018-04-27 10:25 /output/spark/action/_SUCCESS -rw-r--r-- 3 root supergroup 2 2018-04-27 10:25 /output/spark/action/part-00000 -rw-r--r-- 3 root supergroup 4 2018-04-27 10:25 /output/spark/action/part-00001 -rw-r--r-- 3 root supergroup 2 2018-04-27 10:25 /output/spark/action/part-00002 -rw-r--r-- 3 root supergroup 4 2018-04-27 10:25 /output/spark/action/part-00003 ~~~ 可以看到,保存的格式跟保存到本地文件系统是一样的。 **(7)foreach** 在数据集的每一个元素上,运行函数 func。这通常用于更新一个累加器变量,或者和外部存储系统做交互。 ~~~ scala> val list = List(1, 2, 3, 4, 5, 6) list: List[Int] = List(1, 2, 3, 4, 5, 6) scala> val listRDD = sc.parallelize(list) listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:29 scala> listRDD.foreach(println) ... ~~~ **(8)saveAsNewAPIHadoopFile** 也就是将数据保存到 Hadoop HDFS 中,但是需要注意的是,前面使用 saveAsTextFile 也可以进行相关操作,其使用的就是 saveAsNewAPIHadoopFile 或者 saveAsHadoopFile 这两个 API,而其两者的区别是: saveAsHadoopFile 的 OutputFormat 使用的:org.apache.hadoop.mapred 中的早期的类 saveAsNewAPIHadoopFile 的 OutputFormat 使用的:org.apache.hadoop.mapreduce 中的新的类。但不管使用哪一个,都是可以完成工作的。 测试代码如下: ~~~ import org.apache.hadoop.io.{IntWritable, Text} import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat import org.apache.spark.{SparkConf, SparkContext} /** * Spark算子操作之Action * saveAsNewAPIHAdoopFile * * saveAsHadoopFile * 和saveAsNewAPIHadoopFile的唯一区别就在于OutputFormat的不同 * saveAsHadoopFile的OutputFormat使用的:org.apache.hadoop.mapred中的早期的类 * saveAsNewAPIHadoopFile的OutputFormat使用的:org.apache.hadoop.mapreduce中的新的类 * 使用哪一个都可以完成工作 * * 前面在使用saveAsTextFile时也可以保存到hadoop文件系统中,注意其源代码也是使用上面的操作的 * * Caused by: java.net.UnknownHostException: ns1 ... 35 more 找不到ns1,因为我们在本地没有配置,无法正常解析,就需要将hadoop的配置文件信息给我们加载进来 hdfs-site.xml.heihei,core-site.xml.heihei */ object _05SparkActionOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_05SparkActionOps.getClass.getSimpleName) val sc = new SparkContext(conf) val list = List("hello you", "hello he", "hello me") val listRDD = sc.parallelize(list) val pairsRDD = listRDD.map(word => (word, 1)) val retRDD = pairsRDD.reduceByKey((v1, v2) => v1 + v2) retRDD.saveAsNewAPIHadoopFile( "hdfs://ns1/spark/action", // 保存的路径 classOf[Text], // 相当于mr中的k3 classOf[IntWritable], // 相当于mr中的v3 classOf[TextOutputFormat[Text, IntWritable]] // 设置(k3, v3)的outputFormatClass ) } } ~~~ 之后我们可以在 hdfs 中查看到相应的文件输出: ~~~ [root@WGH ~]$ hdfs dfs -ls /spark/action 18/04/27 12:07:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 3 items -rw-r--r-- 3 Administrator supergroup 0 2018-04-27 12:07 /spark/action/_SUCCESS -rw-r--r-- 3 Administrator supergroup 13 2018-04-27 12:07 /spark/action/part-r-00000 -rw-r--r-- 3 Administrator supergroup 11 2018-04-27 12:07 /spark/action/part-r-00001 [root@WGH ~]$ hdfs dfs -text /spark/action/part-r-00000 18/04/27 12:08:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable hello 3 me 1 [root@WGH ~]$ hdfs dfs -text /spark/action/part-r-00001 18/04/27 12:08:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable you 1 he 1 ~~~ ## Actions 下面的表格列了 Spark 支持的一些常用 actions。详细内容请参阅 RDD API 文档([Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD), [Java](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaRDD.html), [Python](https://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html)) 和 PairRDDFunctions 文档([Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions), [Java](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html))。 **Action 动作算子**: | 动作算子 | 含义 | | --------------------------------------- | ------------------------------------------------------------ | | reduce(func) | 通过 func 函数聚集 RDD 中的所有元素,这个功能必须是可交换且可并联的 | | collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 | | count() | 返回 RDD 的元素个数 | | first() | 返回 RDD 的第一个元素 (类似于 take(1)) | | take(n) | 返回一个由数据集的前 n 个元素组成的数组 | | takeSample(withReplacement,num, [seed]) | 返回一个数组,该数组由从数据集中随机采样的 num 个元素组成,可以选择是否用随机数替换不足的部分,seed 用于指定随机数生成器种子 | | takeOrdered(n, [ordering]) | 返回自然顺序或者自定义顺序的前 n 个元素 | | **saveAsTextFile**(path) | 将数据集的元素以 textfile 的形式保存到 HDFS 文件系统或者其他支持的文件系统,对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文本 | | **saveAsSequenceFile**(path) | 将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使 HDFS 或者其他 Hadoop 支持的文件系统 | | saveAsObjectFile(path) | 将数据集的元素,以 Java 序列化的方式保存到指定的目录下 | | **countByKey**() | 针对 (K,V) 类型的 RDD,返回一个 (K,Int) 的 map,表示每一个 key 对应的元素个数 | | foreach(func) | 在数据集的每一个元素上,运行函数 func 进行更新 | | **foreachPartition**(func) | 在数据集的每一个分区上,运行函数 func | **统计操作**: | 算子 | 含义 | | -------------- | -------------------------- | | count | 个数 | | mean | 均值 | | sum | 求和 | | max | 最大值 | | min | 最小值 | | variance | 方差 | | sampleVariance | 从采样中计算方差 | | stdev | 标准差: 衡量数据的离散程度 | | sampleStdev | 采样的标准差 | | stats | 查看统计结果 |