前言

由于任务书里的描述不够清晰,本文代码只代表本人理解的需求完成,如有错误,请指出。
在这里插入图片描述

会用到的库和方法

import org.apache.spark.ml.linalg.{Vector,Vectors}
import org.apache.spark.ml.feature.{StandardScaler,OneHotEncoder, StringIndexer, StringIndexerModel, VectorAssembler}
import org.apache.spark.ml.functions.vector_to_array

// 使用VectorAssembler将所有特征组合成一个向量
val assembler = new VectorAssembler().setInputCols(numericalCol).setOutputCol("features")
val assembled = assembler.transform(product_info)

// 使用StandardScaler对数值特征进行规范化处理
// assembled对feature字段里面的每个值做.setWithStd(true).setWithMean(true)处理
val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(true)
val scaled = scaler.fit(assembled).transform(assembled)

// 非数值类型的数据需要先进行StringIndexer处理后再进行onehot
// 其实就是把类型改为从0开始的键
val indexer: StringIndexerModel = new StringIndexer()
  .setInputCol("color")
  .setOutputCol("index_color")
  .fit(data)
// 如果输入字段本身就数字类型,那么稀疏向量的个数是最大值+1,
// 因为默认从0开始表示第一个类型
val encoder = new OneHotEncoder()
      .setInputCol("index_color")
      .setOutputCol("encoded_color")
      .setDropLast(false)
    encoder.fit(indexData).transform(indexData)

val indexData: DataFrame = indexer.transform(data)
/**
   * 计算两个列表的杰卡德相似度
   * @param list1
   * @param list2
   * @return 两个列表的杰卡德相似度
   */
  def jaccardSimilarity(list1: List[String], list2: List[String]): Double = {
    val set1 = list1.toSet
    val set2 = list2.toSet
    val intersection = set1.intersect(set2).size.toDouble
    val union = set1.union(set2).size.toDouble
    intersection / union
  }

StandardScaler是Spark ML库中的一个特征处理方法,用于对数值型特征进行标准化处理。标准化是一种常见的特征处理方法,它可以将数值型特征缩放到一个标准的范围,使得它们在数值上处于同一数量级,从而避免某些特征由于数值过大而在模型中占据主导地位。

具体来说,StandardScaler会对每个特征进行以下操作:

  1. 计算特征的平均值和标准差。
  2. 将特征的每个值减去平均值,然后除以标准差。
    这样处理后,每个特征的平均值会变为0,标准差会变为1。

例如,假设你有一个DataFrame,它包含两个字段:“length"和"price”。你可以使用VectorAssembler将这两个字段合并为一个向量列"features",然后使用StandardScaler对这个向量列进行标准化处理。假设"length"字段的平均值为a,标准差为b,"price"字段的平均值为c,标准差为d,那么StandardScaler会将"features"列中的每个向量 [ x , y ] 转换为 [ ( x − a ) / b , ( y − c ) / d ] [x, y] 转换为 [(x - a) / b, (y - c) / d] [x,y]转换为[(xa)/b,(yc)/d]

基础知识

余弦相似度

  • A,B为向量,向量长度必须一致,这表示维度一致。

  • 公式如下:
    余弦相似度计算公式

  • 代码如下:


import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.functions.udf

// 定义一个UDF,用于计算两个向量的余弦相似度
val cosineSimilarity = udf((v1: Vector, v2: Vector) => {
  val dotProduct = v1.dot(v2)
  val norms = Vectors.norm(v1, 2) * Vectors.norm(v2, 2)
  dotProduct / norms
}:Double)


上面的v1,v2可以看作向量A、B。

这段代码是用来计算两个向量的余弦相似度。余弦相似度是一种常用的相似度度量方法,它可以衡量两个向量之间的夹角的余弦值。如果两个向量的方向完全相同,那么它们的余弦相似度就是1;如果两个向量的方向完全相反,那么它们的余弦相似度就是-1;如果两个向量是正交的,那么它们的余弦相似度就是0。

