Spark HiveMetastoreCatalog Infer Schema

Spark HiveMetastoreCatalog Infer Schema

问题一

说明

报错显示读取 Parquet footer 错误,不过所读取的文件不是查询条件指定的分区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
......
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:633)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:241)
at org.apache.spark.sql.hive.HiveMetastoreCatalog.org$apache$spark$sql$hive$HiveMetastoreCatalog$$inferIfNeeded(HiveMetastoreCatalog.scala:239)
at org.apache.spark.sql.hive.HiveMetastoreCatalog$$anonfun$4$$anonfun$5.apply(HiveMetastoreCatalog.scala:167)
at org.apache.spark.sql.hive.HiveMetastoreCatalog$$anonfun$4$$anonfun$5.apply(HiveMetastoreCatalog.scala:156)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.hive.HiveMetastoreCatalog$$anonfun$4.apply(HiveMetastoreCatalog.scala:156)
at org.apache.spark.sql.hive.HiveMetastoreCatalog$$anonfun$4.apply(HiveMetastoreCatalog.scala:148)
at org.apache.spark.sql.hive.HiveMetastoreCatalog.withTableCreationLock(HiveMetastoreCatalog.scala:54)
at org.apache.spark.sql.hive.HiveMetastoreCatalog.convertToLogicalRelation(HiveMetastoreCatalog.scala:148)
at org.apache.spark.sql.hive.RelationConversions.org$apache$spark$sql$hive$RelationConversions$$convert(HiveStrategies.scala:207)
at org.apache.spark.sql.hive.RelationConversions$$anonfun$apply$4.applyOrElse(HiveStrategies.scala:239)
at org.apache.spark.sql.hive.RelationConversions$$anonfun$apply$4.applyOrElse(HiveStrategies.scala:228)
at ......
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:290)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:538)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:611)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:603)
at ......
Caused by: java.io.IOException: Could not read footer for file: FileStatus{path=hdfs://hadoop-bdwg-ns01/hive/warehouse/cupid_bi.db/report_qixiao_tracking_event_count_daily/dt=2016-05-24/000000_0.gz; isDirectory=false; length=1191537; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:551)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:538)
at ......
Caused by: java.lang.RuntimeException: hdfs://hadoop-bdwg-ns01/hive/warehouse/cupid_bi.db/report_qixiao_tracking_event_count_daily/dt=2016-05-24/000000_0.gz is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [78, 59, 23, 1]
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:524)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:476)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:544)
... 9 more

排查

关键信息:

1
2
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:241)
org.apache.spark.sql.hive.HiveMetastoreCatalog.org$apache$spark$sql$hive$HiveMetastoreCatalog$$inferIfNeeded(HiveMetastoreCatalog.scala:239)

通过对错误栈进行代码分析,锁定 ParquetFileFormat.inferSchema 和 HiveMetastoreCatalog.inferIfNeeded 两个方法。发现错误是发生在 Parquet 文件的 Schema 推断中。

问题二

说明

Driver 连接不上,像是卡住了,然后就直接退出,查询Application日志没有错误日志,查看Executor 的日志显示 java.io.IOException: Connection reset by peer

1
Application application_1574672087080_11986014 failed 1 times due to ApplicationMaster for attempt appattempt_1574672087080_11986014_000001 timed out. Failing the application.

排查

这个错误没有什么关键的错误信息,一般看到 Connection reset by peer(连接被重置)错误和 timed out 错误,想到调整超时时间,设置参数: spark.network.timeout=1200s,不过发现并没有用,还没有达到此时间就报错了。

