企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
**1. RDD创建方式** 使用集合创建RDD、使用外部存储创建 RDD。 使用外部存储创建 RDD方式包括:本地文件系统、HDFS、HBase、Cassandra 等。 [TOC] # 1. 使用集合创建RDD 通过集合创建 RDD 有两种方法:parallelize 与 makeRDD。 <br/> 两种方法基本上是一样的,不同的是 makeRDD 还有一个重载方法,该重载方法会分配一系列本地 Scala 集合形成一个 RDD,<ins>可以为每个集合对象创建一个分区,并指定优先位置便于在运行中优化调度</ins>。<br/> 使用本地集合创建 RDD 的问题在于:由于这种方法需要用到一台机器中集合的全部数据,所以这种方式在测试和原型构造之外很少使用。<br/> 示例代码: ```scala package spark.core import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} /** * 通过集合创建RDD * Date: 2020/1/4 */ object CreateRDDByCollection { def main(args: Array[String]): Unit = { // 程序入口方式1 val conf: SparkConf = new SparkConf().setMaster("local[4]") .setAppName(this.getClass.getName) val sc: SparkContext = SparkContext.getOrCreate(conf) /* // 程序入口方式2 val spark = SparkSession.builder.master("local[2]") .appName("appName") .getOrCreate() val sc:SparkContext = spark.sparkContext */ // 通过集合创建RDD val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6)) // 获取元素个数 println(rdd1.count()) // 6 // 获取分区数 println(rdd1.getNumPartitions) // 4 // 1、Spark默认会根据集群的情况来设置分区的数量,也可以通过parallelize()第二参数来指定 // 2、Spark会为每一个分区运行一个任务进行处理 val rdd2: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6), 5) println(rdd2.count()) // 6 println(rdd2.getNumPartitions) // 5 val rdd3: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6)) println(rdd3.count()) // 6 println(rdd3.getNumPartitions) // 4 } } ``` <br/> # 2. 使用外部存储创建 RDD 任何 Hadoop 支持的存储类型都可以用于创建 RDD,包括:本地文件系统、HDFS、HBase、Cassandra 等。 <br/> **1. 加载本地文件或hdfs创建RDD** 对于本地文件、HDFS 及 Hadoop 支持的文件系统使用 textFile 创建 RDD,文件中每一行作为 RDD 中的一条数据。 ```scala package spark.core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 通过本地文件或hdfs创建RDD * Date: 2021/1/4 */ object CreateRDDByFile { def main(args: Array[String]): Unit = { // 编程入口方式1 val conf:SparkConf = new SparkConf() .setMaster("local[4]") .setAppName(this.getClass.getName) val sc:SparkContext = new SparkContext(conf) /* // 编程入口方式2 val spark = SparkSession.builder.master("local[2]") .appName("appName") .getOrCreate() val sc:SparkContext = spark.sparkContext */ // 通过本地文件创建RDD val rdd1:RDD[String] = sc.textFile("file:///E:\\hadoop\\input\\hello.txt") rdd1.foreach(println) // 会按行输出hello.txt文件的内容 // 通过hdfs创建RDD val rdd2:RDD[String] = sc.textFile("hdfs://hadoop101:9000/spark/hello.txt") rdd2.foreach(println) } } ``` ```scala // 文件中的一行文本作为RDD的一个元素 val distFile=sc.textFile("file:///home/hadoop/data/hello.txt") // 支持目录、压缩文件以及通配符 sc.textFile("/my/directory") sc.textFile("/my/directory/*.txt") sc.textFile("/my/directory/*.gz") ``` 1、Spark默认访问的是HDFS; 2、Spark默认为HDFS文件的每一个数据块创建一个分区,也可以通过 textFile() 第二个参数指定,但只能比数据块数量多; 3、默认分区数量情况下不能超过 2。原因如下图所示。 ![](https://img.kancloud.cn/a9/11/a91149edd30fefa0569b06b815586683_999x264.png) 其中,totalCoreCount 是一个来跟踪集群中的核心总数原子变量。 <br/> **2. 通过PairRDD创建RDD** SparkContext.wholeTextFiles():可以针对一个目录中的大量小文件返回`<filename,fileContent>`作为PairRDD。 * 普通RDD:org.apache.spark.rdd.RDD[data_type] * PairRDD:org.apache.spark.rdd.RDD[(key_type, value_type)] <br/> Spark 可以为包含键值对类型的 RDD 提供了一些专有的操作,比如:reduceByKey()、groupByKey()等。 也可以通过键值对集合创建PairRDD:sc.parallelize(List((1,2),(1,3))) <br/> 示例代码: ```scala package spark.core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 通过PairRDD方式创建RDD * Date: 2021/1/4 */ object CreateRDDByPairRDD { def main(args: Array[String]): Unit = { // 编程入口方式1 val conf: SparkConf = new SparkConf().setMaster("local[4]") .setAppName(this.getClass.getName) val sc: SparkContext = SparkContext.getOrCreate(conf) /* // 编程入口方式2 val spark = SparkSession.builder.master("local[2]") .appName("appName") .getOrCreate() val sc:SparkContext = spark.sparkContext */ // 通过PairRDD创建RDD val rdd:RDD[(String, String)] = sc.wholeTextFiles("hdfs://hadoop101:9000/spark/hello.txt") rdd.foreach(x=>println(s"文件名:${x._1}, 文件内容:${x._2}")) } } ``` <br/> **3. 其他创建RDD的方法** ```scala SparkContext.sequenceFile[K,V]() 提供对Hadoop SequenceFile的读写支持; SparkContext.hadoopRDD()、newAPIHadoopRDD()从Hadoop接口API创建RDD; SparkContext.objectFile() 是RDD.saveAsObjectFile()的逆操作; ``` 从已有 RDD 创建新的 RDD 通过转换算子实现。通常涉及:map,map, filter, count, distinct, flatMap 等。这些操作与 Scala 集合操作类似。