Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ case class OrcScan(
dataFilters: Seq[Expression] = Seq.empty) extends FileScan {
override def isSplitable(path: Path): Boolean = {
// If aggregate is pushed down, only the file footer will be read once,
// so file should be not split across multiple tasks.
// so file should not be split across multiple tasks.
pushedAggregate.isEmpty
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,6 @@ case class ParquetPartitionReaderFactory(
ParquetFooterReader.readFooter(conf, filePath, SKIP_ROW_GROUPS)
} else {
// For aggregate push down, we will get max/min/count from footer statistics.
// We want to read the footer for the whole file instead of reading multiple
// footers for every split of the file. Basically if the start (the beginning of)
// the offset in PartitionedFile is 0, we will read the footer. Otherwise, it means
// that we have already read footer for that file, so we will skip reading again.
if (file.start != 0) return null
Copy link
Contributor Author

@c21 c21 Nov 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question on: Existing unit test in FileSourceAggregatePushDownSuite.scala. How did the existing tests pass before this PR?

@HyukjinKwon - I think we are in the same page based on your latest comment, but just to be noisy here in case anything is missing. Before this PR, when a single file is split into multiple splits across multiple tasks, we have the logic here to only process the split of file if file.start == 0, so only the first split of file will be processed, and every file is processed only once. So here is the trick. Before this PR, the logic for Parquet aggregate push down was still correct. We want to avoid unnecessary file splitting so update logic in this PR here.

ParquetFooterReader.readFooter(conf, filePath, NO_FILTER)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ case class ParquetScan(
pushedAggregate: Option[Aggregation] = None,
partitionFilters: Seq[Expression] = Seq.empty,
dataFilters: Seq[Expression] = Seq.empty) extends FileScan {
override def isSplitable(path: Path): Boolean = true
override def isSplitable(path: Path): Boolean = {
// If aggregate is pushed down, only the file footer will be read once,
// so file should not be split across multiple tasks.
pushedAggregate.isEmpty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, okay. Got it now.

}

override def readSchema(): StructType = {
// If aggregate is pushed down, schema has already been pruned in `ParquetScanBuilder`
Expand Down