
全国职业技能大赛(大数据技术赛项),任务B 离线数据处理 任务C 数据挖掘
2023年全国职业技能大赛(大数据技术赛项)第十套任务书
·
前言:
比赛的方式有很多很多种,努努力把东西写一写吧,这是我自己做的一个模拟的集群
先展示一波集群吧
这是现在已经装的内容
MySQL数据库
这是我自己一点一点更新的,嘿嘿制作不易哦
对如果你想做实时,我们有一个自己写的数据生成器,最重要的还是要学会方法,这样才能在比赛的时候随机应变哦,不管是搭建,离线,挖掘,实时还是可视化,都要多遇见错误,才能在比赛的时候从容应对。
第十套任务书,不废话直接上代码
前期准备
package org.xuejiujiu.spark
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import java.util.Properties
object packages {
def Hive():SparkSession={
val conf = new SparkConf().setAppName("TASK").setMaster("local").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("OFF")
// 写入1900-01-01 00:00:00的时间要加入此行代码
spark.conf.set("spark.sql.legacy.avro.datetimeRebaseModeInWrite", "CORRECTED")
spark
}
def MySQL():Properties={
val properties = new Properties()
properties.put("user","root")
properties.put("password","123456")
properties.put("driver","com.mysql.jdbc.Driver")
properties
}
def getUrl():String={
val url = "jdbc:mysql://bigdata1:3306/shtd_industry"
url
}
def Clickhouse() : Properties= {
val properties = new Properties()
properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
properties.put("batchsize", "100000")
properties.put("socket_timeout", "300000")
properties.put("rewriteBatchedStatements", "true")
properties
}
}
数据抽取的代码
package org.xuejiujiu.spark
import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import packages._
import org.apache.spark.sql.functions._
object task1 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val spark = Hive()
val properties = MySQL()
val table = Array("BaseMachine", "ChangeRecord", "EnvironmentData", "MachineData", "ProduceRecord")
table.foreach(tableName => {
spark.read.jdbc(getUrl(),tableName,properties).createTempView(s"a_$tableName")
spark.table(s"a_$tableName").show()
//显示当前数据表
val frame = spark.table(s"a_$tableName")
.withColumn("etldate", date_format(date_sub(current_timestamp(), 1), "yyyyMMdd").cast("string"))
frame.createTempView(s"b_$tableName")
if(tableName.equals("EnvironmentData")){
spark.table(s"b_$tableName")
.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD.key(), "InPutTime")
.option(RECORDKEY_FIELD.key(), "EnvoId")
.option(PARTITIONPATH_FIELD.key(), "etldate")
.option(TBL_NAME.key(), "environmentdata")
.mode("overwrite")
.save("hdfs://bigdata1:8020/user/hive/warehouse/hudi_gy_ods.db/environmentdata")
println("----- 写入成功 -----")
}else if(tableName.equals("ChangeRecord")){
spark.table(s"b_$tableName")
.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD.key(), "ChangeEndTime")
.option(RECORDKEY_FIELD.key(), "ChangeMachineID,ChangeID")
.option(PARTITIONPATH_FIELD.key(), "etldate")
.option(TBL_NAME.key(), "changerecord")
.mode("overwrite")
.save("hdfs://bigdata1:8020/user/hive/warehouse/hudi_gy_ods.db/changerecord")
println("----- 写入成功 -----")
}else if(tableName.equals("BaseMachine")){
spark.table(s"b_$tableName")
.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD.key(), "MachineAddDate")
.option(RECORDKEY_FIELD.key(), "BaseMachineID")
.option(PARTITIONPATH_FIELD.key(), "etldate")
.option(TBL_NAME.key(), "basemachine")
.mode("overwrite")
.save("hdfs://bigdata1:8020/user/hive/warehouse/hudi_gy_ods.db/basemachine")
println("----- 写入成功 -----")
}else if(tableName.equals("ProduceRecord")) {
spark.table(s"b_$tableName").drop(col("ProducePrgCode"))
.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD.key(), "ProduceCodeEndTime")
// TODO 联合主键
.option(RECORDKEY_FIELD.key(), "ProduceRecordID,ProduceMachineID")
.option(PARTITIONPATH_FIELD.key(), "etldate")
.option(TBL_NAME.key(), "producerecord")
.mode("overwrite")
.save("hdfs://bigdata1:8020/user/hive/warehouse/hudi_gy_ods.db/producerecord")
println("----- 写入成功 -----")
}else if(tableName.equals("MachineData")) {
spark.table(s"b_$tableName")
.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD.key(), "MachineRecordDate")
.option(RECORDKEY_FIELD.key(), "MachineRecordID")
.option(PARTITIONPATH_FIELD.key(), "etldate")
.option(TBL_NAME.key(), "machinedata")
.mode("overwrite")
.save("hdfs://bigdata1:8020/user/hive/warehouse/hudi_gy_ods.db/machinedata")
println("----- 写入成功 -----")
}
})
}
}
数据清洗的代码
package org.xuejiujiu.spark
import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.spark.sql.functions._
import org.xuejiujiu.spark.packages.{Hive, MySQL}
object task2 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val spark = Hive()
val properties = MySQL()
val table = Array("basemachine", "changerecord", "environmentdata", "machinedata", "producerecord")
table.foreach(tableName => {
spark.read.format("hudi").load(s"hdfs://bigdata1:8020/user/hive/warehouse/hudi_gy_ods.db/$tableName").createOrReplaceTempView(s"a_$tableName")
spark.table(s"a_$tableName").show()
spark.table(s"a_$tableName")
.withColumn("dwd_insert_user",lit("user1"))
.withColumn("dwd_modify_user",lit("user1"))
.withColumn("dwd_insert_time",date_format(current_date(),"yyyyMMdd HH:mm:ss"))
.withColumn("dwd_modify_time",date_format(current_date(),"yyyyMMdd HH:mm:ss"))
.createOrReplaceTempView(s"b_$tableName")
if(tableName.equals("environmentdata")) {
spark.table(s"b_$tableName")
.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD.key(), "InPutTime")
.option(RECORDKEY_FIELD.key(), "EnvoId")
.option(PARTITIONPATH_FIELD.key(), "etldate")
.option(TBL_NAME.key(), "fact_environment_data")
.mode("overwrite")
.save("hdfs://bigdata1:8020/user/hive/warehouse/hudi_gy_dwd.db/fact_environment_data")
println("----- 写入成功 -----")
}else if(tableName.equals("changerecord")){
spark.table(s"b_$tableName")
.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD.key(), "ChangeEndTime")
.option(RECORDKEY_FIELD.key(), "ChangeMachineID,ChangeID")
.option(PARTITIONPATH_FIELD.key(), "etldate")
.option(TBL_NAME.key(), "fact_change_record")
.mode("overwrite")
.save("hdfs://bigdata1:8020/user/hive/warehouse/hudi_gy_dwd.db/fact_change_record")
println("----- 写入成功 -----")
}else if(tableName.equals("basemachine")){
spark.table(s"b_$tableName")
.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD.key(), "MachineAddDate")
.option(RECORDKEY_FIELD.key(), "BaseMachineID")
.option(PARTITIONPATH_FIELD.key(), "etldate")
.option(TBL_NAME.key(), "dim_machine")
.mode("overwrite")
.save("hdfs://bigdata1:8020/user/hive/warehouse/hudi_gy_dwd.db/dim_machine")
println("----- 写入成功 -----")
}else if(tableName.equals("producerecord")) {
spark.table(s"b_$tableName")
.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD.key(), "ProduceCodeEndTime")
// TODO 联合主键
.option(RECORDKEY_FIELD.key(), "ProduceRecordID,ProduceMachineID")
.option(PARTITIONPATH_FIELD.key(), "etldate")
.option(TBL_NAME.key(), "fact_produce_record")
.mode("overwrite")
.save("hdfs://bigdata1:8020/user/hive/warehouse/hudi_gy_dwd.db/fact_produce_record")
println("----- 写入成功 -----")
}else if(tableName.equals("machinedata")) {
spark.table(s"b_$tableName")
.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD.key(), "MachineRecordDate")
.option(RECORDKEY_FIELD.key(), "MachineRecordID")
.option(PARTITIONPATH_FIELD.key(), "etldate")
.option(TBL_NAME.key(), "fact_machine_data")
.mode("overwrite")
.save("hdfs://bigdata1:8020/user/hive/warehouse/hudi_gy_dwd.db/fact_machine_data")
println("----- 写入成功 -----")
}
})
}
}
指标计算的代码
(我直接写到一个里面了,没有分开写,没有用小海豚调度器,这个自己学学就可以,很简单)
package org.xuejiujiu.spark
import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs
import org.apache.hudi.config.HoodieWriteConfig.{CLIENT_HEARTBEAT_INTERVAL_IN_MS, TBL_NAME}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions
import org.apache.spark.sql.functions._
import org.xuejiujiu.spark.packages.{Clickhouse, Hive, MySQL}
object task3 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val spark = Hive()
val properties = Clickhouse()
val table = Array("fact_environment_data", "fact_change_record", "dim_machine", "fact_produce_record", "fact_machine_data")
table.foreach(tableName => {
spark.read.format("hudi").load(s"hdfs://bigdata1:8020/user/hive/warehouse/hudi_gy_dwd.db/$tableName").createOrReplaceTempView(s"$tableName")
spark.table(s"$tableName").show()
})
val task1 = spark.sql(
"""
|select
|b.BaseMachineID as machine_id,
|date_format(a.ChangeStartTime, 'yyyy-MM-dd') AS machine_record_date,
|unix_timestamp(a.ChangeEndTime) - unix_timestamp(a.ChangeStartTime) as total_time
|from fact_change_record a
|join dim_machine b on a.ChangeMachineID = b.BaseMachineID
|where a.ChangeEndTime is not null and a.ChangeRecordState = '运行'
|""".stripMargin).distinct().withColumn("uuid", functions.expr("uuid()"))
task1.show()
task1.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD.key(), "total_time")
.option(RECORDKEY_FIELD.key(), "uuid")
.option(PARTITIONPATH_FIELD.key(), "machine_record_date")
.option(TBL_NAME.key(), "machine_data_total_time")
.mode("overwrite")
.save("hdfs://bigdata1:8020/user/hive/warehouse/hudi_gy_dws.db/machine_data_total_time")
println("----- 写入成功 -----")
val fact_produce_record = spark.table("fact_produce_record")
fact_produce_record.show()
val frame = fact_produce_record
.dropDuplicates(Seq("ProduceRecordID", "ProduceMachineID"))
.filter("ProduceCodeEndTime <> '1900-01-01 00:00:00'")
frame.show()
frame.createOrReplaceTempView("tmp")
val task2 = spark.sql(
"""
|select
|ProduceRecordID as produce_record_id,
|ProduceMachineID as produce_machine_id,
|unix_timestamp(ProduceCodeEndTime) - unix_timestamp(ProduceCodeStartTime) as producetime,
|CAST(AVG(UNIX_TIMESTAMP(ProduceCodeEndTime) - UNIX_TIMESTAMP(ProduceCodeStartTime)) OVER () AS INT) AS produce_per_avgtime
|from tmp
|""".stripMargin).distinct()
task2.show()
task2.write
.mode("overwrite")
.jdbc("jdbc:clickhouse://bigdata1:8123/shtd_industry", "machine_produce_per_avgtime", properties)
// 读取数据
val inputDF = spark.read.format("hudi")
.load("hdfs://bigdata1:8020/user/hive/warehouse/hudi_gy_dws.db/machine_data_total_time")
// 定义窗口规范
val windowSpec = Window.partitionBy("machine_record_date").orderBy(desc("total_time"))
// 添加排名列
val rankedDF = inputDF.withColumn("rank", dense_rank().over(windowSpec))
// 保留排名前三的设备,包括并列情况
val resultDF = rankedDF
.filter("rank <= 3")
.groupBy("machine_record_date")
.agg(
collect_list(struct("machine_id", "total_time", "rank")).as("machines")
)
.selectExpr(
"machine_record_date",
"machines[0].machine_id as first_id",
"machines[0].total_time as first_time",
"machines[1].machine_id as second_id",
"machines[1].total_time as second_time",
"machines[2].machine_id as tertiary_id",
"machines[2].total_time as tertiary_time"
)
resultDF.write
.mode("overwrite")
.jdbc("jdbc:clickhouse://bigdata1:8123/shtd_industry", "machine_data_total_time_top3", properties)
}
}
数据挖掘
第一块
package org.xuejiujiu.spark.dataMining
import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.spark.sql.functions.{col, current_timestamp, date_format, first, lit, when}
import org.xuejiujiu.spark.packages.{Hive, MySQL, getUrl}
object task1 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val spark = Hive()
val properties = MySQL()
val table = Array("BaseMachine", "ChangeRecord", "EnvironmentData", "MachineData", "ProduceRecord")
// TODO 拿取数据
table.foreach(tableName => {
spark.read.jdbc(getUrl(), tableName, properties).createTempView(s"$tableName")
})
val df = spark.table("MachineData").selectExpr("MachineRecordData","MachineRecordID")
.filter(col("MachineRecordData").isNotNull)
.filter(col("MachineRecordID").isNotNull)
df.show(false)
// 使用正则表达式提取ColName和对应的<col></col>信息
val colPattern = """<col ColName="([^"]*)">([^<]*)</col>""".r
import spark.implicits._
// 提取并处理数据
val extractedDF = df.as[(String, Int)].flatMap { case (machineRecordData, machineRecordID) =>
colPattern.findAllMatchIn(machineRecordData)
.map { m =>
val colName = m.group(1)
val colValue = if (m.group(2) == "null") "0.0" else m.group(2) // 替换为默认值 "0.0"
(machineRecordID, colName, colValue)
}
}.toDF("MachineRecordID", "ColName", "ColValue")
// 合并相同的 MachineRecordID
val mergedDF = extractedDF.groupBy("MachineRecordID")
.pivot("ColName")
.agg(first("ColValue"))
.na.fill("0.0") // 将空值填充为 "0.0"
// 将 machine_record_state 字段值为 "报警" 的转换为 1,否则为 0
val transformedDF = mergedDF.withColumn("State",
when(col("State") === "报警", 1).otherwise(0)
)
transformedDF.show(false)
val frame = transformedDF.withColumnRenamed("State", "machine_record_state")
.withColumnRenamed("MachineID", "machine_id")
.withColumnRenamed("MachineRecordID", "machine_record_id")
.withColumnRenamed("主轴转速", "machine_record_mainshaft_speed")
.withColumnRenamed("主轴倍率", "machine_record_mainshaft_multiplerate")
.withColumnRenamed("主轴负载", "machine_record_mainshaft_load")
.withColumnRenamed("进给倍率", "machine_record_feed_speed")
.withColumnRenamed("进给速度", "machine_record_feed_multiplerate")
.withColumnRenamed("PMC程序号", "machine_record_pmc_code")
.withColumnRenamed("循环时间", "machine_record_circle_time")
.withColumnRenamed("运行时间", "machine_record_run_time")
.withColumnRenamed("有效轴数", "machine_record_effective_shaft")
.withColumnRenamed("总加工个数", "machine_record_amount_process")
.withColumnRenamed("已使用内存", "machine_record_use_memory")
.withColumnRenamed("未使用内存", "machine_record_free_memory")
.withColumnRenamed("可用程序量", "machine_record_amount_use_code")
.withColumnRenamed("注册程序量", "machine_record_amount_free_code")
.withColumn("machine_record_date", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
.withColumn("dwd_insert_user",lit("user1"))
.withColumn("dwd_modify_user",lit("user1"))
.withColumn("dwd_insert_time",date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
.withColumn("dwd_modify_time",date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
// 选择数据存入
.select(
"machine_record_id",
"machine_id",
"machine_record_state",
"machine_record_mainshaft_speed",
"machine_record_mainshaft_multiplerate",
"machine_record_mainshaft_load",
"machine_record_feed_speed",
"machine_record_feed_multiplerate",
"machine_record_pmc_code",
"machine_record_circle_time",
"machine_record_run_time",
"machine_record_effective_shaft",
"machine_record_amount_process",
"machine_record_use_memory",
"machine_record_free_memory",
"machine_record_amount_use_code",
"machine_record_amount_free_code",
"machine_record_date",
"dwd_insert_user",
"dwd_modify_user",
"dwd_insert_time",
"dwd_modify_time"
)
frame.show()
frame.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD.key(), "machine_record_date")
.option(RECORDKEY_FIELD.key(), "machine_record_id")
.option(PARTITIONPATH_FIELD.key(), "machine_id")
.option(TBL_NAME.key(), "fact_machine_learning_data")
.mode("overwrite")
.save("hdfs://bigdata1:8020/user/hive/warehouse/hudi_gy_dwd.db/fact_machine_learning_data")
val updatedDF = frame.withColumn("machine_record_state", lit(""))
// TODO 这里写报警预测的测试数据
updatedDF.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD.key(), "machine_record_date")
.option(RECORDKEY_FIELD.key(), "machine_record_id")
.option(PARTITIONPATH_FIELD.key(), "machine_id")
.option(TBL_NAME.key(), "fact_machine_learning_data_test")
.mode("overwrite")
.save("hdfs://bigdata1:8020/user/hive/warehouse/hudi_gy_dwd.db/fact_machine_learning_data_test")
spark.stop()
}
}
第二块
package org.xuejiujiu.spark.dataMining
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, when}
import org.apache.spark.sql.types.DoubleType
import org.xuejiujiu.spark.packages._
object RandomForestModel {
def main(args: Array[String]): Unit = {
val spark = Hive()
val properties = MySQL()
// 加载训练数据
val data = spark.read.format("hudi")
.load("hdfs://bigdata1:8020/user/hive/warehouse/hudi_gy_dwd.db/fact_machine_learning_data")
.withColumn("machine_id", col("machine_id").cast(DoubleType))
.withColumn("machine_record_state", col("machine_record_state").cast(DoubleType))
.withColumn("machine_record_mainshaft_speed", col("machine_record_mainshaft_speed").cast(DoubleType))
.withColumn("machine_record_mainshaft_multiplerate", col("machine_record_mainshaft_multiplerate").cast(DoubleType))
.withColumn("machine_record_mainshaft_load", col("machine_record_mainshaft_load").cast(DoubleType))
.withColumn("machine_record_feed_speed", col("machine_record_feed_speed").cast(DoubleType))
.withColumn("machine_record_feed_multiplerate", col("machine_record_feed_multiplerate").cast(DoubleType))
.withColumn("machine_record_pmc_code", col("machine_record_pmc_code").cast(DoubleType))
.withColumn("machine_record_circle_time", col("machine_record_circle_time").cast(DoubleType))
.withColumn("machine_record_run_time", col("machine_record_run_time").cast(DoubleType))
.withColumn("machine_record_effective_shaft", col("machine_record_effective_shaft").cast(DoubleType))
.withColumn("machine_record_amount_process", col("machine_record_amount_process").cast(DoubleType))
.withColumn("machine_record_use_memory", col("machine_record_use_memory").cast(DoubleType))
.withColumn("machine_record_free_memory", col("machine_record_free_memory").cast(DoubleType))
.withColumn("machine_record_amount_use_code", col("machine_record_amount_use_code").cast(DoubleType))
.withColumn("machine_record_amount_free_code", col("machine_record_amount_free_code").cast(DoubleType))
.select(
"machine_record_id",
"machine_id",
"machine_record_state",
"machine_record_mainshaft_speed",
"machine_record_mainshaft_multiplerate",
"machine_record_mainshaft_load",
"machine_record_feed_speed",
"machine_record_feed_multiplerate",
"machine_record_pmc_code",
"machine_record_circle_time",
"machine_record_run_time",
"machine_record_effective_shaft",
"machine_record_amount_process",
"machine_record_use_memory",
"machine_record_free_memory",
"machine_record_amount_use_code",
"machine_record_amount_free_code"
)
// 处理缺失值
val filledData = data.na.fill(0.0)
// 将特征转换成特征向量
val featureCols = filledData.columns.filter(_ != "machine_record_state")
val assembler = new VectorAssembler()
.setInputCols(featureCols)
.setOutputCol("features")
val assembledData = assembler.transform(filledData)
// 拆分数据集
val Array(trainingData, testData) = assembledData.randomSplit(Array(0.7, 0.3))
// 训练随机森林模型
val rf = new RandomForestClassifier()
.setLabelCol("machine_record_state")
.setFeaturesCol("features")
val model = rf.fit(trainingData)
// 在测试集上评估模型性能
val predictions = model.transform(testData)
val evaluator = new BinaryClassificationEvaluator()
.setLabelCol("machine_record_state")
.setRawPredictionCol("prediction")
.setMetricName("areaUnderROC")
val auc = evaluator.evaluate(predictions)
println(s"Area under ROC curve (test set): $auc")
// 加载测试数据
val data2 = spark.read.format("hudi")
.load("hdfs://bigdata1:8020/user/hive/warehouse/hudi_gy_dwd.db/fact_machine_learning_data_test")
.withColumn("machine_id", col("machine_id").cast(DoubleType))
.withColumn("machine_record_state", col("machine_record_state").cast(DoubleType))
.withColumn("machine_record_mainshaft_speed", col("machine_record_mainshaft_speed").cast(DoubleType))
.withColumn("machine_record_mainshaft_multiplerate", col("machine_record_mainshaft_multiplerate").cast(DoubleType))
.withColumn("machine_record_mainshaft_load", col("machine_record_mainshaft_load").cast(DoubleType))
.withColumn("machine_record_feed_speed", col("machine_record_feed_speed").cast(DoubleType))
.withColumn("machine_record_feed_multiplerate", col("machine_record_feed_multiplerate").cast(DoubleType))
.withColumn("machine_record_pmc_code", col("machine_record_pmc_code").cast(DoubleType))
.withColumn("machine_record_circle_time", col("machine_record_circle_time").cast(DoubleType))
.withColumn("machine_record_run_time", col("machine_record_run_time").cast(DoubleType))
.withColumn("machine_record_effective_shaft", col("machine_record_effective_shaft").cast(DoubleType))
.withColumn("machine_record_amount_process", col("machine_record_amount_process").cast(DoubleType))
.withColumn("machine_record_use_memory", col("machine_record_use_memory").cast(DoubleType))
.withColumn("machine_record_free_memory", col("machine_record_free_memory").cast(DoubleType))
.withColumn("machine_record_amount_use_code", col("machine_record_amount_use_code").cast(DoubleType))
.withColumn("machine_record_amount_free_code", col("machine_record_amount_free_code").cast(DoubleType))
.select(
"machine_record_id",
"machine_id",
"machine_record_state",
"machine_record_mainshaft_speed",
"machine_record_mainshaft_multiplerate",
"machine_record_mainshaft_load",
"machine_record_feed_speed",
"machine_record_feed_multiplerate",
"machine_record_pmc_code",
"machine_record_circle_time",
"machine_record_run_time",
"machine_record_effective_shaft",
"machine_record_amount_process",
"machine_record_use_memory",
"machine_record_free_memory",
"machine_record_amount_use_code",
"machine_record_amount_free_code"
)
// 将特征转换成特征向量
val assembledData2 = assembler.transform(data2)
// 预测测试数据
val predictions2 = model.transform(assembledData2)
// 显示预测结果
val frame = predictions2.select("machine_id", "prediction").withColumnRenamed("prediction", "machine_record_state")
frame.show()
frame.distinct().write.mode("overwrite").jdbc(getUrl(),"ml_result",properties)
}
}
更多推荐
所有评论(0)