Kyuubi 调研测试

一、背景

Kyuubi 是 Spark Thrift Server 增强版实现,实现 HiveServer2 协议,启动一个 Thrift 服务,通过 JDBC 方式接收 Spark SQL 请求并执行。我司通过提供 Kyuubi 服务, 实现 Hive SQL 到 Spark SQL 的迁移,同时提供 Ad-hoc 查询服务。

目前使用版本是 Kyuubi 0.7 版本,社区最新的 Kyuubi 1.4.0 版本带来了很大的架构优化,所以做如下的调研测试。

二、测试环境部署

版本信息

Kyuubi 版本:1.4.0 (未发布,master 分支)

Spark 版本:3.1.1

打包编译

拉取 Kyuubi 代码,并执行打包命令,完成后上传至服务器

1
./build/dist --tgz --spark-provided -Pkyuubi-extension-spark-3-1  # 打包

配置

  1. 绑定IP和端口

    配置 Kyuubi Server 服务 IP 和端口

    1
    2
    kyuubi.frontend.bind.host       0.0.0.0
    kyuubi.frontend.bind.port 10015
  2. 配置 KERBEROS 认证

    Hadoop 集群开启了 Kerberos 认证,则配置 Kerberos 认证,并添加 Kerberos 相关配置,使用 hue 用户代理运行。

    1
    2
    3
    kyuubi.authentication   KERBEROS
    kyuubi.kinit.principal hue/_HOST@***.COM
    kyuubi.kinit.keytab /etc/kyuubi/conf/hue.keytab
  3. Zookeeper 配置

    Kyuubi 依赖 Zookeeper 做服务发现和 HA,所以需要添加 Zookeeper 配置并使用了 DIGEST 认证。

    1
    2
    3
    kyuubi.ha.zookeeper.quorum=192.168.1.100:2181
    kyuubi.ha.zookeeper.auth.type=DIGEST
    kyuubi.ha.zookeeper.auth.digest=hue:hue
  4. 配置 Namespace

    不同集群的 Kyuubi Server 使用同一个 Zookeeper 集群,配置不同 Namespace 隔离,后续连接时只需要指定 zooKeeperNamespace 访问不同集群。

    1
    2
    kyuubi.ha.zookeeper.namespace=kyuubi_cluster001
    kyuubi.session.engine.initialize.timeout=180000

三、功能调研

Beeline 连接 Kyuubi

使用 beeline 工具连接 Kyuubi 进行测试,Kyuubi JDBC 链接包括了以下配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# zookeeper 连接
ZOOKEEPER_QUORUM=192.168.1.100:2181
# 集群名称
CLUSTER=cluster001
# spark 配置
SPARKI_CONFS=spark.executor.instances=10;spark.executor.memory=3g
# kyuubi 配置
KYUUBI_CONFS=$SPARKI_CONFS;kyuubi.engine.share.level=CONNECTION

# jdbc 链接
JBDC_URL="jdbc:hive2://${ZOOKEEPER_QUORUM}/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi_$CLUSTER?$KYUUBI_CONFS"

# 使用 beeline 连接
beeline -u "$JBDC_URL"

相关说明:

  • 由于 JDBC 链接会有分号等字符,beeline 连接时 JDBC 链接需要带上引号。
  • Hive 和 Spark 中都有 beeline 命令,可能与 Kyuubi Server 存在兼容性问题,需要使用合适的 beeline 路径。

