Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,13 @@ case class FileSourceScanExec(
}

@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
val startTime = System.nanoTime()
val ret = relation.location.listFiles(partitionFilters, dataFilters)
val timeTaken = (System.nanoTime() - startTime) / 1000 / 1000
val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000

metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
metrics("metadataTime").add(timeTaken)
metrics("metadataTime").add(timeTakenMs)

val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class CatalogFileIndex(
*/
def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = {
if (table.partitionColumnNames.nonEmpty) {
val startTime = System.nanoTime()
val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
table.identifier, filters)
val partitions = selectedPartitions.map { p =>
Expand All @@ -79,8 +80,9 @@ class CatalogFileIndex(
path.makeQualified(fs.getUri, fs.getWorkingDirectory))
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
val timeNs = System.nanoTime() - startTime
new PrunedInMemoryFileIndex(
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs))
} else {
new InMemoryFileIndex(
sparkSession, rootPaths, table.storage.properties, partitionSchema = None)
Expand Down Expand Up @@ -111,7 +113,8 @@ private class PrunedInMemoryFileIndex(
sparkSession: SparkSession,
tableBasePath: Path,
fileStatusCache: FileStatusCache,
override val partitionSpec: PartitionSpec)
override val partitionSpec: PartitionSpec,
override val metadataOpsTimeNs: Option[Long])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add param doc, as it's not immediately obvious what a user is supposed to supply here.
I'd say something like "time it took to obtain the partitionSpec from the Hive metastore", but maybe that's too specific..

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually includes more than that. We do file listing as part of that ...

extends InMemoryFileIndex(
sparkSession,
partitionSpec.partitions.map(_.path),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,14 @@ trait FileIndex {

/** Schema of the partitioning columns, or the empty schema if the table is not partitioned. */
def partitionSchema: StructType

/**
* Returns an optional metadata operation time, in nanoseconds, for listing files.
*
* We do file listing in query optimization (in order to get the proper statistics) and we want
* to account for file listing time in physical execution (as metrics). To do that, we save the
* file listing time in some implementations and physical execution calls it in this method
* to update the metrics.
*/
def metadataOpsTimeNs: Option[Long] = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's hard to define the semantic of this method for general FileIndex, as everytime we call listFiles, the value of this method should be updated.

how about we only put this method in PrunedInMemoryFileIndex?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that but there is no API level guarantee that we'd get PrunedInMemoryFileIndex after partition pruning. It is more just a current implementation detail. I'd rather have something that's more specified in the API.

}