这段代码中的每一行都有特定的作用:

  1. val dotProduct = Vectors.dot(v1, v2):这行代码计算了向量v1和v2的点积。点积是将两个向量的对应元素相乘,然后将结果相加得到的一个标量。公式中对应分子的计算
  2. val norms = Vectors.norm(v1, 2) * Vectors.norm(v2, 2):这行代码计算了向量v1和v2的范数(也就是长度),然后将结果相乘。范数是将向量的每个元素的平方相加,然后取平方根得到的一个标量。公式中对应分母的计算。
  3. dotProduct / norms:这行代码计算了点积除以范数的结果。这就是余弦相似度。

23年国赛题代码

每个feature函数和它调用的子函数表示每个特征工程的任务代码。

为什么要这么写呢?
因为不可能一步到位,所以分步完成每一个任务。

注意:代码中数据获取皆从mysql中获取,任务书可能要求是hive、hudi。

package Bds

import Bds.DataFrameUtils._  // 这是封装的一个获取各种数据仓库配置项的模块
import org.apache.spark.ml.feature._
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.functions.vector_to_array
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

object dataMining {
  val spark = getSparkSession("dataM")
  spark.sparkContext.setLogLevel("OFF")
  // 获取mysql配置项
  val mySQLConfig = getMySQLCf("shtd_store","sku_info","bigdata1")

  def order_detail(): DataFrame ={
    mySQLConfig("dbtable") = "order_info"
    val order = spark.read.format("jdbc").options(mySQLConfig).load()
    mySQLConfig("dbtable") = "order_detail"
    val detail = spark.read.format("jdbc").options(mySQLConfig).load()
    mySQLConfig("dbtable") = "sku_info"
    val sku_info = spark.read.format("jdbc").options(mySQLConfig).load()

    // 连接三个表即可剔除不存在现有维表的数据
    order.join(detail,order("id")===detail("order_id"))
      .select(order("user_id"),detail("sku_id"))
      .join(sku_info,col("sku_id")===col("id"))
      // 不确定种类是指sku_id还是category3_id,这里用的是category,如果使用sku_id注释掉下面这一行即可
      .select(col("user_id"),col("category3_id").as("sku_id"))
      .dropDuplicates("user_id","sku_id")
  }

  def feature1(targetUser:Int=6708): Unit ={
    val userBuySku = order_detail()

    val resultDf = userBuySku.filter(col("user_id")=!=targetUser).join(
      userBuySku.filter(col("user_id")===targetUser),
      Seq("sku_id"),"semi"
    ).groupBy("user_id").count()
    val result = resultDf.orderBy(desc("count")).limit(10)
      .collect().map(_.getLong(0)).mkString(",")
    println("-------------------相同种类前10的id结果展示为:--------------------")
    println(result)

  }


  def skuFeature(): DataFrame ={
    // 读取商品数据
    val sku = spark.read.format("jdbc").options(mySQLConfig).load()

    //    对price、weight进行规范化(StandardScaler)
    val vectorCol = Array("price","weight")
    val assembler = new VectorAssembler().setInputCols(vectorCol).setOutputCol("feature")
    val assembled= assembler.transform(sku)
    val Scaler = new StandardScaler().setInputCol("feature").setOutputCol("selectFeature")
      .setWithMean(true)
      .setWithStd(true)
    var scaled = Scaler.fit(assembled).transform(assembled)
      .withColumn("selectFeature", vector_to_array(col("selectFeature")))
      .withColumn("price",col("selectFeature").getItem(0))
      .withColumn("weight",col("selectFeature").getItem(1))


    //    scaled.show()
    val categoryColNames:Array[String] = Array("spu_id","tm_id","category3_id")
    // 如果类别字段是从1开始的,就使用下面的代码
    scaled = categoryColNames.foldLeft(scaled){
      (tmpDf,colName)=>{
        tmpDf.withColumn(colName,col(colName)-1)
      }
    }
	// 对类别字段做OneHot处理
    val encoder = new OneHotEncoder()
      .setInputCols(categoryColNames)
      .setOutputCols(Array("spu_idOH","tm_idOH","category3_idOH"))
      .setDropLast(false)
    val encoded = encoder.fit(scaled).transform(scaled)
    val result = encoded.drop(categoryColNames: _*)
      .select(
        col("id").cast("double"),
        col("price"),
        col("weight"),
        col("spu_idOH"),
        col("tm_idOH"),
        col("category3_idOH")
      )

    // 这里id还是整型,国赛要求是double
    result.orderBy("id")
  }