查看 ApplicationMaster 所在的机器,对 ApplicationMaster(Driver) 的线程栈进行分析,jstack 打印线程栈信息,发现关键信息如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
"Driver" #38 prio=5 os_prio=0 tid=0x00002b504f82a000 nid=0x78b0 runnable [0x00002b50809a9000]
java.lang.Thread.State: RUNNABLE
at java.lang.String.indexOf(String.java:1769)
at java.lang.String.indexOf(String.java:1718)
at org.apache.commons.lang.StringUtils.replace(StringUtils.java:3807)
at org.apache.commons.lang.StringUtils.replace(StringUtils.java:3771)
at org.apache.hadoop.fs.Path.normalizePath(Path.java:240)
at org.apache.hadoop.fs.Path.initialize(Path.java:203)
at org.apache.hadoop.fs.Path.<init>(Path.java:172)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$bulkListLeafFiles$3$$anonfun$7.apply(InMemoryFileIndex.scala:251)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$bulkListLeafFiles$3$$anonfun$7.apply(InMemoryFileIndex.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$bulkListLeafFiles$3.apply(InMemoryFileIndex.scala:244)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$bulkListLeafFiles$3.apply(InMemoryFileIndex.scala:243)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:243)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:126)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:91)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:67)
at org.apache.spark.sql.execution.datasources.PrunedInMemoryFileIndex.<init>(CatalogFileIndex.scala:118)
at org.apache.spark.sql.execution.datasources.CatalogFileIndex.filterPartitions(CatalogFileIndex.scala:84)
at org.apache.spark.sql.execution.datasources.CatalogFileIndex.listFiles(CatalogFileIndex.scala:59)
at org.apache.spark.sql.hive.HiveMetastoreCatalog.org$apache$spark$sql$hive$HiveMetastoreCatalog$$inferIfNeeded(HiveMetastoreCatalog.scala:242)
at org.apache.spark.sql.hive.HiveMetastoreCatalog$$anonfun$4$$anonfun$5.apply(HiveMetastoreCatalog.scala:167)
at org.apache.spark.sql.hive.HiveMetastoreCatalog$$anonfun$4$$anonfun$5.apply(HiveMetastoreCatalog.scala:156)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.hive.HiveMetastoreCatalog$$anonfun$4.apply(HiveMetastoreCatalog.scala:156)
at org.apache.spark.sql.hive.HiveMetastoreCatalog$$anonfun$4.apply(HiveMetastoreCatalog.scala:148)
at org.apache.spark.sql.hive.HiveMetastoreCatalog.withTableCreationLock(HiveMetastoreCatalog.scala:54)
at org.apache.spark.sql.hive.HiveMetastoreCatalog.convertToLogicalRelation(HiveMetastoreCatalog.scala:148)
at org.apache.spark.sql.hive.RelationConversions.org$apache$spark$sql$hive$RelationConversions$$convert(HiveStrategies.scala:207)
at ......

根据问题一中的经验,查询 InMemoryFileIndex 日志,发现 InMemoryFileIndex 扫描两万多目录。

1
InMemoryFileIndex: Listing leaf files and directories in parallel under: hdfs://.....

问题解决

查看 org.apache.spark.sql.hive.HiveMetastoreCatalog#inferIfNeeded 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private def inferIfNeeded(
relation: HiveTableRelation,
options: Map[String, String],
fileFormat: FileFormat,
fileIndexOpt: Option[FileIndex] = None): CatalogTable = {
val inferenceMode = sparkSession.sessionState.conf.caseSensitiveInferenceMode
val shouldInfer = (inferenceMode != NEVER_INFER) && !relation.tableMeta.schemaPreservesCase
val tableName = relation.tableMeta.identifier.unquotedString
if (shouldInfer) {
logInfo(s"Inferring case-sensitive schema for table $tableName (inference mode: " +
s"$inferenceMode)")
val fileIndex = fileIndexOpt.getOrElse {
val rootPath = new Path(relation.tableMeta.location)
new InMemoryFileIndex(sparkSession, Seq(rootPath), options, None)
}

val inferredSchema = fileFormat
.inferSchema(
sparkSession,
options,
fileIndex.listFiles(Nil, Nil).flatMap(_.files))
.map(mergeWithMetastoreSchema(relation.tableMeta.dataSchema, _))

inferredSchema match {
case Some(dataSchema) =>
if (inferenceMode == INFER_AND_SAVE) {
updateDataSchema(relation.tableMeta.identifier, dataSchema)
}
val newSchema = StructType(dataSchema ++ relation.tableMeta.partitionSchema)
relation.tableMeta.copy(schema = newSchema)
case None =>
logWarning(s"Unable to infer schema for table $tableName from file format " +
s"$fileFormat (inference mode: $inferenceMode). Using metastore schema.")
relation.tableMeta
}
} else {
relation.tableMeta
}
}

shouldInfer 变量为 true 时会进行 Schema 推断,那么如何设置让它不进行推断呢?有下面两种情况:

  1. 设置 spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER
  2. schemaPreservesCase 为 true 时也跳过 infer schema,需要 Hive 表有 spark.sql.sources.schema.* 相关配置,并且 schema 和 table.schema 相等(相关判断在org.apache.spark.sql.hive.HiveExternalCatalog#restoreHiveSerdeTable里面)。

后面选择设置 spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER,关闭 Schema 推断解决问题。在 Spark3.0 中已经将此配置默认设置为 NEVER_INFER。

相关说明

  1. 为何有 Schema 推断
    由于 Hive Schema 是不区分大小写,Parquet 文件的 Schema 是区分大小写的,读取有大小名称的 Parquet 文件时可能会导致结果有问题。

  2. caseSensitiveInferenceMode 的三种模式说明
    caseSensitiveInferenceMode 有三种模式
    INFER_AND_SAVE:此模式会在第一次进行Schema推断,然后保持到Hive表的properties里面(spark.sql.sources.schema.*)。
    INFER_ONLY:进行推断,不会保持
    NEVER_INFER:不进行推断