文章目录


关闭程序方法有4种
1.kill杀死,可能数据丢失
2,通过钩子(hook)关闭,需要写shell脚本,麻烦
3.在程序中建立http服务,接受外部消息在程序中关闭,代码较多
4.用hdfs中目录做标记,定期检查hdfs目录是否存在,存在关闭程序,简单方便

依赖

   <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.4</version>
        </dependency>
package stream

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.reflect.io.Path

object Test08 {
  var hadoopConf: Configuration = _
  val shutdownMarkerPath = new Path("hdfs://linux01:8020/user/admin/tmp/spark_shutdown_marker")
  var stopMarker: Boolean = false
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("Chapter8_10_2")
    //      .set("spark.streaming.stopGracefullyOnShutdown", "true")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("WARN")

    hadoopConf = ssc.sparkContext.hadoopConfiguration
    hadoopConf.set("fs.defaultFS", "hdfs://linux01:8020")

    val lines = ssc.socketTextStream("linux01", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordMap = words.map(x => (x, 1))
    val wordCounts = wordMap.reduceByKey(_ + _)
    wordCounts.print()

    ssc.start()
    val checkIntervalMillis = 2 * 1000 * 1
    var isStopped = false

    while (!isStopped) {
      println("正在确认关闭状态: ")
      isStopped = ssc.awaitTerminationOrTimeout(checkIntervalMillis)
      if (isStopped)
        println("Spark Streaming Chapter8_10已关闭.")
      else
        println("Spark Streaming Chapter8_10运行中...")
      checkShutdownMarker
      if (!isStopped && stopMarker) {
        println("准备关闭Spark Streaming")
        ssc.stop(true, true)
      }
    }
  }

  def checkShutdownMarker = {
    if (!stopMarker) {
      val fs = FileSystem.get(hadoopConf)
      stopMarker = fs.exists(shutdownMarkerPath)
    }
  }
}


awaitTermination方法会阻塞Driver主线程,使代码无法下行,这里使用awaitTerminationOrTimeout

Logo

永洪科技,致力于打造全球领先的数据技术厂商,具备从数据应用方案咨询、BI、AIGC智能分析、数字孪生、数据资产、数据治理、数据实施的端到端大数据价值服务能力。

更多推荐