
职业院校大数据技术——数据挖掘2
前面一篇文章反响还行,这里加更一个工业数据挖掘的样例。反响不错的话再抽空整理出离线数据处理和可视化的内容。
·
前言
前面一篇文章反响还行,这里加更一个工业数据挖掘的样例。反响不错的话再抽空整理出离线数据处理和可视化的内容。欢迎私聊催更。本文代码只代表本人理解的需求完成,如有错误,请指出。
坑
工业示例数据可能没有根标签得自己加,不然无法解析
- 根标签就是指一对将所有xml标签元素都包裹的标签。下面的hh就是根标签。
<hh> .... </hh>
solution
我这里做了一点处理:
- 拿了一部分样例数据移到了本地跑。
- 训练数据和测试数据通过样例数据切分获得。
package Bgy
import Bds.DataFrameUtils._
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.dom4j._
import scala.collection.JavaConverters._
object dataMining {
// 下面两行都是封装的一些内容,在DataFrameUtils,没什么好说的,可以去看数据挖掘1
val spark = getSparkSession("dataMining")
val mySQLConfig = getMySQLCf("shtd_industry","","bigdata1")
spark.sparkContext.setLogLevel("OFF")
// 把需要处理的字段都整理
def addColumnMap(): Array[(String,String)] ={
val keys: Array[String] = "machine_record_mainshaft_speed\nmachine_record_mainshaft_multiplerate\nmachine_record_mainshaft_load\nmachine_record_feed_speed\nmachine_record_feed_multiplerate\nmachine_record_pmc_code\nmachine_record_circle_time\nmachine_record_run_time\nmachine_record_effective_shaft\nmachine_record_amount_process\nmachine_record_use_memory\nmachine_record_free_memory\nmachine_record_amount_use_code\nmachine_record_amount_free_code".split("\n")
val values: Array[String] = "主轴转速\n主轴倍率\n主轴负载\n进给倍率\n进给速度\nPMC程序号\n循环时间\n运行时间\n有效轴数\n总加工个数\n已使用内存\n未使用内存\n可用程序量\n注册程序量".split("\n")
keys.zip(values)
}
def feature(): DataFrame ={
// 这里我从本地获得数据,所以注释掉了
// mySQLConfig("dbtable") = "machinedata"
// val df = spark.read.format("jdbc").options(mySQLConfig).load()
// 从本地获取数据
val df = spark.read.option("header","false")
.csv("F:\\Workspace\\task_one23\\out\\artifacts\\task_one23_jar\\shtd_industry_machinedata.csv")
// 修改列名
val oldCols = df.columns
val newCols = Array("MachineRecordID","MachineID","MachineRecordState","MachineRecordData","MachineRecordDate")
val newDf = newCols.zip(oldCols).foldLeft(df){
(tmpDf,colNames)=>{
val (newName,oldName) = colNames
tmpDf.withColumnRenamed(oldName,newName)
}
}// 转换machine_record_state字段的值,若值为报警,则填写1,否则填写0
.withColumn("MachineRecordState",when(col("MachineRecordState")==="报警",1.0).otherwise(0.0))
// newDf.show()
// 定义解析XML的函数,注册为UDF
val parseXml = udf((xml: String) => {
// 加上了根元素
val document = DocumentHelper.parseText(s"<root>$xml</root>")
val rootElement: Element = document.getRootElement
// 遍历XML的元素
rootElement.elements("col").asScala.map { element =>
(element.attributeValue("ColName"), element.getText)
}.toMap// 得到一个 列名:内容 的map
}:Map[String,String])
// 应用UDF
val parsedDF = newDf.withColumn("parsedData",parseXml(col("MachineRecordData")))
// 解析后的字段值都提取
val colMap: Array[(String, String)] = addColumnMap()
val result = colMap.foldLeft(parsedDF){
(tmpDf,tup)=>{
val (colName,keyName) = tup
val parsedDataItem = col("parsedData").getItem(keyName)
tmpDf.withColumn(colName,
when(parsedDataItem.isNull || parsedDataItem==="null", 0.0)
.otherwise(parsedDataItem.cast("double"))
)
}
}
// 调整列名和列顺序
val colSequent: Array[String] = Array("machine_record_id","machine_id","machine_record_state")++colMap.map(_._1)++Array("machine_record_date")
result.drop("parsedData","MachineRecordData")
.withColumnRenamed("MachineRecordID","machine_record_id")
.withColumnRenamed("MachineID","machine_id")
.withColumnRenamed("MachineRecordState","machine_record_state")
.withColumnRenamed("MachineRecordDate","machine_record_date")
.select(
colSequent.head,
colSequent.tail: _*,
)
// 展示结果
// parsedDF.show(20,false)
}
def alarmPrediction(): Unit ={
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.regression.RandomForestRegressor
import org.apache.spark.sql.functions._
// 读取训练数据
val data = feature()
// 定义特征列
val featureCols = Array("machine_record_mainshaft_speed", "machine_record_mainshaft_multiplerate", "machine_record_mainshaft_load", "machine_record_feed_speed", "machine_record_feed_multiplerate", "machine_record_pmc_code", "machine_record_circle_time", "machine_record_run_time", "machine_record_effective_shaft", "machine_record_amount_process", "machine_record_use_memory", "machine_record_free_memory", "machine_record_amount_use_code", "machine_record_amount_free_code")
// 创建VectorAssembler
val assembler = new VectorAssembler()
.setInputCols(featureCols)
.setOutputCol("features")
val trainAndTest_df = assembler.transform(data).select("machine_record_id","features", "machine_record_state")
// 注意,这里我只把特征列合并成了一个向量,具体可能还需要用standard处理一下,至于怎么使用,我上一篇有。
// 没有测试数据,把上个任务的结果分割,用于模拟
val Array(trainData, testData) = trainAndTest_df.randomSplit(Array(0.8, 0.2))
// 创建随机森林模型
val rf = new RandomForestClassifier()
.setLabelCol("machine_record_state") // 需要预测的结果列
.setFeaturesCol("features") // 特征列
.setNumTrees(100) // 参数你们可以自己填
.setMaxDepth(10)
.setSeed(42)
// 训练模型
val rf_model = rf.fit(trainData)
// 预测测试数据的标签
val pred_df = rf_model.transform(testData)
.select(col("machine_record_id"),col( "prediction").alias("predictState"))
// prediction列就是预测的结果
// 输出预测结果
pred_df.show()
}
def main(args: Array[String]): Unit = {
alarmPrediction()
// val feat = feature()
// feat.show()
spark.stop()
}
}
结语
如果觉得本文不错的话请点赞收藏支持。反响不错的话再抽空整理出离线数据处理和可视化的内容。欢迎私聊催更。建议自己读懂代码,如果需要代码讲解也可以联系我。
更多推荐
所有评论(0)