## 一、常用的转换算子
需要操作的Transformation算子说明如下:
### 1.1. map
map(func)返回一个新的分布式数据集,由每个原元素经过func函数转换后组成
```
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps1(sc)
sc.stop()
}
/**
* 1、map:将集合中每个元素乘以7
* map(func):返回一个新的分布式数据集,由每个原元素经过func函数转换后组成
*/
def transformationOps1(sc:SparkContext): Unit = {
val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val listRDD = sc.parallelize(list)
val retRDD = listRDD.map(num => num * 7)
retRDD.foreach(num => println(num))
}
}
```
输出结果
```
42
7
49
14
56
21
63
28
70
35
```
### 1.2. filter
filter(func)返回一个新的数据集,由经过func函数后返回值为true的原元素组成
```
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps2(sc)
sc.stop()
}
/**
* 2、filter:过滤出集合中的奇数
* filter(func): 返回一个新的数据集,由经过func函数后返回值为true的原元素组成
*
* 一般在filter操作之后都要做重新分区(因为可能数据量减少了很多)
*/
def transformationOps2(sc:SparkContext): Unit = {
val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val listRDD = sc.parallelize(list)
val retRDD = listRDD.filter(num => num % 2 == 0)
retRDD.foreach(println)
}
}
```
输出结果
```
6
2
8
4
10
```
### 1.3.flatMap
flatMap(func)类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)
```
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps3(sc)
sc.stop()
}
/**
* 3、flatMap:将行拆分为单词
* flatMap(func):类似于map,但是每一个输入元素,
* 会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)
*/
def transformationOps3(sc:SparkContext): Unit = {
val list = List("hello you", "hello he", "hello me")
val listRDD = sc.parallelize(list)
val wordsRDD = listRDD.flatMap(line => line.split(" "))
wordsRDD.foreach(println)
}
}
```
输出结果
```
hello
hello
he
you
hello
mes
```
### 1.4. sample
sample(withReplacement, frac, seed)根据给定的随机种子seed,随机抽样出数量为frac的数据.
```
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps4(sc)
sc.stop()
}
/**
* 4、sample:根据给定的随机种子seed,随机抽样出数量为frac的数据
* sample(withReplacement, frac, seed): 根据给定的随机种子seed,随机抽样出数量为frac的数据
* 抽样的目的:就是以样本评估整体
* withReplacement:
* true:有放回的抽样
* false:无放回的抽样
* frac:就是样本空间的大小,以百分比小数的形式出现,比如20%,就是0.2
*
* 使用sample算子计算出来的结果可能不是很准确,1000个数,20%,样本数量在200个左右,不一定为200
*
* 一般情况下,使用sample算子在做spark优化(数据倾斜)的方面应用最广泛
*/
def transformationOps4(sc:SparkContext): Unit = {
val list = 1 to 1000
val listRDD = sc.parallelize(list)
val sampleRDD = listRDD.sample(false, 0.2)
sampleRDD.foreach(num => print(num + " "))
println
println("sampleRDD count: " + sampleRDD.count())
println("Another sampleRDD count: " + sc.parallelize(list).sample(false, 0.2).count())
}
}
```
输出结果
```
sampleRDD count: 219
Another sampleRDD count: 203
```
### 1.5.union
union(otherDataset)返回一个新的数据集,由原数据集和参数联合而成
```
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps5(sc)
sc.stop()
}
/**
* 5、union:返回一个新的数据集,由原数据集和参数联合而成
* union(otherDataset): 返回一个新的数据集,由原数据集和参数联合而成
* 类似数学中的并集,就是sql中的union操作,将两个集合的所有元素整合在一块,包括重复元素
*/
def transformationOps5(sc:SparkContext): Unit = {
val list1 = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val list2 = List(7, 8, 9, 10, 11, 12)
val listRDD1 = sc.parallelize(list1)
val listRDD2 = sc.parallelize(list2)
val unionRDD = listRDD1.union(listRDD2)
unionRDD.foreach(println)
}
}
```
输出结果
```
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps5(sc)
sc.stop()
}
/**
* 5、union:返回一个新的数据集,由原数据集和参数联合而成
* union(otherDataset): 返回一个新的数据集,由原数据集和参数联合而成
* 类似数学中的并集,就是sql中的union操作,将两个集合的所有元素整合在一块,包括重复元素
*/
def transformationOps5(sc:SparkContext): Unit = {
val list1 = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val list2 = List(7, 8, 9, 10, 11, 12)
val listRDD1 = sc.parallelize(list1)
val listRDD2 = sc.parallelize(list2)
val unionRDD = listRDD1.union(listRDD2)
unionRDD.foreach(println)
}
}
```
输出结果
```
1
6
2
7
3
8
4
9
5
10
7
8
9
10
11
12
```
### 1.6. groupByKey
groupByKey([numTasks])在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task
```
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps6(sc)
sc.stop()
}
/**
* 6、groupByKey:对数组进行 group by key操作
* groupByKey([numTasks]): 在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。
* 注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task
* mr中:
* <k1, v1>--->map操作---><k2, v2>--->shuffle---><k2, [v21, v22, v23...]>---><k3, v3>
* groupByKey类似于shuffle操作
*
* 和reduceByKey有点类似,但是有区别,reduceByKey有本地的规约,而groupByKey没有本地规约,所以一般情况下,
* 尽量慎用groupByKey,如果一定要用的话,可以自定义一个groupByKey,在自定义的gbk中添加本地预聚合操作
*/
def transformationOps6(sc:SparkContext): Unit = {
val list = List("hello you", "hello he", "hello me")
val listRDD = sc.parallelize(list)
val wordsRDD = listRDD.flatMap(line => line.split(" "))
val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1))
pairsRDD.foreach(println)
val gbkRDD:RDD[(String, Iterable[Int])] = pairsRDD.groupByKey()
println("=============================================")
gbkRDD.foreach(t => println(t._1 + "..." + t._2))
}
}
```
输出结果
```
(hello,1)
(hello,1)
(you,1)
(he,1)
(hello,1)
(me,1)
=============================================
you...CompactBuffer(1)
hello...CompactBuffer(1, 1, 1)
he...CompactBuffer(1)
me...CompactBuffer(1)
```
### 1.7. reduceByKey
reduceByKey(func, [numTasks])在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。
```
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps7(sc)
sc.stop()
}
/**
* 7、reduceByKey:统计每个班级的人数
* reduceByKey(func, [numTasks]): 在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,
* key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。
*
* 需要注意的是还有一个reduce的操作,其为action算子,并且其返回的结果只有一个,而不是一个数据集
* 而reduceByKey是一个transformation算子,其返回的结果是一个数据集
*/
def transformationOps7(sc:SparkContext): Unit = {
val list = List("hello you", "hello he", "hello me")
val listRDD = sc.parallelize(list)
val wordsRDD = listRDD.flatMap(line => line.split(" "))
val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1))
val retRDD:RDD[(String, Int)] = pairsRDD.reduceByKey((v1, v2) => v1 + v2)
retRDD.foreach(t => println(t._1 + "..." + t._2))
}
}
```
输出结果如下:
```
you...1
hello...3
he...1
me...1
```
### 1.8. join双流融合
join(otherDataset, [numTasks])在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集
```
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps8(sc)
sc.stop()
}
/**
* 8、join:打印关联的组合信息
* join(otherDataset, [numTasks]): 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集
* 学生基础信息表和学生考试成绩表
* stu_info(sid ,name, birthday, class)
* stu_score(sid, chinese, english, math)
*
* * Serialization stack:
- object not serializable
这种分布式计算的过程,一个非常重要的点,传递的数据必须要序列化
通过代码测试,该join是等值连接(inner join)
A.leftOuterJoin(B)
A表所有的数据都包涵,B表中在A表没有关联的数据,显示为null
之后执行一次filter就是join的结果
*/
def transformationOps8(sc: SparkContext): Unit = {
val infoList = List(
"1,钟 潇,1988-02-04,bigdata",
"2,刘向前,1989-03-24,linux",
"3,包维宁,1984-06-16,oracle")
val scoreList = List(
"1,50,21,61",
"2,60,60,61",
"3,62,90,81",
"4,72,80,81"
)
val infoRDD:RDD[String] = sc.parallelize(infoList)
val scoreRDD:RDD[String] = sc.parallelize(scoreList)
val infoPairRDD:RDD[(String, Student)] = infoRDD.map(line => {
val fields = line.split(",")
val student = new Student(fields(0), fields(1), fields(2), fields(3))
(fields(0), student)
})
val scorePairRDD:RDD[(String, Score)] = scoreRDD.map(line => {
val fields = line.split(",")
val score = new Score(fields(0), fields(1).toFloat, fields(2).toFloat, fields(3).toFloat)
(fields(0), score)
})
val joinedRDD:RDD[(String, (Student, Score))] = infoPairRDD.join(scorePairRDD)
joinedRDD.foreach(t => {
val sid = t._1
val student = t._2._1
val score = t._2._2
println(sid + "\t" + student + "\t" + score)
})
println("=========================================")
val leftOuterRDD:RDD[(String, (Score, Option[Student]))] = scorePairRDD.leftOuterJoin(infoPairRDD)
leftOuterRDD.foreach(println)
}
}
```
输出结果如下:
```
3 3 包维宁 1984-06-16 oracle 3 62.0 90.0 81.0
2 2 刘向前 1989-03-24 linux 2 60.0 60.0 61.0
1 1 钟 潇 1988-02-04 bigdata 1 50.0 21.0 61.0
=========================================
(4,(4 72.0 80.0 81.0,None))
(3,(3 62.0 90.0 81.0,Some(3 包维宁 1984-06-16 oracle)))
(2,(2 60.0 60.0 61.0,Some(2 刘向前 1989-03-24 linux)))
(1,(1 50.0 21.0 61.0,Some(1 钟 潇 1988-02-04 bigdata)))
```
### 1.9.sortByKey
测试代码如下:
~~~
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps7(sc)
sc.stop()
}
/**
* sortByKey:将学生身高进行(降序)排序
* 身高相等,按照年龄排(升序)
*/
def transformationOps9(sc: SparkContext): Unit = {
val list = List(
"1,李 磊,22,175",
"2,刘银鹏,23,175",
"3,齐彦鹏,22,180",
"4,杨 柳,22,168",
"5,敦 鹏,20,175"
)
val listRDD:RDD[String] = sc.parallelize(list)
/* // 使用sortBy操作完成排序
val retRDD:RDD[String] = listRDD.sortBy(line => line, numPartitions = 1)(new Ordering[String] {
override def compare(x: String, y: String): Int = {
val xFields = x.split(",")
val yFields = y.split(",")
val xHgiht = xFields(3).toFloat
val yHgiht = yFields(3).toFloat
val xAge = xFields(2).toFloat
val yAge = yFields(2).toFloat
var ret = yHgiht.compareTo(xHgiht)
if (ret == 0) {
ret = xAge.compareTo(yAge)
}
ret
}
} ,ClassTag.Object.asInstanceOf[ClassTag[String]])
*/
// 使用sortByKey完成操作,只做身高降序排序
val heightRDD:RDD[(String, String)] = listRDD.map(line => {
val fields = line.split(",")
(fields(3), line)
})
val retRDD:RDD[(String, String)] = heightRDD.sortByKey(ascending = false, numPartitions = 1) // 需要设置1个分区,否则只是各分区内有序
retRDD.foreach(println)
// 使用sortByKey如何实现sortBy的二次排序?将上面的信息写成一个java对象,然后重写compareTo方法,在做map时,key就为该对象本身,而value可以为null
}
}
~~~
输出结果如下:
~~~
(180,3,齐彦鹏,22,180)
(175,1,李 磊,22,175)
(175,2,刘银鹏,23,175)
(175,5,敦 鹏,20,175)
(168,4,杨 柳,22,168)
~~~
下面是一个快速入门的 demo:
~~~
scala> val rdd = sc.parallelize(Seq((1,"one"),(2,"two"),(3,"three")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[10] at parallelize at <console>:21
scala> rdd.sortByKey(true, 1).foreach(println)
(1,one)
(2,two)
(3,three)
~~~
### 1.10.combineByKey 与 aggregateByKey
下面的代码分别使用 combineByKey 和 aggregateByKey 来模拟 groupByKey 和 reduceBykey,所以是有 4 个操作,只要把 combineByKey 模拟 groupByKey 的例子掌握了,其它三个相对就容易许多了。
~~~
/**
* spark的transformation操作:
* aggregateByKey
* combineByKey
*
* 使用combineByKey和aggregateByKey模拟groupByKey和reduceByKey
*
* 通过查看源码,我们发现aggregateByKey底层,还是combineByKey
*
* 问题:combineByKey和aggregateByKey的区别?
* aggregateByKey是柯里化形式的,目前底层源码还没时间去分析,所知道的区别是这个
*/
object _03SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_03SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
// combineByKey2GroupByKey(sc)
// combineByKey2ReduceByKey(sc)
// aggregateByKey2ReduceByKey(sc)
aggregateByKey2GroupByKey(sc)
sc.stop()
}
/**
* 使用aggregateByKey模拟groupByKey
*/
def aggregateByKey2GroupByKey(sc: SparkContext): Unit = {
val list = List("hello bo bo", "zhou xin xin", "hello song bo")
val lineRDD = sc.parallelize(list)
val wordsRDD = lineRDD.flatMap(line => line.split(" "))
val pairsRDD = wordsRDD.map(word => (word, 1))
val retRDD:RDD[(String, ArrayBuffer[Int])] = pairsRDD.aggregateByKey(ArrayBuffer[Int]()) ( // 这里需要指定value的类型为ArrayBuffer[Int]()
(part, num) => {
part.append(num)
part
},
(part1, part2) => {
part1.++=(part2)
part1
}
)
retRDD.foreach(println)
}
/**
* 使用aggregateByKey模拟reduceByKey
* def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]
(zeroValue: U)就对应的是combineByKey中的第一个函数的返回值
seqOp 就对应的是combineByKey中的第二个函数,也就是mergeValue
combOp 就对应的是combineByKey中的第三个函数,也就是mergeCombiners
*/
def aggregateByKey2ReduceByKey(sc:SparkContext): Unit = {
val list = List("hello bo bo", "zhou xin xin", "hello song bo")
val lineRDD = sc.parallelize(list)
val wordsRDD = lineRDD.flatMap(line => line.split(" "))
val pairsRDD = wordsRDD.map(word => (word, 1))
val retRDD:RDD[(String, Int)] = pairsRDD.aggregateByKey(0) (
(partNum, num) => partNum + num, // 也就是mergeValue
(partNum1, partNum2) => partNum1 + partNum2 // 也就是mergeCombiners
)
retRDD.foreach(println)
}
/**
* 使用reduceByKey模拟groupByKey
*/
def combineByKey2ReduceByKey(sc:SparkContext): Unit = {
val list = List("hello bo bo", "zhou xin xin", "hello song bo")
val lineRDD = sc.parallelize(list)
val wordsRDD = lineRDD.flatMap(line => line.split(" "))
val pairsRDD = wordsRDD.map(word => (word, 1))
/**
* 对于createCombiner1 mergeValue1 mergeCombiners1
* 代码的参数已经体现得很清楚了,其实只要理解了combineByKey模拟groupByKey的例子,这个就非常容易了
*/
var retRDD:RDD[(String, Int)] = pairsRDD.combineByKey(createCombiner1, mergeValue1, mergeCombiners1)
retRDD.foreach(println)
}
/**
* reduceByKey操作,value就是该数值本身,则上面的数据会产生:
* (hello, 1) (bo, 1) (bo, 1)
* (zhou, 1) (xin, 1) (xin, 1)
* (hello, 1) (song, 1) (bo, 1)
* 注意有别于groupByKey的操作,它是创建一个容器
*/
def createCombiner1(num:Int):Int = {
num
}
/**
* 同一partition内,对于有相同key的,这里的mergeValue直接将其value相加
* 注意有别于groupByKey的操作,它是添加到value到一个容器中
*/
def mergeValue1(localNum1:Int, localNum2:Int): Int = {
localNum1 + localNum2
}
/**
* 将两个不同partition中的key相同的value值相加起来
* 注意有别于groupByKey的操作,它是合并两个容器
*/
def mergeCombiners1(thisPartitionNum1:Int, anotherPartitionNum2:Int):Int = {
thisPartitionNum1 + anotherPartitionNum2
}
/**
* 使用combineByKey模拟groupByKey
*/
def combineByKey2GroupByKey(sc:SparkContext): Unit = {
val list = List("hello bo bo", "zhou xin xin", "hello song bo")
val lineRDD = sc.parallelize(list)
val wordsRDD = lineRDD.flatMap(line => line.split(" "))
val pairsRDD = wordsRDD.map(word => (word, 1))
// 输出每个partition中的map对
pairsRDD.foreachPartition( partition => {
println("<=========partition-start=========>")
partition.foreach(println)
println("<=========partition-end=========>")
})
val gbkRDD:RDD[(String, ArrayBuffer[Int])] = pairsRDD.combineByKey(createCombiner, mergeValue, mergeCombiners)
gbkRDD.foreach(println)
// 如果要测试最后groupByKey的结果是在几个分区,可以使用下面的代码进行测试
/*gbkRDD.foreachPartition(partition => {
println("~~~~~~~~~~~~~~~~~~~~~~~~~~~")
partition.foreach(println)
})*/
}
/**
* 初始化,将value转变成为标准的格式数据
* 是在每个分区中进行的操作,去重后的key有几个,就调用次,
* 因为对于每个key,其容器创建一次就ok了,之后有key相同的,只需要执行mergeValue到已经创建的容器中即可
*/
def createCombiner(num:Int):ArrayBuffer[Int] = {
println("----------createCombiner----------")
ArrayBuffer[Int](num)
}
/**
* 将key相同的value,添加到createCombiner函数创建的ArrayBuffer容器中
* 一个分区内的聚合操作,将一个分区内key相同的数据,合并
*/
def mergeValue(ab:ArrayBuffer[Int], num:Int):ArrayBuffer[Int] = {
println("----------mergeValue----------")
ab.append(num)
ab
}
/**
* 将key相同的多个value数组,进行整合
* 分区间的合并操作
*/
def mergeCombiners(ab1:ArrayBuffer[Int], ab2:ArrayBuffer[Int]):ArrayBuffer[Int] = {
println("----------mergeCombiners----------")
ab1 ++= ab2
ab1
}
}
~~~
输出结果如下:
~~~
/*
combineByKey模拟groupByKey的一个输出效果,可以很好地说明createCombiner、mergeValue和mergeCombiners各个阶段的执行时机:
<=========partition-start=========>
<=========partition-start=========>
(hello,1)
(zhou,1)
(bo,1)
(xin,1)
(bo,1)
(xin,1)
<=========partition-end=========>
(hello,1)
(song,1)
(bo,1)
<=========partition-end=========>
----------createCombiner----------
----------createCombiner----------
----------createCombiner----------
----------createCombiner----------
----------mergeValue----------
----------mergeValue----------
----------createCombiner----------
----------createCombiner----------
----------createCombiner----------
----------mergeCombiners----------
----------mergeCombiners----------
(song,ArrayBuffer(1))
(hello,ArrayBuffer(1, 1))
(bo,ArrayBuffer(1, 1, 1))
(zhou,ArrayBuffer(1))
(xin,ArrayBuffer(1, 1))
*/
~~~
## 二、Transformations
下面的表格列了 Spark 支持的一些常用 transformations。详细内容请参阅 RDD API 文档([Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD), [Java](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaRDD.html), [Python](https://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html)) 和 PairRDDFunctions 文档([Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions), [Java](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html))。
**Transformation 转换算子**:
| 转换算子 | 含义 |
| ---------------------------------------------------- | ------------------------------------------------------------ |
| **map**(func) | 返回一个新的 RDD,该 RDD 由每一个输入元素经过 func 函数转换后组成 |
| **filter**(func) | 返回一个新的 RDD,该 RDD 由经过 func 函数计算后返回值为 true 的输入元素组成 |
| **flatMap**(func) | 类似于 map,但是每一个输入元素可以被映射为 0 或多个输出元素 (所以 func 应该返回一个序列,而不是单一元素) |
| **mapPartitions**(func) | 类似于 map,但独立地在 RDD 的每一个分片上运行,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是 Iterator[T] => Iterator[U] |
| **mapPartitionsWithIndex**(func) | 类似于 mapPartitions,但 func 带有一个整数参数表示分片的索引值,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是 (Int, Interator[T]) => Iterator[U] |
| sample(withReplacement, fraction, seed) | 根据 fraction 指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed 用于指定随机数生成器种子 |
| **union**(otherDataset) | 对源 RDD 和参数 RDD 求并集后返回一个新的 RDD |
| intersection(otherDataset) | 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD |
| **distinct**([numTasks])) | 对源 RDD 进行去重后返回一个新的 RDD |
| **groupByKey**([numTasks]) | 在一个 (K,V) 的 RDD 上调用,返回一个 (K, Iterator[V]) 的 RDD |
| **reduceByKey**(func, [numTasks]) | 在一个 (K,V) 的 RDD 上调用,返回一个 (K,V) 的 RDD,使用指定的 reduce 函数,将相同 key 的值聚合到一起,与 groupByKey 类似,reduce 任务的个数可以通过第二个可选的参数来设置 |
| aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 对 PairRDD 中相同的 Key 值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和 aggregate 函数类似,aggregateByKey 返回值的类型不需要和 RDD 中 value 的类型一致 |
| **sortByKey**([ascending], [numTasks]) | 在一个 (K,V) 的 RDD 上调用,K 必须实现 Ordered 接口,返回一个按照 key 进行排序的 (K,V) 的 RDD |
| sortBy(func,[ascending], [numTasks]) | 与 sortByKey 类似,但是更灵活 |
| **join**(otherDataset, [numTasks]) | 在类型为 (K,V) 和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素对在一起的 (K,(V,W)) 的 RDD |
| cogroup(otherDataset, [numTasks]) | 在类型为 (K,V) 和(K,W)的 RDD 上调用,返回一个 (K,(Iterable,Iterable)) 类型的 RDD |
| cartesian(otherDataset) | 笛卡尔积 |
| pipe(command, [envVars]) | 对 rdd 进行管道操作 |
| **coalesce**(numPartitions) | 减少 RDD 的分区数到指定值。在过滤大量数据之后,可以执行此操作 |
| **repartition**(numPartitions) | 重新给 RDD 分区 |
- Introduction
- 快速上手
- Spark Shell
- 独立应用程序
- 开始翻滚吧!
- RDD编程基础
- 基础介绍
- 外部数据集
- RDD 操作
- 转换Transformations
- map与flatMap解析
- 动作Actions
- RDD持久化
- RDD容错机制
- 传递函数到 Spark
- 使用键值对
- RDD依赖关系与DAG
- 共享变量
- Spark Streaming
- 一个快速的例子
- 基本概念
- 关联
- 初始化StreamingContext
- 离散流
- 输入DStreams
- DStream中的转换
- DStream的输出操作
- 缓存或持久化
- Checkpointing
- 部署应用程序
- 监控应用程序
- 性能调优
- 减少批数据的执行时间
- 设置正确的批容量
- 内存调优
- 容错语义
- Spark SQL
- 概述
- SparkSQLvsHiveSQL
- 数据源
- RDDs
- parquet文件
- JSON数据集
- Hive表
- 数据源例子
- join操作
- 聚合操作
- 性能调优
- 其他
- Spark SQL数据类型
- 其它SQL接口
- 编写语言集成(Language-Integrated)的相关查询
- GraphX编程指南
- 开始
- 属性图
- 图操作符
- Pregel API
- 图构造者
- 部署
- 顶点和边RDDs
- 图算法
- 例子
- 更多文档
- 提交应用程序
- 独立运行Spark
- 在yarn上运行Spark
- Spark配置
- RDD 持久化