(基站信息,小区标识信息,移动电话设备识别码,应用名称,访问时间,上行字节数,下行字节数)
1,1,460028714280218,360,2020-05-01,7,1116
1,2,460028714280219,QQ,2020-05-02,8,121
1,3,460028714280220,YY,2020-05-03,9,122
1,4,460028714280221,360,2020-05-04,10,119
2,1,460028714280222,YY,2020-05-05,5,1119
2,2,460028714280223,360,2020-05-01,12,121
2,3,460028714280224,QQ,2020-05-02,13,122
3,1,460028714280225,QQ,2020-05-03,1,1117
3,2,460028714280226,QQ,2020-05-04,9,1118
3,3,460028714280227,QQ,2020-05-05,10,120
1,1,460028714280218,360,2020-06-01,11,1118
1,2,460028714280219,QQ,2020-06-02,2,1119
1,3,460028714280220,YY,2020-06-03,9,1120
1,4,460028714280221,360,2020-06-04,10,119
2,1,460028714280222,YY,2020-06-05,11,1118
2,2,460028714280223,360,2020-06-02,4,121
2,3,460028714280224,QQ,2020-06-03,17,1119
3,1,460028714280225,QQ,2020-06-04,18,119
3,2,460028714280226,QQ,2020-06-05,19,1119
3,3,460028714280227,QQ,2020-06-10,20,121

需求一:对app字段访问次数进行统计

package homework_App_Test

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable


object AppViewCount_Acc {
  def main(args: Array[String]): Unit = {

    val sparConf = new SparkConf().setMaster("local[*]").setAppName("Homework")
    val sc = new SparkContext(sparConf)
    //TODO 使用累加器实现第一问 对app字段访问次数进行统计

    // 1. 读取数据
    val dataRDD = sc.textFile("datas/mobile.txt")
    //2.过滤
    val filterRDD: RDD[String] = dataRDD.filter(_ != "")

    val avAcc = new AppViewCountAccumulator //创建累加器
    sc.register(avAcc, "appviewAcc")

    filterRDD.foreach{
      action => {
        val datas: Array[String] = action.split(",")
        avAcc.add(datas(3)) //累加器添加数据
      }
    }

    val sort: List[(String, Int)] = avAcc.value.toList.sortWith(
      (left, right) => {
        if (left._2 >= right._2) {
          true
        } else {
          false
        }
      }
    )

    sort.foreach(println)
    sc.stop()
  }

   //累加器
  class AppViewCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Int]] {
    private var avMap = mutable.Map[String, Int]()
    override def isZero: Boolean = {
      avMap.isEmpty
    }

    override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
      new AppViewCountAccumulator
    }

    override def reset(): Unit = {
      avMap.clear()
    }

    override def add(appname: String): Unit = {
      val newCnt: Int = avMap.getOrElse(appname, 0) + 1
      avMap.update(appname, newCnt)
    }

    override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
      val map1 = this.avMap
      val map2 = other.value
      map2.foreach{
        case(name, count) => {
          val newCount: Int = map1.getOrElse(name, 0) + count
          map1.update(name, newCount)
        }
      }
    }
    override def value: mutable.Map[String, Int] = {
      avMap
    }
  }

}

需求二:统计日活跃用户数和月活跃用户数

package homework_App_Test

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2

import scala.collection.mutable

object DayAndMonthActivity_Acc {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("Homework")
    val sc = new SparkContext(sparConf)

    //TODO 使用累加器统计日活跃用户数和月活跃用户数

    val dayAcc: DayAccumulator = new DayAccumulator
    val monAcc: MonthAccumulator = new MonthAccumulator
    sc.register(dayAcc, "dayAccumulator")
    sc.register(monAcc, "monAccumulator")

    val dataRDD = sc.textFile("datas/mobile.txt")
    val filterRDD: RDD[String] = dataRDD.filter(_ != "")

    filterRDD.foreach{
      action => {
        val day_user: Array[String] = action.split(",")
        dayAcc.add(DayAndUser(day_user(4), day_user(2)))
      }
    }

    println("====日活跃用户=====")
    dayAcc.value.foreach(println)

    dayAcc.value.foreach {
      action => {
        val month: Array[String] = action._1.split("-")
        monAcc.add(monthday(month(1), action._2))
      }
    }
    println("====月活跃用户=====")
    monAcc.value.foreach(println)
    
