
【Spark大数据分析】Spark大数据分析复习资料
算法:对数据做归一化处理,计算当前样本和所有训练样本的距离,对距离排序,选择最近的K个样本,统计样本出现的频率,出现频率最多的样本作为预测类别。Spark GraphX基于pregel计算“5万结点、250万条边”图的单源最短路径,大概用时8分钟(8核心16线程,4.0G主频 CPU)1GB文件单词计数,大概用时1分钟-2分钟(8核心16线程,4.0G主频 CPU,分区数为16,集群中无其他作业调
一、
- Val 和Var 的区别:val是常量,var可以被多次赋值,多次修改
- Map映射:map(v1,v2=>(v1+v2)),使用哈希表结构
- for循环:for(i<-1 until 5)(0 1 2 3 4);for(i<-1 until 5)(0 1 2 3 4 5)
- 集合类型有数组、List、Map映射、元组、Set(可去重)
- 数组定义:val arr=Array[int](1,2,3);单变量访问:arr(0);常用方法:arr.min,arr.sorted,arr.min,arr.sorted.reverse;array和list的区别:array是顺序结构
- 二维list定义:list(list(1,0,0),list(0,1,0));val list=listBuffer(1,2,3)
- 元组可以使用_1、_2访问其中元素,如t._1;
- 单例对象:object person{def showoff():Unit{}}可以使用person.showoff()调用方法
- 伴生对象:当某个单例对象名称与它的类名一致时,这个对象被称为伴生对象,二者必须定义在同一个文件中,两者可以互相访问其私有成员,例如class person{private name=”zhangsan”def show{println(“年龄”+person.age)}};object person{private age=20 def arr():Unit{var person=new Person() println(“姓名”+per.name) per.show())}}。由此可见,伴生对象的作用是访问私有成员,优缺点是提高了安全性,但降低了运行效率。
- Scala构造器分为主构造器和辅助构造器(this),主构造器可以没有参数,一个类中如果没有显示地定义主构造器,就默认有一个无参构造器,构造参数不带val/var,默认为val,示例:class Person{ //主构造器private name=”wangwu” //辅助构造器def this(name:String){this()//一定要有,this.name=name}}
- Spark是用来做分布式计算的,它支持java,scala,python和R语言的使用
- Spark的主要组件有Spark Streaming、SQL、MLib、GraphX
- Spark可以通过parallelize()和makeRDD()的方法创建RDD,例如Val rdd=sc.parallelize(List(1,23,4,5,6));Val rdd=sc.makeRDD(List(1,23,4,5,6))
- 写出将所有元素*2形成新的RDD的语句val rdd2=rdd.map(x=>x*2)
- 写出对所有元素进行升序排列的RDD的语句sortwith(_<_)
- val sc = new SparkContext(conf) ,SparkContext的意思是创建SparkContext窗口,该对象是提交Spark应用程序的入口
- val sc = new SparkConf(),创建SparkConf窗口存储应用程序的配置信息。
- Sc.setAppName(),设置应用程序名称,可以在Spark WebUI中显示
- RDD支持两种算子:转化算子和行动算子
- filter算子:对源RDD的每个元素进行过滤
- flatMap算子:实现映射
- reduceByKey算子:将相同key的元素聚在一起,key不变,value求和或形成一个列表
- groupByKey:一个RDD将相同key的元素聚在一起,key不变,value形成一个集合
- union:2个RDD合并成为一个新的RDD,主要用于不同数据来源进行合并
- SortBy:RDD中元素按照某个规则进行排序
- sortByKey:按key进行排序
- join:2个RDD根据key进行连接操作
- intersection:2个RDD进行交集操作
- distinct:去重
- Repartition算子的作用是重新分区
- RDD是一个分布在多个节点上的数据集合,它的主要特征如下:
RDD是不可变的,但可以将RDD转换成新的RDD进行操作
RDD是可分区的。RDD由很多分区组成,每个分区对应一个Task任务来执行(关于分区将在3.4节详细讲解)
对RDD进行操作,相当于对RDD的每个分区进行操作
RDD拥有一系列对分区进行计算的函数,称为算子(关于算子将在3.3节详细讲解)。
RDD之间存在依赖关系,可以实现管道化,避免了中间数据的存储。
- Spark中的一个作业一般会被划分成多个 stage
- DataFrame和RDD的主要区别是增加了结构和描述信息
- val conf = new SparkConf().setAppName("test").setMaster("local[*]"),其中local[*]的意思是使用*个CPU核心
- val stu=(40,“zhang”),访问数据40的表达式是stu._1
- mapreduce的工作原理:用户编写Map和Reduce函数,并提交作业到MapReduce框架,框架将作业分解为多个任务,并将这些任务分配给集群中的不同节点执行,每个节点独立执行Map任务,生成中间结果,中间结果通过Shuffle过程被传输和合并,然后作为输入传递给Reduce任务,Reduce任务处理合并后的数据,并生成最终结果。
- kafka分布式消息系统(kafka是什么?有什么用?)Apache Kafka是一个分布式流处理平台,主要用于构建实时数据管道和流应用程序。它支持高吞吐量的消息传递,能够处理每秒数百万条消息。Kafka的分布式架构提供了高可扩展性和容错性,通过消息的持久化存储在磁盘上,保证了数据的可靠性。它还允许生产者和消费者解耦,支持异步通信。适用于日志聚合、实时分析、事件驱动架构等多种场景。Kafka的灵活性和强大的消息模型使其成为现代数据架构中不可或缺的组件。与其他分布式系统集成,实现数据传输和共享。
- KNN算法的主要原理和程序伪代码:
算法:对数据做归一化处理,计算当前样本和所有训练样本的距离,对距离排序,选择最近的K个样本,统计样本出现的频率,出现频率最多的样本作为预测类别。
伪代码:
(1)计算特征的最大最小值maxFeature和minFeature
(2)根据maxFeature和minFeature对数据做归一化处理
(3)计算distance值
(4)对distance进行sort
(5)take(K)取最近K个样本
(6)依次使用map、reduceByKey、sortBy,得到K个样本里出现频率最多的样本
- 本课程使用过的开发环境、相关软件及其配置:语言、软件和工具:scala语言,IDEA,Spark Shell,jdk,hadoop包等
- sparkStreaming的工作原理:接收数据流,以时间片为单位分成批次,生成RDD进行处理
- 程序运行的大致时间:
1000万数据二次排序,大概用时10到15秒(8核心16线程,4.0G主频 CPU)
鸢尾花样本预测,大概用时7到10秒(8核心16线程,4.0G主频 CPU)
1GB文件单词计数,大概用时1分钟-2分钟(8核心16线程,4.0G主频 CPU,分区数为16,集群中无其他作业调度使用)
Spark GraphX基于pregel计算“5万结点、250万条边”图的单源最短路径,大概用时8分钟(8核心16线程,4.0G主频 CPU)
- 简述Spark GraphX最短路径的计算原理和过程:
初始点的距离值设置为0,其它点设置为正无穷大;
选择距离值最小的结点作为当前处理结点,更新邻接点的距离值;
将当前结点设置为已处理结点;
重复以上步骤,直到无可以处理的结点。
-
图计算代码实现:
object SparkGraphX { def main(args: Array[String]) { …… val vertexArray = Array( Edge(2L,1L,7), Edge(2L,4L,2), …… ) val edgeArray = Array( Edge(2L, 1L, 7), Edge(2L, 4L, 2), Edge(3L, 2L, 4), Edge(3L, 6L, 3), Edge(4L, 1L, 1), Edge(5L, 2L, 2), Edge(5L, 3L, 8), Edge(5L, 6L, 3) ) val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray) val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray) val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD) println("找出图中年龄大于 50的顶点:") graph.vertices.filter{x => x._2._2 > 50}.collect.foreach { case (id, (name, age)) => println(s"$name is $age") } println("顶点的转换操作,顶点 age + 5:") graph.mapVertices { case (id, (name, age)) => (id, (name, age + 10)}.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}")) } }
object SparkGraphX {
def main(args: Array[String]) {
……val vertexArray = Array(
Edge(2L,1L,7),
Edge(2L,4L,2),
…… )
val edgeArray = Array(
Edge(2L, 1L, 7),
Edge(2L, 4L, 2),
Edge(3L, 2L, 4),
Edge(3L, 6L, 3),
Edge(4L, 1L, 1),
Edge(5L, 2L, 2),
Edge(5L, 3L, 8),
Edge(5L, 6L, 3)
)val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)
println("找出图中年龄大于 50的顶点:")
graph.vertices.filter{x => x._2._2 > 50}.collect.foreach {
case (id, (name, age)) => println(s"$name is $age")
}println("顶点的转换操作,顶点 age + 5:")
graph.mapVertices { case (id, (name, age)) => (id, (name, age + 10)}.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))
}
}
-
数据库代码实现
Case class Student(id:Int,name:String,age:Int) extends Serializable case class Score(stu_id:Int,cs_id:Int,score:Int) extends Serializable object SparkSqlScore { def main(args: Array[String]): Unit = { …… var linesRDD: RDD[String] = sc.textFile("c:\\spark\\student.txt") var fieldsRDD: RDD[Array[String]] = linesRDD.map(_.split(" ")) val studentRDD: RDD[Student] = fieldsRDD.map(x=>Student(x(0).toInt,x(1),x(2).toInt)) linesRDD = sc.textFile("c:\\spark\\score.txt") fieldsRDD = linesRDD.map(_.split(" ")) val scoreRDD: RDD[Score] = fieldsRDD.map(x=>Score(x(0).toInt,x(1).toInt,x(2).toInt)) import spark.implicits._ val studentDF: DataFrame = studentRDD.toDF() val scoreDF: DataFrame = scoreRDD.toDF() studentDF.createOrReplaceTempView("t_student") scoreDF.createOrReplaceTempView("t_score") //sql语句进行操作(1)查询年龄大于等于20岁的学生记录(2)查询考试不及格的学 //生姓名、课程号、分数 spark.sql("select count()* as count form t_stduent where age>=20").show() spark.sql("select s.name, c.cs_id, c.score form t_stduent, t_score where t_student.name=t_score.name and t_score.score<60").show()
Case class Student(id:Int,name:String,age:Int) extends Serializable
case class Score(stu_id:Int,cs_id:Int,score:Int) extends Serializableobject SparkSqlScore {
def main(args: Array[String]): Unit = {……
var linesRDD: RDD[String] = sc.textFile("c:\\spark\\student.txt")
var fieldsRDD: RDD[Array[String]] = linesRDD.map(_.split(" "))
val studentRDD: RDD[Student] = fieldsRDD.map(x=>Student(x(0).toInt,x(1),x(2).toInt))
linesRDD = sc.textFile("c:\\spark\\score.txt")
fieldsRDD = linesRDD.map(_.split(" "))
val scoreRDD: RDD[Score] = fieldsRDD.map(x=>Score(x(0).toInt,x(1).toInt,x(2).toInt))import spark.implicits._
val studentDF: DataFrame = studentRDD.toDF()
val scoreDF: DataFrame = scoreRDD.toDF()studentDF.createOrReplaceTempView("t_student")
scoreDF.createOrReplaceTempView("t_score")//sql语句进行操作(1)查询年龄大于等于20岁的学生记录(2)查询考试不及格的学
//生姓名、课程号、分数
spark.sql("select count()* as count form t_stduent where age>=20").show()
spark.sql("select s.name, c.cs_id, c.score form t_stduent, t_score where t_student.name=t_score.name and t_score.score<60").show()
-
简述Spark Streaming的工作原理和过程
spark streaming接收数据流,以时间片为单位拆分成批次,生成RDD进行处理
-
流计算代码
object SparkStreaming { def main(args:Array[String]):Unit={ …… val ssc=new StreamingContext (conf,Seconds(seconds(2)) val lines=ssc.textFileStream("c:\\spark\\streaming") //输出分数为100分的学生的“学号、课程号、分数”,并统计100分的人数 val score=lines.map(x=>{ val stu_id=x.split(" ")(0).toInt val course_id=x.split(" ")(1).toInt val score=x.split(" ")(2).toInt (stu_id,course_id,score) }) score.filter(_._3==100).print() score.filter(_._3==100).count().print() ssc.start() ssc.awaitTermination() } }
object SparkStreaming {
def main(args:Array[String]):Unit={
……
val ssc=new StreamingContext (conf,Seconds(seconds(2))
val lines=ssc.textFileStream("c:\\spark\\streaming")//输出分数为100分的学生的“学号、课程号、分数”,并统计100分的人数
val score=lines.map(x=>{
val stu_id=x.split(" ")(0).toInt
val course_id=x.split(" ")(1).toInt
val score=x.split(" ")(2).toInt
(stu_id,course_id,score)
})
score.filter(_._3==100).print()
score.filter(_._3==100).count().print()ssc.start()
ssc.awaitTermination()}
}
-
编写scala程序,计算多种图形的面积。包括圆形、矩形。(用抽象类写)抽象类不能被实例化,子类可通过继承抽象类来实现抽象类的方法,若子类实现了抽象类中已经实现的方法,此称为重写,重写需要加override?
import scala.math.sqrt abstract class Shape { def Area:Double } class Circle(_r:Double) extends Shape{ var r:Double=_r def Area:Double={ 3.14*r*r } } class Rectangle(_length:Double,_width:Double) extends Shape{ var length:Double=_length var width:Double=_width def Area:Double={ length*width } } object ShapeArea { def main(args:Array[String]):Unit={ var shape:Shape=new Circle(3.0) println(shape.Area) shape=new Rectangle (3.0,4.0) println(shape.Area) } }
- 编写scala程序,统计单词个数并按字典序输出。
val wordMap:SortedMap[String,Int]=SortedMap() for(i<-wordList.indices){ if (!wordMap.contains(wordList(i))){ wordMap(wordList(i))=1 }else{ wordMap(wordList(i))+=1 } } println(wordMap)
- 编写Spark RDD TOPN程序,统计每个学生的最高3个分数。
val nameAndScoreTupleRDD: RDD[(String, Int)] = linesRDD.flatMap(_.split(";")).map(x => { val name = x.split(",")(0) val score = x.split(",")(1).toInt (name, score) }) val topNRDD = nameAndScoreTupleRDD.groupByKey().map(x => { val name: String = x._1 val scoreList: List[Int] = x._2.toList.sortWith(_ > _).take(3) (name, scoreList) }) topNRDD.repartition(1).foreach(x => { println() print(x._1 + ":") x._2.foreach(y => print(y + " ")) println() })
-
二次排序代码实现
更多推荐
所有评论(0)