提供了两种自定义分组的聚合函数,mapGroupsWithState,flatMapGroupsWithState,允许开发者基于事件时间或处理时间进行有状态的流计算

通过mapGroupsWithState 实现数据分组,手动维护分组状态

根据输入单词,按照事件时间聚合相同分钟数,相同单词出现的次数,并在此过程中通过mapGroupsWithState实现数据分组,手动维护分组状态

package struct

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.sql.streaming.{GroupStateTimeout, Trigger}

object StructStream10 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .master("local[*]")
      .appName("StructStream10")
      .getOrCreate()

    import spark.implicits._
    spark.sparkContext.setLogLevel("WARN")

    val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm")

    val wordsDataFrame = spark.readStream
      .format("socket")
      .option("host", "linux01")
      .option("port", 9999)
      .load()
      .as[String].map(s => {
      val arr = s.split(",")
      val date = sdf1.parse(arr(0))
      (new Timestamp(date.getTime), arr(1))
    }).toDF("ts", "word")

    val result = wordsDataFrame
      .withWatermark("ts", "3 minutes")   //设置过期时间3 minutes
      .groupByKey[String]((row: Row) => {  //调用groupByKey方法自定义分组聚合键,该方法可以接受一个函数,使用 mapGroupsWithState 前提时,必须使用groupByKey实现自定义分组
        val timestamp = row.getTimestamp(0)
        val currentEventTimeMinute = sdf2.format(new Date(timestamp.getTime))
        currentEventTimeMinute + "," + row.getString(1)
      })
      .mapGroupsWithState[(String, Long), (String, String, Long)](GroupStateTimeout.EventTimeTimeout())((timeAndWord, iterator, groupState) => {
      println("当前数据:" + timeAndWord)
      println("当前Watermark:" + groupState.getCurrentWatermarkMs())
      println("状态是否存在:" + groupState.exists)
      println("状态是否过期:" + groupState.hasTimedOut)

      var count = 0L
      if(groupState.hasTimedOut){
        groupState.remove()
      }else if(groupState.exists){
        val groupCount = groupState.get._2
        if(groupCount >= 10){
          groupState.remove()
        }else{
          count = groupState.getOption.getOrElse((timeAndWord, 0L))._2 + iterator.size
          groupState.update(timeAndWord, count)
        }
      }else{
        count = iterator.size
        groupState.update(timeAndWord, count)
        val arr = timeAndWord.split(",")
        val timeoutTimestamp = sdf2.parse(arr(0)).getTime
        groupState.setTimeoutTimestamp(timeoutTimestamp)
      }

      if(count != 0){
        val arr = timeAndWord.split(",")
        (arr(0), arr(1), count)
      }else{
        null
      }
    }).filter(_ != null).toDF("time", "word", "count")

    val query = result.writeStream
      .outputMode("update")
      .trigger(Trigger.ProcessingTime(0))
      .format("console")
      .start()

    query.awaitTermination()


  }
}


源码

@Experimental
  @InterfaceStability.Evolving
  def mapGroupsWithState[S: Encoder, U: Encoder](
      timeoutConf: GroupStateTimeout)(
      func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {
    val flatMapFunc = (key: K, it: Iterator[V], s: GroupState[S]) => Iterator(func(key, it, s))
    Dataset[U](
      sparkSession,
      FlatMapGroupsWithState[K, V, S, U](
        flatMapFunc.asInstanceOf[(Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any]],
        groupingAttributes,
        dataAttributes,
        OutputMode.Update,
        isMapGroupsWithState = true,
        timeoutConf,
        child = logicalPlan))
  }

使用mapGroupsWithState需要指定两个泛型,S规定了状态的数据类型,U规定了f函数返回值的数据类型

flatMapGroupWithState实现数据分组,手动维护分组状态

flatMapGroupWithState 与 mapGroupsWithState 类似,但是flatMapGroupWithState支持列转行

package chapter9

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, Trigger}
import org.apache.spark.sql.{Row, SparkSession}


object test10{
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .master("local[*]")
      .appName("test10")
      .getOrCreate()

    import spark.implicits._
    spark.sparkContext.setLogLevel("WARN")

    val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm")

    val wordsDataFrame = spark.readStream
      .format("socket")
      .option("host", "linux01")
      .option("port", 9999)
      .load()
      .as[String].map(s => {
      val arr = s.split(",")
      val date = sdf1.parse(arr(0))
      (new Timestamp(date.getTime), arr(1))
    }).toDF("ts", "gift")

    val result = wordsDataFrame
      .withWatermark("ts", "3 minutes")
      .groupByKey[String]((row: Row) => {
      val timestamp = row.getTimestamp(0)
      val currentEventTimeMinute = sdf2.format(new Date(timestamp.getTime))
      currentEventTimeMinute + "," + row.getString(1)
    }).flatMapGroupsWithState[(String, Long), (String, String, Long)](OutputMode.Update(), GroupStateTimeout.EventTimeTimeout())((giftAndTime, iterator, groupState) => {
      println("当前数据:" + giftAndTime)
      println("当前Watermark:" + groupState.getCurrentWatermarkMs())
      println("状态是否存在:" + groupState.exists)
      println("状态是否过期:" + groupState.hasTimedOut)

      var count = 0L
      if(groupState.hasTimedOut){
        groupState.remove()
      }else if(groupState.exists){
        val groupCount = groupState.get._2
        if(groupCount >= 10){
          groupState.remove()
        }else{
          count = groupState.getOption.getOrElse((giftAndTime, 0L))._2 + iterator.size
          groupState.update(giftAndTime, count)
        }
      }else{
        count = iterator.size
        groupState.update(giftAndTime, count)
        val arr = giftAndTime.split(",")
        val timeoutTimestamp = sdf2.parse(arr(0)).getTime
        groupState.setTimeoutTimestamp(timeoutTimestamp)
      }
      val result = collection.mutable.ArrayBuffer[(String, String, Long)]()
      if(count != 0){
        val arr1 = giftAndTime.split(",")
        val arr2 = arr1(1).split("_")
        for(s <- arr2){
          result.append((arr1(0).trim, s.trim, count))
        }
      }
      result.iterator
    }).toDF("ts", "gift", "count")
//      .withWatermark("ts", "3 minutes")
//      .groupBy($"gift")
//      .count()

    val query = result.writeStream
      .outputMode("update")
      .trigger(Trigger.ProcessingTime(0))
      .format("console")
      .start()

    query.awaitTermination()

  }
}

Logo

永洪科技,致力于打造全球领先的数据技术厂商,具备从数据应用方案咨询、BI、AIGC智能分析、数字孪生、数据资产、数据治理、数据实施的端到端大数据价值服务能力。

更多推荐