共享引擎策略

  1. 引擎共享策略
    Kyuubi 支持共享引擎,可通 kyuubi.engine.share.level 配置不同共享级别,共享级别定义如下:
    • CONNECTION:连接级别,引擎适用于一次 jdbc connection,不做其他共享,此配置适用于离线 ETL 任务,使得不同任务之间相互隔离。
    • USER:用户级别共享,引擎可以在同一个用户的不同连接进行共享,适用于 AdHoc 查询和较小的任务,可以节省资源,并在有可用引擎时支持快速响应。
    • SERVER:服务级别共享(全局共享),引擎可以全局共享,所有连接可以共享一个引擎,不过启动引擎的用户需要具有较高权限才能满足访问不同用户的表。
  2. 引擎使用单个 SparkSession
    默认情况下,共享引擎对于新的 Connection 连接,使用的新的 SparkSession,不同连接共享 SparkContext 的资源,不过一些 session 级别的参数、函数、临时表等都是隔离开的。可以通过kyuubi.engine.single.spark.session参数,使用全局的 SparkSession,使得不同连接可以共享 Session 状态,包括参数、函数、临时表等。
  3. 引擎 TTL
    对于共享引擎,多个连接共享使用,并不由某个连接单独管理,在某个连接关闭后引擎不会马上退出,而是在引擎空闲的时间超过配置的超时时间后自动退出,通过kyuubi.session.engine.idle.timeout 参数进行配置。

对于共享引擎,官方公众号有更详细介绍:Apache Kyuubi:灵活运用引擎隔离共享,加速即席查询,支持大规模 ETL

用户默认配置

Kyuubi 支持用户级别的默认配置,可以为不同用户配置不同的参数,详见:Settings: User Defaults

下面示例,给 user1 和 user2 设置了不同的队列和动态资源最大 Executor 数:

1
2
3
4
5
6
7
8
9
10
11
12
# For system defaults
spark.master=yarn;
spark.submit.deployMode=cluster
spark.dynamicAllocation.enabled=true;

# user1
__user1__.spark.yarn.queue=root.queue1;
__user2__.spark.dynamicAllocation.maxExecutors=200;

# user2
__user2__.spark.yarn.queue=root.queue2;
__user2__.spark.dynamicAllocation.maxExecutors=500;

Kyuubi Spark SQL Extensions

Kyuubi 中实现了一些 Spark SQL 的优化,可通过 spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension 配置开启,具体:Auxiliary SQL extension for Spark SQL

解决小文件问题:
KyuubiSparkSQLExtension 中定义了 RepartitionBeforeWritingHiveRepartitionBeforeWritingDatasource 规则,在写入 Hive 或 DataSource 前插入 Repartition 操作,来控制写入的分区数,可通过 spark.sql.optimizer.insertRepartitionNum 参数配置 Repartition 操作的分区数。
对于动态分区写入,加了一个随机数来解决 Repartition 可能带来的数据倾斜的问题,不过可能会导致小文件,通过 spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 配置可设置动态分区 Repartition 操作插入的随机分区数。

Kyuubi Metrics

Kyuubi Server 中也定义了一些监控指标,用于监控 Kyuubi Server 的运行状况,支持了很多的 Reporter,包括 Prometheus,后续工作需要将指标投递到 Prometheus 中,对 Kyuubi 服务进行监控告警。具体参考: Kyuubi Server Metrics

Kyuubi Ctl

Kyuubi 的 bin 目录中提供了 kyuubi-ctl 工具,目前主要用于维护 Server 和 Engine 实例的状态,可以获取和删除 Server 和 Engine 在 Zookeeper 上的注册信息。

目前包括了,下面一些命令,可执行 bin/kyuubi-ctl --help 获取完整帮助信息。

1
2
3
4
5
6
Command: get [server|engine] [options]
Get the service/engine node info, host and port needed.
Command: delete [server|engine] [options]
Delete the specified service/engine node, host and port needed.
Command: list [server|engine] [options]
List all the service/engine nodes for a particular domain.

后续在服务做灰度升级时,可通过 kyuubi-ctl 命令,先下线 KyuubiServer 注册信息,切断 KyuubiServer 流量,等一段时间后该 KyuubiServer 上连接都关闭后,下线该服务。

后续规划

  1. 共享策略

    • 离线 SQL:对于离线 SQL 为了保证任务稳定性,不使用共享引擎,保证任务进行完全隔离不相互影响。
    • Adhoc 任务:使用 User 级别共享,加大 TTL 时间,让引擎尽量常驻,使得 Adhoc 查询能够及时响应;需要考虑 Spark 调度策略,防止资源抢占导致响应慢。
  2. 配置管理
    目前考虑将配置交由上游系统管理,根据标签设置不同配置,任务提交时带上相应的标签即可。