使用准备好的特征集来训练模型,采用 Spark ML 的 Pipeline 来完成,算法使用的是随机森林树。训练后的模型保存到 HDFS 上,供后续预测使用。代码采用的是 PySpark(Python语言) 来编写,也可以使用Scala语言来实现。
<br/>
**1. 使用PySpark实现**
```python
from pyparsing import col
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline, PipelineModel
def create_ml_model(spark):
"""
创建机器学习模型
"""
# 加载训练数据
df: DataFrame = spark.sql('select * from events.train_data').withColumnRenamed('user_interested', 'label')
df.show(5)
# +-----+----------+----------+-----------+-----------+-------------------+-
# |label| user_id| event_id|user_locale|user_gender| user_age| ...
# +-----+----------+----------+-----------+-----------+-------------------+-
# | 0|1000293064|2633697384| 11| 0| 0.0425531914893617|
# | 0|1000293064| 738328406| 11| 0| 0.0425531914893617|
# | 0|1000293064|3642199129| 11| 0| 0.0425531914893617| ...
# | 0|1000293064|3478765131| 11| 0| 0.0425531914893617|
# | 1|1006838695|1896310801| 11| 0|0.02127659574468085|
# +-----+----------+----------+-----------+-----------+-------------------+-
# 将所有属性的类型转换为 Double 类型
df: DataFrame = df.select([col(c).cast(DoubleType()) for c in df.columns])
# 加载特征列,由于训练数据中有不是特征的列,这里需要去除
feature_columns: DataFrame = list(filter(lambda x: x != 'label' and x != 'event_id' and x != 'user_id', df.columns))
# transformer,VectorAssembler 将多列数据转化为单列的向量列
# handleInvalid="skip",遇到null值不处理,跳过
assembler: VectorAssembler = VectorAssembler(inputCols=feature_columns, outputCol="features", handleInvalid="skip")
# 使用随机森林算法
rf: RandomForestClassifier = RandomForestClassifier(impurity='gini', maxDepth=9, numTrees=30,
featureSubsetStrategy='auto', seed=21)
# 建立 Pipeline
pipeline: Pipeline = Pipeline(stages=[assembler, rf])
# 建立模型
model: PipelineModel = pipeline.fit(df)
# 保存模型,这里使用外部传参的方式
model.write().overwrite().save('F:/ml/model/predicts')
def writeCopy(df, tbl):
"""
将处理后的数据保存到MySQL中
"""
df.write.format('jdbc') \
.option('driver', 'com.mysql.jdbc.Driver') \
.option('url', 'jdbc:mysql://hadoop101:3306/projectpredict') \
.option('dbtable', tbl) \
.option('user', 'root') \
.option('password', '123456') \
.mode('overwrite') \
.save()
def setup_bi_model(spark):
"""
将对应的Hive表中的数据写到MySQL中
"""
dfEventAttendees = spark.sql("select event_id, user_id, attend_type from events.event_attendee")
writeCopy(dfEventAttendees, 'event_attendee')
dfEventAttendeeCount = spark.sql("select event_id, attend_type, attend_count from events.event_attendee_count")
writeCopy(dfEventAttendeeCount, 'event_attendee_count')
dfEventCities = spark.sql("select city, level from events.event_cities")
writeCopy(dfEventCities, 'event_cities')
dfEventCountries = spark.sql("select country, level from events.event_countries")
writeCopy(dfEventCountries, 'event_countries')
dfEvents = spark.sql(
"select event_id, start_time, city, state, zip, country, latitude, longitude, user_id from events.events")
writeCopy(dfEvents, 'events')
dfFriendAttendSummary = spark.sql("select user_id, event_id, invited_friends_count, attended_friends_count, not_attended_friends_count, \
maybe_attended_friends_count from events.friend_attend_summary")
writeCopy(dfFriendAttendSummary, 'friend_attend_summary')
dfLocale = spark.sql("select locale_id, locale from events.locale")
writeCopy(dfLocale, 'locale')
dfTrain = spark.sql("select user_id, event_id, invited, time_stamp, interested from events.train")
writeCopy(dfTrain, 'train')
dfUserAttendEventCount = spark.sql("select user_id, invited_count, \
attended_count, not_attended_count, maybe_attended_count from \
events.user_attend_event_count")
writeCopy(dfUserAttendEventCount, 'user_attend_event_count')
dfUserEventCount = spark.sql("select user_id, event_count from events.user_event_count")
writeCopy(dfUserEventCount, 'user_event_count')
dfUserFriend = spark.sql("select user_id, friend_id from events.user_friend")
writeCopy(dfUserFriend, 'user_friend')
dfUserFriendCount = spark.sql("select user_id, friend_count from events.user_friend_count")
writeCopy(dfUserFriendCount, 'user_friend_count')
dfUsers = spark.sql("select user_id, birth_year, gender, locale, location, \
time_zone, joined_at from events.users")
writeCopy(dfUsers, 'users')
if __name__ == "__main__":
spark = SparkSession.builder.master("local[*]") \
.appName("pipelineDemo") \
.enableHiveSupport() \
.getOrCreate()
try:
create_ml_model(spark)
# setup_bi_model(spark)
except Exception as e:
print(e)
spark.stop()
```
**2. 使用Scala代码实现**
```scala
import java.util.Properties
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._
object BuildModel extends App {
val spark: SparkSession = SparkSession.builder()
.appName("PipelineDemo").master("local[*]")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
//TODO 构建模型
def create_ml_model(): Unit = {
//TODO 加载训练数据
val df1: DataFrame = spark.sql("select * from events.train_data")
.withColumnRenamed("user_interested", "label")
df1.show(5)
// +-----+----------+----------+-----------+-----------+-------------------+-
// |label| user_id| event_id|user_locale|user_gender| user_age| ...
// +-----+----------+----------+-----------+-----------+-------------------+-
// | 0|1000293064|2633697384| 11| 0| 0.0425531914893617|
// | 0|1000293064| 738328406| 11| 0| 0.0425531914893617|
// | 0|1000293064|3642199129| 11| 0| 0.0425531914893617| ...
// | 0|1000293064|3478765131| 11| 0| 0.0425531914893617|
// | 1|1006838695|1896310801| 11| 0|0.02127659574468085|
// +-----+----------+----------+-----------+-----------+-------------------+-
val colNames: Array[String] = df1.columns
colNames.foreach(println) // label user_id event_id user_locale user_gender user_age ...
var df2: DataFrame = df1
//TODO 将所有属性的类型转换为Double类型
for (colName <- colNames) {
df2 = df2.withColumn(colName, col(colName).cast(DoubleType))
}
df2.show(5)
// +-----+-------------+-------------+-----------+-----------+-------------------+-
// |label| user_id| event_id|user_locale|user_gender| user_age|...
// +-----+-------------+-------------+-----------+-----------+-------------------+-
// | 0.0|1.000293064E9|2.633697384E9| 11.0| 0.0| 0.0425531914893617|
// | 0.0|1.000293064E9| 7.38328406E8| 11.0| 0.0| 0.0425531914893617|
// | 0.0|1.000293064E9|3.642199129E9| 11.0| 0.0| 0.0425531914893617| ...
// | 0.0|1.000293064E9|3.478765131E9| 11.0| 0.0| 0.0425531914893617|
// | 1.0|1.006838695E9|1.896310801E9| 11.0| 0.0|0.02127659574468085|
// +-----+-------------+-------------+-----------+-----------+-------------------+-
df2.printSchema()
// root
// |-- label: double (nullable = true)
// |-- user_id: double (nullable = true)
// |-- event_id: double (nullable = true)
// |-- user_locale: double (nullable = true)
// |-- user_gender: double (nullable = true)
// |-- user_age: double (nullable = true)
// ...
//TODO 加载特征列,由于训练数据中有不是特征的列,这里需要去除
val feature_columns: Array[String] = df2.columns.filter(
x => (x != "label" && x != "event_id" && x != "user_id")
)
println(feature_columns.mkString("\n"))
// user_locale
// user_gender
// user_age
// user_time_zone
// user_member_days
// user_friend_count
// ...
//TODO transformer,VectorAssembler将多列数据转化为单列的向量列
val assembler: VectorAssembler = new VectorAssembler()
.setInputCols(feature_columns)
.setOutputCol("features")
.setHandleInvalid("skip") // 遇到null值则跳过
println(assembler.getInputCols.mkString("\n"))
// user_locale
// user_gender
// user_age
// user_time_zone
// user_member_days
// user_friend_count
// ...
println(assembler.getOutputCol)
// features
//TODO 调用随机森林算法
//纯度计算 "gini"/"entropy"
val impurity = "gini"
//树的最大层次
val maxDepath = 9
//树的个数
val numTrees = 30
//特征子集采样策略,auto 表示算法自主选取
//"auto"根据特征数量在4个中进行选择
// 1:all 全部特征 。2:sqrt 把特征数量开根号后随机选择的 。 3:log2 取对数个。 4:onethird 三分之一
val featureSubsetStrategy = "auto"
val seed = 21
val rf = new RandomForestClassifier() // 随机森林算法
.setImpurity(impurity)
.setMaxDepth(maxDepath)
.setNumTrees(numTrees)
.setFeatureSubsetStrategy(featureSubsetStrategy)
.setSeed(seed)
//TODO 建立pipeline
val pipeline = new Pipeline()
.setStages(Array(assembler, rf))
//TODO 构建模型,使用数据集
val model: PipelineModel = pipeline.fit(df2)
//TODO 保存模型
model.write.overwrite().save("F:/mllib/model/projectTraining")
}
//TODO 写出数据
def setup_bi_model() = {
//TODO load event-attendee
val dfEventAttendees = spark.sql("select event_id, user_id, attend_type from events.event_attendee")
// write
writeCopy(dfEventAttendees, "event_attendee")
//TODO load event attendee count
val dfEventAttendeeCount = spark.sql("select event_id, attend_type, attend_count from events.event_attendee_count")
// write
writeCopy(dfEventAttendeeCount, "event_attendee_count")
//TODO load event cities
val dfEventCities = spark.sql("select city, level from events.event_cities")
// write
writeCopy(dfEventCities, "event_cities")
//TODO load event countries
val dfEventCountries = spark.sql("select country, level from events.event_countries")
// write
writeCopy(dfEventCountries, "event_countries")
//TODO load events
val dfEvents = spark.sql("select event_id, start_time, city, state, zip, country, latitude, longitude, user_id from events.events")
// write
writeCopy(dfEvents, "events")
//TODO load friend attend summary
val dfFriendAttendSummary = spark.sql("select user_id, event_id, invited_friends_count, attended_friends_count, not_attended_friends_count, maybe_attended_friends_count from events.friend_attend_summary")
// write
writeCopy(dfFriendAttendSummary, "friend_attend_summary")
//TODO load locale
val dfLocale = spark.sql("select locale_id, locale from events.locale")
// write
writeCopy(dfLocale, "locale")
//TODO load train
val dfTrain = spark.sql("select user_id, event_id, invited, time_stamp, interested from events.train")
// write
writeCopy(dfTrain, "train")
//TODO load user attend event count
val dfUserAttendEventCount = spark.sql("select user_id, invited_count, attended_count, not_attended_count, maybe_attended_count from events.user_attend_event_count")
// write
writeCopy(dfUserAttendEventCount, "user_attend_event_count")
//TODO load user_event_count
val dfUserEventCount = spark.sql("select user_id, event_count from events.user_event_count")
// write
writeCopy(dfUserEventCount, "user_event_count")
//TODO load user_friend
val dfUserFriend = spark.sql("select user_id, friend_id from events.user_friend")
// write
writeCopy(dfUserFriend, "user_friend")
//TODO load user_friend_count
val dfUserFriendCount = spark.sql("select user_id, friend_count from events.user_friend_count")
// write
writeCopy(dfUserFriendCount, "user_friend_count")
//TODO load users
val dfUsers = spark.sql("select user_id, birth_year, gender, locale, location, time_zone, joined_at from events.users")
// write
writeCopy(dfUsers, "users")
}
//TODO 将处理的数据写入到MySQL
def writeCopy(df: DataFrame, tbl: String): Unit = {
val url = "jdbc:mysql://hadoop101:3306/projectpredict"
val prop = new Properties
prop.setProperty("user", "root")
prop.setProperty("password", "123456")
prop.setProperty("driver", "com.mysql.jdbc.Driver")
df.write.mode("overwrite").jdbc(url, tbl, prop)
}
create_ml_model()
//setup_bi_model()
spark.stop()
}
```