《Spark快速大数据分析》笔记 第三章RDD编程
以下内容均摘抄自《Spark快速大数据分析》运行规则总的来说,每个 Spark 程序或 shell 会话都按如下方式工作。从外部数据创建出输入 RDD。使用诸如 filter() 这样的转化操作对 RDD 进行转化,以定义新的 RDD。告诉 Spark 对需要被重用的中间结果 RDD 执行 persist() 操作。使用行动操作(例如 count() 和 first() 等)来触发一次并行
以下内容均摘抄自《Spark快速大数据分析》
运行规则
总的来说,每个 Spark 程序或 shell 会话都按如下方式工作。
- 从外部数据创建出输入 RDD。
- 使用诸如 filter() 这样的转化操作对 RDD 进行转化,以定义新的 RDD。
- 告诉 Spark 对需要被重用的中间结果 RDD 执行 persist() 操作。
- 使用行动操作(例如 count() 和 first() 等)来触发一次并行计算, Spark 会对计算进行 优化后再执行。
RDD创建
RDD操作
转化操作
返回一个新的 RDD 的操作,比如 map() 和 filter()。
转化出来的 RDD 是惰性求值的,只有在行动操作中用到这些 RDD 时才会被计算。
行动操作
行动操作则是向驱动器程序返回结果或把结果写入外部系统的操作,会触发实际的计算,比如 count() 和 first()。
转化操作返回的是 RDD,而行动操作返回的是其他的数据类型。
惰性求值
RDD 的转化操作都是惰性求值的。这意味着在被调用行动操作之前 Spark 不会开始计算。
惰性求值意味着当我们对 RDD 调用转化操作(例如调用 map())时,操作不会立即执行。相反, Spark 会在内部记录下所要求执行的操作的相关信息。我们不应该把 RDD 看作存放着特定数据的数据集, 而最好把每个 RDD 当作我们通过转化操作构建出来的、记录如何计算数据的指令列表。 把数据读取到 RDD 的操作也同样是惰性的。因此,当我们调用sc.textFile() 时,数据并没有读取进来,而是在必要时才会读取。和转化操作一样的是,读取数据的操作也有可能会多次执行。
Spark 使用惰性求值,这样就可以把一些操作合并到一起来减少计算数据的步骤。在类似Hadoop MapReduce 的系统中,开发者常常花费大量时间考虑如何把操作组合到一起,以减少 MapReduce 的周期数。而在 Spark 中,写出一个非常复杂的映射并不见得能比使用很多简单的连续操作获得好很多的性能。 因此,用户可以用更小的操作来组织他们的程序,这样也使这些操作更容易管理。
向Spark传递函数
Spark 的大部分转化操作和一部分行动操作,都需要依赖用户传递的函数来计算。
传递函数时需要小心的一点是, Python 会在你不经意间把函数所在的对象也序列化传出去。当你传递的对象是某个对象的成员, 或者包含了对某个对象中一个字段的引用时(例如 self.field), Spark 就会把整个对象发到工作节点上,这可能比你想传递的东西大得多(见例 3-19)。
常见的转化操作和行动操作
基本RDD
针对各个元素的转化操作: map() 和 filter()、flatMap()。
伪集合操作:尽管 RDD 本身不是严格意义上的集合,但它也支持许多数学上的集合操作,比如合并和相交操作,如下图。
需要注意, distinct() 操作的开销很大,因为它需要将所有数据通过网络进行混洗(shuffle),以确保每个元素都只有一份。
行动操作:
在不同RDD类型间转换
有些函数只能用于特定类型的 RDD,比如 mean() 和 variance() 只能用在数值 RDD 上,而 join() 只能用在键值对 RDD 上。
在 Scala 中,将 RDD 转为有特定函数的 RDD(比如在 RDD[Double] 上进行数值操作)是由隐式转换来自动处理的。
在 Java 中,各种 RDD 的特殊类型间的转换更为明确。 Java 中有两个专门的类 JavaDoubleRDD和 JavaPairRDD,来处理特殊类型的 RDD。
Python 的 API 结构与 Java 和 Scala 有所不同。在 Python 中,所有的函数都实现在基本的RDD 类中,但如果操作对应的 RDD 数据类型不正确,就会导致运行时错误。
持久化 persist()(缓存)
Spark RDD 是惰性求值的,如果简单地对 RDD 调用行动操作, Spark 每次都会重算 RDD 以及它的所有依赖。为了避免多次计算同一个 RDD,可以让 Spark 对数据进行持久化。
当我们让 Spark 持久化存储一个 RDD 时,计算出 RDD 的节点会分别保存它们所求出的分区数据。
在 Scala和 Java 中,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中。在 Python 中,我们会始终序列化要持久化存储的数据,所以持久化级别默认值就是以序列化后的对象存储在 JVM 堆空间中。
如果要缓存的数据太多, 内存中放不下, Spark 会自动利用最近最少使用(LRU)的缓存策略把最老的分区从内存中移除。
更多推荐
所有评论(0)