## 外部数据集
Spark 可以从任何一个 Hadoop 支持的存储源创建分布式数据集,包括
* 你的本地文件系统,HDFS,Cassandra,HBase,[Amazon S3](http://wiki.apache.org/hadoop/AmazonS3)等。
* Spark 支持文本文件(text files),[SequenceFiles](http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html) 和其他 Hadoop [InputFormat](http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html)。
> 文本文件 RDDs 可以使用 SparkContext 的 `textFile` 方法创建。 在这个方法里传入文件的 URI (机器上的本地路径或 `hdfs://`,`s3n://` 等),然后它会将文件读取成一个行集合。这里是一个调用例子:
```scala
scala> val distFile = sc.textFile("data.txt")
distFile: RDD[String] = MappedRDD@1d4cee08
```
一旦创建完成,`distFiile` 就能做数据集操作。例如,我们可以用下面的方式使用 `map` 和 `reduce` 操作将所有行的长度相加:`distFile.map(s => s.length).reduce((a, b) => a + b)`。
注意,Spark 读文件时:
- 如果使用本地文件系统路径,文件必须能在 work 节点上用相同的路径访问到。要么复制文件到所有的 workers,要么使用网络的方式共享文件系统。
- 所有 Spark 的基于文件的方法,包括 `textFile`,能很好地支持文件目录,压缩过的文件和通配符。例如,你可以使用 `textFile("/my/文件目录")`,`textFile("/my/文件目录/*.txt")` 和 `textFile("/my/文件目录/*.gz")`。
- `textFile` 方法也可以选择第二个可选参数来控制切片(_slices_)的数目。默认情况下,Spark 为每一个文件块(HDFS 默认文件块大小是 64M)创建一个切片(_slice_)。但是你也可以通过一个更大的值来设置一个更高的切片数目。注意,你不能设置一个小于文件块数目的切片值。
除了文本文件,Spark 的 Scala API 支持其他几种数据格式:
- `SparkContext.wholeTextFiles` 让你读取一个包含多个小文本文件的文件目录并且返回每一个(filename, content)对。与 `textFile` 的差异是:它记录的是每个文件中的每一行。
- 对于 [SequenceFiles](http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html),可以使用 SparkContext 的 `sequenceFile[K, V]` 方法创建,K 和 V 分别对应的是 key 和 values 的类型。像 [IntWritable](http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/IntWritable.html) 与 [Text](http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/Text.html) 一样,它们必须是 Hadoop 的 [Writable](http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/Writable.html) 接口的子类。另外,对于几种通用的 Writables,Spark 允许你指定原生类型来替代。例如: `sequenceFile[Int, String]` 将会自动读取 IntWritables 和 Text。
- 对于其他的 Hadoop InputFormats,你可以使用 `SparkContext.hadoopRDD` 方法,它可以指定任意的 `JobConf`,输入格式(InputFormat),key 类型,values 类型。你可以跟设置 Hadoop job 一样的方法设置输入源。你还可以在新的 MapReduce 接口(org.apache.hadoop.mapreduce)基础上使用 `SparkContext.newAPIHadoopRDD`(译者注:老的接口是 `SparkContext.newHadoopRDD`)。
- `RDD.saveAsObjectFile` 和 `SparkContext.objectFile` 支持保存一个RDD,保存格式是一个简单的 Java 对象序列化格式。这是一种效率不高的专有格式,如 Avro,它提供了简单的方法来保存任何一个 RDD。
## 创建方式举例
**1. 由外部存储系统的数据集创建,包括本地的文件系统,还有所有 `Hadoop` 支持的数据集,比如 `HDFS、Cassandra、HBase` 等:**
```
val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")
```
**2. 通过已有的 RDD 经过算子转换生成新的 RDD:**
```
val rdd2=rdd1.flatMap(_.split(" "))
```
**3. 由一个已经存在的 Scala 集合创建:**
```
val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
或者
val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))
```
`makeRDD` 方法底层调用了 `parallelize` 方法:
![](https://img.kancloud.cn/bc/9d/bc9dd3627d18cb8f61355c2d7cad72c3_1325x225.png)
**4. 还可以对接其他的中间件,比如:kafka**
参考: [大数据系列之Spark Streaming接入Kafka数据_pyspark.streaming ssc.sockettextstream 接受kafka-CSDN博客](https://blog.csdn.net/solihawk/article/details/116479840)
- Introduction
- 快速上手
- Spark Shell
- 独立应用程序
- 开始翻滚吧!
- RDD编程基础
- 基础介绍
- 外部数据集
- RDD 操作
- 转换Transformations
- map与flatMap解析
- 动作Actions
- RDD持久化
- RDD容错机制
- 传递函数到 Spark
- 使用键值对
- RDD依赖关系与DAG
- 共享变量
- Spark Streaming
- 一个快速的例子
- 基本概念
- 关联
- 初始化StreamingContext
- 离散流
- 输入DStreams
- DStream中的转换
- DStream的输出操作
- 缓存或持久化
- Checkpointing
- 部署应用程序
- 监控应用程序
- 性能调优
- 减少批数据的执行时间
- 设置正确的批容量
- 内存调优
- 容错语义
- Spark SQL
- 概述
- SparkSQLvsHiveSQL
- 数据源
- RDDs
- parquet文件
- JSON数据集
- Hive表
- 数据源例子
- join操作
- 聚合操作
- 性能调优
- 其他
- Spark SQL数据类型
- 其它SQL接口
- 编写语言集成(Language-Integrated)的相关查询
- GraphX编程指南
- 开始
- 属性图
- 图操作符
- Pregel API
- 图构造者
- 部署
- 顶点和边RDDs
- 图算法
- 例子
- 更多文档
- 提交应用程序
- 独立运行Spark
- 在yarn上运行Spark
- Spark配置
- RDD 持久化