企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
**1. json格式文件** (1)示例数据`people.json` ```json {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} ``` (2)示例代码 ```scala import org.apache.spark.SparkContext import org.apache.spark.sql.{DataFrame, SparkSession} object CreateDataFrame { def main(args: Array[String]): Unit = { val spark:SparkSession = SparkSession.builder() .master("local[4]") .appName(this.getClass.getName) .getOrCreate() val sc:SparkContext = spark.sparkContext import spark.implicits._ val df:DataFrame = spark.read.json("file:///E:\\hadoop\\input\\people.json") // 或者使用下面这个方法创建DataFrame也是一样的 // spark.read.format("json").load("file:///E:\\hadoop\\input\\people.json") df.printSchema() // 打印DataFrame的数据结构(Schema)信息 // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // 和SQL的select一样, 对数据个别字段进行提取 df.select("name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // 还可以对某一个字段进行简单运算 df.select(df("name"), df("age")+1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // 和Sql的where一样, 对数据进行过滤 df.filter(df("age")>21).show() // 与下面的where方法是一样的效果 df.where(df("age")>21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // groupBy, 对数据聚合, 可以求最大值, 最小值, 数据条数, ... df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+ // 创建/覆盖视图 df.createOrReplaceTempView("people") // 普通sql查询语句,需要已经创建了people视图 spark.sql("select * from people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ } } ``` <br/> **2. csv格式文件** (1)示例数据 ```csv id|name|age 1|darren|18 2|anne|18 3|"test"|18 4|'test2'|18 ``` (2)示例代码 ```scala import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} object SparkDemo1 { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .master("local[*]") .appName(this.getClass.getName) .getOrCreate() import spark.implicits._ val result = spark.read.format("csv") .option("delimiter", "|") // 字段分隔符,默认为 ,” .option("header", "true") // 第一行作为 Schema,而非内容 .option("quote", "'") // 引号字符,默认为双引号 "" .option("nullValue", "\\N") // 指定一个字符串代表 null 值 .option("inferSchema", "true") // 自动推测字段类型 .load("file:///F:/spark/student.csv") result.show() // +---+------+---+ // | id| name|age| // +---+------+---+ // | 1|darren| 18| // | 2| anne| 18| // | 3|"test"| 18| // | 4| test2| 18| // +---+------+---+ result.printSchema() // root // |-- id: integer (nullable = true) // |-- name: string (nullable = true) // |-- age: integer (nullable = true) } } ```