    sc.stop()
  }

  case class DayAndUser(day: String, user: String)

  class DayAccumulator extends AccumulatorV2[DayAndUser, mutable.Map[String, Int]] {
    private var dayMap = mutable.Map[String, Int]()

    override def isZero: Boolean = {
      dayMap.isEmpty
    }

    override def copy(): AccumulatorV2[DayAndUser, mutable.Map[String, Int]] = {
      new DayAccumulator
    }

    override def reset(): Unit = {
      dayMap.clear()
    }

    override def add(v: DayAndUser): Unit = {
      val datas: Int = dayMap.getOrElse(v.day, 0) + 1
      dayMap.update(v.day, datas)
    }

    override def merge(other: AccumulatorV2[DayAndUser, mutable.Map[String, Int]]): Unit = {
      val map1 = this.dayMap
      val map2 = other.value

      map2.foreach{
        case (day, count) => {
          val datas1: Int = map1.getOrElse(day, 0) + count
          map1.update(day, datas1)
        }
      }
    }

    override def value: mutable.Map[String, Int] = {
      dayMap
    }
  }
  case class monthday(month: String, count: Int)
  class MonthAccumulator extends AccumulatorV2[monthday, mutable.Map[String, Int]] {
    val monMap = mutable.Map[String, Int]()

    override def isZero: Boolean = {
      monMap.isEmpty
    }

    override def copy(): AccumulatorV2[monthday, mutable.Map[String, Int]] = {
      new MonthAccumulator
    }

    override def reset(): Unit = {
      monMap.clear()
    }

    override def add(v: monthday): Unit = {
      val data: Int = monMap.getOrElse(v.month, 0) + v.count
      monMap.update(v.month, data)
    }

    override def merge(other: AccumulatorV2[monthday, mutable.Map[String, Int]]): Unit = {
      val map1: mutable.Map[String, Int] = this.monMap
      val map2: mutable.Map[String, Int] = other.value
      map2.foreach{
        case (month, cnt) => {
          val datas: Int = map1.getOrElse(month, 0) + cnt
          monMap.update(month, datas)
        }
      }
    }

    override def value: mutable.Map[String, Int] = {
      monMap
    }
  }

}

需求三:统计不同应用上下行总流量

package homework_App_Test

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2

import scala.collection.mutable

object countAppFlow_Acc {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("Homework")
    val sc = new SparkContext(sparConf)
    //TODO 使用累加器实现第三问,不同应用上下行总流量

    // 1. 读取数据
    val dataRDD = sc.textFile("datas/mobile.txt")
    val filterRDD: RDD[String] = dataRDD.filter(_ != "")

    val dayMonAcc = new DayAndMonthAccumulator
    sc.register(dayMonAcc, "dayMonAccumulator")

    filterRDD.foreach{
      line => {
        val datas: Array[String] = line.split(",")
        dayMonAcc.add(datas(3),INtupleValue(datas(5).toInt, datas(6).toInt))  //(Appname,(上行,下行))
      }
    }
    val AccVal: mutable.Map[String, OUTtupleValue] = dayMonAcc.value
    val AccTupleVal: mutable.Iterable[OUTtupleValue] = AccVal.map(_._2)

    AccTupleVal.foreach(println)

    sc.stop()
  }
  case class INtupleValue(upvalue: Int, downValue: Int) //输入数据
  case class OUTtupleValue(appname: String, var UPdata: Int, var DOWNdata: Int) //输出数据

  class DayAndMonthAccumulator extends AccumulatorV2[(String, INtupleValue), mutable.Map[String, OUTtupleValue]] {

    private val dataMap = mutable.Map[String, OUTtupleValue]()

    override def isZero: Boolean = {
      dataMap.isEmpty
    }

    override def copy(): AccumulatorV2[(String, INtupleValue), mutable.Map[String, OUTtupleValue]] = {
      new DayAndMonthAccumulator
    }

    override def reset(): Unit = {
      dataMap.clear()
    }

    override def add(INdata: (String, INtupleValue)): Unit = {
      val appname: String = INdata._1
      val upData: Int = INdata._2.upvalue
      val downData: Int = INdata._2.downValue
      val acqData: OUTtupleValue = dataMap.getOrElse(appname, OUTtupleValue(appname, 0, 0))
      acqData.UPdata += upData
      acqData.DOWNdata += downData
      dataMap.update(appname, acqData)
    }

    override def merge(other: AccumulatorV2[(String, INtupleValue), mutable.Map[String, OUTtupleValue]]): Unit = {
      val map1 = this.dataMap
      val map2 = other.value
      map2.foreach{
        case(name, tupledata) => {
          val data1: OUTtupleValue = map1.getOrElse(name, OUTtupleValue(name,0, 0))
          data1.UPdata += tupledata.UPdata
          data1.DOWNdata += tupledata.DOWNdata
          map1.update(name, data1)
        }
      }
    }

    override def value: mutable.Map[String, OUTtupleValue] = {
      dataMap
    }
  }

}

Logo

永洪科技,致力于打造全球领先的数据技术厂商,具备从数据应用方案咨询、BI、AIGC智能分析、数字孪生、数据资产、数据治理、数据实施的端到端大数据价值服务能力。

更多推荐