
大数据的小文件优化指引
如果结果文件的平均大小小于hive.merge.mapfiles设置的值,则额外启动一轮job进行小文件的合并,合并后的期望文件大小由max(hive.merge.size.per.task, hive.merge.smallfiles.avgsize)来决定。此时也需要在结果写入到hdfs之后启动一轮额外的任务来合并小文件,方法是使用distribute by 把相同分区的数据分发到相同的tas
1.危害
小文件的最大危害是拖慢任务执行时间,甚至会引发OOM
2.单分区的小文件合并
一个任务不管是spark还是mapreduce,大致的执行过程都是如下图所示
如图,我们可以在上面三个stage中进行小文件的合并
阶段1:map合并,发生在split操作,可以解决输入小文件的合并(也可以解决只有map阶段的输出小文件合并),方式是把多个小文件合并成一个数据分片(split),进而交给一个map task进行计算。
阶段2:reduce合并,发生在shuffle阶段,可以解决输出小文件的合并,方式是减少reduce task数量,减少小文件的形成
阶段3:写入hdfs之后,可以解决输出小文件的合并,方式是额外启动一个job,根据用户设置的阈值来合并已经生成的小文件
以上的小文件合并,都是可以通过参数的设置来优化,其中对于输出文件的优化(经过reduce的输出),在spark下推荐优先使用reduce合并
由于我们目前数仓中的表绝大多数是orc格式的表,并且input format默认是CombineHiveInputFormat,所以这里针对这种设置进行一个总结
hive |
spark |
|||
参数设置 |
解释 |
参数设置 |
解释 |
|
map合并 |
mapred.max.split.size mapred.min.split.size.per.node mapred.min.split.size.per.rack |
mapred.max.split.size控制切分时的数据分片大小 mapred.min.split.size.per.node和mapred.min.split.size.per.rack控制了切分后,在dn和机架内的合并阈值。 三个参数的值可以根据情况自由调整 |
set spark.hadoop.hive.exec.orc.split.strategy=ETL; spark.hadoopRDD.targetBytesInPartition spark.hadoop.mapreduce.input.fileinputformat.split.maxsize spark.hadoop.mapreduce.input.fileinputformat.split.minsize |
首先需要把切分策略设置成ETL模式,其余三个参数都可以控制合并后的文件大小。 |
reduce合并 |
hive.exec.reducers.bytes.per.reducer |
增大该参数值,可以增大每个reducer处理的数据量,进而减少reduce task的数量,最终减少了输出文件数量。 task_num=input_size/hive.exec.reducers.bytes.per.reducer 注意:一个hive任务会拆分成多个MR任务,该参数会对多个MR任务的reduce阶段作用,所以并不是一个很好的减少小文件的方式(有可能会导致同任务的其他MR job执行缓慢) |
set spark.sql.adaptive.enabled=true; spark.sql.adaptive.shuffle.targetPostShuffleInputSize |
打开自适应开关,在最后一个stage中增加每个task的处理量,进而减少task数量,最终减少小文件数量。 task_num=shuffle_read_size/spark.sql.adaptive.shuffle.targetPostShuffleInputSize |
写入后合并 |
set hive.merge.mapfiles=true; set hive.merge.mapredfiles=true; hive.merge.size.per.task hive.merge.smallfiles.avgsize |
首先打开map端和reduce端的结果文件合并开关 如果结果文件的平均大小小于hive.merge.mapfiles设置的值,则额外启动一轮job进行小文件的合并,合并后的期望文件大小由max(hive.merge.size.per.task, hive.merge.smallfiles.avgsize)来决定 注意:如果对mapjoin的结果进行合并,一定要设置hive.merge.mapredfiles=true |
spark.sql.mergeSmallFileSize spark.sql.targetBytesInPartitionWhenMerge |
spark.sql.mergeSmallFileSize和hive参数hive.merge.smallfiles.avgsize作用相同,是小文件合并的阈值,最终的文件大小由max(spark.sql.mergeSmallFileSize, spark.sql.targetBytesInPartitionWhenMerge , spark.hadoopRDD.targetBytesInPartition )来决定 注意:由于新起了一个stage做文件合并,所以参数spark.hadoopRDD.targetBytesInPartition也是其作用 |
3.多分区的小文件合并
如果一个任务最终会向多个分区插入数据,此时M个task会分别在同一个分区中写M个文件,这样N个分区就会形成M*N个文件。由于此种方式产生的小文件,处理起来比较特殊,单纯通过参数不能很好的解决。
此时也需要在结果写入到hdfs之后启动一轮额外的任务来合并小文件,方法是使用distribute by 把相同分区的数据分发到相同的task中,如果一个分区用一个task会处理较多的数据,可以把一个分区散列成多个task来处理。
比如一个表的分区字段是dt,一个任务同时处理了100天的数据,最终会在100个dt分区下写入文件。
使用distribute by dt,会把dt相同的数据分发到一个task中,这样一个分区就会生成一个文件
如果一个分一个文件数据量太大,可以"distribute by dt,pk%10" 或者“distribute by dt,cast(rand(11)*10 as int)”,这样把一个分区的数据散列到了10个task中处理,最终一个分区会生成10个文件
更多推荐
所有评论(0)