企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
使用准备好的特征集来训练模型,采用 Spark ML 的 Pipeline 来完成,算法使用的是随机森林树。训练后的模型保存到 HDFS 上,供后续预测使用。代码采用的是 PySpark(Python语言) 来编写,也可以使用Scala语言来实现。 <br/> **1. 使用PySpark实现** ```python from pyparsing import col from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.ml.feature import VectorAssembler from pyspark.ml.classification import RandomForestClassifier from pyspark.ml import Pipeline, PipelineModel def create_ml_model(spark): """ 创建机器学习模型 """ # 加载训练数据 df: DataFrame = spark.sql('select * from events.train_data').withColumnRenamed('user_interested', 'label') df.show(5) # +-----+----------+----------+-----------+-----------+-------------------+- # |label| user_id| event_id|user_locale|user_gender| user_age| ... # +-----+----------+----------+-----------+-----------+-------------------+- # | 0|1000293064|2633697384| 11| 0| 0.0425531914893617| # | 0|1000293064| 738328406| 11| 0| 0.0425531914893617| # | 0|1000293064|3642199129| 11| 0| 0.0425531914893617| ... # | 0|1000293064|3478765131| 11| 0| 0.0425531914893617| # | 1|1006838695|1896310801| 11| 0|0.02127659574468085| # +-----+----------+----------+-----------+-----------+-------------------+- # 将所有属性的类型转换为 Double 类型 df: DataFrame = df.select([col(c).cast(DoubleType()) for c in df.columns]) # 加载特征列,由于训练数据中有不是特征的列,这里需要去除 feature_columns: DataFrame = list(filter(lambda x: x != 'label' and x != 'event_id' and x != 'user_id', df.columns)) # transformer,VectorAssembler 将多列数据转化为单列的向量列 # handleInvalid="skip",遇到null值不处理,跳过 assembler: VectorAssembler = VectorAssembler(inputCols=feature_columns, outputCol="features", handleInvalid="skip") # 使用随机森林算法 rf: RandomForestClassifier = RandomForestClassifier(impurity='gini', maxDepth=9, numTrees=30, featureSubsetStrategy='auto', seed=21) # 建立 Pipeline pipeline: Pipeline = Pipeline(stages=[assembler, rf]) # 建立模型 model: PipelineModel = pipeline.fit(df) # 保存模型,这里使用外部传参的方式 model.write().overwrite().save('F:/ml/model/predicts') def writeCopy(df, tbl): """ 将处理后的数据保存到MySQL中 """ df.write.format('jdbc') \ .option('driver', 'com.mysql.jdbc.Driver') \ .option('url', 'jdbc:mysql://hadoop101:3306/projectpredict') \ .option('dbtable', tbl) \ .option('user', 'root') \ .option('password', '123456') \ .mode('overwrite') \ .save() def setup_bi_model(spark): """ 将对应的Hive表中的数据写到MySQL中 """ dfEventAttendees = spark.sql("select event_id, user_id, attend_type from events.event_attendee") writeCopy(dfEventAttendees, 'event_attendee') dfEventAttendeeCount = spark.sql("select event_id, attend_type, attend_count from events.event_attendee_count") writeCopy(dfEventAttendeeCount, 'event_attendee_count') dfEventCities = spark.sql("select city, level from events.event_cities") writeCopy(dfEventCities, 'event_cities') dfEventCountries = spark.sql("select country, level from events.event_countries") writeCopy(dfEventCountries, 'event_countries') dfEvents = spark.sql( "select event_id, start_time, city, state, zip, country, latitude, longitude, user_id from events.events") writeCopy(dfEvents, 'events') dfFriendAttendSummary = spark.sql("select user_id, event_id, invited_friends_count, attended_friends_count, not_attended_friends_count, \ maybe_attended_friends_count from events.friend_attend_summary") writeCopy(dfFriendAttendSummary, 'friend_attend_summary') dfLocale = spark.sql("select locale_id, locale from events.locale") writeCopy(dfLocale, 'locale') dfTrain = spark.sql("select user_id, event_id, invited, time_stamp, interested from events.train") writeCopy(dfTrain, 'train') dfUserAttendEventCount = spark.sql("select user_id, invited_count, \ attended_count, not_attended_count, maybe_attended_count from \ events.user_attend_event_count") writeCopy(dfUserAttendEventCount, 'user_attend_event_count') dfUserEventCount = spark.sql("select user_id, event_count from events.user_event_count") writeCopy(dfUserEventCount, 'user_event_count') dfUserFriend = spark.sql("select user_id, friend_id from events.user_friend") writeCopy(dfUserFriend, 'user_friend') dfUserFriendCount = spark.sql("select user_id, friend_count from events.user_friend_count") writeCopy(dfUserFriendCount, 'user_friend_count') dfUsers = spark.sql("select user_id, birth_year, gender, locale, location, \ time_zone, joined_at from events.users") writeCopy(dfUsers, 'users') if __name__ == "__main__": spark = SparkSession.builder.master("local[*]") \ .appName("pipelineDemo") \ .enableHiveSupport() \ .getOrCreate() try: create_ml_model(spark) # setup_bi_model(spark) except Exception as e: print(e) spark.stop() ``` **2. 使用Scala代码实现** ```scala import java.util.Properties import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types._ object BuildModel extends App { val spark: SparkSession = SparkSession.builder() .appName("PipelineDemo").master("local[*]") .enableHiveSupport() .getOrCreate() import spark.implicits._ import org.apache.spark.sql.functions._ //TODO 构建模型 def create_ml_model(): Unit = { //TODO 加载训练数据 val df1: DataFrame = spark.sql("select * from events.train_data") .withColumnRenamed("user_interested", "label") df1.show(5) // +-----+----------+----------+-----------+-----------+-------------------+- // |label| user_id| event_id|user_locale|user_gender| user_age| ... // +-----+----------+----------+-----------+-----------+-------------------+- // | 0|1000293064|2633697384| 11| 0| 0.0425531914893617| // | 0|1000293064| 738328406| 11| 0| 0.0425531914893617| // | 0|1000293064|3642199129| 11| 0| 0.0425531914893617| ... // | 0|1000293064|3478765131| 11| 0| 0.0425531914893617| // | 1|1006838695|1896310801| 11| 0|0.02127659574468085| // +-----+----------+----------+-----------+-----------+-------------------+- val colNames: Array[String] = df1.columns colNames.foreach(println) // label user_id event_id user_locale user_gender user_age ... var df2: DataFrame = df1 //TODO 将所有属性的类型转换为Double类型 for (colName <- colNames) { df2 = df2.withColumn(colName, col(colName).cast(DoubleType)) } df2.show(5) // +-----+-------------+-------------+-----------+-----------+-------------------+- // |label| user_id| event_id|user_locale|user_gender| user_age|... // +-----+-------------+-------------+-----------+-----------+-------------------+- // | 0.0|1.000293064E9|2.633697384E9| 11.0| 0.0| 0.0425531914893617| // | 0.0|1.000293064E9| 7.38328406E8| 11.0| 0.0| 0.0425531914893617| // | 0.0|1.000293064E9|3.642199129E9| 11.0| 0.0| 0.0425531914893617| ... // | 0.0|1.000293064E9|3.478765131E9| 11.0| 0.0| 0.0425531914893617| // | 1.0|1.006838695E9|1.896310801E9| 11.0| 0.0|0.02127659574468085| // +-----+-------------+-------------+-----------+-----------+-------------------+- df2.printSchema() // root // |-- label: double (nullable = true) // |-- user_id: double (nullable = true) // |-- event_id: double (nullable = true) // |-- user_locale: double (nullable = true) // |-- user_gender: double (nullable = true) // |-- user_age: double (nullable = true) // ... //TODO 加载特征列,由于训练数据中有不是特征的列,这里需要去除 val feature_columns: Array[String] = df2.columns.filter( x => (x != "label" && x != "event_id" && x != "user_id") ) println(feature_columns.mkString("\n")) // user_locale // user_gender // user_age // user_time_zone // user_member_days // user_friend_count // ... //TODO transformer,VectorAssembler将多列数据转化为单列的向量列 val assembler: VectorAssembler = new VectorAssembler() .setInputCols(feature_columns) .setOutputCol("features") .setHandleInvalid("skip") // 遇到null值则跳过 println(assembler.getInputCols.mkString("\n")) // user_locale // user_gender // user_age // user_time_zone // user_member_days // user_friend_count // ... println(assembler.getOutputCol) // features //TODO 调用随机森林算法 //纯度计算 "gini"/"entropy" val impurity = "gini" //树的最大层次 val maxDepath = 9 //树的个数 val numTrees = 30 //特征子集采样策略,auto 表示算法自主选取 //"auto"根据特征数量在4个中进行选择 // 1:all 全部特征 。2:sqrt 把特征数量开根号后随机选择的 。 3:log2 取对数个。 4:onethird 三分之一 val featureSubsetStrategy = "auto" val seed = 21 val rf = new RandomForestClassifier() // 随机森林算法 .setImpurity(impurity) .setMaxDepth(maxDepath) .setNumTrees(numTrees) .setFeatureSubsetStrategy(featureSubsetStrategy) .setSeed(seed) //TODO 建立pipeline val pipeline = new Pipeline() .setStages(Array(assembler, rf)) //TODO 构建模型,使用数据集 val model: PipelineModel = pipeline.fit(df2) //TODO 保存模型 model.write.overwrite().save("F:/mllib/model/projectTraining") } //TODO 写出数据 def setup_bi_model() = { //TODO load event-attendee val dfEventAttendees = spark.sql("select event_id, user_id, attend_type from events.event_attendee") // write writeCopy(dfEventAttendees, "event_attendee") //TODO load event attendee count val dfEventAttendeeCount = spark.sql("select event_id, attend_type, attend_count from events.event_attendee_count") // write writeCopy(dfEventAttendeeCount, "event_attendee_count") //TODO load event cities val dfEventCities = spark.sql("select city, level from events.event_cities") // write writeCopy(dfEventCities, "event_cities") //TODO load event countries val dfEventCountries = spark.sql("select country, level from events.event_countries") // write writeCopy(dfEventCountries, "event_countries") //TODO load events val dfEvents = spark.sql("select event_id, start_time, city, state, zip, country, latitude, longitude, user_id from events.events") // write writeCopy(dfEvents, "events") //TODO load friend attend summary val dfFriendAttendSummary = spark.sql("select user_id, event_id, invited_friends_count, attended_friends_count, not_attended_friends_count, maybe_attended_friends_count from events.friend_attend_summary") // write writeCopy(dfFriendAttendSummary, "friend_attend_summary") //TODO load locale val dfLocale = spark.sql("select locale_id, locale from events.locale") // write writeCopy(dfLocale, "locale") //TODO load train val dfTrain = spark.sql("select user_id, event_id, invited, time_stamp, interested from events.train") // write writeCopy(dfTrain, "train") //TODO load user attend event count val dfUserAttendEventCount = spark.sql("select user_id, invited_count, attended_count, not_attended_count, maybe_attended_count from events.user_attend_event_count") // write writeCopy(dfUserAttendEventCount, "user_attend_event_count") //TODO load user_event_count val dfUserEventCount = spark.sql("select user_id, event_count from events.user_event_count") // write writeCopy(dfUserEventCount, "user_event_count") //TODO load user_friend val dfUserFriend = spark.sql("select user_id, friend_id from events.user_friend") // write writeCopy(dfUserFriend, "user_friend") //TODO load user_friend_count val dfUserFriendCount = spark.sql("select user_id, friend_count from events.user_friend_count") // write writeCopy(dfUserFriendCount, "user_friend_count") //TODO load users val dfUsers = spark.sql("select user_id, birth_year, gender, locale, location, time_zone, joined_at from events.users") // write writeCopy(dfUsers, "users") } //TODO 将处理的数据写入到MySQL def writeCopy(df: DataFrame, tbl: String): Unit = { val url = "jdbc:mysql://hadoop101:3306/projectpredict" val prop = new Properties prop.setProperty("user", "root") prop.setProperty("password", "123456") prop.setProperty("driver", "com.mysql.jdbc.Driver") df.write.mode("overwrite").jdbc(url, tbl, prop) } create_ml_model() //setup_bi_model() spark.stop() } ```