Kyuubi 优化小文件

Spark 小文件问题

Hive 表中太多的小文件会影响数据的查询性能和效率,同时加大了 HDFS NameNode 的压力。Hive (on MapReduce) 一般可以简单的通过一些参数来控制小文件,而 Spark 中并没有提供小文件合并的功能。下面我们来简单了解一下 Spark 小文件问题,以及如何处理小文件。

环境

Kyuubi 版本:1.6.0-SNAPSHOT

Spark 版本:3.1.3、3.2.0

TPCDS 数据集

Kyuubi 中提供了一个 TPCDS Spark Connector,可以通过配置 Catalog 的方式,在读取时自动生成 TPCDS 数据。

只需要将 kyuubi-spark-connector-tpcds_2.12-1.6.0-SNAPSHOT.jar 包放入 Spark jars 目录中,并配置:

1
spark.sql.catalog.tpcds=org.apache.kyuubi.spark.connector.tpcds.TPCDSCatalog;

这样我们就可以直接读取 TPCDS 数据集:

1
2
3
4
5
use tpcds;
show databases;
use sf3000;
show tables;
select * from sf300.catalog_returns limit 10;

小文件产生

首先我们在 Hive 中创建一个 sample.catalog_returns 表,用于写入生成的 TPCDS catalog_returns 数据,并添加一个 hash 字段作为分区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
create table sample.catalog_returns
(
cr_returned_date_sk bigint,
cr_returned_time_sk bigint,
cr_item_sk bigint,
cr_refunded_customer_sk bigint,
cr_refunded_cdemo_sk bigint,
cr_refunded_hdemo_sk bigint,
cr_refunded_addr_sk bigint,
cr_returning_customer_sk bigint,
cr_returning_cdemo_sk bigint,
cr_returning_hdemo_sk bigint,
cr_returning_addr_sk bigint,
cr_call_center_sk bigint,
cr_catalog_page_sk bigint,
cr_ship_mode_sk bigint,
cr_warehouse_sk bigint,
cr_reason_sk bigint,
cr_order_number bigint,
cr_return_quantity int,
cr_return_amount decimal(7, 2),
cr_return_tax decimal(7, 2),
cr_return_amt_inc_tax decimal(7, 2),
cr_fee decimal(7, 2),
cr_return_ship_cost decimal(7, 2),
cr_refunded_cash decimal(7, 2),
cr_reversed_charge decimal(7, 2),
cr_store_credit decimal(7, 2),
cr_net_loss decimal(7, 2)
) PARTITIONED BY(hash int)
stored as parquet;

我们先关闭 Kyuubi 的优化,读取 catalog_returns 数据并写入 Hive:

1
2
3
set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=false;

insert overwrite sample.catalog_returns partition (hash=0) select * from tpcds.sf300.catalog_returns;

Spark SQL 最终产生的文件数最多可能是最后一个写入的 Stage 的 Task 数乘以动态分区的数量。我们可以看到由于读取输入表的 Task 数是 44 个,所以最终产生了 44 个文件,每个文件大小约 69 M。

改变分区数(Repartition)

由于写入的文件数跟最终写入 Stage 的 Task 数据有关,那么我们可以通过添加一个 Repartition 操作,来减少最终写入的 task 数,从而控制小文件:

1
insert overwrite sample.catalog_returns partition (hash=0) select /*+ REPARTITION(10) */ * from tpcds.sf300.catalog_returns;

添加 REPARTITION(10) 后,会在读取后做一个 Repartition 操作,将 partition 数变成 10,所以最终写入的文件数变成 10 个。

Spark AQE 自动合并小分区

Spark 3.0 以后引入了自适应查询优化(Adaptive Query Execution, AQE),可以自动合并较小的分区。

开启 AQE,并通过添加 distribute by cast(rand() * 100 as int) 触发 Shuffle 操作:

1
2
3
4
5
6
set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=false;
set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.advisoryPartitionSizeInBytes=512M;
set spark.sql.adaptive.coalescePartitions.minPartitionNum=1;

insert overwrite sample.catalog_returns partition (hash=0) select * from tpcds.sf300.catalog_returns distribute by cast(rand() * 100 as int);

默认 Shuffle 分区数 spark.sql.shuffle.partitions=200,如果不开启 AQE 会产生 200 个小文件,开启 AQE 后,会自动合并小分区,根据 spark.sql.adaptive.advisoryPartitionSizeInBytes=512M 配置合并较小的分区,最终产生 12 个文件。

Kyuubi 小文件优化分析

Apache Kyuubi (Incubating) 作为增强版的 Spark Thrift Server 服务,可通过 Spark SQL 进行大规模的数据处理分析。Kyuubi 通过 Spark SQL Extensions 实现了很多的 Spark 优化,其中包括了 RepartitionBeforeWrite 的优化,再结合 Spark AQE 可以自动优化小文件问题,下面我们具体分析一下 Kyuubi 如何实现小文件优化。

Kyuubi 如何优化小文件

Kyuubi 提供了在写入前加上 Repartition 操作的优化,我们只需要将 kyuubi-extension-spark-3-1_2.12-1.6.0-SNAPSHOT.jar 放入 Spark jars 目录中,并配置 spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension。相关配置:

Name Default Value Description Since
spark.sql.optimizer.insertRepartitionBeforeWrite.enabled true Add repartition node at the top of query plan. An approach of merging small files. 1.2.0
spark.sql.optimizer.insertRepartitionNum none The partition number if spark.sql.optimizer.insertRepartitionBeforeWrite.enabled is enabled. If AQE is disabled, the default value is spark.sql.shuffle.partitions. If AQE is enabled, the default value is none that means depend on AQE. 1.2.0
spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 100 The partition number of each dynamic partition if spark.sql.optimizer.insertRepartitionBeforeWrite.enabled is enabled. We will repartition by dynamic partition columns to reduce the small file but that can cause data skew. This config is to extend the partition of dynamic partition column to avoid skew but may generate some small files. 1.2.0

