wForget's blog


  • Home

  • About

  • Tags

  • Archives

Spark SQL 分析 Event Log

Posted on 2022-11-07

一、背景

Spark 程序在运行时会会在 Driver 端产生一些事件并保存在 EventLog 中,Spark History Server 通过读取并 Replay 这些 Event 可以渲染出 Spark UI 页面,Spark UI 页面中包括了我们排查问题所需的大部分指标和统计信息。

所以我们可以通过在 Spark Event Log 中提取一些关键信息,用于发现 Spark 的一些问题并进行优化,同时也可以对历史运行的任务进行统计,为一些优化效果提供数据支撑。

二、Spark SQL 读取 Event Log

Spark 中 Event log 是通过 JSON 格式进行序列化写入文件中,所以我们可以直接使用 Spark JOSN File Datasource 方式直接读取 Event Log。

1、直接读取Event Log

1
select * from `json`.`hdfs://rbf-bdxs-g1/system/spark/tmp/spark/logs/application_1663317691313_4806906_1` limit 10;

2、为 EventLog 文件创建一下视图

参考:https://spark.apache.org/docs/latest/sql-ref-syntax-ddl-create-table-datasource.html

1
2
3
4
5
6
CREATE OR REPLACE TEMPORARY VIEW `application_1663317691313_4806906` (
`Event` String, `Job ID` String,
`Completion Time` String,
`Job Result` Struct<`Result` String, `Exception` Struct<`Message` String, `Stack Trace` String>>)
USING `json`
OPTIONS (path 'hdfs://rbf-bdxs-g1/system/spark/tmp/spark/logs/application_1663317691313_4806906_1');

三、Hive SQL 读取 EventLog

Hive 中可以通过配置 TEXTFILE ‘org.apache.hadoop.hive.serde2.JsonSerDe’ 序列化方式的表也可以直接读取 JSON 文件。

1
2
3
4
5
6
7
8
9
10
11
CREATE EXTERNAL TABLE IF NOT EXISTS `sample`.`spark_event_log`(
`Event` String,
`Job ID` String,
`Completion Time` String,
`Job Result` Struct<`Result`: String, `Exception`: Struct<`Message`: String, `Stack Trace`: String>>
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
STORED AS TEXTFILE
LOCATION 'hdfs://rbf-bdxs-g1/system/spark/tmp/spark/logs/application_1663317691313_4806906_1';

select * from `sample`.`spark_event_log` limit 10;

四、分区读取 Event Log

通过上面一些尝试,我们可以确定能够通过 Spark SQL、Hive SQL 来读取 Event Log 文件。

不过由于所有的 Application 都写在同一个 Event Log 路径中,我们很难过滤部分 Application 的 EventLog,所以我们需要为 Event Log 添加一些分区,使得我们可以按照分区来过滤一些数据,不用每次都全量读取。

1、Load Data 方式加载到 Hive 表中

在 Hive 中创建一个分区表,然后通过 Load Data 方法将 Event Log 文件加载到对应的分区中,不过测试下来发现 Load data 命令会删除源文件。

我们也可以直接将文件拷贝到表的分区目录中,再为表添加对应的分区,不过这种方式相当于对源文件多个一个备份。

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE EXTERNAL TABLE IF NOT EXISTS `sample`.`spark_event_log_002`(
`Event` String,
`Job ID` String,
`Completion Time` String,
`Job Result` Struct<`Result`: String, `Exception`: Struct<`Message`: String, `Stack Trace`: String>>
)
PARTITIONED BY (dt String, app_id String)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
STORED AS TEXTFILE;

LOAD DATA INPATH 'hdfs://rbf-bdxs-g1/system/spark/tmp/spark/logs/application_1663317691313_4373493_1' INTO TABLE `sample`.`spark_event_log_002` PARTITION (dt='2022-10-31',app_id='application_1663317691313_4373493'); --- 会删除源文件

select * from `sample`.`spark_event_log_002` limit 10;

2、Spark EventLog Connector

为了让 Spark SQL 直接读取 Event Log 文件,并且对 Event Log 添加分区信息,我通过 DataSourceV2 的方式实现一个 EventLog Catalog 和 Table,复用了 JsonTable 的大部分逻辑,并重写 JsonScan 的 partitions 逻辑。

实现了直接通过 Catalog 读取 Event Log 并支持 dt、hour、app_id 三级分区。

代码:https://github.com/wForget/spark-connector-eventlog

具体使用:https://github.com/wForget/spark-connector-eventlog/blob/main/README.md

配置 EventLog Catalog:

1
2
spark.sql.catalog.eventlog=cn.wangz.spark.connector.eventlog.EventLogCatalog
spark.sql.catalog.eventlog.eventLogDir=hdfs://namenode01/tmp/spark/logs

访问 Event Log:

1
2
3
4
5
6
7
8
9
10
11
// 可通过设置 maxPartitionBytes,控制每个 task 处理的数据量
set spark.sql.files.maxPartitionBytes=1g

// 查询表
show tables in eventlog

// 查询分区
show partitions eventlog.spark_event_log

// 查询 EventLog
select * from eventlog.spark_event_log where dt = '2022-10-26'

Kyuubi Zorder 调研测试

Posted on 2022-10-18

Kyuubi Z-order 调研测试

Z-order 是一种将多维数据映射到一维上的排序算法,排序后使得多维数据彼此临近,更有利于数据压缩和查询时 Data Skip。

所谓的 Z-order 优化实际上是在数据写入时添加 Z-order 排序,将数据进行聚类,很好的提升了数据的压缩率和查询性能。

Kyuubi 通过 Spark 插件的形式,实现了 Z-order 优化,具体使用参考:Z-Ordering Support。

下面我参考了 Kyuubi 官方文档对 Z-order 功能进行调研测试:Z-order Benchmark

数据准备

使用 Beeline 连接 Kyuubi,准备测试数据。

1、创建原始数据表:

关闭 AQE ,便于控制输出文件数。

1
2
3
// 关闭 AQE
set spark.sql.adaptive.enabled=false;
set spark.sql.shuffle.partitions=200;

创建原始表:

1
2
3
4
5
create table sample.conn_random (
src_ip string,
src_port int,
dst_ip string,
dst_port int) stored as parquet;

使用 Scala 执行模式生成原始表数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 切换成 Scala 执行模式
set kyuubi.operation.language=scala;

import scala.util.Random;
def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".");
def randomPort(r: Random) = r.nextInt(65536);

case class ConnRecord(src_ip: String, src_port: Int, dst_ip: String, dst_port: Int);
def randomConnRecord(r: Random) = ConnRecord(src_ip = randomIPv4(r), src_port = randomPort(r), dst_ip = randomIPv4(r), dst_port = randomPort(r));

val numRecords = 100 * 1000 * 1000L;
val numFiles = 200;

val df = spark.range(0, numFiles, 1, numFiles).mapPartitions { it =>
val partitionID = it.toStream.head
val r = new Random(seed = partitionID)
Iterator.fill((numRecords / numFiles).toInt)(randomConnRecord(r))
};

// 创建原始测试数据表 sample.conn_random
df.write.mode("overwrite").format("parquet").insertInto("sample.conn_random");

// 切换成 SQL 执行模式
spark.sql("set kyuubi.operation.language=sql");

2、生成 Order by 排序的表:

1
2
3
4
5
6
7
// conn_order_only_ip
create table sample.conn_order_only_ip like sample.conn_random;
INSERT overwrite sample.conn_order_only_ip select * from sample.conn_random order by src_ip, dst_ip;

// conn_order
create table sample.conn_order like sample.conn_random;
INSERT overwrite sample.conn_order select * from sample.conn_random order by src_ip, src_port, dst_ip, dst_port;

2、生成 Z-order 优化的表:

1
2
3
4
5
6
7
8
9
// conn_zorder_only_ip
create table sample.conn_zorder_only_ip like sample.conn_random;
insert overwrite sample.conn_zorder_only_ip select * from sample.conn_random;
OPTIMIZE sample.conn_zorder_only_ip ZORDER BY src_ip, dst_ip;

// conn_zorder
create table sample.conn_zorder like sample.conn_random;
insert overwrite sample.conn_zorder select * from sample.conn_random;
OPTIMIZE sample.conn_zorder ZORDER BY src_ip, src_port, dst_ip, dst_port;

