ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
**RDD 的五大属性:** 分区、分区计算函数、依赖、分区器、分区优先位置列表。 * 一系列的分区(分片)信息,每个任务处理一个分区; * 每个分区上都有compute函数,计算该分区中的数据; * RDD之间有一系列的依赖; * 分区器决定数据(key-value)分配至哪个分区; * 优先位置列表,将计算任务分派到其所在处理数据块的存储位置; [TOC] # 1. 分区(Partition) **1. RDD有分区构成** 一个RDD 是由多个分区构成的,每个Partition 都有一个唯一索引编号。 ```scala import org.apache.spark.{SparkConf, SparkContext} object SparkDemo1 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() .setMaster("local[*]") .setAppName(this.getClass.getName) val sc: SparkContext = SparkContext.getOrCreate(conf) val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5)) println(rdd.getNumPartitions) // 8个分区 rdd.partitions.foreach(x => println(x.index)) // 0 1 2 3 4 5 6 7 } } ``` RDD 分区概念与MapReduce 的输入切片概念是类似的。对每个分区的运算会被当作一个 Task 执行,换句话说,<mark>分区是 Spark 任务执行的基本单位</mark>。 如果有 100 个分区,RDD 上有 n 个操作,那么将会产生有 `$ n * 100 $` 个任务。 <br/> **2. 使用分区的时候要了解的两条规则** (1)<ins>只有Key-Value类型的RDD(比如PairRDD)才有分区器</ins>,非Key-Value类型的RDD分区器的值是 None。 (2)每个 RDD 的分区 ID 范围:<ins>0~numPartitions-1</ins>,决定这个值是属于那个分区的。 <br/> **3. 分区的影响** (1)一个RDD拥有的分区越多,得到的并行性就越强; (2)每个分区都是被分发到不同 Worker Node 节点上被处理; (3)每个分区对应一个Task; ![](https://img.kancloud.cn/b7/af/b7af34c4d83e843e58c17e44288acb06_751x314.png) <br/> # 2. compute函数 RDD 的每个分区上都有一个函数去作用,<mark>Spark 中的 RDD 的计算是以分区为单位的</mark>,<ins>每个 RDD 都会实现 compute 函数</ins>以达到这个目的。不同的 RDD 的compute 函数逻辑各不一样,比如: 1. MapPartitionsRDD 的 compute 是将用户的转换逻辑作用到指定的 Partition上。因为 RDD 的 map 算子产生 MapPartitionsRDD,而 map 算子的参数(具体操作逻辑)是变化的。 2. HadoopRDD 的 compute 是读取指定 Partition 数据 。 因为`sc.hadoopFile("path")`读取 HDFS 文件返回的 RDD 具体类型便是 HadoopRDD,所以只需要读取数据即可。 3. CheckpointRDD 的 compute 是直接读取检查点的数据。一旦 RDD 进行checkpoint(在下一章中介绍),将变成 CheckpointRDD。如下图所示。 ![](https://img.kancloud.cn/f6/f1/f6f1c4e49a0d92beeb742ecf7c715d03_1025x159.png) <br/> # 3. RDD间的依赖 RDD 有依赖性,通常情况下一个 RDD 是来源于另一个 RDD,这个叫做 **lineage(血统)**。RDD 会记录下这些依赖,方便容错,也称 DAG。如下图所示,RDD 所依赖的其他RDD 作为构造器参数之一。 ![](https://img.kancloud.cn/0b/20/0b201f1b750197c7571c5006e5abead6_999x206.png) <br/> # 4. 分区器(Partitioner) RDD 的分区器是一个可选项,如果 RDD 里面存的数据是 key-value 形式,则可以传递一个自定义的 Partitioner 进行重新分区,例如这里自定义的 Partitioner是基于 key 进行分区,那则会将不同 RDD 里面的相同 key 的数据放到同一个Partition 里面。 <br/> Spark 有两种分区器: **HashPartitioner ( 哈 希 分 区 )** 和 **RangePartitioner(范围分区)**。 <br/> HashPartitioner 是默认的分区方式,其算法逻辑是:`$ partition = key.hashCode() % numPartitions $` <br/> RangePartitioner 通过两个步骤来实现: 第一步:先从整个 RDD 中抽取出样本数据,将样本数据排序,计算出每个分区的最大 key 值,形成一个 Array[KEY]类型的数组变量 rangeBounds; 第二步:判断 key 在 rangeBounds 中所处的范围,给出该 key 值在下一个 RDD中的分区 id 下标;该分区器要求 RDD 中的 KEY 类型必须是可以排序的。 <br/> # 5.分区优先位置列表 该列表<ins>存储了存取每个分区的优先位置</ins>。对于一个 HDFS 文件来说,这个列表保存了每个分区所在的数据块的位置。按照<ins>移动数据不如移动计算的</ins>的理念,Spark 在进行任务调度的时候,会尽可能的将计算任务移动到所要处理的数据块的存储位置。<br/> preferredLocations 返回每个分区所在的机器名或者 IP 地址,如果分区数据是多份存储,那么返回多个机器地址。如下图所示。 ![](https://img.kancloud.cn/52/49/5249f2e562d866646b995d1b1e5af7ca_996x214.png)