目录

一、时间语义

1.1 三种时间概念

 1.1.1 ProcessTime 在代码中的使用

1.1.2 EventTime 在代码中的使用

1.1.3 关于窗口起始时间的计算值

 二、对事件的处理

2.1 有序事件

2.2 乱序事件

2.3 指定 Timestamps 与生成 Watermarks

2.4 使用 WatermarkStrategy 工具类指定时间戳和Watermark

2.5 自定义指定 Timestamps 和 Watermarks

2.6 对迟到数据的处理


一、时间语义

1.1 三种时间概念

Flink 根据时间产生的位置不同,可以将时间分为三种:

  • 事件时间:EventTime,数据产生的时间
  • 接入时间:IngestionTime,数据进入Flink的时间
  • 处理时间:ProcessTime,数据被算子处理的时间

 1.1.1 ProcessTime 在代码中的使用

ProcessTime 使用的是系统时间,直接使用对应的API即可,参考:大数据——Flink dataStream 中 窗口函数的使用

  • TumblingProcessingTimeWindows
  • SlidingProcessingTimeWindows
  • ProcessingTimeSessionWindows

1.1.2 EventTime 在代码中的使用

EventTime在数据中,需要在代码中指定

  • TumblingEventTimeWindows
  • SlidingEventTimeWindows
  • EventTimeSessionWindows

1.1.3 关于窗口起始时间的计算值

  • 左闭右开
  • timestamp -  (timestamp - offset + windowSize) % windowSize

 二、对事件的处理

2.1 有序事件

如果读取到的数据是有序并且是升序的,可以使用assignAscendingTimestamps

代码如下

package cn.kgc.window

import cn.kgc.bean.TrainAlarm
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time


/**
 * 基于EventTime实现的窗口计算
 */
object WindowEventTime {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val inputStream = env.socketTextStream("master", 666)
      .map(line => {
        val ps = line.split(",")
        TrainAlarm(ps(0), ps(1).toLong, ps(2).toDouble)
      })
      //如果读取到的数据是有序的并且升序,那么就使用assignAscendingTimestamps
      //指定EventTime,如果数据中的时间戳为10位数(到秒),需要乘以1000转换为到毫秒
        .assignAscendingTimestamps(_.ts * 1000L)
        .keyBy(_.id)

    //基于EventTime实现滚动窗口
    //起始时间为窗口大小的整数倍,比如:203就是[200,205)窗口内
    inputStream
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .maxBy("temp")

    //基于EventTime实现滑动窗口
    inputStream
        .window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(2)))
        .maxBy("temp")

    //基于EventTime实现会话窗口
    //会话窗口触发机制:同一字段,一条数据超过会话时间时触发
    inputStream
        .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
        .maxBy("temp")
        .print()
    env.execute()
  }
}
//列车温度传感器数据的样例类id表示列车id,ts表示事件戳,temp表示温度
case class TrainAlarm(id:String,ts:Long,temp:Double)

2.2 乱序事件

        由于网络或者系统等外部因素的影响,数据被传输到 Flink 的时间往往不是按照事件产生的顺序传输进来的,因而会造成乱序或者延迟等问题。比如基于事件时间的 Window 创建后,如何判断 Window 中的数据是否已经全部到达,全部到达则可以执行,未全部到达则需要继等待。比如在理想状态下,数据产生和到达的顺序都是一致的,而实际却是乱序的。在此情况下,就出现了 Watermark 机制,它能够衡量数据到达的进度和完整性。
        比如在理想状态下,数据产生和到达的顺序都是一致的,而实际却是乱序的。        

         对于上图中实际状态的数据到达情况,如果我们有一个 5s 的窗口算子,当 4 一直未到达时,这个窗口就得一直等待 4 的到来,如果 4 长时间未到达就会影 响整个窗口的计算,而 Watermark 就用来解决此问题。

        Watermark:Flink 将最新读取数据的最大的 EventTime 减去固定的时间间隔 作为 Watermark。固定的时间间隔其实就是指最大延迟时间。每条数据都会伴随 着一个 Watermark。如果有一条数据的 Watermark 大于了某个窗口的 EndTime, 就会默认该窗口内的数据已经全部到达,并触发执行。

         虽然 4 未到达,但是它的窗口已经被执行了,所以 4 默认不会被处理,因为它在延时时长内还未到达。

2.3 指定 Timestamps 与生成 Watermarks

        WatermarkStrategy 可以在 Flink 应用程序中的两处使用,第一种是直接在 数据源上使用,第二种是直接在非数据源的操作之后使用。

        第一种方式相比会更好,因为数据源可以利用 watermark 生成逻辑中有关 分片/分区(shards/partitions/splits)的信息。使用这种方式,数据源通常可以更 精准地跟踪 watermark,整体 watermark 生成将更精确。直接在源上指定 WatermarkStrategy 意味着你必须使用特定数据源接口。

        比如我们使用 Flink 读取 Kafka 中的数据的时候,Kafka 中的分区个数与 Flink 中的分区个数是一致的。多个分区常常并行使用,因此交错来自各个分区的事件 数据就会破坏每个分区的事件时间模式。在这种情况下,你可以使用 Flink 中可 识别 Kafka 分区的 watermark 生成机制。使用此特性,将在 Kafka 消费端内部 针对每个 Kafka 分区生成 watermark。

