【大数据分析】Spark的joins研究
数据的join操作(数据连接)对于数据分析来说是非常重要的组成部分,不管是Spark Core还是Spark SQL都支持joins的相同基本类型。joins一种很常见,但又最容易造成性能问题的操作。因为它可能会造成大量的网络传输,尤其是当使用Spark Core组件的时候,因为DAG optimizer(DAG 优化器)无法对数据进行重排列(按照列)并且降低filters的复杂度。在输入数据集的
概述
数据的join操作(数据连接)对于数据分析来说是非常重要的组成部分,不管是Spark Core还是Spark SQL都支持joins的相同基本类型。joins一种很常见,但又最容易造成性能问题的操作。因为它可能会造成大量的网络传输,尤其是当使用Spark Core组件的时候,因为DAG optimizer(DAG 优化器)无法对数据进行重排列(按照列)并且降低filters的复杂度。
join的类型
在输入数据集的记录之间应用连接条件之后,join类型会影响join操作的结果,假设有两个数据表A和B。
A表的数据如下:
Aid | Aname |
---|---|
1 | A1 |
2 | A2 |
3 | A3 |
4 | A4 |
5 | A5 |
6 | A6 |
7 | A7 |
8 | A8 |
9 | A9 |
B表的数据如下:
Bid | Bname |
---|---|
2 | 100 |
6 | 101 |
3 | 102 |
7 | 103 |
6 | 104 |
8 | 105 |
2 | 106 |
2 | 107 |
11 | 108 |
(1)内连接(join或inner join)。可获取两表公共部分的记录。SQL语句:select * from A JOIN B ON A.Aid=B.Bid
Aid | Aname | Bid | Bname |
---|---|---|---|
2 | A2 | 2 | 100 |
2 | A2 | 2 | 106 |
2 | A2 | 2 | 107 |
6 | A6 | 6 | 104 |
6 | A6 | 6 | 101 |
3 | A3 | 3 | 102 |
7 | A7 | 7 | 103 |
8 | A8 | 8 | 105 |
(2)左外连接(left out join)。可获取公共部分的数据集和left join关键字左边的表的数据集。SQL语句:select * from A left join B on A.Aid=B.Bid
Aid | Aname | Bid | Bname |
---|---|---|---|
1 | A1 | NULL | NULL |
2 | A2 | 2 | 100 |
2 | A2 | 2 | 106 |
2 | A2 | 2 | 107 |
3 | A3 | 3 | 102 |
4 | A4 | NULL | NULL |
5 | A5 | NULL | NULL |
6 | A6 | 6 | 104 |
6 | A6 | 6 | 101 |
7 | A7 | 7 | 103 |
8 | A8 | 8 | 105 |
9 | A9 | NULL | NULL |
(3)右外连接(right out join)。可获取公共部分的数据集和right join右边的表的数据集。SQL语句:select * from A right join B on A.Aid=B.Bid
(4)半连接(semi join)。semi join关键字右边的表只用于过滤左边的表的数据而不出现在结果集中。
(5)交叉连接(cross join)。交叉连接返回左表中的所有行,和左表中每一行与右表中所有行的组合。交叉连接也称为笛卡尔积。
Spark执行join的5种策略
Spark提供5种join机制来执行具体的join操作。分别是:shuffle hash join,broadcast hash join,sort merge join,cartesian join,broadcast nested loop join。
(1)shuffle hash join
如果两个表的其中之一(或者其中之二)的数据量比较大,就可以选择shuffle hash join,这样可以保证每个相同的key都发送到同一个分区中。
shuffle hash join的基本步骤主要由以下两点:
(1)对于两张参与join的表,分别按照join key进行重分区,该过程会有shuffle,目的是将相同key的数据发送到同一个分区(一个分区可以有多个key),然后方便分区内执行join。
(2)对于每个shuffle之后的分区,小表的分区数据会构建成一个hash table,然后根据join key与大表的分区数据记录进行匹配。
shuffle hash join的特点:
(1)仅支持等值连接,也就是A表和B表中需要存在某个相同字段,join key不需要排序。
(2)不支持全外连接(full outer joins)
(3)需要对小表构建hash map,属于内存密集型的操作,如果构建hash表的一侧数据也比较大,可能会造成OOM。
(4)需要将参数spark.sql.join.prefersortmergeJoin(default true)设为false
(2)broadcast hash join
如果join的其中一张表比较小,可以选择broadcast hash join,这样可以避免shuffle带来的开销(spark的shuffle操作是很耗时的),从而提高性能。比如事实表与维度表进行join时,由于维度表的数据通常比较小,可以将维度表的数据进行广播(broadcast)。
broadcast hash join的特点
(1)broadcast hash join相比其他join机制效率更高。但由于它本质是数据冗余传输机制,并且需要Driver端缓存数据,所以当小表的数据量也比较大时,会出现OOM。
(2)被广播的小表的数据量要小于spark.sql.autoBroadcastJoinThreshold值,默认是10MB。
(3)被广播的小表的大小阈值不能超过8G。
(3)Sort Merge Join
sort merge join机制是Spark中默认,可以通过参数spark.sqlspark.sql.join.preferSortMergeJoin进行配置,默认是true,即优先使用sort merge join。一般是当需要join的两张表都比较大时,使用这个机制。sort merge join可以减少集群间的数据传输,该方式不会预先加载所有数据到内存,但是join需要对join key进行排序。
sort merge join主要包括三个阶段:
(1)shuffle phase:两张大表根据join key进行shuffle重分区。
(2)sort phase:每个分区内的数据进行排序。
(3)merge phase:对来自不同表的,排序好的分区数据进行join,通过遍历元素,连接具有相同join key值的行来合并数据集。
sort merge join的条件与特点:
(1)仅支持等值连接。
(2)支持所有情况下的join。
(3)join key是排序的。
(4)参数spark.sqlspark.sql.join.preferSortMergeJoin(默认true)设定为true。
(4)Cartesian Join
如果Spark中两张参与join的表没指定join key(on 条件)那么会产生cartesian product join,这个join得到的结果其实就是两张行数的乘积。
cartesian join的特点:
(1)仅支持内连接。
(2)支持等值和不等值连接。
(3)开启参数spark.sql.crossJoin.enable=true
各类join策略的优先级
五种join策略的优先级:Broadcast Hash Join>Sort Merge Join>Shuffle Hash Join>Cartsian Join>Broadcast Nested Loop Join。
更多推荐
所有评论(0)