Zorder 优化效果对比分析

1、存储分析

表名 大小 生成时间
conn_random 2.7 G 1.1 min
conn_order_only_ip 2.2 G 1.3 min
conn_order 2.2 G 52 s
conn_zorder_only_ip 1510.3 MiB 1.9 min
conn_zorder 1510.4 MiB 1.9 min

2、查询分析

1
2
3
4
5
6
// query
select count(*) from sample.conn_random where src_ip like '157%' and dst_ip like '216.%';
select count(*) from sample.conn_order_only_ip where src_ip like '157%' and dst_ip like '216.%';
select count(*) from sample.conn_order where src_ip like '157%' and dst_ip like '216.%';
select count(*) from sample.conn_zorder_only_ip where src_ip like '157%' and dst_ip like '216.%';
select count(*) from sample.conn_zorder where src_ip like '157%' and dst_ip like '216.%';
表名 扫描数据量
conn_random 100,000,000
conn_order_only_ip 400,000
conn_order 400,648
conn_zorder_only_ip 120,000
conn_zorder 120,000

3、总结

从上面的测试结果中可以很直观的看到 Z-order 优化的一些特点:

  • 排序消耗:由于需要对数据进行排序,所以对于数据生产任务有少许额为的排序消耗。
  • 减少空间:数据进行了聚类,使得相邻数据有很高的相似性,大幅提高数据压缩率,减少存储空间。
  • 提高查询性能:数据临近也让 RowGroup 数据范围比较小,加大了查询时 Data skip 效果,减少扫描的数据量,提升查询性能。

Kyuubi 多数据源混合计算

Posted on 2022-07-17

Kyuubi 多数据源混合计算

Apache Kyuubi 作为一个分布式和多租户网关,用于在 Lakehouse 上提供 Serverless SQL。
借助于 Spark 引擎完整的生态,我们可以接入各种数据源,实现各种数据源的混合计算,以及不同数据源之间的数据同步。
第三方数据源通过实现 Spark DataSource 相关接口,提供 Spark Connector 依赖包来与 Spark 对接。Spark SQL 中可通过 DataSource 或 Catalog 的方式来定义数据源,进而通过 Spark SQL 读写数据源数据。

DataSource

Spark SQL 可使用 DataSource 配置,创建一个数据源的临时视图。

1
2
3
4
5
6
7
8
9
# 从 JSON 文件创建一个临时视图
CREATE TEMPORARY VIEW jsonTable
USING org.apache.spark.sql.json
OPTIONS (
path 'examples/src/main/resources/people.json'
)

# 查询定义的临时视图
SELECT * FROM jsonTable

Catalog

Spark 3 提供 CatalogPlugin 接口,可以为数据源实现自定义的 Catalog,通过相关配置加载数据源的 Catalog,进而访问该数据源的数据。

1
2
3
4
5
6
7
8
9
# 定义一个名称为 test 的 catalog,catalog_class 为 CatalogPlugin 的实现类
spark.sql.catalog.test catalog_class

# 配置 test catalog 的 options
spark.sql.catalog.test.optKey1 optValue1
spark.sql.catalog.test.optKey2 optValue2

# 通过 `test.db.table` 访问自定义 catalog 下的库表
select * from test.db.table

Spark SQL 连接各种数据源

HDFS

Spark SQL 中默认只能加载一个 Hive 集群配置,对于有跨集群访问 Hive 的需求,我们可以使用文件类型的数据源,定义其它集群的 HDFS 路径,跨集群的操作 Hive 数据。

Spark 内置了一些文件类型的数据源,可以用于读写 HDFS 文件,如:Parquet、ORC、JSON、CSV 等,不需要额外的依赖。

对于开启了 Kerberos 认证的集群,需要使用 kyuubi.credentials.hadoopfs.uris 配置跨集群访问的 Hadoop FileSystems。不过目前这个参数是 Kyuubi Server 的参数,运行时不可修改,后续可以进优化。

1
kyuubi.credentials.hadoopfs.uris=hdfs://192.168.1.199

通过 DataSource 方式访问 HDFS:

1
2
3
4
5
6
7
CREATE TEMPORARY VIEW parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (
path 'hdfs://192.168.1.199/user/hadoop/test/people.parquet'
);

select * from parquetTable limit 10;

JDBC

Spark 内置了 JDBC 的 DataSource 和 Catalog 实现,仅需要添加 JDBC Driver 依赖(我们 Spark jars 中添加了 MySQL Driver,所以不需要额外引入)。

通过 DataSource 方式访问 MySQL:

1
2
3
4
5
6
7
8
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url 'jdbc:mysql://192.168.1.199:3306/database',
dbtable 'database.tablename',
user 'username',
password 'password'
);

通过 CatalogPlugin 方式访问 MySQL:

1
2
3
4
5
6
7
set spark.sql.catalog.mysql=org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog;
set spark.sql.catalog.mysql.url=jdbc:mysql://192.168.1.199/database;
set spark.sql.catalog.mysql.user=username;
set spark.sql.catalog.mysql.password=password;

use database;
select * from tablename;

HBase

使用 HBase Spark Connector 组件:hbase-connectors

HBase Shell 创建测试表:

1
2
3
create 'wangzhen_test', 'cf'
put 'wangzhen_test', 'row1', 'cf:a', 'value1'
scan 'wangzhen_test'

hbase-connectors 没有提供打入依赖的 shade 包,使用 maven-shade-plugin 插件制作完整的 shade 包便于添加 HBase Spark Connector 依赖,可以排除 Hadoop、Spark 等 provided 依赖包仅需要 HBase Client 相关依赖。
运行时可通过 add jars 命令加入依赖:

1
2
# 添加 HBase Spark Connector 依赖
add jars "hdfs://XXX/user/hadoop/jars/hbase-spark-1.0.1-SNAPSHOT.jar";

通过 DataSource 方式访问 HBase:

1
2
3
4
5
6
7
8
9
10
11
# 创建 HBase 表临时视图:
CREATE TEMPORARY VIEW hbaseTable
USING org.apache.hadoop.hbase.spark
OPTIONS (
'catalog' '{"table":{"namespace":"default", "name":"wangzhen_test"}, "rowkey":"key1", "columns":{"col1":{"cf":"rowkey", "col":"key1", "type":"string"}, "col2":{"cf":"cf", "col":"a", "type":"string"}}}',
'hbase.spark.config.location' 'hbase-site.xml',
'hbase.spark.use.hbasecontext' 'false'
);

# 读取 HBase 数据
select * from hbaseTable;

我们使用 Impersonation 模式运行 Kyuubi 服务,在 Kyuubi Server 中使用代理用户启动 Spark 引擎,Kyuubi Server 默认仅实现了获取 HDFS 和 Hive 的 Delegation Token。
我们 HBase 集群也开启了 Kerberos 认证,测试时通过指定 spark.kerberos.keytab 和 spark.kerberos.principal 方式运行 Spark Engine,后续需要在 Kyuubi Server 中实现 HBaseDelegationTokenProvider 来获取 HBase 的 Token。

ClickHouse

ClickHouse 提供了 JDBC 客户端,可以使用 Spark JDBC 的方式连接 ClickHouse。

另外开源的 spark-clickhouse-connector 组件,实现了 Spark ClickHouse Connector ,相对于 JDBC 的方式有更好的分区逻辑和更多的优化,对于读取操作应该有很好的性能提升,需要注意的是 spark-clickhouse-connector 是通过 GRPC 接口访问 ClickHouse 集群。

spark-clickhouse-connector 组件默认没有提供 Spark3.1的支持,我们通过复制 Spark3.3 实现的代码,简单的调整后测试可用。

使用 spark-clickhouse-connector 的 CatalogPlugin 方式访问 ClickHouse:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 添加 spark-clickhouse-connector 依赖
add jars 'hdfs://XXX/user/hadoop/jars/clickhouse-spark-runtime-3.1_2.12-0.4.0-SNAPSHOT.jar';

# 配置 clickhouse catalog
set spark.sql.catalog.clickhouse=xenon.clickhouse.ClickHouseCatalog;
set spark.sql.catalog.clickhouse.host=192.168.1.199;
set spark.sql.catalog.clickhouse.grpc_port=9100;
set spark.sql.catalog.clickhouse.user=username;
set spark.sql.catalog.clickhouse.password=password;
set spark.sql.catalog.clickhouse.database=database;

