基于事件时间的窗口操作

在 Struct Strreaming中,可以按照事件真实发生时间对附近范围内的数据进行聚合操作,即基于事件时间窗口进行操作,在这种机制下,不必考虑事件到达顺序与事件发生顺序一致,大大减少了开发者工作量
一条数据可以被称为一个事件,在生成数据时携带的时间可以称为事件时间
案例

package struct

import java.text.SimpleDateFormat

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.sql.streaming.Trigger

object StructStream06 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .master("local[*]")
      .appName("StructStream06")
      .getOrCreate()
    import org.apache.spark.sql.functions._
    import spark.implicits._

    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val lines = spark.readStream.format("socket").option("host", "note01").option("port", 9999).load()
    val words = lines.as[String].map(s => {
      val arr = s.split(",")
      val date = sdf.parse(arr(0))
      (new Timestamp(date.getTime, arr(1)))
    }).toDF("ts", "word")

    val wordCounts = words.groupBy(
      window($"ts", "10 minutes", "2 minutes"), $"word"
    ).count()

    val query = wordCounts.writeStream.outputMode("complete").trigger(Trigger.ProcessingTime(0)).format("console")
      .start()
    query.awaitTermination()
  }
}

事件时间窗口方式
    val wordCounts = words.groupBy(
      window($"ts", "10 minutes", "2 minutes"), $"word"
    ).count()

$“ts” 指定存放时间的列,10 minutes窗口宽度,2 minutes滑动宽度

事件时间窗口生成规则
package struct

import java.sql.Date
import java.text.SimpleDateFormat

object StructStream07 {
  def main(args: Array[String]): Unit = {
    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

    def main(args: Array[String]): Unit = {
      //事件时间
      val eventTime = sdf.parse("2019-03-08 12:00:00")
      val eventTimestamp = eventTime.getTime
      println(eventTimestamp)
      //起始时间
      val startTime = 0L
      //窗口时间宽度: minutes
      val windowDuration = 10 * 60 * 1000L
      //窗口滑动长度: minutes
      val slideDuration = 2 * 60 * 1000L

      if(slideDuration > windowDuration) {
        println("slideDuration必须小于或等于windowDuration")
        return
      }

      //根据窗口宽度与滑动宽度,计算窗口连续滑动多少次,滑动长度的总和才等于窗口宽度。如果滑动次数为小数,则进位。
      val overlappingWindows = math.ceil(windowDuration * 1.0 / slideDuration).toInt
      //生成每一个window的起始时间和结束时间
      for(i <- 0 until overlappingWindows){
        //根据事件时间,按照slideDuration宽度,计算供需多少次滚动,或者说是需要连续生成多少个window,才可以将窗口的起始时间滚动至即将脱离事件时间的位置
        val division = (eventTimestamp - startTime) / slideDuration.toDouble
        //如果无法整除,则意味着需要一个新的窗口容纳事件,即,小数部分直接进位。
        val ceil = math.ceil(division)
        //如果恰巧整除,即,移动次数恰巧为整数,则需要新增一个窗口,这是因为window的起始时间和结束时间的区间为“前闭后开”,即[startTime, endTime)
        //如果没有被整除,则已经进位,无需再+1。最后将该值作为windowId。
        val windowId = if(ceil == division) ceil + 1 else ceil
        //计算窗口起始时间 = ()
        val windowStart = (windowId + i - overlappingWindows) * slideDuration + startTime
        //计算窗口的结束时间 = 当前窗口的起始时间 + 滑动长度
        val windowEnd = windowStart + windowDuration

        if(eventTimestamp >= windowStart.toLong && eventTimestamp < windowEnd.toLong){
          val startTimeString = sdf.format(new Date(windowStart.toLong))
          val endTimeString = sdf.format(new Date(windowEnd.toLong))
          println("[" + startTimeString + ", " + endTimeString + ")")
        }
      }
    }
  }
}

Logo

永洪科技,致力于打造全球领先的数据技术厂商,具备从数据应用方案咨询、BI、AIGC智能分析、数字孪生、数据资产、数据治理、数据实施的端到端大数据价值服务能力。

更多推荐