大部分的DataStream API的算子的输出是单一输出,也就是某种数据类型的流。除了split算子,可以将一条流分成多条流,这些流的数据类型也都相同。process function的side outputs功能可以产生多条流,并且这些流的数据类型可以不一样。一个side output可以定义为OutputTag[X]对象,X是输出流的数据类型。process function可以通过Context对象发射一个事件到一个或者多个side outputs。

  • 需求:温度大于30输出一条流和温度小于30输出一条流
    代码实现
import learning.SensorReading
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object SideOutputTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val inputStream = env.socketTextStream("hadoop12",9999)
    val dataStream = inputStream
      .map(data => {
        val dataArray = data.split(",")
        SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
      })

    //用ProcessFunction的测输出流实现分流操作
    val highTempStream = dataStream
      .process(new SplitTempProcessor(30))

    val lowTempStream = highTempStream.getSideOutput(new OutputTag[(String,Double,Long)]("low-temp"))

    //打印输出
    highTempStream.print("high")
    lowTempStream.print("low")

    env.execute("side output job")
  }

}
//自定义ProcessFunction,用于区分高低温度的数据
class SplitTempProcessor(threshold: Int) extends ProcessFunction[SensorReading,SensorReading]{
  override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]
    #Context, out: Collector[SensorReading]): Unit = {
    //判断当前数据的温度值,如果大于阈值,输出到主流,如果小于阈值,输出到测输出流
    if (value.temperature > threshold){
      out.collect(value)
    }else{
      ctx.output(new OutputTag[(String,Double,Long)]("low-temp"),(value.id,value.temperature,value.timestamp))
    }

  }
}

结果
在这里插入图片描述
在这里插入图片描述

Logo

永洪科技,致力于打造全球领先的数据技术厂商,具备从数据应用方案咨询、BI、AIGC智能分析、数字孪生、数据资产、数据治理、数据实施的端到端大数据价值服务能力。

更多推荐