合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
### 3.4 Spark SQL 应用 #### 3.4.1 创建 DataFrame/DataSet **方式一:读取本地文件** **① 在本地创建一个文件,有 id、name、age 三列,用空格分隔,然后上传到 hdfs 上。** ``` vim /root/person.txt ``` 内容如下: ``` 1 zhangsan 20 2 lisi 29 3 wangwu 25 4 zhaoliu 30 5 tianqi 35 6 kobe 40 ``` **② 打开 spark-shell** ``` spark/bin/spark-shell ##创建 RDD val lineRDD= sc.textFile("hdfs://node1:8020/person.txt").map(_.split(" ")) //RDD[Array[String]] ``` **③ 定义 case class(相当于表的 schema)** ``` case class Person(id:Int, name:String, age:Int) ``` **④ 将 RDD 和 case class 关联** ``` val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //RDD[Person] ``` **⑤ 将 RDD 转换成 DataFrame** ``` val personDF = personRDD.toDF //DataFrame ``` **⑥ 查看数据和 schema** ``` personDF.show ``` **⑦ 注册表** ``` personDF.createOrReplaceTempView("t_person") ``` **⑧ 执行 SQL** ``` spark.sql("select id,name from t_person where id > 3").show ``` **⑨ 也可以通过 SparkSession 构建 DataFrame** ``` val dataFrame=spark.read.text("hdfs://node1:8020/person.txt") dataFrame.show //注意:直接读取的文本文件没有完整schema信息 dataFrame.printSchema ``` **方式二:读取 json 文件** ``` val jsonDF= spark.read.json("file:///resources/people.json") ``` 接下来就可以使用 `DataFrame` 的函数操作 ``` jsonDF.show ``` 注意:直接读取 `json` 文件有`schema` 信息,因为`json`文件本身含有`Schema`信息,`SparkSQL` 可以自动解析。 **方式三:读取 parquet 文件** ``` val parquetDF=spark.read.parquet("file:///resources/users.parquet") ``` 接下来就可以使用 `DataFrame` 的函数操作 ``` parquetDF.show ``` 注意:直接读取 `parquet` 文件有 `schema` 信息,因为 `parquet` 文件中保存了列的信息。 #### 3.4.2 两种查询风格:DSL 和 SQL DSL 风格示例: ``` personDF.select(personDF.col("name")).show personDF.select(personDF("name")).show personDF.select(col("name")).show personDF.select("name").show ``` SQL 风格示例: ``` spark.sql("select * from t_person").show ``` **总结**: - `DataFrame` 和 `DataSet` 都可以通过`RDD`来进行创建; - 也可以通过读取普通文本创建–注意: 直接读取没有完整的约束,需要通过 `RDD`+`Schema`; - 通过 `josn/parquet` 会有完整的约束; - 不管是 `DataFrame` 还是 `DataSet` 都可以注册成表,之后就可以使用 `SQL` 进行查询了! 也可以使用 `DSL`! #### 3.4.3 Spark SQL 面向多种数据源 读取 json 文件: ``` spark.read.json("D:\\data\\output\\json").show() ``` 读取 csv 文件: ``` spark.read.csv("D:\\data\\output\\csv").toDF("id","name","age").show() ``` 读取 parquet 文件: ``` spark.read.parquet("D:\\data\\output\\parquet").show() ``` 读取 mysql 表: ``` val prop = new Properties() prop.setProperty("user","root") prop.setProperty("password","root") spark.read.jdbc( "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop).show() ``` 写入 json 文件: ``` personDF.write.json("D:\\data\\output\\json") ``` 写入 csv 文件: ``` personDF.write.csv("D:\\data\\output\\csv") ``` 写入 parquet 文件: ``` personDF.write.parquet("D:\\data\\output\\parquet") ``` 写入 mysql 表: ``` val prop = new Properties() prop.setProperty("user","root") prop.setProperty("password","root") personDF.write.mode(SaveMode.Overwrite).jdbc( "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop) ```