### 3.4 Spark SQL 应用
#### 3.4.1 创建 DataFrame/DataSet
**方式一:读取本地文件**
**① 在本地创建一个文件,有 id、name、age 三列,用空格分隔,然后上传到 hdfs 上。**
```
vim /root/person.txt
```
内容如下:
```
1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 tianqi 35
6 kobe 40
```
**② 打开 spark-shell**
```
spark/bin/spark-shell
##创建 RDD
val lineRDD= sc.textFile("hdfs://node1:8020/person.txt").map(_.split(" ")) //RDD[Array[String]]
```
**③ 定义 case class(相当于表的 schema)**
```
case class Person(id:Int, name:String, age:Int)
```
**④ 将 RDD 和 case class 关联**
```
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //RDD[Person]
```
**⑤ 将 RDD 转换成 DataFrame**
```
val personDF = personRDD.toDF //DataFrame
```
**⑥ 查看数据和 schema**
```
personDF.show
```
**⑦ 注册表**
```
personDF.createOrReplaceTempView("t_person")
```
**⑧ 执行 SQL**
```
spark.sql("select id,name from t_person where id > 3").show
```
**⑨ 也可以通过 SparkSession 构建 DataFrame**
```
val dataFrame=spark.read.text("hdfs://node1:8020/person.txt")
dataFrame.show //注意:直接读取的文本文件没有完整schema信息
dataFrame.printSchema
```
**方式二:读取 json 文件**
```
val jsonDF= spark.read.json("file:///resources/people.json")
```
接下来就可以使用 `DataFrame` 的函数操作
```
jsonDF.show
```
注意:直接读取 `json` 文件有`schema` 信息,因为`json`文件本身含有`Schema`信息,`SparkSQL` 可以自动解析。
**方式三:读取 parquet 文件**
```
val parquetDF=spark.read.parquet("file:///resources/users.parquet")
```
接下来就可以使用 `DataFrame` 的函数操作
```
parquetDF.show
```
注意:直接读取 `parquet` 文件有 `schema` 信息,因为 `parquet` 文件中保存了列的信息。
#### 3.4.2 两种查询风格:DSL 和 SQL
DSL 风格示例:
```
personDF.select(personDF.col("name")).show
personDF.select(personDF("name")).show
personDF.select(col("name")).show
personDF.select("name").show
```
SQL 风格示例:
```
spark.sql("select * from t_person").show
```
**总结**:
- `DataFrame` 和 `DataSet` 都可以通过`RDD`来进行创建;
- 也可以通过读取普通文本创建–注意: 直接读取没有完整的约束,需要通过 `RDD`+`Schema`;
- 通过 `josn/parquet` 会有完整的约束;
- 不管是 `DataFrame` 还是 `DataSet` 都可以注册成表,之后就可以使用 `SQL` 进行查询了! 也可以使用 `DSL`!
#### 3.4.3 Spark SQL 面向多种数据源
读取 json 文件:
```
spark.read.json("D:\\data\\output\\json").show()
```
读取 csv 文件:
```
spark.read.csv("D:\\data\\output\\csv").toDF("id","name","age").show()
```
读取 parquet 文件:
```
spark.read.parquet("D:\\data\\output\\parquet").show()
```
读取 mysql 表:
```
val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","root")
spark.read.jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop).show()
```
写入 json 文件:
```
personDF.write.json("D:\\data\\output\\json")
```
写入 csv 文件:
```
personDF.write.csv("D:\\data\\output\\csv")
```
写入 parquet 文件:
```
personDF.write.parquet("D:\\data\\output\\parquet")
```
写入 mysql 表:
```
val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","root")
personDF.write.mode(SaveMode.Overwrite).jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)
```
- Introduction
- 快速上手
- Spark Shell
- 独立应用程序
- 开始翻滚吧!
- RDD编程基础
- 基础介绍
- 外部数据集
- RDD 操作
- 转换Transformations
- map与flatMap解析
- 动作Actions
- RDD持久化
- RDD容错机制
- 传递函数到 Spark
- 使用键值对
- RDD依赖关系与DAG
- 共享变量
- Spark Streaming
- 一个快速的例子
- 基本概念
- 关联
- 初始化StreamingContext
- 离散流
- 输入DStreams
- DStream中的转换
- DStream的输出操作
- 缓存或持久化
- Checkpointing
- 部署应用程序
- 监控应用程序
- 性能调优
- 减少批数据的执行时间
- 设置正确的批容量
- 内存调优
- 容错语义
- Spark SQL
- 概述
- SparkSQLvsHiveSQL
- 数据源
- RDDs
- parquet文件
- JSON数据集
- Hive表
- 数据源例子
- join操作
- 聚合操作
- 性能调优
- 其他
- Spark SQL数据类型
- 其它SQL接口
- 编写语言集成(Language-Integrated)的相关查询
- GraphX编程指南
- 开始
- 属性图
- 图操作符
- Pregel API
- 图构造者
- 部署
- 顶点和边RDDs
- 图算法
- 例子
- 更多文档
- 提交应用程序
- 独立运行Spark
- 在yarn上运行Spark
- Spark配置
- RDD 持久化