大数据之Flink(8) | 侧输出流(SideOutput)
大部分的DataStream API的算子的输出是单一输出,也就是某种数据类型的流。除了split算子,可以将一条流分成多条流,这些流的数据类型也都相同。process function的side outputs功能可以产生多条流,并且这些流的数据类型可以不一样。一个side output可以定义为OutputTag[X]对象,X是输出流的数据类型。process function可以通过Cont
·
大部分的DataStream API的算子的输出是单一输出,也就是某种数据类型的流。除了split算子,可以将一条流分成多条流,这些流的数据类型也都相同。process function的side outputs功能可以产生多条流,并且这些流的数据类型可以不一样。一个side output可以定义为OutputTag[X]对象,X是输出流的数据类型。process function可以通过Context对象发射一个事件到一个或者多个side outputs。
- 需求:温度大于30输出一条流和温度小于30输出一条流
代码实现
import learning.SensorReading
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object SideOutputTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream = env.socketTextStream("hadoop12",9999)
val dataStream = inputStream
.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
})
//用ProcessFunction的测输出流实现分流操作
val highTempStream = dataStream
.process(new SplitTempProcessor(30))
val lowTempStream = highTempStream.getSideOutput(new OutputTag[(String,Double,Long)]("low-temp"))
//打印输出
highTempStream.print("high")
lowTempStream.print("low")
env.execute("side output job")
}
}
//自定义ProcessFunction,用于区分高低温度的数据
class SplitTempProcessor(threshold: Int) extends ProcessFunction[SensorReading,SensorReading]{
override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]
#Context, out: Collector[SensorReading]): Unit = {
//判断当前数据的温度值,如果大于阈值,输出到主流,如果小于阈值,输出到测输出流
if (value.temperature > threshold){
out.collect(value)
}else{
ctx.output(new OutputTag[(String,Double,Long)]("low-temp"),(value.id,value.temperature,value.timestamp))
}
}
}
结果
更多推荐
所有评论(0)