ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
对于转换操作,RDD 的所有转换都不会直接计算结果。Spark 仅记录作用于RDD 上的转换操作逻辑,<mark>当遇到动作算子(Action)时才会进行真正的计算</mark>。RDD全部转换算子如下表。 | Transformation | 描述 | | --- | --- | | `map(func)` | 通过函数 func 作用于源 RDD 中的每个元素,返回一个新的 RDD| | `filter(func)` | 选择源 RDD 中的使得函数 func 为 true 的元素,返回一个新的 RDD| | `flatMap(func)` | 与 map 类似,但是每个输入项可以映射到 0 或多个输出项(因此 func 应该返回一个 Seq,而不是单个项)。| | `mapValues(func)`| 原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素,仅适用于PairRDD | | `mapPartitions(func)` | 与 map 类似,但是在 RDD 的每个分区上单独运行,所以 func 在类型为 T 的 RDD 上运行时,必须是类型 Iterator`<T> `=> Iterator`<U>`| | `mapPartitionsWithIndex(func)` | 与 mapPartitions 类似,但为 func 多提供一个分区编号 ,所以 func 类型为:(Int, Iterator`<T>`) => Iterator`<U>`| | `sample(withReplacement, fraction, seed)` |使用给定的随机数生成器种子对数据的一部分进行采样。| | `union(otherDataset)` | 返回一个新数据集,该数据集包含源数据集中的元素和参数的并集| | `intersection(otherDataset)` | 返回一个新的 RDD,其中包含源数据集中的元素和参数的交集。| | `distinct([numPartitions]))` | 返回包含源数据集的不同元素的新数据集。| | `groupByKey([numPartitions])` | 当调用一个(K, V)对的数据集时,返回一个(K,Iterable<V>)对的数据集。| | `reduceByKey(func, [numPartitions])` | 对相同的key通过给定的函数进行聚合 | | `aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])`| seqOp 操作会聚合各分区中的元素,然后 combOp操作把所有分区的聚合结果再次聚合,两个操作的初始值都是 zeroValue. seqOp 的操作是遍历分区中的所有元素(T),第一个T跟zeroValue做操作,结果再作为与第二个T做操作的zeroValue,直到遍历完整个分区。combOp 操作是把各分区聚合的结果,再聚合。| | `sortByKey([ascending], [numPartitions])`| 根据 key 进行排序,默认为升序。ascending: Boolean = true| | `join(otherDataset, [numPartitions])`| 当在类型(K, V)和(K, W)的数据集上调用时,返回一个(K, (V,W))对的数据集,其中包含每个键的所有对元素。外部连接由 leftOuterJoin、rightOuterJoin和 fullOuterJoin 支持。| | `cogroup(otherDataset, [numPartitions])`| 当调用类型(K, V)和(K, W)的数据集时,返回一个(K,(Iterable,Iterable))元组的数据集。这个操作也称为groupWith。| | `cartesian(otherDataset)` | 在类型为 T 和 U 的数据集上调用时,返回一个(T, U)对(所有对元素)的数据集。| | `pipe(command, [envVars])` | 通过 shell 命令(例如 Perl 或 bash 脚本)对 RDD 的每个分区进行管道传输。将 RDD 元素写入进程的stdin,并将其输出到 stdout 的行作为字符串 RDD返回。| | `coalesce(numPartitions)` | 将 RDD 中的分区数量减少到numpartition。| | `repartition(numPartitions)` | 随机地重新 Shuffle RDD 中的数据,以创建更多或更少的分区,并在它们之间进行平衡。| 下面是一些常用转换算子的示例: ```scala package spark.core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object TransformationOps { def main(args: Array[String]): Unit = { val conf:SparkConf = new SparkConf() .setMaster("local[4]") .setAppName(this.getClass.getName) val sc:SparkContext = SparkContext.getOrCreate(conf) // 1. map算子 // 通过函数 func 作用于源 RDD 中的每个元素,返回一个新的 RDD val rdd1 = sc.parallelize(1 to 9) rdd1.map(_*2).foreach(x => print(s"$x ")) // 14 6 16 8 18 2 10 4 12 println() // map算子将RDD变成PairRDD val rdd2 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle")) val rdd3 = rdd2.map(x => (x, 1)) rdd3.foreach(x => print(s"$x ")) // (tiger,1) (lion,1) (panther,1) (eagle,1) (cat,1) (dog,1) println() // 2. filter算子 // 选择源 RDD 中的使得函数 func 为 true 的元素,返回一个新的 RDD val rdd4 = sc.parallelize(1 to 10) rdd4.filter(_%2==0).foreach(x => print(s"$x ")) // 8 4 2 6 10 println() // 3. mapValues算子 // 原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素,仅适用于PairRDD val rdd5 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle")) val rdd6 = rdd5.map(x => (x.length, x)) rdd6.foreach(x => print(s"$x ")) // (3,dog) (5,tiger) (4,lion) (3,cat) (7,panther) (5,eagle) println() rdd6.mapValues("x"+_+"x").foreach(x => print(s"$x ")) // (5,xtigerx) (3,xdogx) (3,xcatx) (7,xpantherx) (4,xlionx) (5,xeaglex) println() // 4. distinct算子 // 返回包含源数据集的不同元素的新数据集 val rdd7 = sc.parallelize(List(1, 2, 3, 3, 4, 4, 4, 5)) rdd7.distinct.foreach(x => print(s"$x ")) // 4 3 1 2 5 println() // 5. flatMap算子 // 类似于scala中先进行map操作, 在进行flatten操作 val rdd8 = sc.parallelize(List("hello wrold hadoop spark ","spark python java hello")) rdd8.flatMap(_.split("\\s+")).foreach(x => print(s"$x ")) // hello wrold hadoop spark spark python java hello println() // 6. mapPartitions算子 // 遍历每一个分区, 对每一个分区进行操作, 返回一个新的RDD // 使用场景:例如创建数据库连接, mapPartitions是对分区进行操作, 创建连接数量会更少 val rdd9 = sc.parallelize(1 to 10) rdd9.repartition(2).mapPartitions(part => { part.map(_*2) }).foreach(x => print(s"$x ")) // 4 2 8 6 10 12 14 16 20 18 println() // 7. mapPartitionsWithIndex算子 // mapPartitionsWithIndex比mapPartitions多了一个下标索引 val rdd10 = sc.parallelize(1 to 10) rdd10.mapPartitionsWithIndex((index, part) => part.map((index, _))).foreach(x => print(s"$x ")) // (0,1) (0,2) (1,3) (1,4) (1,5) (3,8) (3,9) (3,10) (2,6) (2,7) println() // 8. sample抽样算子 // 第一个参数是否有放回,第二个参数是抽取的数据量比例, 第三个是随机种子 val rdd11 = sc.parallelize(1 to 10) rdd11.sample(true, 0.5, 2).foreach(x => print(s"$x ")) // 9 6 println() // 9. union算子 // union 对两个RDD求并集 val rdd12 = sc.parallelize(1 to 4) rdd12.union(sc.parallelize(3 to 9)).foreach(x => print(s"$x ")) // 2 1 3 4 3 4 5 6 7 8 9 println() // 10. intersection算子 // 对两个RDD求交集 val rdd13 = sc.parallelize(1 to 5) rdd13.intersection(sc.parallelize(2 to 7)).foreach(x => print(s"$x ")) // 3 2 5 4 println() // 11. groupByKey算子 // 相同的key分到一组 val rdd14 = sc.parallelize(List("hello wrold hadoop spark ","spark python java hello")) rdd14.flatMap(_.split("\\s+")).map((_, 1)).groupByKey().map(x => (x._1, x._2.size)) .foreach(x => print(s"$x ")) // (python,1) (wrold,1) (spark,2) (hadoop,1) (hello,2) (java,1) println() // 12. reduceByKey算子 // 对相同的key通过给定的函数进行聚合 val rdd15 = sc.parallelize(List("hello wrold hadoop spark ","spark python java hello")) rdd15.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey((x, y) => x+y).foreach(x => print(s"$x ")) // (hello,2) (java,1) (python,1) (wrold,1) (spark,2) (hadoop,1) // 这种写法与上面的效果是一样的 // rdd15.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_+_).foreach(x => print(s"$x ")) println() // 13. aggregateByKey算子 // 柯里化函数, 第一个参数列表传入0值 // 第二个参数列表需要传入两个参数, 第一个参数是本地聚合函数, 第二个参数是全局聚合函数 val rdd16 = sc.parallelize(List("hello wrold hadoop spark ","spark python java hello")) rdd16.flatMap(_.split("\\s+")).map((_, 1)).aggregateByKey(0)((x,y)=>x+y, (x,y)=>x+y).foreach(x => print(s"$x ")) // (spark,2) (hadoop,1) (python,1) (wrold,1) (hello,2) (java,1) // 或者下面这种写法也是同样效果 // rdd16.flatMap(_.split("\\s+")).map((_,1)).aggregateByKey(0)(_+_,_+_).foreach(x => print(s"$x ")) println() // 14. sortByKey算子 // 对key进行排序, 第一个参数是"是否是正序排列", 第二个参数是分区 // 注意: 只能是分区内有序, 不能全局有序 val rdd17 = sc.parallelize(List("hello wrold hadoop spark ","spark python java hello")) val sorted: RDD[(String, Int)] = rdd17.repartition(1).flatMap(_.split("\\s+")) .map((_,1)).aggregateByKey(0)(_+_,_+_).sortByKey() sorted.foreach(x => print(s"$x ")) // (hadoop,1) (hello,2) (java,1) (python,1) (spark,2) (wrold,1) println() val pairRdd1: RDD[(String, String)] = sc.parallelize(List(("1001","zhangsan"),("1002","lisi"),("1003","wangwu"))) val pairRdd2: RDD[(String, String)] = sc.parallelize(List(("1002","20"),("1003","15"),("1004","18"))) // 15. join算子 // 内连接 pairRdd1.join(pairRdd2).foreach(print) // (1003,(wangwu,15)) (1002,(lisi,20)) println() // 16. cogroup算子 // 相当于外连接, 和join不同的是会将value值分为一组 pairRdd1.cogroup(pairRdd2).foreach(println) // (1004,(CompactBuffer(),CompactBuffer(18))) // (1001,(CompactBuffer(zhangsan),CompactBuffer())) // (1003,(CompactBuffer(wangwu),CompactBuffer(15))) // (1002,(CompactBuffer(lisi),CompactBuffer(20))) // repartition算子 // 重分区,实际上调用了coalesce, 并且默认走shuffle // 我们自己调用coalesce函数的时候, 如果需要将分区由多变少, 可以传入false,不走shuffle val rdd18 = sc.parallelize(List("hello wrold hadoop spark ","spark python java hello")) println(rdd18.getNumPartitions) // 默认分区为4 val rdd19 = rdd18.repartition(3) println(rdd19.getNumPartitions) // 3 val rdd20:RDD[String] = rdd18.coalesce(3, false) println(rdd20.getNumPartitions) // 3 sc.stop() // 关闭资源 } } ```