运行环境
Spark: 2.4.3
Iceberg: 1.2.0
IcebergSource
Iceberg 提供 DataSourceRegister
接口的实现类 org.apache.iceberg.spark.source.IcebergSource
,在 IcebergSource 中实现了 createReader
、createWriter
等方法进行 DataFrame 的读写。
IcebergSource 中的 findTable
方法是从 Catalog 中获取 Iceberg Table,此方法进一步通过 CustomCatalogs.table 获取 table, 在 CustomCatalogs 中 buildCatalog 是通过 spark.sessionState().newHadoopConf
获取 Hadoop、Hive 相关配置,那么默认的行为将没有办法进行多集群间的读写。
自定义 IcebergSource
IcebergSource 的 findTable
方法是 protected
的,可以从 IcebergSource 派生出自定义的 IcebergSource,在自定义的 IcebergSource 中维护多个集群的 Catalog,覆盖 findTable
方法从对应集群的 Catalog 中获取 Iceberg Table 对象,进而可实现多集群间的读写。
1 | class ClusterIcebergSource extends IcebergSource { |
注册自定义 IcebergSource
添加 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
文件
1 | com.***.ClusterIcebergSource |
读写 Iceberg 表
Read Iceberg Table
1 | spark.read.option("cluster", cluster) |
Write DataFrame
1 | df.write |
遇到问题
- loadTable 时 HiveMetaStore 初始化报错,
No suitable driver found for jdbc:mysql:***
1 | java.sql.SQLException: No suitable driver found for jdbc:mysql://***:***/hive_db?createDatabaseIfNotExist=true |
解决:
1 | DriverRegistry.register("com.mysql.jdbc.Driver") |