# 查询 clickhouse 数据
use clickhouse;
select * from tablename;

Kyuubi 优化小文件

Posted on 2022-06-04

Spark 小文件问题

Hive 表中太多的小文件会影响数据的查询性能和效率,同时加大了 HDFS NameNode 的压力。Hive (on MapReduce) 一般可以简单的通过一些参数来控制小文件,而 Spark 中并没有提供小文件合并的功能。下面我们来简单了解一下 Spark 小文件问题,以及如何处理小文件。

环境

Kyuubi 版本:1.6.0-SNAPSHOT

Spark 版本:3.1.3、3.2.0

TPCDS 数据集

Kyuubi 中提供了一个 TPCDS Spark Connector,可以通过配置 Catalog 的方式,在读取时自动生成 TPCDS 数据。

只需要将 kyuubi-spark-connector-tpcds_2.12-1.6.0-SNAPSHOT.jar 包放入 Spark jars 目录中,并配置:

1
spark.sql.catalog.tpcds=org.apache.kyuubi.spark.connector.tpcds.TPCDSCatalog;

这样我们就可以直接读取 TPCDS 数据集:

1
2
3
4
5
use tpcds;
show databases;
use sf3000;
show tables;
select * from sf300.catalog_returns limit 10;

小文件产生

首先我们在 Hive 中创建一个 sample.catalog_returns 表,用于写入生成的 TPCDS catalog_returns 数据,并添加一个 hash 字段作为分区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
create table sample.catalog_returns
(
cr_returned_date_sk bigint,
cr_returned_time_sk bigint,
cr_item_sk bigint,
cr_refunded_customer_sk bigint,
cr_refunded_cdemo_sk bigint,
cr_refunded_hdemo_sk bigint,
cr_refunded_addr_sk bigint,
cr_returning_customer_sk bigint,
cr_returning_cdemo_sk bigint,
cr_returning_hdemo_sk bigint,
cr_returning_addr_sk bigint,
cr_call_center_sk bigint,
cr_catalog_page_sk bigint,
cr_ship_mode_sk bigint,
cr_warehouse_sk bigint,
cr_reason_sk bigint,
cr_order_number bigint,
cr_return_quantity int,
cr_return_amount decimal(7, 2),
cr_return_tax decimal(7, 2),
cr_return_amt_inc_tax decimal(7, 2),
cr_fee decimal(7, 2),
cr_return_ship_cost decimal(7, 2),
cr_refunded_cash decimal(7, 2),
cr_reversed_charge decimal(7, 2),
cr_store_credit decimal(7, 2),
cr_net_loss decimal(7, 2)
) PARTITIONED BY(hash int)
stored as parquet;

我们先关闭 Kyuubi 的优化,读取 catalog_returns 数据并写入 Hive:

1
2
3
set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=false;

insert overwrite sample.catalog_returns partition (hash=0) select * from tpcds.sf300.catalog_returns;

Spark SQL 最终产生的文件数最多可能是最后一个写入的 Stage 的 Task 数乘以动态分区的数量。我们可以看到由于读取输入表的 Task 数是 44 个,所以最终产生了 44 个文件,每个文件大小约 69 M。

改变分区数(Repartition)

由于写入的文件数跟最终写入 Stage 的 Task 数据有关,那么我们可以通过添加一个 Repartition 操作,来减少最终写入的 task 数,从而控制小文件:

1
insert overwrite sample.catalog_returns partition (hash=0) select /*+ REPARTITION(10) */ * from tpcds.sf300.catalog_returns;

添加 REPARTITION(10) 后,会在读取后做一个 Repartition 操作,将 partition 数变成 10,所以最终写入的文件数变成 10 个。

Spark AQE 自动合并小分区

Spark 3.0 以后引入了自适应查询优化(Adaptive Query Execution, AQE),可以自动合并较小的分区。

开启 AQE,并通过添加 distribute by cast(rand() * 100 as int) 触发 Shuffle 操作:

1
2
3
4
5
6
set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=false;
set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.advisoryPartitionSizeInBytes=512M;
set spark.sql.adaptive.coalescePartitions.minPartitionNum=1;

insert overwrite sample.catalog_returns partition (hash=0) select * from tpcds.sf300.catalog_returns distribute by cast(rand() * 100 as int);

默认 Shuffle 分区数 spark.sql.shuffle.partitions=200,如果不开启 AQE 会产生 200 个小文件,开启 AQE 后,会自动合并小分区,根据 spark.sql.adaptive.advisoryPartitionSizeInBytes=512M 配置合并较小的分区,最终产生 12 个文件。

Kyuubi 小文件优化分析

Apache Kyuubi (Incubating) 作为增强版的 Spark Thrift Server 服务,可通过 Spark SQL 进行大规模的数据处理分析。Kyuubi 通过 Spark SQL Extensions 实现了很多的 Spark 优化,其中包括了 RepartitionBeforeWrite 的优化,再结合 Spark AQE 可以自动优化小文件问题,下面我们具体分析一下 Kyuubi 如何实现小文件优化。

Kyuubi 如何优化小文件

Kyuubi 提供了在写入前加上 Repartition 操作的优化,我们只需要将 kyuubi-extension-spark-3-1_2.12-1.6.0-SNAPSHOT.jar 放入 Spark jars 目录中,并配置 spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension。相关配置:

Name Default Value Description Since
spark.sql.optimizer.insertRepartitionBeforeWrite.enabled true Add repartition node at the top of query plan. An approach of merging small files. 1.2.0
spark.sql.optimizer.insertRepartitionNum none The partition number if spark.sql.optimizer.insertRepartitionBeforeWrite.enabled is enabled. If AQE is disabled, the default value is spark.sql.shuffle.partitions. If AQE is enabled, the default value is none that means depend on AQE. 1.2.0
spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 100 The partition number of each dynamic partition if spark.sql.optimizer.insertRepartitionBeforeWrite.enabled is enabled. We will repartition by dynamic partition columns to reduce the small file but that can cause data skew. This config is to extend the partition of dynamic partition column to avoid skew but may generate some small files. 1.2.0

通过 spark.sql.optimizer.insertRepartitionNum 参数可以配置最终插入 Repartition 的分区数,当不开启 AQE,默认为 spark.sql.shuffle.partitions 的值。需要注意,当我们设置此配置会导致 AQE 失效,所以开启 AQE 不建议设置此值。

对于动态分区写入,会根据动态分区字段进行 Repartition,并添加一个随机数来避免产生数据倾斜,spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 用来配置随机数的范围,不过添加随机数后,由于加大了动态分区的基数,还是可能会导致小文件。这个操作类似在 SQL 中添加 distribute by DYNAMIC_PARTITION_COLUMN, cast(rand() * 100 as int)。

静态分区写入

开启 Kyuubi 优化和 AQE,测试静态分区写入:

1
2
3
4
5
6
7
set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=true;

set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.advisoryPartitionSizeInBytes=512M;
set spark.sql.adaptive.coalescePartitions.minPartitionNum=1;

insert overwrite sample.catalog_returns partition (hash=0) select * from tpcds.sf300.catalog_returns;

可以看到 AQE 生效了,很好的控制了小文件,产生了 11 个文件,文件大小 314.5 M 左右。

动态分区写入

我们测试一下动态分区写入的情况,先关闭 Kyuubi 优化,并生成 10 个 hash 分区:

1
2
3
set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=false;

insert overwrite sample.catalog_returns partition (hash) select *, cast(rand() * 10 as int) as hash from tpcds.sf300.catalog_returns;

产生了 44 × 10 = 440 个文件,文件大小 8 M 左右。

开启 Kyuubi 优化和 AQE:

1
2
3
4
5
6
7
8
set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=true;
set spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum=100;

set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.advisoryPartitionSizeInBytes=512M;
set spark.sql.adaptive.coalescePartitions.minPartitionNum=1;

insert overwrite sample.catalog_returns partition (hash) select *, cast(rand() * 10 as int) as hash from tpcds.sf300.catalog_returns;

产生了 12 × 10 = 120 个文件,文件大小 30 M 左右,可以看到小文件有所改善,不过任然不够理想。