  def feature2(): Unit ={
    // 只要第一行,节省计算量
    val output = skuFeature().limit(1)
    
    val assembler = new VectorAssembler().setInputCols(Array("id","price","weight","spu_idOH","tm_idOH","category3_idOH"))
      .setOutputCol("V")
    val assembled = assembler.transform(output)
    println("--------------------第一条数据前10列结果展示为:---------------------")
    val V: SparseVector = assembled.first().getAs[SparseVector]("V")
    // 注意每个类型字段都被额外加了一个元素在下标为0的位置
    val result = (0 to 9).map(V(_)).mkString(",")
    println(result)
  }

  /**
   *
   * @param targetUser  目标用户
   * @param top10UserId   通过feature1获得
   */
  def recmd_sys(top10UserId:Array[Int],targetUser:Int=6708): Unit ={  
    // 筛除需要使用的数据
    val UserBuySku = order_detail().filter(col("user_id").isin(top10UserId) || col("user_id")===targetUser).cache()
    // 剔除目标用户购买过的商品
    val userT = UserBuySku.filter(col("user_id")===targetUser)  // user id == targetUser 的数据
    val top10UserBuySku= UserBuySku.join(userT,Seq("sku_id"),"left_anti")  // 返回不存在6708用户买过的商品数据
      .dropDuplicates("sku_id")  // 去重商品,避免留下多个用户买过的同一商品

    val sku_feature = skuFeature()

    // sku_feature所有字段做为向量
    val assembler = new VectorAssembler().setInputCols(sku_feature.columns).setOutputCol("V")
    val assembled = assembler.transform(sku_feature)
      .select(
        col("id").cast("Long"),
        col("V")
      )

    // 跟上各自商品的特征
    val joinOn = col("sku_id")===col("id")
    val userTWithV = userT.join(assembled,joinOn)
    val top10UserBuySkuWithV = top10UserBuySku.join(assembled,joinOn)

    val result = userTWithV.alias("u").crossJoin(top10UserBuySkuWithV.alias("t"))
      .withColumn("cosineSimilarity",cosineSimilarity(col("u.V"),col("t.V")))
      .groupBy("sku_id").agg(mean("cosineSimilarity").alias("cosineSimilarity"))

    println("------------------------推荐Top5结果如下------------------------")
    result.orderBy(col("cosineSimilarity").desc).limit(10)
      .collect().map(x=>(x.getLong(0),x.getDouble(1)))
      .zipWithIndex
      .foreach(x=>{
        println(s"相似度top${x._2}(商品id:${x._1._1},平均相似度:${x._1._2})")
      })
  }

  def main(args: Array[String]): Unit = {
    // feature1() // 参数填入任务书指定的用户id
    // feature2() // 特征2的结果
    // recmd_sys()  // 推荐系统的结果,参数有注释
    spark.stop()
  }
}

结语

如果觉得本文不错的话请点赞收藏支持。反响不错的话再抽空整理出工业的数据挖掘。欢迎私聊催更。建议自己读懂代码,如果需要代码讲解也可以联系我。

Logo

永洪科技,致力于打造全球领先的数据技术厂商,具备从数据应用方案咨询、BI、AIGC智能分析、数字孪生、数据资产、数据治理、数据实施的端到端大数据价值服务能力。

更多推荐