Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -345,39 +345,59 @@ private[sql] class ParquetRelation2(
// Schema of the whole table, including partition columns.
var schema: StructType = _

// Cached leaves
val cachedLeaves: mutable.Map[Path, FileStatus] = new mutable.HashMap[Path, FileStatus]()

/**
* Refreshes `FileStatus`es, footers, partition spec, and table schema.
*/
def refresh(): Unit = {
// Lists `FileStatus`es of all leaf nodes (files) under all base directories.
// We only care about new FileStatus and updated FileStatus.
val leaves = cachedLeafStatuses().filter { f =>
isSummaryFile(f.getPath) ||
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
(isSummaryFile(f.getPath) ||
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))) &&
(!cachedLeaves.contains(f.getPath) ||
cachedLeaves(f.getPath).getModificationTime < f.getModificationTime)
}.map { f =>
cachedLeaves += (f.getPath -> f)
f
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately there's still a bug here :) We also need to remove those files that only exist in the old cache (namely removed files). This happens when an existing directory is overwritten.

I think we can first keep both the old cache and the result of cachedLeafStatuses(), then filter out updated and new files, and at last update the old FileStatus cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you are trying to only keep FileStatuses of those files that need to be touched during schema merging here. Actually the FileStatus cache must be consistent with the files stored on the file system. Because we also inject the cache to ParquetInputFormat in ParquetRelation2.buildScan to avoid calling listStatus repeatedly there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I am thinking, if we only read the footers of the updated and newly added files, the merged schema may be incorrect? For the removed files, it is the same situation. If we don't re-merge all footers' schema, the schema should not be correct.

So this pr should check if we need to re-read all footers and merge schema based on whether the FileStatuses are updated or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. After reading footers and merging schemas of of new files and updated files, we also need to merge the result schema with the old schema, because some columns may be missing in new files and/or updated files.

Actually I found it might be difficult to define the "correctness" of the merged schema. Take the following scenario as an example:

  1. Initially there is file f0, which comes with a single column c0.

    Merged schema: c0

  2. File f1 is added, which contains a single conlumn c1

    Merged schema: c0, c1

  3. Removing f0

    Which is the "correct" merged schema now?

    a. c0, c1
    b. c1

    I tend to use (a), because removing existing columns can be dangerous, and may confuse down stream systems. But currently Spark SQL uses (b). Also, we need to take metastore schema into account for Parquet relations converted from metastore Parquet tables.

I think this issue is too complicated to be fixed in this PR. I agree with you that we should keep this PR simple and just re-read all the footers for now. It's already strictly better than the current implementation, not mention that schema merging has been significantly accelerated by #7396.

}.toArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is irrelevant to this PR, but I'd like to point out that this check is unnecessary now. IIRC, at the time PR #6012 was merged, ParquetRelation2 still needed to be serialized to executor side, thus avoiding schema merging on executor side makes sense. However, after migrating to HadoopFsRelation, ParquetRelation2 won't be serialized anymore. You may see that ParquetRelation2 is no longer a case class and doesn't even extend Serializable now. I made this change intentionally to make sure ParquetRelation2 is never serialized.


dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
commonMetadataStatuses =
leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)

// If we already get the schema, don't need to re-compute it since the schema merging is
// time-consuming.
if (dataSchema == null) {
dataSchema = {
val dataSchema0 = maybeDataSchema
.orElse(readSchema())
.orElse(maybeMetastoreSchema)
.getOrElse(throw new AnalysisException(
s"Failed to discover schema of Parquet file(s) in the following location(s):\n" +
paths.mkString("\n\t")))

// If this Parquet relation is converted from a Hive Metastore table, must reconcile case
// case insensitivity issue and possible schema mismatch (probably caused by schema
// evolution).
maybeMetastoreSchema
.map(ParquetRelation2.mergeMetastoreParquetSchema(_, dataSchema0))
.getOrElse(dataSchema0)
}
dataSchema = {
val dataSchema0 = maybeDataSchema
.orElse(readSchema())
.orElse(maybeMetastoreSchema)

// If we already have previous loaded schema, merge it with newly loaded schema.
val dataSchema1 =
if (dataSchema != null) {
if (dataSchema0.isDefined) {
dataSchema0.get.merge(dataSchema)
} else {
dataSchema
}
} else {
if (dataSchema0.isDefined) {
dataSchema0.get
} else {
throw new AnalysisException(
s"Failed to discover schema of Parquet file(s) in the following location(s):\n" +
paths.mkString("\n\t"))
}
}

// If this Parquet relation is converted from a Hive Metastore table, must reconcile case
// case insensitivity issue and possible schema mismatch (probably caused by schema
// evolution).
maybeMetastoreSchema
.map(ParquetRelation2.mergeMetastoreParquetSchema(_, dataSchema1))
.getOrElse(dataSchema1)
}
}

Expand Down Expand Up @@ -426,11 +446,15 @@ private[sql] class ParquetRelation2(
}

assert(
filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined,
"No predefined schema found, " +
s"and no Parquet data files or summary files found under ${paths.mkString(", ")}.")
filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined ||
dataSchema != null, "No predefined schema found, " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataSchema == null?

s"and no Parquet data files or summary files found under ${paths.mkString(", ")}.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert these two indentation changes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks.


ParquetRelation2.mergeSchemasInParallel(filesToTouch, sqlContext)
if (filesToTouch.nonEmpty) {
ParquetRelation2.mergeSchemasInParallel(filesToTouch, sqlContext)
} else {
None
}
}
}
}
Expand Down