Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -342,15 +342,35 @@ private[sql] class ParquetRelation2(
// Schema of the whole table, including partition columns.
var schema: StructType = _

// Cached leaf statuses
var localCachedLeafStatuses: Set[FileStatus] = _

var lastRefreshTime: Long = 0

// Cached leaves
var cachedLeaves: Array[FileStatus] = Array()

/**
* Refreshes `FileStatus`es, footers, partition spec, and table schema.
*/
def refresh(): Unit = {
// Check if cachedLeafStatuses is changed or not
val leafStatusesChanged = localCachedLeafStatuses != cachedLeafStatuses()

// Lists `FileStatus`es of all leaf nodes (files) under all base directories.
val leaves = cachedLeafStatuses().filter { f =>
isSummaryFile(f.getPath) ||
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
}.toArray
val leaves = if (leafStatusesChanged) {
localCachedLeafStatuses = cachedLeafStatuses()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do a deep copy here? Currently it's OK because cachedLeafStatuses() always returns a new instance of Set[FileStatus], but it's possible that we use a mutable set object in the future. In that case, the != predicate above will always be true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is irrelevant to this PR, but I'd like to point out that this check is unnecessary now. IIRC, at the time PR #6012 was merged, ParquetRelation2 still needed to be serialized to executor side, thus avoiding schema merging on executor side makes sense. However, after migrating to HadoopFsRelation, ParquetRelation2 won't be serialized anymore. You may see that ParquetRelation2 is no longer a case class and doesn't even extend Serializable now. I made this change intentionally to make sure ParquetRelation2 is never serialized.

val updatedLeaves = cachedLeafStatuses().filter { f =>
(isSummaryFile(f.getPath) ||
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))) &&
(f.getModificationTime > lastRefreshTime)
}.toArray
lastRefreshTime = System.currentTimeMillis
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using System.currentTimeMillis is not OK, because clocks on different nodes may differ.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We care about two kinds of files here:

  • New files: files don't exist in the old cache.
  • Files got updated: files exist in the old cache but have different modification time from their old copy.

cachedLeaves = updatedLeaves
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't drop FileStatuses of non-updated leaves here.

cachedLeaves
} else {
cachedLeaves
}

dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
Expand Down