企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
Scala中在`class`关键字前加上`case`关键字,这个类就成为了样例类,样例类和普通类区别: * 不需要new可以直接生成对象。 * 默认实现序列化接口。 * 默认自动覆盖 toString()、equals()、hashCode() 。 **1. 直接通过case class创建Dataset** ```scala import org.apache.spark.SparkContext import org.apache.spark.sql.{Dataset, SparkSession} object CreateDataSetByCaseClass { case class Point(label:String, x:Double, y:Double) case class Category(id:Long, name:String) def main(args: Array[String]): Unit = { val spark:SparkSession = SparkSession.builder() .master("local[4]") .appName(this.getClass.getName) .getOrCreate() val sc:SparkContext = spark.sparkContext import spark.implicits._ // 通过Point创建一个Dataset val points:Dataset[Point] = Seq(Point("bar", 2.6, 3.5), Point("foo", 4.0, 3.7)).toDS() // 通过Category创建一个Dataset val categories:Dataset[Category] = Seq(Category(1, "bar"), Category(2, "foo")).toDS() // 进行join连接 val joins = points.join(categories, points("label") === categories("name")) points.show() // +-----+---+---+ // |label| x| y| // +-----+---+---+ // | bar|2.6|3.5| // | foo|4.0|3.7| // +-----+---+---+ categories.show() // +---+----+ // | id|name| // +---+----+ // | 1| bar| // | 2| foo| // +---+----+ joins.show() // +-----+---+---+---+----+ // |label| x| y| id|name| // +-----+---+---+---+----+ // | bar|2.6|3.5| 1| bar| // | foo|4.0|3.7| 2| foo| // +-----+---+---+---+----+ } } ``` <br/> **2. 在开发中的常用写法是先创建RDD,然后RDD与case class进行关联来创建Dataset.** ```scala import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, SparkSession} object CreateDataSetByCaseClass { case class Point(label:String, x:Double, y:Double) case class Category(id:Long, name:String) def main(args: Array[String]): Unit = { val spark:SparkSession = SparkSession.builder() .master("local[4]") .appName(this.getClass.getName) .getOrCreate() val sc:SparkContext = spark.sparkContext import spark.implicits._ // 先创建RDD val pointsRdd:RDD[(String, Double, Double)] = sc.parallelize(List(("bar",2.6,3.5),("foo",4.0,3.7))) val categoriesRdd: RDD[(Int, String)] = sc.parallelize(List((1,"bar"),(2,"foo"))) // 两个RDD和样例类进行关联 val pointsDs:Dataset[Point] = pointsRdd.map(x=>Point(x._1, x._2, x._3)).toDS() val categoriesDs:Dataset[Category] = categoriesRdd.map(x=>Category(x._1, x._2)).toDS() // 进行join val joinDs = pointsDs.join(categoriesDs, pointsDs("label") === categoriesDs("name")) pointsDs.show() categoriesDs.show() joinDs.show() } } ```