Spark移动互联网大数据分析——纯累加器实现
对app字段访问次数进行统计,统计日活跃用户数和月活跃用户数,统计不同应用上下行总流量,就三个简单的需求,不用累加器的话实现起来非常容易,但是为了熟悉累加器,特意手写累加器实现这三个需求来练练手...
(基站信息,小区标识信息,移动电话设备识别码,应用名称,访问时间,上行字节数,下行字节数)
1,1,460028714280218,360,2020-05-01,7,1116
1,2,460028714280219,QQ,2020-05-02,8,121
1,3,460028714280220,YY,2020-05-03,9,122
1,4,460028714280221,360,2020-05-04,10,119
2,1,460028714280222,YY,2020-05-05,5,1119
2,2,460028714280223,360,2020-05-01,12,121
2,3,460028714280224,QQ,2020-05-02,13,122
3,1,460028714280225,QQ,2020-05-03,1,1117
3,2,460028714280226,QQ,2020-05-04,9,1118
3,3,460028714280227,QQ,2020-05-05,10,120
1,1,460028714280218,360,2020-06-01,11,1118
1,2,460028714280219,QQ,2020-06-02,2,1119
1,3,460028714280220,YY,2020-06-03,9,1120
1,4,460028714280221,360,2020-06-04,10,119
2,1,460028714280222,YY,2020-06-05,11,1118
2,2,460028714280223,360,2020-06-02,4,121
2,3,460028714280224,QQ,2020-06-03,17,1119
3,1,460028714280225,QQ,2020-06-04,18,119
3,2,460028714280226,QQ,2020-06-05,19,1119
3,3,460028714280227,QQ,2020-06-10,20,121
需求一:对app字段访问次数进行统计
package homework_App_Test
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable
object AppViewCount_Acc {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local[*]").setAppName("Homework")
val sc = new SparkContext(sparConf)
//TODO 使用累加器实现第一问 对app字段访问次数进行统计
// 1. 读取数据
val dataRDD = sc.textFile("datas/mobile.txt")
//2.过滤
val filterRDD: RDD[String] = dataRDD.filter(_ != "")
val avAcc = new AppViewCountAccumulator //创建累加器
sc.register(avAcc, "appviewAcc")
filterRDD.foreach{
action => {
val datas: Array[String] = action.split(",")
avAcc.add(datas(3)) //累加器添加数据
}
}
val sort: List[(String, Int)] = avAcc.value.toList.sortWith(
(left, right) => {
if (left._2 >= right._2) {
true
} else {
false
}
}
)
sort.foreach(println)
sc.stop()
}
//累加器
class AppViewCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Int]] {
private var avMap = mutable.Map[String, Int]()
override def isZero: Boolean = {
avMap.isEmpty
}
override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
new AppViewCountAccumulator
}
override def reset(): Unit = {
avMap.clear()
}
override def add(appname: String): Unit = {
val newCnt: Int = avMap.getOrElse(appname, 0) + 1
avMap.update(appname, newCnt)
}
override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
val map1 = this.avMap
val map2 = other.value
map2.foreach{
case(name, count) => {
val newCount: Int = map1.getOrElse(name, 0) + count
map1.update(name, newCount)
}
}
}
override def value: mutable.Map[String, Int] = {
avMap
}
}
}
需求二:统计日活跃用户数和月活跃用户数
package homework_App_Test
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable
object DayAndMonthActivity_Acc {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local[*]").setAppName("Homework")
val sc = new SparkContext(sparConf)
//TODO 使用累加器统计日活跃用户数和月活跃用户数
val dayAcc: DayAccumulator = new DayAccumulator
val monAcc: MonthAccumulator = new MonthAccumulator
sc.register(dayAcc, "dayAccumulator")
sc.register(monAcc, "monAccumulator")
val dataRDD = sc.textFile("datas/mobile.txt")
val filterRDD: RDD[String] = dataRDD.filter(_ != "")
filterRDD.foreach{
action => {
val day_user: Array[String] = action.split(",")
dayAcc.add(DayAndUser(day_user(4), day_user(2)))
}
}
println("====日活跃用户=====")
dayAcc.value.foreach(println)
dayAcc.value.foreach {
action => {
val month: Array[String] = action._1.split("-")
monAcc.add(monthday(month(1), action._2))
}
}
println("====月活跃用户=====")
monAcc.value.foreach(println)
sc.stop()
}
case class DayAndUser(day: String, user: String)
class DayAccumulator extends AccumulatorV2[DayAndUser, mutable.Map[String, Int]] {
private var dayMap = mutable.Map[String, Int]()
override def isZero: Boolean = {
dayMap.isEmpty
}
override def copy(): AccumulatorV2[DayAndUser, mutable.Map[String, Int]] = {
new DayAccumulator
}
override def reset(): Unit = {
dayMap.clear()
}
override def add(v: DayAndUser): Unit = {
val datas: Int = dayMap.getOrElse(v.day, 0) + 1
dayMap.update(v.day, datas)
}
override def merge(other: AccumulatorV2[DayAndUser, mutable.Map[String, Int]]): Unit = {
val map1 = this.dayMap
val map2 = other.value
map2.foreach{
case (day, count) => {
val datas1: Int = map1.getOrElse(day, 0) + count
map1.update(day, datas1)
}
}
}
override def value: mutable.Map[String, Int] = {
dayMap
}
}
case class monthday(month: String, count: Int)
class MonthAccumulator extends AccumulatorV2[monthday, mutable.Map[String, Int]] {
val monMap = mutable.Map[String, Int]()
override def isZero: Boolean = {
monMap.isEmpty
}
override def copy(): AccumulatorV2[monthday, mutable.Map[String, Int]] = {
new MonthAccumulator
}
override def reset(): Unit = {
monMap.clear()
}
override def add(v: monthday): Unit = {
val data: Int = monMap.getOrElse(v.month, 0) + v.count
monMap.update(v.month, data)
}
override def merge(other: AccumulatorV2[monthday, mutable.Map[String, Int]]): Unit = {
val map1: mutable.Map[String, Int] = this.monMap
val map2: mutable.Map[String, Int] = other.value
map2.foreach{
case (month, cnt) => {
val datas: Int = map1.getOrElse(month, 0) + cnt
monMap.update(month, datas)
}
}
}
override def value: mutable.Map[String, Int] = {
monMap
}
}
}
需求三:统计不同应用上下行总流量
package homework_App_Test
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable
object countAppFlow_Acc {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local[*]").setAppName("Homework")
val sc = new SparkContext(sparConf)
//TODO 使用累加器实现第三问,不同应用上下行总流量
// 1. 读取数据
val dataRDD = sc.textFile("datas/mobile.txt")
val filterRDD: RDD[String] = dataRDD.filter(_ != "")
val dayMonAcc = new DayAndMonthAccumulator
sc.register(dayMonAcc, "dayMonAccumulator")
filterRDD.foreach{
line => {
val datas: Array[String] = line.split(",")
dayMonAcc.add(datas(3),INtupleValue(datas(5).toInt, datas(6).toInt)) //(Appname,(上行,下行))
}
}
val AccVal: mutable.Map[String, OUTtupleValue] = dayMonAcc.value
val AccTupleVal: mutable.Iterable[OUTtupleValue] = AccVal.map(_._2)
AccTupleVal.foreach(println)
sc.stop()
}
case class INtupleValue(upvalue: Int, downValue: Int) //输入数据
case class OUTtupleValue(appname: String, var UPdata: Int, var DOWNdata: Int) //输出数据
class DayAndMonthAccumulator extends AccumulatorV2[(String, INtupleValue), mutable.Map[String, OUTtupleValue]] {
private val dataMap = mutable.Map[String, OUTtupleValue]()
override def isZero: Boolean = {
dataMap.isEmpty
}
override def copy(): AccumulatorV2[(String, INtupleValue), mutable.Map[String, OUTtupleValue]] = {
new DayAndMonthAccumulator
}
override def reset(): Unit = {
dataMap.clear()
}
override def add(INdata: (String, INtupleValue)): Unit = {
val appname: String = INdata._1
val upData: Int = INdata._2.upvalue
val downData: Int = INdata._2.downValue
val acqData: OUTtupleValue = dataMap.getOrElse(appname, OUTtupleValue(appname, 0, 0))
acqData.UPdata += upData
acqData.DOWNdata += downData
dataMap.update(appname, acqData)
}
override def merge(other: AccumulatorV2[(String, INtupleValue), mutable.Map[String, OUTtupleValue]]): Unit = {
val map1 = this.dataMap
val map2 = other.value
map2.foreach{
case(name, tupledata) => {
val data1: OUTtupleValue = map1.getOrElse(name, OUTtupleValue(name,0, 0))
data1.UPdata += tupledata.UPdata
data1.DOWNdata += tupledata.DOWNdata
map1.update(name, data1)
}
}
}
override def value: mutable.Map[String, OUTtupleValue] = {
dataMap
}
}
}
更多推荐
所有评论(0)