diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala index 6b9d181a7f4c0..baf307257c3b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala @@ -44,7 +44,7 @@ case class OrcScan( dataFilters: Seq[Expression] = Seq.empty) extends FileScan { override def isSplitable(path: Path): Boolean = { // If aggregate is pushed down, only the file footer will be read once, - // so file should be not split across multiple tasks. + // so file should not be split across multiple tasks. pushedAggregate.isEmpty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 6f021ff2e97f5..516d8cd845443 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -92,11 +92,6 @@ case class ParquetPartitionReaderFactory( ParquetFooterReader.readFooter(conf, filePath, SKIP_ROW_GROUPS) } else { // For aggregate push down, we will get max/min/count from footer statistics. - // We want to read the footer for the whole file instead of reading multiple - // footers for every split of the file. Basically if the start (the beginning of) - // the offset in PartitionedFile is 0, we will read the footer. Otherwise, it means - // that we have already read footer for that file, so we will skip reading again. - if (file.start != 0) return null ParquetFooterReader.readFooter(conf, filePath, NO_FILTER) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala index b92ed82190ae8..617faad8ab6d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala @@ -47,7 +47,11 @@ case class ParquetScan( pushedAggregate: Option[Aggregation] = None, partitionFilters: Seq[Expression] = Seq.empty, dataFilters: Seq[Expression] = Seq.empty) extends FileScan { - override def isSplitable(path: Path): Boolean = true + override def isSplitable(path: Path): Boolean = { + // If aggregate is pushed down, only the file footer will be read once, + // so file should not be split across multiple tasks. + pushedAggregate.isEmpty + } override def readSchema(): StructType = { // If aggregate is pushed down, schema has already been pruned in `ParquetScanBuilder`