企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
**1. Estimator、Transformer、Parameter使用示例** ```scala import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamMap import org.apache.spark.sql.{DataFrame, Row, SparkSession} object MLInstance1 { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[*]") .appName(this.getClass.getName) .getOrCreate() // 从(label,features)元组列表准备训练数据 val training:DataFrame = spark.createDataFrame(Seq( (1.0, Vectors.dense(0.0, 1.1, 0.1)), (0.0, Vectors.dense(2.0, 1.0, -1.0)), (0.0, Vectors.dense(2.0, 1.3, 1.0)), (1.0, Vectors.dense(0.0, 1.2, -0.5))) ).toDF("label", "features") training.show(5) // +-----+--------------+ // |label| features| // +-----+--------------+ // | 1.0| [0.0,1.1,0.1]| // | 0.0|[2.0,1.0,-1.0]| // | 0.0| [2.0,1.3,1.0]| // | 1.0|[0.0,1.2,-0.5]| // +-----+--------------+ // 创建一个逻辑回归(LogisticRegression)实例。 该实例是一个 Estimator。 val lr:LogisticRegression = new LogisticRegression() // 打印出参数,文档和任何默认值 println("LogisticRegression parameters:\n" + lr.explainParams() + "\n") // 我们可以使用 setter 方法设置参数 lr.setMaxIter(10).setRegParam(0.01) // 训练逻辑回归(LogisticRegression)模型。 这里使用存储在 lr 中的参数 val model1 = lr.fit(training) // 由于 model1 是模型(即 Estimator 产生的 Transformer) // 我们可以查看它在 fit()中使用的参数 // 这将打印参数(名称:值)对,其中名称是此参数的唯一 ID println("Model 1 was fit using parameters: " + model1.parent.extractParamMap) // 我们也可以使用 ParamMap 指定参数 // 支持多种用于指定参数的方法。 val paramMap = ParamMap(lr.maxIter -> 20) // 指定 1 个参数,这将覆盖原始的 maxIter .put(lr.maxIter, 30) // 指定多个参数 .put(lr.regParam -> 0.1, lr.threshold -> 0.55) // 也可以结合使用 ParamMaps val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") //更改输出列名称 val paramMapCombined = paramMap ++ paramMap2 // 现在使用 paramMapCombined 参数学习一个新模型 // paramMapCombined 会覆盖通过 lr.set *方法设置的所有参数 val model2 = lr.fit(training, paramMapCombined) println("Model 2 was fit using parameters: " + model2.parent.extractParamMap) // 准备测试数据 val test = spark.createDataFrame(Seq( (1.0, Vectors.dense(-1.0, 1.5, 1.3)), (0.0, Vectors.dense(3.0, 2.0, -0.1)), (1.0, Vectors.dense(0.0, 2.2, -1.5)))).toDF("label", "features") // 使用 Transformer.transform()方法对测试数据进行预测 // LogisticRegression.transform 将仅使用“feature”列 // 注意:model2.transform()输出的是“ myProbability”列,而不是'probability'列, // 因为我们之前已重命名了 lr.probabilityCol 参数。 model2.transform(test) .select("features", "label", "myProbability", "prediction") .collect() .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) => println(s"($features, $label) -> prob=$prob, prediction=$prediction") } } } ``` **2. Pipeline使用案例** ```scala import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.{DataFrame, Row, SparkSession} object PipelineInstance { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[*]") .appName(this.getClass.getName) .getOrCreate() // 从(id,text,label)元组列表准备测试文档 val training:DataFrame = spark.createDataFrame(Seq( (0L, "a b c d e spark", 1.0), (1L, "b d", 0.0), (2L, "spark f g h", 1.0), (3L, "hadoop mapreduce", 0.0))).toDF("id", "text", "label") training.show() // +---+----------------+-----+ // | id| text|label| // +---+----------------+-----+ // | 0| a b c d e spark| 1.0| // | 1| b d| 0.0| // | 2| spark f g h| 1.0| // | 3|hadoop mapreduce| 0.0| // +---+----------------+-----+ // 配置 ML 管道,该管道包括三个阶段:令牌生成器,hashingTF 和 lr val tokenizer:Tokenizer = new Tokenizer() .setInputCol("text") .setOutputCol("words") val hashingTF:HashingTF = new HashingTF() .setNumFeatures(1000) .setInputCol(tokenizer.getOutputCol) .setOutputCol("features") val lr:LogisticRegression = new LogisticRegression() .setMaxIter(10) .setRegParam(0.001) val pipeline:Pipeline = new Pipeline() .setStages(Array(tokenizer, hashingTF, lr)) // 使管道训练文档 val model:PipelineModel = pipeline.fit(training) // 现在,我们可以选择将已拟合的管道保存到磁盘 model.write.overwrite().save("F:/ml/tmp/spark-logistic-regression-model") // 我们也可以将此不合适的管道保存到磁盘 pipeline.write.overwrite().save("F:/ml/tmp/unfit-lr-model") // 在生产中将其加载回 val sameModel:PipelineModel = PipelineModel.load("F:/ml/tmp/spark-logistic-regression-model") // 准备没有标签(id,text)元组的测试文档 val test = spark.createDataFrame(Seq( (4L, "spark i j k"), (5L, "l m n"), (6L, "spark hadoop spark"), (7L, "apache hadoop"))).toDF("id", "text") // 对测试文档进行预测 model.transform(test) .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => println(s"($id, $text) --> prob=$prob, prediction=$prediction") } } } ```