此案例中 hash 分区由 rand 函数产生,分布比较均匀,所以我们将 spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 设置成 0,重新运行,同时将动态分区数设置为 5:

1
2
3
4
5
6
7
8
set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=true;
set spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum=0;

set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.advisoryPartitionSizeInBytes=512M;
set spark.sql.adaptive.coalescePartitions.minPartitionNum=1;

insert overwrite sample.catalog_returns partition (hash) select *, cast(rand() * 5 as int) as hash from tpcds.sf300.catalog_returns;

由于动态分区数只有 5 个,所以实际上只有 5 个 Task 有数据写入,每个 Task 对应一个分区,导致最终每个分区只有一个较大的大文件。

通过上面的分析可以看到,对于动态分区写入,Repartition 的优化可以缓解小文件,配置 spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum=100 解决了数据倾斜问题,不过同时还是可能会有小文件。

Rebalance 优化

Spark 3.2+ 引入了 Rebalance 操作,借助于 Spark AQE 来平衡分区,进行小分区合并和倾斜分区拆分,避免分区数据过大或过小,能够很好的处理小文件问题。

Kyuubi 对于 Spark 3.2+ 的优化,是在写入前插入 Rebalance 操作,对于动态分区,则指定动态分区列进行 Rebalance 操作。不再需要 spark.sql.optimizer.insertRepartitionNum 和 spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 配置。

测试静态分区写入,使用 Spark 3.2.0 开启 Kyuubi 优化和 AQE:

1
2
3
4
5
6
7
set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=true;

set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.advisoryPartitionSizeInBytes=512M;
set spark.sql.adaptive.coalescePartitions.minPartitionNum=1;

insert overwrite sample.catalog_returns partition (hash=0) select * from tpcds.sf300.catalog_returns;

Repartition 操作自动合并了小分区,产生了 11 个文件,文件大小 334.6 M 左右,解决了小文件的问题。

测试动态分区写入,使用 Spark 3.2.0 开启 Kyuubi 优化和 AQE,生成 5 个动态分区:

1
2
3
4
5
6
7
set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=true;

set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.advisoryPartitionSizeInBytes=512M;
set spark.sql.adaptive.coalescePartitions.minPartitionNum=1;

insert overwrite sample.catalog_returns partition (hash) select *, cast(rand() * 5 as int) as hash from tpcds.sf300.catalog_returns;

Repartition 操作自动拆分较大分区,产生了 2 × 5 = 10 个文件,文件大小 311 M 左右,很好的解决的倾斜问题。

总结

从上面的分析可以看到,对于 Spark 3.2+,Kyuubi 结合 Rebalance 能够很好的解决小文件问题,对于 Spark 3.1,Kyuubi 也能自动优化小文件,不过动态分区写入的情况还是可能存在问题。

相关的配置总结:

1
2
3
4
5
6
7
8
9
10
# 配置 Kyuubi Spark SQL Extension
spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension

# 开启 RepartitionBeforeWrite 优化(默认开启)
spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=true;

# 配置 AQE
spark.sql.adaptive.enabled=true;
spark.sql.adaptive.advisoryPartitionSizeInBytes=512M;
spark.sql.adaptive.coalescePartitions.minPartitionNum=1;

更多 AQE 配置可以参考:How To Use Spark Adaptive Query Execution (AQE) in Kyuubi

Kyuubi 调研测试

Posted on 2021-11-23

一、背景

Kyuubi 是 Spark Thrift Server 增强版实现,实现 HiveServer2 协议,启动一个 Thrift 服务,通过 JDBC 方式接收 Spark SQL 请求并执行。我司通过提供 Kyuubi 服务, 实现 Hive SQL 到 Spark SQL 的迁移,同时提供 Ad-hoc 查询服务。

目前使用版本是 Kyuubi 0.7 版本,社区最新的 Kyuubi 1.4.0 版本带来了很大的架构优化,所以做如下的调研测试。

二、测试环境部署

版本信息

Kyuubi 版本:1.4.0 (未发布,master 分支)

Spark 版本:3.1.1

打包编译

拉取 Kyuubi 代码,并执行打包命令,完成后上传至服务器

1
./build/dist --tgz --spark-provided -Pkyuubi-extension-spark-3-1  # 打包

配置

  1. 绑定IP和端口

    配置 Kyuubi Server 服务 IP 和端口

    1
    2
    kyuubi.frontend.bind.host       0.0.0.0
    kyuubi.frontend.bind.port 10015
  2. 配置 KERBEROS 认证

    Hadoop 集群开启了 Kerberos 认证,则配置 Kerberos 认证,并添加 Kerberos 相关配置,使用 hue 用户代理运行。

    1
    2
    3
    kyuubi.authentication   KERBEROS
    kyuubi.kinit.principal hue/_HOST@***.COM
    kyuubi.kinit.keytab /etc/kyuubi/conf/hue.keytab
  3. Zookeeper 配置

    Kyuubi 依赖 Zookeeper 做服务发现和 HA,所以需要添加 Zookeeper 配置并使用了 DIGEST 认证。

    1
    2
    3
    kyuubi.ha.zookeeper.quorum=192.168.1.100:2181
    kyuubi.ha.zookeeper.auth.type=DIGEST
    kyuubi.ha.zookeeper.auth.digest=hue:hue
  4. 配置 Namespace

    不同集群的 Kyuubi Server 使用同一个 Zookeeper 集群,配置不同 Namespace 隔离,后续连接时只需要指定 zooKeeperNamespace 访问不同集群。

    1
    2
    kyuubi.ha.zookeeper.namespace=kyuubi_cluster001
    kyuubi.session.engine.initialize.timeout=180000

三、功能调研

Beeline 连接 Kyuubi

使用 beeline 工具连接 Kyuubi 进行测试,Kyuubi JDBC 链接包括了以下配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# zookeeper 连接
ZOOKEEPER_QUORUM=192.168.1.100:2181
# 集群名称
CLUSTER=cluster001
# spark 配置
SPARKI_CONFS=spark.executor.instances=10;spark.executor.memory=3g
# kyuubi 配置
KYUUBI_CONFS=$SPARKI_CONFS;kyuubi.engine.share.level=CONNECTION

# jdbc 链接
JBDC_URL="jdbc:hive2://${ZOOKEEPER_QUORUM}/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi_$CLUSTER?$KYUUBI_CONFS"

# 使用 beeline 连接
beeline -u "$JBDC_URL"

相关说明:

  • 由于 JDBC 链接会有分号等字符,beeline 连接时 JDBC 链接需要带上引号。
  • Hive 和 Spark 中都有 beeline 命令,可能与 Kyuubi Server 存在兼容性问题,需要使用合适的 beeline 路径。

共享引擎策略

  1. 引擎共享策略
    Kyuubi 支持共享引擎,可通 kyuubi.engine.share.level 配置不同共享级别,共享级别定义如下:
    • CONNECTION:连接级别,引擎适用于一次 jdbc connection,不做其他共享,此配置适用于离线 ETL 任务,使得不同任务之间相互隔离。
    • USER:用户级别共享,引擎可以在同一个用户的不同连接进行共享,适用于 AdHoc 查询和较小的任务,可以节省资源,并在有可用引擎时支持快速响应。
    • SERVER:服务级别共享(全局共享),引擎可以全局共享,所有连接可以共享一个引擎,不过启动引擎的用户需要具有较高权限才能满足访问不同用户的表。
  2. 引擎使用单个 SparkSession
    默认情况下,共享引擎对于新的 Connection 连接,使用的新的 SparkSession,不同连接共享 SparkContext 的资源,不过一些 session 级别的参数、函数、临时表等都是隔离开的。可以通过kyuubi.engine.single.spark.session参数,使用全局的 SparkSession,使得不同连接可以共享 Session 状态,包括参数、函数、临时表等。
  3. 引擎 TTL
    对于共享引擎,多个连接共享使用,并不由某个连接单独管理,在某个连接关闭后引擎不会马上退出,而是在引擎空闲的时间超过配置的超时时间后自动退出,通过kyuubi.session.engine.idle.timeout 参数进行配置。

对于共享引擎,官方公众号有更详细介绍:Apache Kyuubi:灵活运用引擎隔离共享,加速即席查询,支持大规模 ETL