通过 spark.sql.optimizer.insertRepartitionNum 参数可以配置最终插入 Repartition 的分区数,当不开启 AQE,默认为 spark.sql.shuffle.partitions 的值。需要注意,当我们设置此配置会导致 AQE 失效,所以开启 AQE 不建议设置此值。

对于动态分区写入,会根据动态分区字段进行 Repartition,并添加一个随机数来避免产生数据倾斜,spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 用来配置随机数的范围,不过添加随机数后,由于加大了动态分区的基数,还是可能会导致小文件。这个操作类似在 SQL 中添加 distribute by DYNAMIC_PARTITION_COLUMN, cast(rand() * 100 as int)

静态分区写入

开启 Kyuubi 优化和 AQE,测试静态分区写入:

1
2
3
4
5
6
7
set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=true;

set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.advisoryPartitionSizeInBytes=512M;
set spark.sql.adaptive.coalescePartitions.minPartitionNum=1;

insert overwrite sample.catalog_returns partition (hash=0) select * from tpcds.sf300.catalog_returns;

可以看到 AQE 生效了,很好的控制了小文件,产生了 11 个文件,文件大小 314.5 M 左右。

动态分区写入

我们测试一下动态分区写入的情况,先关闭 Kyuubi 优化,并生成 10 个 hash 分区:

1
2
3
set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=false;

insert overwrite sample.catalog_returns partition (hash) select *, cast(rand() * 10 as int) as hash from tpcds.sf300.catalog_returns;

产生了 44 × 10 = 440 个文件,文件大小 8 M 左右。

开启 Kyuubi 优化和 AQE:

1
2
3
4
5
6
7
8
set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=true;
set spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum=100;

set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.advisoryPartitionSizeInBytes=512M;
set spark.sql.adaptive.coalescePartitions.minPartitionNum=1;

insert overwrite sample.catalog_returns partition (hash) select *, cast(rand() * 10 as int) as hash from tpcds.sf300.catalog_returns;

产生了 12 × 10 = 120 个文件,文件大小 30 M 左右,可以看到小文件有所改善,不过任然不够理想。

此案例中 hash 分区由 rand 函数产生,分布比较均匀,所以我们将 spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 设置成 0,重新运行,同时将动态分区数设置为 5

1
2
3
4
5
6
7
8
set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=true;
set spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum=0;

set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.advisoryPartitionSizeInBytes=512M;
set spark.sql.adaptive.coalescePartitions.minPartitionNum=1;

insert overwrite sample.catalog_returns partition (hash) select *, cast(rand() * 5 as int) as hash from tpcds.sf300.catalog_returns;

由于动态分区数只有 5 个,所以实际上只有 5 个 Task 有数据写入,每个 Task 对应一个分区,导致最终每个分区只有一个较大的大文件。

通过上面的分析可以看到,对于动态分区写入,Repartition 的优化可以缓解小文件,配置 spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum=100 解决了数据倾斜问题,不过同时还是可能会有小文件。

Rebalance 优化

Spark 3.2+ 引入了 Rebalance 操作,借助于 Spark AQE 来平衡分区,进行小分区合并和倾斜分区拆分,避免分区数据过大或过小,能够很好的处理小文件问题。

Kyuubi 对于 Spark 3.2+ 的优化,是在写入前插入 Rebalance 操作,对于动态分区,则指定动态分区列进行 Rebalance 操作。不再需要 spark.sql.optimizer.insertRepartitionNumspark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 配置。

测试静态分区写入,使用 Spark 3.2.0 开启 Kyuubi 优化和 AQE:

1
2
3
4
5
6
7
set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=true;

set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.advisoryPartitionSizeInBytes=512M;
set spark.sql.adaptive.coalescePartitions.minPartitionNum=1;

insert overwrite sample.catalog_returns partition (hash=0) select * from tpcds.sf300.catalog_returns;

Repartition 操作自动合并了小分区,产生了 11 个文件,文件大小 334.6 M 左右,解决了小文件的问题。

测试动态分区写入,使用 Spark 3.2.0 开启 Kyuubi 优化和 AQE,生成 5 个动态分区:

1
2
3
4
5
6
7
set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=true;

set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.advisoryPartitionSizeInBytes=512M;
set spark.sql.adaptive.coalescePartitions.minPartitionNum=1;

insert overwrite sample.catalog_returns partition (hash) select *, cast(rand() * 5 as int) as hash from tpcds.sf300.catalog_returns;

Repartition 操作自动拆分较大分区,产生了 2 × 5 = 10 个文件,文件大小 311 M 左右,很好的解决的倾斜问题。

总结

从上面的分析可以看到,对于 Spark 3.2+,Kyuubi 结合 Rebalance 能够很好的解决小文件问题,对于 Spark 3.1,Kyuubi 也能自动优化小文件,不过动态分区写入的情况还是可能存在问题。

相关的配置总结:

1
2
3
4
5
6
7
8
9
10
# 配置 Kyuubi Spark SQL Extension
spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension

# 开启 RepartitionBeforeWrite 优化(默认开启)
spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=true;

# 配置 AQE
spark.sql.adaptive.enabled=true;
spark.sql.adaptive.advisoryPartitionSizeInBytes=512M;
spark.sql.adaptive.coalescePartitions.minPartitionNum=1;

更多 AQE 配置可以参考:How To Use Spark Adaptive Query Execution (AQE) in Kyuubi