批处理间隔

val ssc = new StreamingContext(sc, Seconds(5))
对于spark处理数据,数据以流式方式进入划分为一个批次一个批次的,每一段数据合并成一个RDD,并将RDD添加到DStream的HashMap中进行维护,因此数据的处理时间要小于间隔时间,否则造成数据堆压

窗口时间宽度与滑动时间宽度

对于多个批次指定对应的起始批次与结束批次对应的时间,这个时间区间就是"窗口时间宽度",
随着时间推进,只指定了窗口时间宽度不能动态持续对数据进行局部聚合,设置滑动时间宽度,随着窗口推移,持续按照指定的宽度移动

注意:窗口时间宽度与滑动时间宽度大小必须是批处理间隔整数倍

案例:每2s统计10s内平均温度

package stream

import java.util

import com.alibaba.fastjson.{JSON, TypeReference}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WindowTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("Chapter8_4_2")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(2))

    val jsonDstream = ssc.socketTextStream("note01", 8888)
    val cityAndTemp = jsonDstream.map(json => {
      val json2JavaMap = JSON.parseObject(json, new TypeReference[util.Map[String, String]]() {})
      import scala.collection.JavaConverters._
      val json2ScalaMap = json2JavaMap.asScala
      json2ScalaMap
    })
    cityAndTemp.map(scalaMap => (scalaMap("city"),scalaMap("temp"))).mapValues(temp => (temp.toFloat,1))
      .reduceByKeyAndWindow(
        (t1:(Float,Int),t2:(Float,Int)) => (t1._1+t2._1,t1._2+t2._2),
        Seconds(10),
        Seconds(2)
      ).mapValues(x => x._1/x._2).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

Logo

永洪科技,致力于打造全球领先的数据技术厂商,具备从数据应用方案咨询、BI、AIGC智能分析、数字孪生、数据资产、数据治理、数据实施的端到端大数据价值服务能力。

更多推荐