diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index dca421a09da62..c6d18aafb0c2f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2744,6 +2744,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val BUCKET_SORTED_SCAN_ENABLED = + buildConf("spark.sql.sources.bucketing.sortedScan.enabled") + .doc("When true, the bucketed table scan will respect table sort columns, " + + "and read multiple sorted files per bucket in a sort-merge way to preserve ordering. " + + "Note: tasks might have more memory footprint and OOM with vectorized reader, " + + "because multiple rows or columnar batches from different files will be read at same time.") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + /** * Holds information about keys that have been deprecated. * @@ -3364,6 +3374,8 @@ class SQLConf extends Serializable with Logging { def truncateTrashEnabled: Boolean = getConf(SQLConf.TRUNCATE_TRASH_ENABLED) + def bucketSortedScanEnabled: Boolean = getConf(SQLConf.BUCKET_SORTED_SCAN_ENABLED) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 2e7b6fe5f923d..6451d63e0a300 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString @@ -172,9 +173,20 @@ case class FileSourceScanExec( // Note that some vals referring the file-based relation are lazy intentionally // so that this plan can be canonicalized on executor side too. See SPARK-23731. override lazy val supportsColumnar: Boolean = { - relation.fileFormat.supportBatch(relation.sparkSession, schema) + relation.fileFormat.supportBatch(relation.sparkSession, schema) && + scanMode == BatchMode } + private lazy val scanMode: ScanMode = + if (conf.bucketSortedScanEnabled && outputOrdering.nonEmpty) { + val sortOrdering = new LazilyGeneratedOrdering(outputOrdering, output) + SortedBucketMode(sortOrdering) + } else if (relation.fileFormat.supportBatch(relation.sparkSession, schema)) { + BatchMode + } else { + RowMode + } + private lazy val needsUnsafeRowConversion: Boolean = { if (relation.fileFormat.isInstanceOf[ParquetSource]) { SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled @@ -301,29 +313,34 @@ case class FileSourceScanExec( sortColumns.nonEmpty && !hasPartitionsAvailableAtRunTime - val sortOrder = if (shouldCalculateSortOrder) { - // In case of bucketing, its possible to have multiple files belonging to the - // same bucket in a given relation. Each of these files are locally sorted - // but those files combined together are not globally sorted. Given that, - // the RDD partition will not be sorted even if the relation has sort columns set - // Current solution is to check if all the buckets have a single file in it - - val files = selectedPartitions.flatMap(partition => partition.files) - val bucketToFilesGrouping = - files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) - val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) - - // TODO SPARK-24528 Sort order is currently ignored if buckets are coalesced. - if (singleFilePartitions && optionalNumCoalescedBuckets.isEmpty) { - // TODO Currently Spark does not support writing columns sorting in descending order - // so using Ascending order. This can be fixed in future + // In case of bucketing, its possible to have multiple files belonging to the + // same bucket in a given relation. Each of these files are locally sorted + // but those files combined together are not globally sorted. Given that, + // the RDD partition will not be sorted even if the relation has sort columns set. + // + // 1. With configuration "spark.sql.sources.bucketing.sortedScan.enabled" being enabled, + // output ordering is preserved by reading those sorted files in sort-merge way. + // + // 2. With configuration "spark.sql.legacy.bucketedTableScan.outputOrdering" being enabled, + // output ordering is preserved if each bucket has no more than one file. + val sortOrder = if (conf.bucketSortedScanEnabled) { sortColumns.map(attribute => SortOrder(attribute, Ascending)) + } else if (shouldCalculateSortOrder) { + val files = selectedPartitions.flatMap(partition => partition.files) + val bucketToFilesGrouping = + files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) + val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) + + if (singleFilePartitions && optionalNumCoalescedBuckets.isEmpty) { + // TODO Currently Spark does not support writing columns sorting in descending order + // so using Ascending order. This can be fixed in future + sortColumns.map(attribute => SortOrder(attribute, Ascending)) + } else { + Nil + } } else { Nil } - } else { - Nil - } (partitioning, sortOrder) } else { (UnknownPartitioning(0), Nil) @@ -563,7 +580,7 @@ case class FileSourceScanExec( } } - new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) + new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions, scanMode) } /** @@ -604,7 +621,7 @@ case class FileSourceScanExec( val partitions = FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes) - new FileScanRDD(fsRelation.sparkSession, readFile, partitions) + new FileScanRDD(fsRelation.sparkSession, readFile, partitions, scanMode) } // Filters unused DynamicPruningExpression expressions - one which has been replaced diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanIterators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanIterators.scala new file mode 100644 index 0000000000000..96865a7dc3450 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanIterators.scala @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.{FileNotFoundException, IOException} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.parquet.io.ParquetDecodingException + +import org.apache.spark.{Partition => RDDPartition, SparkUpgradeException, TaskContext} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.executor.InputMetrics +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.InputFileBlockHolder +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.{QueryExecutionException, RowIterator} +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.NextIterator + +/** + * Holds common logic for iterators to scan files + */ +abstract class BaseFileScanIterator( + split: RDDPartition, + context: TaskContext, + ignoreCorruptFiles: Boolean, + ignoreMissingFiles: Boolean, + readFunction: PartitionedFile => Iterator[InternalRow]) + extends Iterator[Object] + with AutoCloseable + with Logging { + + protected val inputMetrics: InputMetrics = context.taskMetrics().inputMetrics + private val existingBytesRead = inputMetrics.bytesRead + + // Find a function that will return the FileSystem bytes read by this thread. Do this before + // apply readFunction, because it might read some bytes. + private val getBytesReadCallback = + SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() + + // We get our input bytes from thread-local Hadoop FileSystem statistics. + // If we do a coalesce, however, we are likely to compute multiple partitions in the same + // task and in the same thread, in which case we need to avoid override values written by + // previous partitions (SPARK-13071). + protected def incTaskInputMetricsBytesRead(): Unit = { + inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback()) + } + + private[this] val files = split.asInstanceOf[FilePartition].files.toIterator + protected[this] var currentFile: PartitionedFile = null + protected[this] var currentIterator: Iterator[Object] = null + + override def hasNext: Boolean = { + // Kill the task in case it has been marked as killed. This logic is from + // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order + // to avoid performance overhead. + context.killTaskIfInterrupted() + (currentIterator != null && currentIterator.hasNext) || nextIterator() + } + + override def next(): Object + + private def readFile(file: PartitionedFile): Iterator[InternalRow] = { + try { + readFunction(file) + } catch { + case e: FileNotFoundException => + throw new FileNotFoundException( + e.getMessage + "\n" + + "It is possible the underlying files have been updated. " + + "You can explicitly invalidate the cache in Spark by " + + "running 'REFRESH TABLE tableName' command in SQL or " + + "by recreating the Dataset/DataFrame involved.") + } + } + + /** Advances to the next file. Returns true if a new non-empty iterator is available. */ + protected def nextIterator(): Boolean = { + if (files.hasNext) { + currentFile = files.next() + logInfo(s"Reading File $currentFile") + // Sets InputFileBlockHolder for the file block's information + InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) + + if (ignoreMissingFiles || ignoreCorruptFiles) { + currentIterator = new NextIterator[Object] { + private val file = currentFile + // The readFunction may read some bytes before consuming the iterator, e.g., + // vectorized Parquet reader. Here we use lazy val to delay the creation of + // iterator so that we will throw exception in `getNext`. + private lazy val internalIter = readFile(file) + + override def getNext(): AnyRef = { + try { + if (internalIter.hasNext) { + internalIter.next() + } else { + finished = true + null + } + } catch { + case e: FileNotFoundException if ignoreMissingFiles => + logWarning(s"Skipped missing file: $currentFile", e) + finished = true + null + // Throw FileNotFoundException even if `ignoreCorruptFiles` is true + case e: FileNotFoundException if !ignoreMissingFiles => throw e + case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => + logWarning( + s"Skipped the rest of the content in the corrupted file: $currentFile", e) + finished = true + null + } + } + + override def close(): Unit = {} + } + } else { + currentIterator = readFile(currentFile) + } + + try { + context.killTaskIfInterrupted() + (currentIterator != null && currentIterator.hasNext) || nextIterator() + } catch { + case e: SchemaColumnConvertNotSupportedException => + val message = "Parquet column cannot be converted in " + + s"file ${currentFile.filePath}. Column: ${e.getColumn}, " + + s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}" + throw new QueryExecutionException(message, e) + case e: ParquetDecodingException => + if (e.getCause.isInstanceOf[SparkUpgradeException]) { + throw e.getCause + } else if (e.getMessage.contains("Can not read value at")) { + val message = "Encounter error while reading parquet files. " + + "One possible cause: Parquet column cannot be converted in the " + + "corresponding files. Details: " + throw new QueryExecutionException(message, e) + } + throw e + } + } else { + currentFile = null + InputFileBlockHolder.unset() + false + } + } + + override def close(): Unit = { + incTaskInputMetricsBytesRead() + InputFileBlockHolder.unset() + } +} + +/** + * Iterator to scan files row by row + */ +class FileRowScanIterator( + split: RDDPartition, + context: TaskContext, + ignoreCorruptFiles: Boolean, + ignoreMissingFiles: Boolean, + readFunction: PartitionedFile => Iterator[InternalRow]) + extends BaseFileScanIterator(split, context, ignoreCorruptFiles, ignoreMissingFiles, + readFunction) { + + override def next(): Object = { + val nextRow = currentIterator.next() + + // Too costly to update every record + if (inputMetrics.recordsRead % + SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { + incTaskInputMetricsBytesRead() + } + inputMetrics.incRecordsRead(1) + nextRow + } +} + +/** + * Iterator to scan files batch by batch + */ +class FileBatchScanIterator( + split: RDDPartition, + context: TaskContext, + ignoreCorruptFiles: Boolean, + ignoreMissingFiles: Boolean, + readFunction: PartitionedFile => Iterator[InternalRow]) + extends BaseFileScanIterator(split, context, ignoreCorruptFiles, ignoreMissingFiles, + readFunction) { + + override def next(): Object = { + val nextBatch = currentIterator.next() + incTaskInputMetricsBytesRead() + inputMetrics.incRecordsRead(nextBatch.asInstanceOf[ColumnarBatch].numRows()) + nextBatch + } +} + +/** + * Iterator to scan files all together at the same time, + * and read row by row based on `sortOrdering` in sort-merge way. + * It uses standard scala priority queue to decide read order. + * This iterator is used for reading sorted bucketed table only. + * + * @param sortOrdering The order to read rows in multiple files + */ +class FileSortedBucketScanIterator( + split: RDDPartition, + context: TaskContext, + ignoreCorruptFiles: Boolean, + ignoreMissingFiles: Boolean, + readFunction: PartitionedFile => Iterator[InternalRow], + sortOrdering: Ordering[InternalRow]) + extends BaseFileScanIterator(split, context, ignoreCorruptFiles, ignoreMissingFiles, + readFunction) { + + // The priority queue to keep the latest row from each file + private val rowHeap = new mutable.PriorityQueue[IteratorWithRow]()( + // Reverse the order as priority queue de-queues the highest priority one + Ordering.by[IteratorWithRow, InternalRow](_.getRow)(sortOrdering).reverse) + private var heapInitialized: Boolean = false + protected var currentIteratorWithRow: IteratorWithRow = _ + + override def hasNext: Boolean = { + // Kill the task in case it has been marked as killed. This logic is from + // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order + // to avoid performance overhead. This is the same as `BaseFileScanIterator.hasNext()`. + context.killTaskIfInterrupted() + + if (!heapInitialized) { + initializeHeapWithFirstRows() + heapInitialized = true + } + rowHeap.nonEmpty + } + + override def next(): Object = { + currentIteratorWithRow = rowHeap.dequeue() + + // Make a copy of row because we need to enqueue next row (if there any) to the heap + val nextRow = currentIteratorWithRow.getRow.copy() + if (currentIteratorWithRow.advanceNext()) { + rowHeap.enqueue(currentIteratorWithRow) + } + + // Too costly to update every record + if (inputMetrics.recordsRead % + SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { + incTaskInputMetricsBytesRead() + } + inputMetrics.incRecordsRead(1) + + // Set InputFileBlockHolder for the file block's information + currentFile = currentIteratorWithRow.getFile + InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) + + nextRow + } + + private def initializeHeapWithFirstRows(): Unit = { + while (nextIterator()) { + require(currentIterator != null && currentFile != null, + "currentIterator and currentFile should not be null if nextIterator() returns true") + // In case of columnar batch, read row by row in one batch + val convertedIter: Iterator[InternalRow] = currentIterator.flatMap { + case batch: ColumnarBatch => batch.rowIterator().asScala + case row => Iterator.single(row.asInstanceOf[InternalRow]) + } + currentIteratorWithRow = new IteratorWithRow(convertedIter, currentFile) + if (currentIteratorWithRow.advanceNext()) { + rowHeap.enqueue(currentIteratorWithRow) + } + } + } +} + +/** + * A wrapper for iterator, its file and its current latest row. + * Designed to be instantiated once per each file in one thread, and reused. + */ +private[execution] class IteratorWithRow( + iterator: Iterator[InternalRow], + file: PartitionedFile) extends RowIterator { + private var row: InternalRow = _ + + override def advanceNext(): Boolean = { + if (iterator.hasNext) { + row = iterator.next() + true + } else { + row = null + false + } + } + + override def getRow: InternalRow = row + + def getFile: PartitionedFile = file +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index fc59336d6107c..62e95ceaae00a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -17,18 +17,10 @@ package org.apache.spark.sql.execution.datasources -import java.io.{FileNotFoundException, IOException} - -import org.apache.parquet.io.ParquetDecodingException - -import org.apache.spark.{Partition => RDDPartition, SparkUpgradeException, TaskContext} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.rdd.{InputFileBlockHolder, RDD} +import org.apache.spark.{Partition => RDDPartition, TaskContext} +import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.QueryExecutionException -import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.NextIterator /** * A part (i.e. "block") of a single file that should be read, along with partition column values @@ -51,154 +43,53 @@ case class PartitionedFile( } } +/** + * The mode for scanning a list of file partitions. + * + * [[RowMode]]: Scan files sequentially one by one, and scan each file row by row. + * [[BatchMode]]: Scan files sequentially one by one, and scan each file batch by batch. + * [[SortedBucketMode]]: Scan files together at same time in a sort-merge way + * (for sorted bucket files only). + */ +sealed abstract class ScanMode + +case object RowMode extends ScanMode + +case object BatchMode extends ScanMode + +case class SortedBucketMode(sortOrdering: Ordering[InternalRow]) extends ScanMode { + override def toString: String = "SortedBucketMode" +} + /** * An RDD that scans a list of file partitions. + * + * @param scanMode the mode for scanning files. Based on scan mode, the corresponding + * subclass of [[BaseFileScanIterator]] will be used to scan files. */ class FileScanRDD( @transient private val sparkSession: SparkSession, readFunction: (PartitionedFile) => Iterator[InternalRow], - @transient val filePartitions: Seq[FilePartition]) + @transient val filePartitions: Seq[FilePartition], + val scanMode: ScanMode) extends RDD[InternalRow](sparkSession.sparkContext, Nil) { private val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles private val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = { - val iterator = new Iterator[Object] with AutoCloseable { - private val inputMetrics = context.taskMetrics().inputMetrics - private val existingBytesRead = inputMetrics.bytesRead - - // Find a function that will return the FileSystem bytes read by this thread. Do this before - // apply readFunction, because it might read some bytes. - private val getBytesReadCallback = - SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() - - // We get our input bytes from thread-local Hadoop FileSystem statistics. - // If we do a coalesce, however, we are likely to compute multiple partitions in the same - // task and in the same thread, in which case we need to avoid override values written by - // previous partitions (SPARK-13071). - private def incTaskInputMetricsBytesRead(): Unit = { - inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback()) - } - - private[this] val files = split.asInstanceOf[FilePartition].files.toIterator - private[this] var currentFile: PartitionedFile = null - private[this] var currentIterator: Iterator[Object] = null - - def hasNext: Boolean = { - // Kill the task in case it has been marked as killed. This logic is from - // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order - // to avoid performance overhead. - context.killTaskIfInterrupted() - (currentIterator != null && currentIterator.hasNext) || nextIterator() - } - def next(): Object = { - val nextElement = currentIterator.next() - // TODO: we should have a better separation of row based and batch based scan, so that we - // don't need to run this `if` for every record. - val preNumRecordsRead = inputMetrics.recordsRead - if (nextElement.isInstanceOf[ColumnarBatch]) { - incTaskInputMetricsBytesRead() - inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows()) - } else { - // too costly to update every record - if (inputMetrics.recordsRead % - SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { - incTaskInputMetricsBytesRead() - } - inputMetrics.incRecordsRead(1) - } - nextElement - } - - private def readCurrentFile(): Iterator[InternalRow] = { - try { - readFunction(currentFile) - } catch { - case e: FileNotFoundException => - throw new FileNotFoundException( - e.getMessage + "\n" + - "It is possible the underlying files have been updated. " + - "You can explicitly invalidate the cache in Spark by " + - "running 'REFRESH TABLE tableName' command in SQL or " + - "by recreating the Dataset/DataFrame involved.") - } - } - - /** Advances to the next file. Returns true if a new non-empty iterator is available. */ - private def nextIterator(): Boolean = { - if (files.hasNext) { - currentFile = files.next() - logInfo(s"Reading File $currentFile") - // Sets InputFileBlockHolder for the file block's information - InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) - - if (ignoreMissingFiles || ignoreCorruptFiles) { - currentIterator = new NextIterator[Object] { - // The readFunction may read some bytes before consuming the iterator, e.g., - // vectorized Parquet reader. Here we use lazy val to delay the creation of - // iterator so that we will throw exception in `getNext`. - private lazy val internalIter = readCurrentFile() - - override def getNext(): AnyRef = { - try { - if (internalIter.hasNext) { - internalIter.next() - } else { - finished = true - null - } - } catch { - case e: FileNotFoundException if ignoreMissingFiles => - logWarning(s"Skipped missing file: $currentFile", e) - finished = true - null - // Throw FileNotFoundException even if `ignoreCorruptFiles` is true - case e: FileNotFoundException if !ignoreMissingFiles => throw e - case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => - logWarning( - s"Skipped the rest of the content in the corrupted file: $currentFile", e) - finished = true - null - } - } - - override def close(): Unit = {} - } - } else { - currentIterator = readCurrentFile() - } - - try { - hasNext - } catch { - case e: SchemaColumnConvertNotSupportedException => - val message = "Parquet column cannot be converted in " + - s"file ${currentFile.filePath}. Column: ${e.getColumn}, " + - s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}" - throw new QueryExecutionException(message, e) - case e: ParquetDecodingException => - if (e.getCause.isInstanceOf[SparkUpgradeException]) { - throw e.getCause - } else if (e.getMessage.contains("Can not read value at")) { - val message = "Encounter error while reading parquet files. " + - "One possible cause: Parquet column cannot be converted in the " + - "corresponding files. Details: " - throw new QueryExecutionException(message, e) - } - throw e - } - } else { - currentFile = null - InputFileBlockHolder.unset() - false - } - } - - override def close(): Unit = { - incTaskInputMetricsBytesRead() - InputFileBlockHolder.unset() - } + val iterator = scanMode match { + case RowMode => new FileRowScanIterator(split, context, ignoreCorruptFiles, + ignoreMissingFiles, readFunction) + + case BatchMode => new FileBatchScanIterator(split, context, ignoreCorruptFiles, + ignoreMissingFiles, readFunction) + + case SortedBucketMode(sortOrdering) => new FileSortedBucketScanIterator(split, context, + ignoreCorruptFiles, ignoreMissingFiles, readFunction, sortOrdering) + + case x => throw new IllegalArgumentException( + s"${getClass.getSimpleName} not take $x as the ScanMode") } // Register an on-task-completion callback to close the input stream. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index a808546745817..467449812f41f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -290,7 +290,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession with Pre val fakeRDD = new FileScanRDD( spark, (file: PartitionedFile) => Iterator.empty, - Seq(partition) + Seq(partition), + RowMode ) assertResult(Set("host0", "host1", "host2")) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 98886d271e977..18826d34732ca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -525,50 +525,70 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { test("check sort and shuffle when bucket and sort columns are join keys") { // In case of bucketing, its possible to have multiple files belonging to the // same bucket in a given relation. Each of these files are locally sorted - // but those files combined together are not globally sorted. Given that, - // the RDD partition will not be sorted even if the relation has sort columns set - // Therefore, we still need to keep the Sort in both sides. + // but those files combined together are not globally sorted. With configuration + // "spark.sql.sources.bucketing.sortedScan.enabled" being enabled, sort ordering + // is preserved by reading those sorted files in sort-merge way. val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) - val bucketedTableTestSpecLeft1 = BucketedTableTestSpec( - bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) - val bucketedTableTestSpecRight1 = BucketedTableTestSpec( - bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft1, - bucketedTableTestSpecRight = bucketedTableTestSpecRight1, - joinCondition = joinCondition(Seq("i", "j")) - ) + Seq(true, false).foreach(sortedScanEnabled => { + withSQLConf(SQLConf.BUCKET_SORTED_SCAN_ENABLED.key -> sortedScanEnabled.toString) { + val bucketedTableTestSpecLeft1 = BucketedTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, + expectedSort = !sortedScanEnabled) + val bucketedTableTestSpecRight1 = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft1, + bucketedTableTestSpecRight = bucketedTableTestSpecRight1, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + }) - val bucketedTableTestSpecLeft2 = BucketedTableTestSpec( - bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - val bucketedTableTestSpecRight2 = BucketedTableTestSpec( - bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft2, - bucketedTableTestSpecRight = bucketedTableTestSpecRight2, - joinCondition = joinCondition(Seq("i", "j")) - ) + Seq(true, false).foreach(sortedScanEnabled => { + withSQLConf(SQLConf.BUCKET_SORTED_SCAN_ENABLED.key -> sortedScanEnabled.toString) { + val bucketedTableTestSpecLeft2 = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketedTableTestSpecRight2 = BucketedTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, + expectedSort = !sortedScanEnabled) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft2, + bucketedTableTestSpecRight = bucketedTableTestSpecRight2, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + }) - val bucketedTableTestSpecLeft3 = BucketedTableTestSpec( - bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) - val bucketedTableTestSpecRight3 = BucketedTableTestSpec( - bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft3, - bucketedTableTestSpecRight = bucketedTableTestSpecRight3, - joinCondition = joinCondition(Seq("i", "j")) - ) + Seq(true, false).foreach(sortedScanEnabled => { + withSQLConf(SQLConf.BUCKET_SORTED_SCAN_ENABLED.key -> sortedScanEnabled.toString) { + val bucketedTableTestSpecLeft3 = BucketedTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, + expectedSort = !sortedScanEnabled) + val bucketedTableTestSpecRight3 = BucketedTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, + expectedSort = !sortedScanEnabled) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft3, + bucketedTableTestSpecRight = bucketedTableTestSpecRight3, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + }) - val bucketedTableTestSpecLeft4 = BucketedTableTestSpec( - bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - val bucketedTableTestSpecRight4 = BucketedTableTestSpec( - bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft4, - bucketedTableTestSpecRight = bucketedTableTestSpecRight4, - joinCondition = joinCondition(Seq("i", "j")) - ) + Seq(true, false).foreach(sortedScanEnabled => { + withSQLConf(SQLConf.BUCKET_SORTED_SCAN_ENABLED.key -> sortedScanEnabled.toString) { + val bucketedTableTestSpecLeft4 = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketedTableTestSpecRight4 = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft4, + bucketedTableTestSpecRight = bucketedTableTestSpecRight4, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + }) } test("avoid shuffle and sort when sort columns are a super set of join keys") {