背景介绍
CDT Data团队每天需要处理来自eBay的所有数据,数据量非常大、存储的历史周期也特别长,在日常跑批或手动维护过程中,不可避免地产生大量的小文件。过多的小文件会对HDFS NameNode产生较大的压力,甚至造成灾难性的影响。处理小文件是一个比较通用的需求,也是Spark SQL和Hive处理数据的一个痛点。如何快速、高效地、以最少的资源合并小文件对于我们来说非常重要。
成因和危害
离线任务处理数据时,不可避免地产生小文件,其原因有很多,大致可以分为以下几个因素:
分区数量过多。在数据量非常大时,为了追求效率,开发者往往把 spark.sql.shuffle.partitions 设置得很大。如果分区数远大于数据量时,每个分区写入的文件就会很小。
数据倾斜。这是最常见的原因。数据分布往往不均匀,或不恰当的join条件,在处理数据时,会导致一部分分区产生的文件很小。
过滤操作。比如CTAS(Create Table As Select),如果查询时过滤了大部分的数据,也会产生小文件。
动态分区。可以看做是特殊场景的数据倾斜。特别是数据按照日期分区,在使用动态分区写入时,数据的分布往往不均匀。比如在早期,项目刚上线,数据量一般很小,后期会越来越大,特别是对于Risk数据,如果某些规则被触发的次数特别多,会导致某些天的数据暴增。
文件格式。某些文件格式(如Parquet或ORC)是以块的形式来组织数据的,不恰当的参数会导致小文件的产生。
数据源特性。从某些特定的数据源读取数据时,例如读取小文件集合,可能会保留原始文件的分区粒度,导致输出也是小文件。
小文件的危害大概有以下6个:
NameNode内存压力。HDFS的NameNode负责存储文件系统的元数据,包括文件名、权限、每个文件的块列表和块的位置信息等。如果有大量的小文件,每个文件的元数据都需要存储在NameNode的内存中,这会消耗大量的内存资源。
元数据操作开销。对于每个文件或目录的操作,NameNode都需要更新其元数据。如果有大量的小文件,即使是简单的文件系统操作也会导致大量的元数据更新,这会使NameNode的负载增加,影响其性能。
任务启动开销。通常每个HDFS块会启动一个Map任务。如果有大量的小文件,意味着会有大量的Map任务被创建,这会导致任务调度和启动的开销变大,从而影响整体的处理效率。
读取效率低。HDFS是优化了顺序读取大文件的性能,而对于小文件,因为每个文件都可能分布在不同的DataNode上,所以频繁地打开和关闭文件会增加网络开销和I/O开销,导致数据读取效率低下。
存储空间利用率低。虽然小文件本身不占用太多空间,但是HDFS的每个文件都是以块(block)为单位存储的,而每个块都有一个最小大小(默认为128MB)。如果文件远小于一个块的大小,那么剩余的块空间就会浪费。特别是对于类似Parquet这样的文件格式,还会存储额外的metadata信息,如果数据量过少,metadata占的比重就会增加。
复制开销。HDFS默认会对每个块进行多个副本的复制,以保证数据的可靠性。小文件过多会导致大量的复制操作,增加网络和存储的负担。
常见方案
CDT Data Team 产生小文件大致有2种场景:
Spark SQL 处理当天批量数据,产生大量小文件。
历史分区中的小文件。该类数据不会在每天的批量中读写,会被Hadoop team要求合并。
第一种场景比较简单,业界常用的方案就是调整Spark SQL或Hive的参数,或开启AQE,每天处理时减少生成的小文件。但如果参数调整不当,有可能还是无法避免小文件,或者影响批量的性能。如果每天的批量需要读写多个分区的数据,就需要很仔细地设计SQL和相关的参数,否则也很难避免小文件的产生。
第二种一般是通过SQL依次读、写一遍每个历史分区的数据,用合并后的小文件替换原有的小文件。如果数据量非常大,比如PB级别,重新读写一次会占用很多的资源,性能也不会很高。
SQL处理小文件,可以简单地看作是把结果数据进行一次shuffle,按照目标大小(一般是128MB)写入到文件。其本质大都是对产生的所有数据读、写一次进行合并。另外,eBay内部的数据绝大多数都是parquet表,且开启了压缩,这样读、写一次的代价将会非常大。
抽丝剥茧
考虑到我们team的数据量非常大,最好能以最少的资源、最快的速度合并小文件。那么有没有一个完美的方案来满足这两个看似矛盾的要求呢?
在解决这个问题之前,让我们抛开SQL,先来看一下文件合并最简单的场景:合并2个txt格式的小文件。合并过程非常简单,分别读取一次这两个文件,然后写入到一个比较大的文件:
我们稍微增加一下这个场景的复杂度,现在有3个文件需要合并,那么可以把file#2和file#3的内容拆开读写到两个不同的文件中。也就是按照目标大小,把这3个文件的数据平均写到文件中,这也是用SQL实现的基本逻辑。
但文件数量好像没有变化,合并后还是3个文件。其实file#3也没必要严格按照目标大小拆分,因为它是最后一个文件,可以把所有的数据都写到file#5中,这样文件数量变成了2个。
那么上面的方案是不是最优方案呢?也不一定。或许下面这种读写方案会好一些,至少file#2只需要一次顺序读就可以了,而顺序读在大数据领域比较重要。而且可以同时启动2个线程来处理这3个文件。
那么如果文件数量大于3个呢?文件合并将会变得非常复杂,需要仔细地设计合并算法来合并小文件。再进一步把这个问题复杂化,如果待合并的文件是gzip或parquet格式呢?就不得不解压文件中的数据,然后压缩到一个新的文件。
优化方案
通过深入的研究,我们把合并小文件的过程分为以下步骤:
以合适的算法把待合并的文件分组,组内的文件满足以下条件:
每组至少有一个文件。
每个文件只能属于一个分组——这样它的数据就不会被split到另一个文件中,也就是能完全按照顺序读合并到一个大文件中。
文件分组要尽可能的少——也就是合并后的文件要尽可能的少。
合并后的文件的目标大小不一定要完全等于目标值,可以适当地大于目标值——比如大于目标值的30%。
读取组内的文件,合并到一个新的大文件。
针对这两个步骤,我们采取了不同的优化方案。文件分组采取的策略是:
计算每个文件的size,过滤掉大于目标值的文件——也就是说合并时忽略掉大文件,不再全量读写数据,这一条对于提升性能非常关键。
把文件按照其size从小到大排序,然后依次遍历,计算相邻文件size的和。
如果刚好大于目标size(比如是128 Mb)的一定范围,比如大于 128Mb * %130 ~= 166Mb,则这些文件分为一组。
如果最后一组只有一个文件则把它划到前面一个分组。
合并文件看似很简单,还有没有优化的余地呢?
其实在我们抽丝剥茧探究合并方案时,是以txt文件为例的,txt文件有一个巨大的好处就是它属于plain text:也就是txt文件可以不用解析,直接进行二进制层面的copy。
那么parquet和gzip文件有没有类似的特性呢?在深入研究这些文件格式之后,我们发现他们同样有类似的读写方法。以Parquet为例,它是有RowGroup的概念的,即可以在RowGroup的层面来读、写数据,这一过程其实是节约了数据的压缩和解压缩。那么我们就可以一次性读取一个RowGroup的数据,然后写入这个RowGroup的二进制数据到另一个parquet文件中。Avro文件也可以采取类似的读写策略。对于gzip、bzip2文件,直接按照txt文件进行拼接就行了,这里就不过多介绍,感兴趣的可以自行研究。
具体实现
考虑到CDT Data Team绝大多数的离线任务都是Spark SQL,我们自定义了一个command来实现文件合并。其用法如下:
它的执行计划如下:
当然在合并过程中我们还有一些其他的特性,比如合并后的文件先放到临时目录,如果替换小文件过程中有异常,则还可以通过该命令进行回滚。合并之后,该命令还会自动生成一个临时视图,v_merged_files,查询该视图可以返回此次合并过程中的细节。比如文件size和文件组,以及合并后的文件名等信息。
单独使用该命令合并小文件还带来一个额外的好处就是,用户不需要调整现有的SQL和参数,可以按照最高的性能配置来跑批,在批量最后一步通过该命令合并文件。如果没有小文件,该命令不再读写一次数据,可以很快结束,对整体的批量性能没有影响。
总结
合并小文件在大数据领域是一个常见的问题,也是一个痛点。如果采取常见的策略,比如调整参数、添加distribute by、开启AQE、降低shuffle并行度等,往往会对批量的性能有一定的影响,或者仍然无法避免小文件的产生。
CDT Data团队拥有处理海量数据的经验,和专业的开发人员,在处理小文件时并没有直接、简单地参考和采用业界的方案,而是深入研究了parquet和gzip/bzip2/parquet/avro的文件格式。我们另辟蹊径,实现了一个Spark SQL命令处理这些海量的数据和小文件,同时又规避了常规方案严重的性能问题。
END