2.4 使用 WatermarkStrategy 工具类指定时间戳和Watermark

package cn.kgc.window

import java.time.Duration

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object EventTimeAndWaterMark {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val keyedStream = env.socketTextStream("master", 1234)
      .map(line => {
        val ps = line.split(",")
        TrainerAlarm(ps(0), ps(1).toLong, ps(2).toDouble)
      })

      //指定时间戳和注入水印
      .assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness(Duration.ofSeconds(2))
          .withTimestampAssigner(new SerializableTimestampAssigner[TrainerAlarm] {
            override def extractTimestamp(element: TrainerAlarm, timestamps: Long): Long = {
              if (element.timestamp.toString.length == 10){
                element.timestamp*1000L
              }else{
                element.timestamp
              }
            }
          }))
      .keyBy(_.id)

    //指定窗口类型为滚动窗口
    keyedStream
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      .maxBy("temp")
      .print()

    env.execute()
  }
}

case class TrainerAlarm(id:String,timestamp:Long,temperature:Double)

2.5 自定义指定 Timestamps 和 Watermarks

package cn.kgc.window

import org.apache.flink.api.common.eventtime.{TimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object EventTimeAndWaterMark {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val keyedStream = env.socketTextStream("master", 1234)
      .map(line => {
        val ps = line.split(",")
        TrainerAlarm(ps(0), ps(1).toLong, ps(2).toDouble)
      })
      //使用自定义指定的timestamp和水印
      .assignTimestampsAndWatermarks(new MyWatermarkStrategy)
      .keyBy(_.id)

    //指定窗口类型为滚动窗口
    keyedStream
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      .maxBy("temp")
      .print()

    env.execute()
  }
}

case class TrainerAlarm(id:String,timestamps:Long,temperature:Double)

//继承WatermarkStrategy来自定义指定时间戳和创建水印
class MyWatermarkStrategy extends  WatermarkStrategy[TrainerAlarm]{

  override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[TrainerAlarm] = {
    new TimestampAssigner[TrainerAlarm] {
      override def extractTimestamp(element: TrainerAlarm, timestamps: Long): Long = {
        if (element.timestamps.toString.length == 10){
          element.timestamps*1000L
        }else{
          element.timestamps
        }
      }
    }
  }

  override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[TrainerAlarm] = {

    new WatermarkGenerator[TrainerAlarm] {
      val maxOutOfOrderness = 2000L
      var maxTimestamps = 0L
      override def onEvent(element: TrainerAlarm, timestamp: Long, watermarkOutput: WatermarkOutput): Unit = {
        //提取已经来的数据中最大的EventTime
        maxTimestamps = Math.max(maxTimestamps,element.timestamps)
      }

      override def onPeriodicEmit(watermarkOutput: WatermarkOutput): Unit = {
        //创建水印
        watermarkOutput.emitWatermark(new Watermark({
          if (maxTimestamps.toString.length == 10){
            maxTimestamps*1000L-maxTimestamps
          }else{
            maxTimestamps-maxTimestamps
          }
        }))
      }
    }
  }
}

2.6 对迟到数据的处理

        如果 Watermark 设置的延迟太久,收到结果的速度可能就会很慢,解决办 法是在 Watermark 到达之前输出一个近似的结果。

        如果 Watermark 到达的太早,则可能收到错误的结果,不过 Flink 处理延迟 到的数据机制可能解决这个问题。

        实际开发常见处理,一般都是设置比较少的延迟时间(可以解决大部分的乱序 数据的一个时间),然后使用延迟数据处理机制和侧输出流。

代码

package cn.kgc.window

import java.time.Duration

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object EventTimeAndWaterMark {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val keyedStream = env.socketTextStream("master", 1234)
      .map(line => {
        val ps = line.split(",")
        TrainerAlarm(ps(0), ps(1).toLong, ps(2).toDouble)
      })

      //指定时间戳和注入水印
      .assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness(Duration.ofSeconds(2))
          .withTimestampAssigner(new SerializableTimestampAssigner[TrainerAlarm] {
            override def extractTimestamp(element: TrainerAlarm, timestamps: Long): Long = {
              if (element.timestamp.toString.length == 10){
                element.timestamp*1000L
              }else{
                element.timestamp
              }
            }
          }))
      .keyBy(_.id)

    val late = new OutputTag[TrainerAlarm]("late")
    //指定窗口类型为滚动窗口
    val result = keyedStream
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      //设置数据允许迟到时间:窗口触发后不会关闭,直到超过允许迟到时间时才会关闭窗口,若属于该窗口的数据在允许迟到时间内到达,仍会参与窗口计算
      .allowedLateness(Time.minutes(1))
      //超过允许迟到时间到达的数据会被记录到侧输出流
      .sideOutputLateData(late)
      .maxBy("temp")
      
    
    result.print()
    result.getSideOutput(late).print("late===")
    

    env.execute()
  }
}

case class TrainerAlarm(id:String,timestamp:Long,temperature:Double)

Logo

永洪科技,致力于打造全球领先的数据技术厂商,具备从数据应用方案咨询、BI、AIGC智能分析、数字孪生、数据资产、数据治理、数据实施的端到端大数据价值服务能力。

更多推荐