用户默认配置

Kyuubi 支持用户级别的默认配置,可以为不同用户配置不同的参数,详见:Settings: User Defaults

下面示例,给 user1 和 user2 设置了不同的队列和动态资源最大 Executor 数:

1
2
3
4
5
6
7
8
9
10
11
12
# For system defaults
spark.master=yarn;
spark.submit.deployMode=cluster
spark.dynamicAllocation.enabled=true;

# user1
__user1__.spark.yarn.queue=root.queue1;
__user2__.spark.dynamicAllocation.maxExecutors=200;

# user2
__user2__.spark.yarn.queue=root.queue2;
__user2__.spark.dynamicAllocation.maxExecutors=500;

Kyuubi Spark SQL Extensions

Kyuubi 中实现了一些 Spark SQL 的优化,可通过 spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension 配置开启,具体:Auxiliary SQL extension for Spark SQL。

解决小文件问题:
KyuubiSparkSQLExtension 中定义了 RepartitionBeforeWritingHive 和 RepartitionBeforeWritingDatasource 规则,在写入 Hive 或 DataSource 前插入 Repartition 操作,来控制写入的分区数,可通过 spark.sql.optimizer.insertRepartitionNum 参数配置 Repartition 操作的分区数。
对于动态分区写入,加了一个随机数来解决 Repartition 可能带来的数据倾斜的问题,不过可能会导致小文件,通过 spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 配置可设置动态分区 Repartition 操作插入的随机分区数。

Kyuubi Metrics

Kyuubi Server 中也定义了一些监控指标,用于监控 Kyuubi Server 的运行状况,支持了很多的 Reporter,包括 Prometheus,后续工作需要将指标投递到 Prometheus 中,对 Kyuubi 服务进行监控告警。具体参考: Kyuubi Server Metrics。

Kyuubi Ctl

Kyuubi 的 bin 目录中提供了 kyuubi-ctl 工具,目前主要用于维护 Server 和 Engine 实例的状态,可以获取和删除 Server 和 Engine 在 Zookeeper 上的注册信息。

目前包括了,下面一些命令,可执行 bin/kyuubi-ctl --help 获取完整帮助信息。

1
2
3
4
5
6
Command: get [server|engine] [options]
Get the service/engine node info, host and port needed.
Command: delete [server|engine] [options]
Delete the specified service/engine node, host and port needed.
Command: list [server|engine] [options]
List all the service/engine nodes for a particular domain.

后续在服务做灰度升级时,可通过 kyuubi-ctl 命令,先下线 KyuubiServer 注册信息,切断 KyuubiServer 流量,等一段时间后该 KyuubiServer 上连接都关闭后,下线该服务。

后续规划

  1. 共享策略

    • 离线 SQL:对于离线 SQL 为了保证任务稳定性,不使用共享引擎,保证任务进行完全隔离不相互影响。
    • Adhoc 任务:使用 User 级别共享,加大 TTL 时间,让引擎尽量常驻,使得 Adhoc 查询能够及时响应;需要考虑 Spark 调度策略,防止资源抢占导致响应慢。
  2. 配置管理
    目前考虑将配置交由上游系统管理,根据标签设置不同配置,任务提交时带上相应的标签即可。

Spark Yarn ApplicationMaster 超时退出

Posted on 2021-07-28

问题说明

Spark 应用运行失败,查看 Spark UI 界面,发现任务执行正常,没有失败的 Job 和 Task。

问题分析

1. 查看 Yarn ResourceManger 的日志

从 Spark UI 界面中没有看到失败的 Job 和 Task,查看 Driver 日志看到 ApplicationMaster: RECEIVED SIGNAL TERM 信息,怀疑 Spark 应用被 kill 了,所以查看了 Yarn ResourceManger 日志,可以看到 Yarn 判断 ApplicationMaster 心跳超时导致,发送 TERM 信号。

1
Expired:appattempt_1623809535012_3914039_000001 Timed out after 90 secs

2. 分析 Spark ApplicationMaster 代码

ApplicationMaster 代码中启动 Reporter 线程调用 org.apache.spark.deploy.yarn.ApplicationMaster#allocationThreadImpl 方法,通过 while 循环向 Yarn 发送心跳。

怀疑是不是 Reporter 线程挂掉了,查看代码可确定日志中 Reporter 关键字,在 Driver 日志中并未发现异常退出信息。后直接在日志中查找 ApplicationMaster 关键字,发现如下信息:

1
2
3
4
5
6
7
21/07/26 16:00:21 INFO ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0

21/07/26 16:00:21 INFO SparkContext: Invoking stop() from shutdown hook

........

21/07/26 16:02:02 ERROR ApplicationMaster: RECEIVED SIGNAL TERM

可以看到先打印了 SUCCEEDED 状态,刚好大约 90s 后接收到 TERM 信号。

