Kyuubi 多数据源混合计算

Kyuubi 多数据源混合计算

Apache Kyuubi 作为一个分布式和多租户网关,用于在 Lakehouse 上提供 Serverless SQL。
借助于 Spark 引擎完整的生态,我们可以接入各种数据源,实现各种数据源的混合计算,以及不同数据源之间的数据同步
第三方数据源通过实现 Spark DataSource 相关接口,提供 Spark Connector 依赖包来与 Spark 对接。Spark SQL 中可通过 DataSourceCatalog 的方式来定义数据源,进而通过 Spark SQL 读写数据源数据。

DataSource

Spark SQL 可使用 DataSource 配置,创建一个数据源的临时视图。

1
2
3
4
5
6
7
8
9
# 从 JSON 文件创建一个临时视图
CREATE TEMPORARY VIEW jsonTable
USING org.apache.spark.sql.json
OPTIONS (
path 'examples/src/main/resources/people.json'
)

# 查询定义的临时视图
SELECT * FROM jsonTable

Catalog

Spark 3 提供 CatalogPlugin 接口,可以为数据源实现自定义的 Catalog,通过相关配置加载数据源的 Catalog,进而访问该数据源的数据。

1
2
3
4
5
6
7
8
9
# 定义一个名称为 test 的 catalog,catalog_class 为 CatalogPlugin 的实现类
spark.sql.catalog.test catalog_class

# 配置 test catalog 的 options
spark.sql.catalog.test.optKey1 optValue1
spark.sql.catalog.test.optKey2 optValue2

# 通过 `test.db.table` 访问自定义 catalog 下的库表
select * from test.db.table

Spark SQL 连接各种数据源

HDFS

Spark SQL 中默认只能加载一个 Hive 集群配置,对于有跨集群访问 Hive 的需求,我们可以使用文件类型的数据源,定义其它集群的 HDFS 路径,跨集群的操作 Hive 数据。

Spark 内置了一些文件类型的数据源,可以用于读写 HDFS 文件,如:Parquet、ORC、JSON、CSV 等,不需要额外的依赖。

对于开启了 Kerberos 认证的集群,需要使用 kyuubi.credentials.hadoopfs.uris 配置跨集群访问的 Hadoop FileSystems。不过目前这个参数是 Kyuubi Server 的参数,运行时不可修改,后续可以进优化。

1
kyuubi.credentials.hadoopfs.uris=hdfs://192.168.1.199

通过 DataSource 方式访问 HDFS:

1
2
3
4
5
6
7
CREATE TEMPORARY VIEW parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (
path 'hdfs://192.168.1.199/user/hadoop/test/people.parquet'
);

select * from parquetTable limit 10;

JDBC

Spark 内置了 JDBC 的 DataSource 和 Catalog 实现,仅需要添加 JDBC Driver 依赖(我们 Spark jars 中添加了 MySQL Driver,所以不需要额外引入)。

通过 DataSource 方式访问 MySQL:

1
2
3
4
5
6
7
8
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url 'jdbc:mysql://192.168.1.199:3306/database',
dbtable 'database.tablename',
user 'username',
password 'password'
);

通过 CatalogPlugin 方式访问 MySQL:

1
2
3
4
5
6
7
set spark.sql.catalog.mysql=org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog;
set spark.sql.catalog.mysql.url=jdbc:mysql://192.168.1.199/database;
set spark.sql.catalog.mysql.user=username;
set spark.sql.catalog.mysql.password=password;

use database;
select * from tablename;

HBase

使用 HBase Spark Connector 组件:hbase-connectors

HBase Shell 创建测试表:

1
2
3
create 'wangzhen_test', 'cf'
put 'wangzhen_test', 'row1', 'cf:a', 'value1'
scan 'wangzhen_test'

hbase-connectors 没有提供打入依赖的 shade 包,使用 maven-shade-plugin 插件制作完整的 shade 包便于添加 HBase Spark Connector 依赖,可以排除 Hadoop、Spark 等 provided 依赖包仅需要 HBase Client 相关依赖。
运行时可通过 add jars 命令加入依赖:

1
2
# 添加 HBase Spark Connector 依赖
add jars "hdfs://XXX/user/hadoop/jars/hbase-spark-1.0.1-SNAPSHOT.jar";

通过 DataSource 方式访问 HBase:

1
2
3
4
5
6
7
8
9
10
11
# 创建 HBase 表临时视图:
CREATE TEMPORARY VIEW hbaseTable
USING org.apache.hadoop.hbase.spark
OPTIONS (
'catalog' '{"table":{"namespace":"default", "name":"wangzhen_test"}, "rowkey":"key1", "columns":{"col1":{"cf":"rowkey", "col":"key1", "type":"string"}, "col2":{"cf":"cf", "col":"a", "type":"string"}}}',
'hbase.spark.config.location' 'hbase-site.xml',
'hbase.spark.use.hbasecontext' 'false'
);

# 读取 HBase 数据
select * from hbaseTable;

我们使用 Impersonation 模式运行 Kyuubi 服务,在 Kyuubi Server 中使用代理用户启动 Spark 引擎,Kyuubi Server 默认仅实现了获取 HDFS 和 Hive 的 Delegation Token。
我们 HBase 集群也开启了 Kerberos 认证,测试时通过指定 spark.kerberos.keytabspark.kerberos.principal 方式运行 Spark Engine,后续需要在 Kyuubi Server 中实现 HBaseDelegationTokenProvider 来获取 HBase 的 Token。

ClickHouse

ClickHouse 提供了 JDBC 客户端,可以使用 Spark JDBC 的方式连接 ClickHouse。

另外开源的 spark-clickhouse-connector 组件,实现了 Spark ClickHouse Connector ,相对于 JDBC 的方式有更好的分区逻辑和更多的优化,对于读取操作应该有很好的性能提升,需要注意的是 spark-clickhouse-connector 是通过 GRPC 接口访问 ClickHouse 集群。

spark-clickhouse-connector 组件默认没有提供 Spark3.1的支持,我们通过复制 Spark3.3 实现的代码,简单的调整后测试可用。

使用 spark-clickhouse-connector 的 CatalogPlugin 方式访问 ClickHouse:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 添加 spark-clickhouse-connector 依赖
add jars 'hdfs://XXX/user/hadoop/jars/clickhouse-spark-runtime-3.1_2.12-0.4.0-SNAPSHOT.jar';

# 配置 clickhouse catalog
set spark.sql.catalog.clickhouse=xenon.clickhouse.ClickHouseCatalog;
set spark.sql.catalog.clickhouse.host=192.168.1.199;
set spark.sql.catalog.clickhouse.grpc_port=9100;
set spark.sql.catalog.clickhouse.user=username;
set spark.sql.catalog.clickhouse.password=password;
set spark.sql.catalog.clickhouse.database=database;

# 查询 clickhouse 数据
use clickhouse;
select * from tablename;