【大数据分析】基本RDD操作
目录MLlib介绍创建RDDMLlib介绍MLlib是Spark中可扩展的机器学习库,她由一些列机器学习算法和实用程序组成。包含:线性回归、逻辑回归、贝叶斯分类、决策树分类、KMeans、LDA、KNN、Apriori、FPGrowth、协同过滤、ALS、BP、RBF、SVM等,另外还包括一些深度学习算法。创建RDDRDD(Resilient Distributed Dataset),即.....
RDD有两种操作:transformations和actions。transformations在一个RDD上进行有用的数据操作得到一个新的RDD。actions触发计算过程并且返回结果。Spark的transformation是“lazy”的,意味着它在遇到actions前不会触发真正的计算过程。
创建RDD
RDD(Resilient Distributed Dataset),即弹性分布式数据集,可以通过使用它很方便地将数据存储到磁盘和内存中,并能控制数据的分区数。
1、数据集合
val data = Array(1,2,3,4,5,6)
//3是分区数
val dataRDD = sc.parallelize(data,3)
每一个分区对应一个对应的任务Task。默认情况下集群中的每一个CPU对应运行2-4个分片。Spark会根据实际情况自行设定分片数量,但也可以自定义,如上面的3。
2、外部数据源
Spark支持文本文件、序列文件以及其他Hadoop支持的输入格式文件。
//当前目录文件
val dataRDD1 = sc.textFile("data.txt")
//HDFS文件,并指定3个分区
val dataRDD2 = sc.textFile("hdfs://<ip>:<port>/input/data.txt",3)
//本地文件
val dataRDD3= sc.textFile("/input/data.txt")
//多个文件
val dataRDD4 = sc.textFile("/input/001.txt","/input/002.txt")
//通配符
val dataRDD5 = sc.textFile("/input/*.txt")
transformations
map
map允许使用一个函数作用于rdd中所有的数据。
val rdd1 = sc.parallelize(1 to 9,3)
val rdd2 = rdd1.map(x=>x*2)
println(rdd2.map(x => x.toString).reduce((x,y) => x + "," + y))
2,4,6,8,10,12,14,16,18
filter
对RDD中的元素记性过滤
val rdd3 = rdd2.filter(x=>x>10)
println(rdd3.map(x => x.toString).reduce((x,y) => x + "," + y))
12,14,16,18
flatMap
在map功能的基础上添加了扁平化
val rdd4 = rdd3.flatMap(x=>x to 20)
println(rdd4.map(x => x.toString).reduce((x,y) => x + "," + y))
12,13,14,15,16,17,18,19,20,14,15,16,17,18,19,20,16,17,18,19,20,18,19,20
map与flatMap的区别
对于flatMap,传入的函数在处理完后,其返回值必须是List(或Seq),如果不是List(Seq),那么将出错。也就是说,传入的函数是有要求的 。
class person(val name:String,val relatedPeople:Seq[String])
def main(args:Array[String]):Unit={
val sparkAppName = "Basic"
val sc = getSparkContext(sparkAppName)
val p1=new person("A",Seq("A-1","A-2","A-3"))
val p2=new person("B",Seq("B-1","B-2","b-2"))
val seqs = Seq[person](p1,p2)
val m_rp = seqs.map(
p=>p.relatedPeople
)
println("---mapping---")
println(m_rp)
val fm_rp = seqs.flatMap(
p=>p.relatedPeople
)
println("---flatMapping---")
println(fm_rp)
sc.stop()
}
---mapping---
List(List(A-1, A-2, A-3), List(B-1, B-2, b-2))
---flatMapping---
List(A-1, A-2, A-3, B-1, B-2, b-2)
上面例子中的flatMap算子中的map操作对每个元素处理后都返回一个List,所以会得到一个包含List元素的List,而flat操作进一步将所有的内部list的元素取出来构成一个List返回。
mapPartitions
map的输入函数应用于RDD中的每个元素;而mapPartitions的输入函数针对每个分区的数据,分区内的数据将作为整体来处理。
def main(args:Array[String]):Unit={
val sparkAppName = "Basic"
val sc = getSparkContext(sparkAppName)
val data = Array(1,2,3,4,5,6,7,8,9)
val rdd1 = sc.parallelize(data,3)
val rdd5 = rdd1.mapPartitions(myfunc)
sc.stop()
}
def myfunc[T](iter:Iterator[T]):Iterator[(T,T)] = {
var res = List[(T,T)]()
var pre = iter.next()
while(iter.hasNext){
val cur = iter.next()
res.::=(pre,cur)
pre = cur
}
res.iterator
}
(2,3),(1,2),(5,6),(4,5),(8,9),(7,8)
mapPartitionsWithIndex
mapPartitionsWithIndex与mapPartitions的功能类似,额外多传split index,split index 是分区的索引。
sample
sample(withReplacement,fraction,seed)
withReplacement:是否放回抽样;fraction:比例,0.1表示10%;seed:随机种子。
val a = sc.parallelize(1 to 1000,3)
println(a.sample(false,0.1,0).count())
115
union
union(otherDataset)
数据合并,返回新的RDD。
val rdd8 = rdd1.union(rdd3)
println(rdd8.map(x => x.toString).reduce((x,y) => x + "," + y))
1,2,3,4,5,6,7,8,9,12,14,16,18
intersection
intersection(otherDataset)
返回两个RDD的交集。
val rdd9 = rdd8.intersection(rdd1)
println(rdd9.map(x => x.toString).reduce((x,y) => x + "," + y))
6,1,7,8,2,3,9,4,5
distinct
distinct([numTasks])
数据去重,numTasks用于设置并行任务数量
val rdd10 = rdd8.union(rdd9).distinct()
println(rdd10.map(x => x.toString).reduce((x,y) => x + "," + y))
12,1,14,2,3,4,16,5,6,18,7,8,9
gourpByKey
groupByKey([numTasks])
数据分组操作,同时也是shuffle算子。在一个由(K,V)对组成的数据集上使用,返回一个(K,Seq[V])对的数据集。
val rdd0 = sc.parallelize(Array((1,1),(1,2),(1,3),(2,1),(2,2),(2,3)),3)
val rdd11 = rdd0.groupByKey()
println(rdd11.map(x => x.toString).reduce((x,y) => x + "," + y))
(1,CompactBuffer(1, 2, 3)),(2,CompactBuffer(1, 2, 3))
reduceByKey
reduceByKey(func,[numTasks])
数据分组聚合操作,同时也是shuffle算子。在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集。
val rdd12 = rdd0.reduceByKey((x,y)=>x+y)
println(rdd12.map(x => x.toString).reduce((x,y) => x + "," + y))
(1,6),(2,6)
aggregateByKey
aggreateByKey(zeroValue:U)(seqOP:(U,T)=>U,combOp:(U,U)=>U)
shuffle算子。它有三个参数
~zeroValue:U,初始值,比如空列表{}。
~seqOp:(U,T)=>U,seq操作符。
~combOp:(U,U)=>U,comb操作符
combineByKey
combineByKey[C](createCombiner:(V)=>C,mergeValue:(C,V)=>C,mergeCombiners:(C,C)=>C,numPartitions:Int):RDD[(K,C)]
shuffle算子。把(K,V)类型的RDD转换为(K,C)类型的RDD,C和V可以不一样。combineByKey的三个参数如下
sortByKey
排序操作,对(K,V)类型的数据按照K进行排序,shuffle算子
join
将输入的数据集(K,V)和另一个数据集(K,W)进行join,得到(K,(V,W))。对相同K的V和W进行笛卡尔积操作。也就是V和W的所有组合。
cogroup
cogroup(otherDataset,[numTasks])
shuffle算子,将输入数据集(K,V)和另外一个数据集(K,W)进行cogroup,得到一个格式为(K,Seq[V],Seq[W])的数据集
val rdd16 = rdd0.cogroup(rdd0)
println(rdd16.map(x => x.toString).reduce((x,y) => x + "," + y))
(1,(CompactBuffer(1, 2, 3),CompactBuffer(1, 2, 3))),(2,(CompactBuffer(1, 2, 3),CompactBuffer(1, 2, 3)))
cartesian
对数据集T和U进行笛卡尔积,得到(T,U)格式的数据集
pipe
以shell命令处理RDD数据
randomSplit
randomSplit(weights:Array[Double],seed:Long=Utils.random.nextLong):Array[RDD[T]]。对RDD按照权重进行数据分割。
substract
subtract(other:RDD[T]):RDD[T]
对RDD进行减去操作。
zip
zip[U](other:RDD[U])(implicit arg0:ClassTag[U]):RDD[(T,U)]
对两个RDD进行拉链操作
coalesce
coalesce(numPartitions)
将RDD进行重分区,新分区的个数由numPartitions定义,默认不进行shuffle
同时也是shuffle算子
treeAggregate
treeAggregate[U](zeroValue:U)(seqOp:(U,T)=>U,combOp:(U,U)=>U,depth:Int=2)
zeroValue:U,初始值,比如空列表{},0等。
seqOp:(U,T)=>U,seq操作符,描述如何将T合并两个U,比如合并两个列表。
actions
Spark RDD在遇到Action算子时,才会真正触发计算。
reduce
reduce(func)对所有元素执行函数func。
collect
将RDD类型的数据转化为数组,同时从远程集群是拉取数据到driver端。
count
返回数据集中元素的个数
first
返回数据集中的第一个元素,类似于take(1)
take
take(n)返回包含前n个元素的数组。
takeSample
takeSample(withReplacement,num,[seed])返回包含随机的num个元素的数组。
takeOrdered
takeOrdered(n,[ordering])是返回包含随机的n个元素的数组,按照顺序输出。
saveAsTextFile
把数据集中的元素写到一个文本文件,Spark会对每个元素调用toString方法来把每个元素存成文本文件的一行。
countByKey
对于(K,V)类型的RDD,返回一个(K,int)的map,Int为K的个数
foreach
foreach(func)是对数据集中的每个元素都执行func函数
更多推荐
所有评论(0)