查看 ApplicationMaster 代码,在用户包的 main 方法执行完成后,改变为成功状态,并设置 finished=true,从而导致 Reporter 线程正常退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
val userThread = new Thread {
override def run() {
try {
if (!Modifier.isStatic(mainMethod.getModifiers)) {
logError(s"Could not find static main method in object ${args.userClass}")
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
} else {
mainMethod.invoke(null, userArgs.toArray)
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
logDebug("Done running user class")
}
} catch {
......

成功状态后进程并未退出,可以看到 SparkContext.stop() 方法在 main 方法执行完成后在执行的,由于 SparkContext 中添加 ShutdownHook 执行 stop() 方法。

解决

通过上面分析得出结论,由于在执行完用户包的 mian 方法后,Reporter 线程退出,执行 SparkContext.stop() ,并且 SparkContext.stop() 方法执行时间超过 90s (yarn.am.liveness-monitor.expiry-interval-ms),导致 Yarn 认为 ApplicationMaster 超时,向 ApplicationMaster 发送 TERM 信号,停止任务。

根据以上结论,让用户在程序结束时显示调用 SparkContext.stop() 解决问题。

后续

继续排查 SparkContext.stop 慢的原因,发现大量 AppStatusListener 异常信息,怀疑是 AppStatusListener 接收到了太多的事件,导致处理过慢。

查看 SparkUI 界面,发现 Stage 的 task 数量很多,并且执行时间都非常短,查看用户配置,发现 spark.sql.shuffle.partitions 参数设置太大,用户修改后程序正常。

Kyuubi 架构对比

Posted on 2021-07-02

背景

提供 JDBC 服务,迁移 Hive SQL 。

提供 Ad-Hoc 查询能力。

Spark Thrift Server

Spark Thrift Server 是 Spark 社区基于 HiveServer2 协议实现的 Thrift 服务,提供 Spark SQL JDBC 服务。由于 Spark 进程常驻,提交请求后立即执行计算,响应迅速,可用于 Ad-Hoc 查询。

Spark Thrift Server 架构

Spark Thrift Server 是基于 SparkContext 多线程应用场景的实现。

Spark 中一次 Action 算子对应一个 Job,提交一个 Job 后会等待执行完成,所以在单线程的场景下,Job 是顺序执行的。SparkContext 通过多线程提交 Job 时,不同的 Job 可并发提交执行。

下图是 Spark Thrift Server 的架构。Spark Thrift Server 在启动时初始化一个 SparkContext。接受请求后,会新建或者复用 SparkSession, 并通过 SparkSession.sql() 进行执行,由于处理请求的线程不同,所以 SparkSession.sql() 提交的任务可以并发执行。

优点

  • 架构简单,方便部署运维。
  • 启动后常驻 Spark 相关服务,响应快。
  • 统一服务,隐藏集群配置,方便统一管理和优化。

缺点

  • 单用户运行,不支持多用户访问
  • 资源不隔离
  • 不支持高可用

Kyuubi 0.7.0

Kyuubi 是网易开源的增强版 Spark Thrift Server 实现。支持多租户、资源隔离、高可用等,使得 Spark Thrift Server 服务具有更好的可用性和稳定性。

Kyuubi 0.7.0 架构

Kyuubi 0.7.0 版本,实现用户 Session 级别的 SparkContext 的初始化、注册、缓存、回收机制。

Kyuubi 接收到请求后可根据不同用户创建多个 SparkContext,启动多个 Spark 实例,从而实现多租户和资源隔离。

Kyuubi 0.7.0 高可用

Kyuubi 0.7.0 版本,支持 Load Balance Mode 、Active/Standby Failover 两种模式的高可用,通过 Zookeeper 作服务发现。

Load Balance Mode:负载均衡模式下,所有的 Kyuubi 服务都是活跃状态,这种模式下可以减轻 Kyuubi Server 的负载,提高服务的并发。不过会加大 Yarn 集群的负载,可能导致一个用户连接在不同的 Kyuubi Server 上,启动多个 SparkContext,造成集群资源浪费。

Active/Standby Failover:主备故障切换模式下,只有一个几点是 Active 的状态对外提供服务,当 Active 节点发生故障时,Standby 节点选举成功后变成 Active 状态对外提供服务,从而达到高可用,不过此模式不会加大整体并发能力。

优点

  • 支持多租户
  • 支持代理用户(hive.server2.proxy.user)
  • 支持 Session 级别配置
  • Executors 资源隔离
  • 支持高可用

缺点

  • 只支持 Yarn-Client 模式,共用 Driver 进程,可能成为服务瓶颈
  • 高可用模式存在缺陷

Kyuubi 1.3.0

Kyuubi 1.X 版本中,将 Kyuubi Server 和 SparkContext 进行解耦。引入 Spark SQL Engine 进行 SparkContext 初始化和 SQL 执行,在 Kyuubi Server 中通过 spark-submit 命令启动,启动后将自身状态保持在 Zookeeper 中。Kyuubi Server 接收到请求后通过 Zookeeper 寻找可用引擎或启动新引擎进行处理。

Spark SQL Engine 作为独立的 Spark 应用,可以以不同的方式执行,并支持 yarn-cluster 模式。状态持续在 Zookeeper 中,使得 Kyuubi Server 之间可以共用 Engine,提高了 Kyuubi Server 的扩展能力。

优点

  • 支持多租户
  • 支持资源隔离
  • 支持不同级别的引擎共享策略(CONNECTION, USER, SERVER)
  • 支持高可用

缺点

  • 多一层服务,加大了服务的复杂性
  • 增加依赖 Zookeeper,作为服务端和引擎端的服务发现

参考

  • Distributed SQL Engine
  • Kyuubi v.s. Spark Thrift JDBC/ODBC Server (STS)
  • Kyuubi 0.7.0 Architecture
  • Kyuubi 1.X Architecture

Spark 跨集群读写 Iceberg

Posted on 2021-06-10

运行环境

Spark: 2.4.3
Iceberg: 1.2.0

IcebergSource

Iceberg 提供 DataSourceRegister 接口的实现类 org.apache.iceberg.spark.source.IcebergSource,在 IcebergSource 中实现了 createReader、createWriter等方法进行 DataFrame 的读写。

IcebergSource 中的 findTable 方法是从 Catalog 中获取 Iceberg Table,此方法进一步通过 CustomCatalogs.table 获取 table, 在 CustomCatalogs 中 buildCatalog 是通过 spark.sessionState().newHadoopConf 获取 Hadoop、Hive 相关配置,那么默认的行为将没有办法进行多集群间的读写。

自定义 IcebergSource

IcebergSource 的 findTable 方法是 protected 的,可以从 IcebergSource 派生出自定义的 IcebergSource,在自定义的 IcebergSource 中维护多个集群的 Catalog,覆盖 findTable 方法从对应集群的 Catalog 中获取 Iceberg Table 对象,进而可实现多集群间的读写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class ClusterIcebergSource extends IcebergSource {

override def shortName(): String = SHORT_NAME

override def findTable(options: DataSourceOptions, conf: Configuration): Table = {
val path = options.get("path")
val cluster = options.get("cluster")
Preconditions.checkArgument(path.isPresent, "Cannot open table: path is not set".asInstanceOf[Object])
Preconditions.checkArgument(cluster.isPresent, "Cannot open table: cluster is not set".asInstanceOf[Object])

val catalog = loadClusterCatalog(cluster.get())
catalog.loadTable(tableIdentifier(path.get()))
}

}

object ClusterIcebergSource {

val SHORT_NAME = "iceberg-cluster"

val catalogs: util.Map[String, Catalog] = new ConcurrentHashMap[String, Catalog]()

def loadClusterCatalog(cluster: String): Catalog = {
if (!catalogs.containsKey(cluster)) catalogs synchronized {
if (!catalogs.containsKey(cluster)) {
val hiveCatalog = new HiveCatalog()
hiveCatalog.setConf(hadoopConf(cluster))
val properties = new util.HashMap[String, String]()
hiveCatalog.initialize(s"iceberg_catalog_$cluster", properties)
catalogs.put(cluster, hiveCatalog)
}
}
catalogs.get(cluster)
}

private def hadoopConf(cluster: String): Configuration = {
// TODO load cluster hadoop conf
null
}

private def tableIdentifier(path: String): TableIdentifier = {
val nameParts = path.split("\\.")
TableIdentifier.of(nameParts(0), nameParts(1))
}

}

注册自定义 IcebergSource

添加 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister 文件

1
com.***.ClusterIcebergSource

读写 Iceberg 表

Read Iceberg Table

1
2
3
spark.read.option("cluster", cluster)
.format(ClusterIcebergSource.SHORT_NAME)
.load(s"$database.$table")

Write DataFrame

1
2
3
4
5
6
df.write
.option("cluster", cluster)
.option(SparkWriteOptions.FANOUT_ENABLED, "true")
.format(ClusterIcebergSource.SHORT_NAME)
.mode("append")
.save(s"$database.$table")

遇到问题

  1. loadTable 时 HiveMetaStore 初始化报错,No suitable driver found for jdbc:mysql:***
1
2
3
4
5
6
7
8
java.sql.SQLException: No suitable driver found for jdbc:mysql://***:***/hive_db?createDatabaseIfNotExist=true
at java.sql.DriverManager.getConnection(DriverManager.java:689)
at java.sql.DriverManager.getConnection(DriverManager.java:208)
at com.jolbox.bonecp.BoneCP.obtainRawInternalConnection(BoneCP.java:349)
at com.jolbox.bonecp.BoneCP.<init>(BoneCP.java:416)
at com.jolbox.bonecp.BoneCPDataSource.getConnection(BoneCPDataSource.java:120)
at org.datanucleus.store.rdbms.ConnectionFactoryImpl$ManagedConnectionImpl.getConnection(ConnectionFactoryImpl.java:501)
at org.datanucleus.store.rdbms.RDBMSStoreManager.<init>(RDBMSStoreManager.java:298)

解决:

1
DriverRegistry.register("com.mysql.jdbc.Driver")

JanusGraph: OLAP Traversals with Spark On Yarn-Client Mode

Posted on 2021-03-08

问题背景

JanusGraph 支持使用 Spark 进行一些 OLAP 计算。官方文档中只给出了 Spark Local 模式和 Spark Standalone Cluster 模式的例子。参考:JanusGraph with TinkerPop’s Hadoop-Gremlin

由于我们的大部分 Spark 是运行在 Yarn 上面的,尝试通过 Gremlin Console 使用 Spark On Yarn-Client 模型运行 JanusGraph 的 OLAP 任务。

JanusGraph 我们是用于 Atlas 的存储,也做一些 Atlas 的适配。

版本信息:

1
2
3
4
JanusGraph: 0.5.1
Spark: 2.4.0
Hadoop: 2.7.7
Atlas 2.1.0

环境适配

打包安装

1
2
3
4
mvn clean install -Pjanusgraph-release -Dgpg.skip=true -DskipTests=true

# 安装包
janusgraph-dist/target/janusgraph-0.5.1.zip

依赖整理

由于 Janusgraph-Hadoop 并没有引入 spark-yarn 和 相关 jar 包。所以想要支持 Spark On Yarn 模式运行必须要加入相关的依赖。
直接下载 Spark 2.4.0 的安装包,将 jars 里面的 jar 包放在 ${JANUSGRAPH_HOME}/ext/spark/jars 中。

1
2
3
4
5
6
wget https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
tar xzvf spark-2.4.0-bin-hadoop2.7.tgz
cp ${SPARK_HOME}/jars/* ${JANUSGRAPH_HOME}/ext/spark/jars

# 去掉冲突 Jar 包
rm -f ${JANUSGRAPH_HOME}/ext/spark/jars/guava-14.0.1.jar

需要支持 Atlas,顾也加上 Atlas 相关 Jar 包。

1
2
3
4
5
6
cp ${ATLAS_HOME}/server/webapp/atlas/WEB-INF/lib/* ${JANUSGRAPH_HOME}/ext/atlas/jars

# 去掉冲突 Jar 包
rm -f ${JANUSGRAPH_HOME}/ext/atlas/jars/atlas-webapp-2.1.0.jar
rm -f ${JANUSGRAPH_HOME}/ext/atlas/jars/netty-3.10.5.Final.jar
rm -f ${JANUSGRAPH_HOME}/ext/atlas/jars/netty-all-4.0.52.Final.jar

启动脚本

编辑 gremlin.sh,做了如下配置:

  • 配置 SPARK_HOME
  • 配置 HADOOP_CONF_DIR 和 HBASE_CONF_DIR 并加入 CLASSPATH
  • 配置 HADOOP_GREMLIN_LIBS,通过 SparkContext addJar 方式引入依赖。
1
2
3
4
5
export SPARK_HOME=${JANUSGRAPH_HOME}/ext/spark
export HADOOP_GREMLIN_LIBS=${JANUSGRAPH_HOME}/ext/spark:${JANUSGRAPH_HOME}/ext/atlas:${JANUSGRAPH_HOME}/lib
export HADOOP_CONF_DIR=/etc/hadoop/conf
export HBASE_CONF_DIR=/etc/hbase/conf
export CLASSPATH=$CLASSPATH:$HADOOP_CONF_DIR:HBASE_CONF_DIR

JanusGraph 配置

hadoop-graph 配置

JanusGraph 后端使用 HBase 作为存储。

配置 ${JANUSGRAPH_HOME}/conf/hadoop-graph/read-hbase.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#
# Hadoop Graph Configuration
#
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.hbase.HBaseInputFormat
gremlin.hadoop.graphWriter=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat

gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output
gremlin.spark.persistContext=true

#
# JanusGraph HBase InputFormat configuration
#
janusgraphmr.ioformat.conf.storage.backend=hbase
janusgraphmr.ioformat.conf.storage.hostname=*****
janusgraphmr.ioformat.conf.storage.hbase.table=apache_atlas_janus

#
# SparkGraphComputer Configuration
#
#spark.master=local[*]
#spark.executor.memory=1g
spark.master=yarn-client
spark.executor.memory=2g
spark.executor.instances=4
spark.yarn.principal=******
spark.yarn.keytab=******.keytab
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator=org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator

日志配置

修改 ${JANUSGRAPH_HOME}/conf/log4j-console.properties 配置

1
2
3
4
5
6
7
8
9
10
11
# 添加 Rolling_File Log Appender
log4j.appender.Rolling_File=org.apache.log4j.RollingFileAppender
log4j.appender.Rolling_File.Threshold=INFO
log4j.appender.Rolling_File.File=logs/gremlin.log
log4j.appender.Rolling_File.Append=true
log4j.appender.Rolling_File.MaxFileSize=100MB
log4j.appender.Rolling_File.MaxBackupIndex=10
log4j.appender.Rolling_File.layout=org.apache.log4j.PatternLayout
log4j.appender.Rolling_File.layout.ConversionPattern=%-d{yyyy-MM-dd HH\:mm\:ss,SSS} [%C]-[%p] %m%n

log4j.rootLogger=${gremlin.log4j.level}, A2, Rolling_File

启动时可通过 -l 修改日志级别

1
bin/gremlin.sh -l info

执行 Hive Table Count 任务

启动 Gremlin Console:bin/gremlin.sh (可加上 -l info 指定日志级别)。

执行下面语句统计 hive_table 数量,可查看 logs/gremlin.log 日志信息,可以看到 Spark on yarn 的 application 信息。

1
2
3
graph = GraphFactory.open('conf/hadoop-graph/read-hbase.properties')
g = graph.traversal().withComputer(SparkGraphComputer)
g.V().has("__typeName", "hive_table").count()

报错 groovy 脚本

保存一个 groovy 初始化脚本,不用每次都初始化 graph。

编辑 ${JANUSGRAPH_HOME}/scripts/graph-spark.groovy

1
2
graph = GraphFactory.open('conf/hadoop-graph/read-hbase.properties')
g = graph.traversal().withComputer(SparkGraphComputer)

使用 graph-spark.groovy 初始化 脚本启动 gremlin console

1
bin/gremlin.sh -i scripts/graph-spark.groovy

Atlas 1.1.0 Full GC 问题分析

Posted on 2021-02-24

Atlas 1.1.0 Full GC 问题分析

问题背景

频繁接收到 Atlas 拨测告警,拨测程序是访问 atlas 查询 entity 的接口,一段时间都是 502 的返回,持续几分钟后恢复。查看 Atlas gc 日志,发现 Full GC 日志,并且持续时间和服务不可用时间吻合,基本确定就是由于 Full GC 导致。

GC LOG 分析

JVM 相关参数调整

查看 GC 日志发现,(concurrent mode failure)导致的 Full GC。

进行 JVM 参数调整,将 -XX:CMSInitiatingOccupancyFraction 设置为 50,即当老年代内存达到 50% 时,触发 CMS GC。
,参考:CMS之promotion failed&concurrent mode failure

完整参数:

1
-XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+HeapDumpOnOutOfMemoryError -XX:CMSInitiatingOccupancyFraction=50 -XX:ParallelGCThreads=20 -XX:+CMSScavengeBeforeRemark -XX:MaxGCPauseMillis=400 -XX:HeapDumpPath=dumps/atlas_server.hprof -Xloggc:logs/gc-worker.log -verbose:gc -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10m -XX:+PrintGCDetails -XX:+PrintGCDateStamps

调整完参数后,执行以下命令,观察 GC 变化。

1
jstat -gcutil PID 3000

发现 old Space 达到 50% 时,会触发一次 old GC,old Space 使用会下降,不过持续观察一段时间后,还是会出现 Full GC,调整内存还是会出现 Full GC 问题。

GC log 分析工具

可以使用 GC Log 分析工具,直观的看到 GC 变化。

GCViewer

项目地址:https://github.com/chewiebug/GCViewer
使用说明:Results of log analysis

GCEasy

在线 GC log 分析,地址: https://blog.gceasy.io/

Dump 分析

经过一些 JVM 参数的调整和 GC Log 分析,还是无法避免 Full GC,感觉可能还是由于创建大量存活对象,顾对程序进行 Dump 分析。

生成 Dump 文件

生成 Dump 文件的方式有很多种,下面列举了几种方式。由于 jmap 生成 DUMP 文件时会导致服务挂起,对线上服务有影响,所以尝试使用 gcore 方式。不过将 gcore 导出的 core 文件转换成 bin 文件报错,没有具体研究。最终使用的 Arthas heapdump 命令(不太确定是否影响线上服务)。

Jmap 命令

1
2
3
4
5
6
7
8
# 显示Java堆详细信息
jmap -heap [pid]

# 显示堆中对象统计信息
jmap -histo:live [pid]

# 获取 dump 文件
jmap -dump:live,:format=b,file=文件名 [pid]

Arthas heapdump 命令

具体参考:Arthas heapdump

Gcore

具体可参考:记一次Java服务频繁Full GC的排查过程

分析 Dump 文件

使用 jhat 分析 dump 文件。

1
2
# dump.hprof 为 Dump 文件路径
jhat -J-d64 -J-mx8g -J-ms8g dump.hprof

分析完成后,访问 http://IP:7000 ,查看详情。点击 ‘Show heap histogram’ 可查看堆中对象统计信息。

点击占用内存较高的对象,并抽样分析引用该对象的对象(References to this object 中的对象),关注到大多都指向了 CacheEdge 和 CacheVertex 两个类型的对象,这两个对象应该是 JanusGraph 中顶底和边。

查看 CacheVertex 的关联对象,发现主要来自 LocalCache 和 CacheEdge 两个类型对象。

JanusGraph 缓存

分析 LocalCache 对象,来自 com.google.common.cache.LocalCache.LocalManualCache 对象,是 Guava 的缓存工具。从对象引用确定到与 JanusGraph 的两个地方的缓存有关:

1
2
org.janusgraph.diskstorage.keycolumnvalue.cache.ExpirationKCVSCache#ExpirationKCVSCache
org.janusgraph.graphdb.transaction.vertexcache.GuavaVertexCache#GuavaVertexCache

ExpirationKCVSCache 用于 JanusGraph 全局的 KeyColumnValue 缓存,db-cache-size 参数表示缓存最大占用总堆内存的百分比(小于 1 时),或指定缓存大小(大于 1 时),默认为 0.3。

GuavaVertexCache 用于 Transaction 中的 Vertex 缓存。tx-cache-size 参数表示一次事务中最大缓存 Vertex 的大小,JanusGraph 中默认为 20000,Atlas 里面默认为 15000。

怀疑是否是缓存过大导致,顾对以下参数进行调整。

1
2
atlas.graph.cache.db-cache-size=0.2 # 
atlas.graph.cache.tx-cache-size=100

查找元凶

对缓存进行调整后发现还是出现 Full GC 继续分析 Dump 文件。

CacheVertex 和 CacheEdge 对象相互引用,看到部分 CacheVertex 被大量 CacheEdge 引用。猜测是否由于一个定点有很多的边导致的。

使用 OQL 查询被引用对象超过 1000 的 CacheVertex,具体语法参考:Object Query Language。

1
select { name:v, id: v.id, count: count(referrers(v))} from org.janusgraph.graphdb.vertices.CacheVertex v where count(referrers(v))  > 1000

可以看到有个定点甚至有几万的边,使用 Gremlin 查询这些 Vertex,发现主要是下面两种情况,这里操作参考之前文章:Gremlin Server/Console 适配 Atlas JanusGraph。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
gremlin> g.V(1006100712).properties("__typeName")
==>vp[__typeName->hive_table]
gremlin> g.V(1006100712).inE().count()
==>34436
gremlin> g.V(1006100712).inE()
==>e[51zpq-1mank-2711-gn08a0][2719856-__Process.inputs->1006100712]
==>e[635se-1yhyo-2711-gn08a0][3289200-__Process.inputs->1006100712]
==>e[7tmvi-2iycw-2711-gn08a0][4243568-__Process.inputs->1006100712]
==>e[b50r2-3p6k0-2711-gn08a0][6213744-__Process.inputs->1006100712]
......

gremlin> g.V(98328).properties("__typeName")
==>vp[__typeName->hive_db]
gremlin> g.V(98328).inE().count()
==>5734
gremlin> g.V(98328).inE()
==>e[8o4f-2goo-acyd-23vc][114936-__hive_table.db->98328]
==>e[ej2i-44ls-acyd-23vc][192592-__hive_table.db->98328]
==>e[sl5n-6riw-acyd-23vc][315608-__hive_table.db->98328]
==>e[tpfy-9qwg-acyd-23vc][454768-__hive_table.db->98328]
==>e[c2oer-48hw8-acyd-23vc][7114904-__hive_table.db->98328]
......

可以看到 hive_db 的情况,应该是会查询出 hive_db 的所有 hive_table,正常的操作不应该查询出所有的关联对象,再对程序进行 jstack 分析。

解决问题

再对程序进行 jstack 分析,这里需要注意需要在程序正常运行是进行 jstack,不要在 Full GC 发生时进行 jstack。

通过查看 jstack log 注意到的 org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever#mapRelationshipAttributes 方法,这个方法会遍历 relationship 的对象,会导致大量对象的创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
"NotificationHookConsumer thread-40" #522 prio=5 os_prio=0 tid=0x00007f897794d000 nid=0x98e5 runnable [0x00007f69c4ecd000]
java.lang.Thread.State: RUNNABLE
// ......
at org.apache.tinkerpop.gremlin.structure.Vertex.property(Vertex.java:38)
at org.apache.atlas.repository.graphdb.janus.AtlasJanusElement.getProperty(AtlasJanusElement.java:65)
at org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex(AtlasGraphUtilsV2.java:105)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapVertexToRelatedObjectId(EntityGraphRetriever.java:1025)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapRelatedVertexToObjectId(EntityGraphRetriever.java:991)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapVertexToRelationshipAttribute(EntityGraphRetriever.java:976)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapRelationshipAttributes(EntityGraphRetriever.java:944)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapVertexToAtlasEntity(EntityGraphRetriever.java:418)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapVertexToAtlasEntity(EntityGraphRetriever.java:395)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapVertexToObjectId(EntityGraphRetriever.java:900)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapVertexToCollectionEntry(EntityGraphRetriever.java:819)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapVertexToArray(EntityGraphRetriever.java:787)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapVertexToAttribute(EntityGraphRetriever.java:709)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapAttributes(EntityGraphRetriever.java:568)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapVertexToAtlasEntity(EntityGraphRetriever.java:415)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.toAtlasEntityWithExtInfo(EntityGraphRetriever.java:183)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.toAtlasEntityWithExtInfo(EntityGraphRetriever.java:178)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.toAtlasEntityWithExtInfo(EntityGraphRetriever.java:166)
at org.apache.atlas.repository.converters.AtlasInstanceConverter.getAndCacheEntity(AtlasInstanceConverter.java:300)
at org.apache.atlas.repository.store.graph.v2.AtlasEntityChangeNotifier.toAtlasEntities(AtlasEntityChangeNotifier.java:409)
at org.apache.atlas.repository.store.graph.v2.AtlasEntityChangeNotifier.notifyV2Listeners(AtlasEntityChangeNotifier.java:305)
at org.apache.atlas.repository.store.graph.v2.AtlasEntityChangeNotifier.notifyListeners(AtlasEntityChangeNotifier.java:275)
at org.apache.atlas.repository.store.graph.v2.AtlasEntityChangeNotifier.onEntitiesMutated(AtlasEntityChangeNotifier.java:108)
at org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2.createOrUpdate(AtlasEntityStoreV2.java:732)
at org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2.createOrUpdate(AtlasEntityStoreV2.java:253)
// ......

"NotificationHookConsumer thread-11" #464 prio=5 os_prio=0 tid=0x00007f8977919000 nid=0x98c8 runnable [0x00007f69c6beb000]
java.lang.Thread.State: RUNNABLE
// ......
at org.apache.tinkerpop.gremlin.structure.Vertex.property(Vertex.java:72)
at org.apache.tinkerpop.gremlin.structure.Vertex.property(Vertex.java:38)
at org.apache.atlas.repository.graphdb.janus.AtlasJanusElement.getProperty(AtlasJanusElement.java:65)
at org.apache.atlas.repository.graph.GraphHelper.getTypeName(GraphHelper.java:1090)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapSystemAttributes(EntityGraphRetriever.java:1121)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapEdgeToAtlasRelationship(EntityGraphRetriever.java:1080)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapEdgeToAtlasRelationship(EntityGraphRetriever.java:1070)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapVertexToRelatedObjectId(EntityGraphRetriever.java:1033)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapRelationshipArrayAttribute(EntityGraphRetriever.java:1010)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapVertexToRelationshipAttribute(EntityGraphRetriever.java:981)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapRelationshipAttributes(EntityGraphRetriever.java:944)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapVertexToAtlasEntity(EntityGraphRetriever.java:418)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.mapVertexToAtlasEntity(EntityGraphRetriever.java:395)
at org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.toAtlasEntity(EntityGraphRetriever.java:162)
at org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2.createOrUpdate(AtlasEntityStoreV2.java:699)
at org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2.createOrUpdate(AtlasEntityStoreV2.java:253)
// ......

根据上面两个线程栈信息,定位在了下面两个方法,具体的修改参考 Atlas 2.1.0 代码进行修改,这里主要是去掉不必要的 Relationship 查询。

1
2
3
org.apache.atlas.repository.store.graph.v2.AtlasEntityChangeNotifier#toAtlasEntities

org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2#createOrUpdate(org.apache.atlas.repository.store.graph.v2.EntityStream, boolean, boolean)

修改后问题解决,消费性能大幅度提高,后续计划升级至 Atlas 2.1.0。

12…5<i class="fa fa-angle-right"></i>

45 posts
28 tags
GitHub
© 2022 wangz