**1. Estimator、Transformer、Parameter使用示例**
```scala
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object MLInstance1 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]")
.appName(this.getClass.getName)
.getOrCreate()
// 从(label,features)元组列表准备训练数据
val training:DataFrame = spark.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5)))
).toDF("label", "features")
training.show(5)
// +-----+--------------+
// |label| features|
// +-----+--------------+
// | 1.0| [0.0,1.1,0.1]|
// | 0.0|[2.0,1.0,-1.0]|
// | 0.0| [2.0,1.3,1.0]|
// | 1.0|[0.0,1.2,-0.5]|
// +-----+--------------+
// 创建一个逻辑回归(LogisticRegression)实例。 该实例是一个 Estimator。
val lr:LogisticRegression = new LogisticRegression()
// 打印出参数,文档和任何默认值
println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
// 我们可以使用 setter 方法设置参数
lr.setMaxIter(10).setRegParam(0.01)
// 训练逻辑回归(LogisticRegression)模型。 这里使用存储在 lr 中的参数
val model1 = lr.fit(training)
// 由于 model1 是模型(即 Estimator 产生的 Transformer)
// 我们可以查看它在 fit()中使用的参数
// 这将打印参数(名称:值)对,其中名称是此参数的唯一 ID
println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)
// 我们也可以使用 ParamMap 指定参数
// 支持多种用于指定参数的方法。
val paramMap = ParamMap(lr.maxIter -> 20)
// 指定 1 个参数,这将覆盖原始的 maxIter
.put(lr.maxIter, 30)
// 指定多个参数
.put(lr.regParam -> 0.1, lr.threshold -> 0.55)
// 也可以结合使用 ParamMaps
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") //更改输出列名称
val paramMapCombined = paramMap ++ paramMap2
// 现在使用 paramMapCombined 参数学习一个新模型
// paramMapCombined 会覆盖通过 lr.set *方法设置的所有参数
val model2 = lr.fit(training, paramMapCombined)
println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)
// 准备测试数据
val test = spark.createDataFrame(Seq(
(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
(0.0, Vectors.dense(3.0, 2.0, -0.1)),
(1.0, Vectors.dense(0.0, 2.2, -1.5)))).toDF("label", "features")
// 使用 Transformer.transform()方法对测试数据进行预测
// LogisticRegression.transform 将仅使用“feature”列
// 注意:model2.transform()输出的是“ myProbability”列,而不是'probability'列,
// 因为我们之前已重命名了 lr.probabilityCol 参数。
model2.transform(test)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction:
Double) =>
println(s"($features, $label) -> prob=$prob, prediction=$prediction")
}
}
}
```
**2. Pipeline使用案例**
```scala
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object PipelineInstance {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]")
.appName(this.getClass.getName)
.getOrCreate()
// 从(id,text,label)元组列表准备测试文档
val training:DataFrame = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0))).toDF("id", "text", "label")
training.show()
// +---+----------------+-----+
// | id| text|label|
// +---+----------------+-----+
// | 0| a b c d e spark| 1.0|
// | 1| b d| 0.0|
// | 2| spark f g h| 1.0|
// | 3|hadoop mapreduce| 0.0|
// +---+----------------+-----+
// 配置 ML 管道,该管道包括三个阶段:令牌生成器,hashingTF 和 lr
val tokenizer:Tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF:HashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr:LogisticRegression = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
val pipeline:Pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// 使管道训练文档
val model:PipelineModel = pipeline.fit(training)
// 现在,我们可以选择将已拟合的管道保存到磁盘
model.write.overwrite().save("F:/ml/tmp/spark-logistic-regression-model")
// 我们也可以将此不合适的管道保存到磁盘
pipeline.write.overwrite().save("F:/ml/tmp/unfit-lr-model")
// 在生产中将其加载回
val sameModel:PipelineModel = PipelineModel.load("F:/ml/tmp/spark-logistic-regression-model")
// 准备没有标签(id,text)元组的测试文档
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop"))).toDF("id", "text")
// 对测试文档进行预测
model.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
}
}
```