Spark 跨集群读写 Iceberg

运行环境

Spark: 2.4.3
Iceberg: 1.2.0

IcebergSource

Iceberg 提供 DataSourceRegister 接口的实现类 org.apache.iceberg.spark.source.IcebergSource,在 IcebergSource 中实现了 createReadercreateWriter等方法进行 DataFrame 的读写。

IcebergSource 中的 findTable 方法是从 Catalog 中获取 Iceberg Table,此方法进一步通过 CustomCatalogs.table 获取 table, 在 CustomCatalogs 中 buildCatalog 是通过 spark.sessionState().newHadoopConf 获取 Hadoop、Hive 相关配置,那么默认的行为将没有办法进行多集群间的读写。

自定义 IcebergSource

IcebergSource 的 findTable 方法是 protected 的,可以从 IcebergSource 派生出自定义的 IcebergSource,在自定义的 IcebergSource 中维护多个集群的 Catalog,覆盖 findTable 方法从对应集群的 Catalog 中获取 Iceberg Table 对象,进而可实现多集群间的读写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class ClusterIcebergSource extends IcebergSource {

override def shortName(): String = SHORT_NAME

override def findTable(options: DataSourceOptions, conf: Configuration): Table = {
val path = options.get("path")
val cluster = options.get("cluster")
Preconditions.checkArgument(path.isPresent, "Cannot open table: path is not set".asInstanceOf[Object])
Preconditions.checkArgument(cluster.isPresent, "Cannot open table: cluster is not set".asInstanceOf[Object])

val catalog = loadClusterCatalog(cluster.get())
catalog.loadTable(tableIdentifier(path.get()))
}

}

object ClusterIcebergSource {

val SHORT_NAME = "iceberg-cluster"

val catalogs: util.Map[String, Catalog] = new ConcurrentHashMap[String, Catalog]()

def loadClusterCatalog(cluster: String): Catalog = {
if (!catalogs.containsKey(cluster)) catalogs synchronized {
if (!catalogs.containsKey(cluster)) {
val hiveCatalog = new HiveCatalog()
hiveCatalog.setConf(hadoopConf(cluster))
val properties = new util.HashMap[String, String]()
hiveCatalog.initialize(s"iceberg_catalog_$cluster", properties)
catalogs.put(cluster, hiveCatalog)
}
}
catalogs.get(cluster)
}

private def hadoopConf(cluster: String): Configuration = {
// TODO load cluster hadoop conf
null
}

private def tableIdentifier(path: String): TableIdentifier = {
val nameParts = path.split("\\.")
TableIdentifier.of(nameParts(0), nameParts(1))
}

}

注册自定义 IcebergSource

添加 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister 文件

1
com.***.ClusterIcebergSource

读写 Iceberg 表

Read Iceberg Table

1
2
3
spark.read.option("cluster", cluster)
.format(ClusterIcebergSource.SHORT_NAME)
.load(s"$database.$table")

Write DataFrame

1
2
3
4
5
6
df.write
.option("cluster", cluster)
.option(SparkWriteOptions.FANOUT_ENABLED, "true")
.format(ClusterIcebergSource.SHORT_NAME)
.mode("append")
.save(s"$database.$table")

遇到问题

  1. loadTable 时 HiveMetaStore 初始化报错,No suitable driver found for jdbc:mysql:***
1
2
3
4
5
6
7
8
java.sql.SQLException: No suitable driver found for jdbc:mysql://***:***/hive_db?createDatabaseIfNotExist=true
at java.sql.DriverManager.getConnection(DriverManager.java:689)
at java.sql.DriverManager.getConnection(DriverManager.java:208)
at com.jolbox.bonecp.BoneCP.obtainRawInternalConnection(BoneCP.java:349)
at com.jolbox.bonecp.BoneCP.<init>(BoneCP.java:416)
at com.jolbox.bonecp.BoneCPDataSource.getConnection(BoneCPDataSource.java:120)
at org.datanucleus.store.rdbms.ConnectionFactoryImpl$ManagedConnectionImpl.getConnection(ConnectionFactoryImpl.java:501)
at org.datanucleus.store.rdbms.RDBMSStoreManager.<init>(RDBMSStoreManager.java:298)

解决:

1
DriverRegistry.register("com.mysql.jdbc.Driver")