背景

spark sql中,使用count(distinct)会使得数据发生expand,使得stage的全部task跑的慢
原因:数据expand会使得数据翻n倍,单个task处理过多的数据
conut(distinct)源码分析

优化方法

一、改写sql

适用于最后去重的字段时同一个,只是条件不同

  1. 将数据聚合到去重字段粒度
  2. 计算结果

二、膨胀前repartition

通过repartition或distribute by,在数据膨胀前,手动用一次shuffle将数据打散到更多的task
如:表有1280M,一个task读128M,在进行数据膨胀之前,通过repartition打散到100个task上,每个task只有12.8M,则发生数据膨胀也10倍也不会超过单个task的处理能力
注意:repartition的数量可以根据膨胀倍数进行确定

三、参数调优

让每个map读的数据变少,如设置12M,数据膨胀10倍至120M时一个task也能进行处理
存在的问题:读表变慢,因为需要用到更多的task

set spark.sql.files.maxPartitionBytes = 12m;

四、逻辑拆分

把逻辑拆开,分别计算指标,然后再join起来

总结

考虑去重字段数量

最后去重的字段是一个,仅条件不同

1>2>3>4

最后去重的字段是多个

2>3>4
(此时方法1不适用)

Logo

永洪科技,致力于打造全球领先的数据技术厂商,具备从数据应用方案咨询、BI、AIGC智能分析、数字孪生、数据资产、数据治理、数据实施的端到端大数据价值服务能力。

更多推荐