diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8152132ce6e89..f1d66968833cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2636,6 +2636,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val BUCKET_SORTED_SCAN_ENABLED = + buildConf("spark.sql.sources.bucketing.sortedScan.enabled") + .doc("When true, the bucketed table scan will respect table sort columns, " + + "and read multiple sorted files per bucket in a sort-merge way to preserve ordering. " + + "Note: tasks might have more memory footprint and OOM with vectorized reader, " + + "because multiple rows or columnar batches from different files will be read at same time.") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + /** * Holds information about keys that have been deprecated. * @@ -3222,6 +3232,8 @@ class SQLConf extends Serializable with Logging { def csvFilterPushDown: Boolean = getConf(CSV_FILTER_PUSHDOWN_ENABLED) + def bucketSortedScanEnabled: Boolean = getConf(SQLConf.BUCKET_SORTED_SCAN_ENABLED) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index e2bf132a2e18f..ea9a789567645 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString @@ -146,6 +147,22 @@ case class RowDataSourceScanExec( tableIdentifier = None) } +/** + * The mode for scanning a list of file partitions. + * + * [[RegularMode]]: Scan files sequentially one by one, and scan each file batch by batch, + * or row by row. + * [[SortedBucketMode]]: Scan files together at same time in a sort-merge way + * (for sorted bucket files only). + */ +sealed abstract class ScanMode + +case object RegularMode extends ScanMode + +case class SortedBucketMode(sortOrdering: Ordering[InternalRow]) extends ScanMode { + override def toString: String = "SortedBucketMode" +} + /** * Physical plan node for scanning data from HadoopFsRelations. * @@ -170,9 +187,18 @@ case class FileSourceScanExec( // Note that some vals referring the file-based relation are lazy intentionally // so that this plan can be canonicalized on executor side too. See SPARK-23731. override lazy val supportsColumnar: Boolean = { - relation.fileFormat.supportBatch(relation.sparkSession, schema) + relation.fileFormat.supportBatch(relation.sparkSession, schema) && + scanMode == RegularMode } + private lazy val scanMode: ScanMode = + if (conf.bucketSortedScanEnabled && outputOrdering.nonEmpty && !singleFilePartitions) { + val sortOrdering = new LazilyGeneratedOrdering(outputOrdering, output) + SortedBucketMode(sortOrdering) + } else { + RegularMode + } + private lazy val needsUnsafeRowConversion: Boolean = { if (relation.fileFormat.isInstanceOf[ParquetSource]) { sqlContext.conf.parquetVectorizedReaderEnabled @@ -181,6 +207,14 @@ case class FileSourceScanExec( } } + private lazy val singleFilePartitions: Boolean = { + val files = selectedPartitions.flatMap(partition => partition.files) + val bucketToFilesGrouping = + files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) + bucketToFilesGrouping.forall(p => p._2.length <= 1) + } + + override def vectorTypes: Option[Seq[String]] = relation.fileFormat.vectorTypes( requiredSchema = requiredSchema, @@ -294,18 +328,19 @@ case class FileSourceScanExec( sortColumns.nonEmpty && !hasPartitionsAvailableAtRunTime - val sortOrder = if (shouldCalculateSortOrder) { - // In case of bucketing, its possible to have multiple files belonging to the - // same bucket in a given relation. Each of these files are locally sorted - // but those files combined together are not globally sorted. Given that, - // the RDD partition will not be sorted even if the relation has sort columns set - // Current solution is to check if all the buckets have a single file in it - - val files = selectedPartitions.flatMap(partition => partition.files) - val bucketToFilesGrouping = - files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) - val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) - + // In case of bucketing, its possible to have multiple files belonging to the + // same bucket in a given relation. Each of these files are locally sorted + // but those files combined together are not globally sorted. Given that, + // the RDD partition will not be sorted even if the relation has sort columns set. + // + // 1. With configuration "spark.sql.sources.bucketing.sortedScan.enabled" being enabled, + // output ordering is preserved by reading those sorted files in sort-merge way. + // + // 2. With configuration "spark.sql.legacy.bucketedTableScan.outputOrdering" being enabled, + // output ordering is preserved if each bucket has no more than one file. + val sortOrder = if (conf.bucketSortedScanEnabled) { + sortColumns.map(attribute => SortOrder(attribute, Ascending)) + } else if (shouldCalculateSortOrder) { if (singleFilePartitions) { // TODO Currently Spark does not support writing columns sorting in descending order // so using Ascending order. This can be fixed in future @@ -341,7 +376,8 @@ case class FileSourceScanExec( "PartitionFilters" -> seqToString(partitionFilters), "PushedFilters" -> seqToString(pushedDownFilters), "DataFilters" -> seqToString(dataFilters), - "Location" -> locationDesc) + "Location" -> locationDesc, + "ScanMode" -> scanMode.toString) val withSelectedBucketsCount = relation.bucketSpec.map { spec => val numSelectedBuckets = optionalBucketSet.map { b => @@ -543,10 +579,14 @@ case class FileSourceScanExec( } val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId => - FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) - } + FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) } - new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) + scanMode match { + case SortedBucketMode(sortOrdering) => + new FileSortedMergeScanRDD(fsRelation.sparkSession, readFile, filePartitions, sortOrdering) + case RegularMode => + new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanIterators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanIterators.scala new file mode 100644 index 0000000000000..50e3f69bc6db7 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanIterators.scala @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.{FileNotFoundException, IOException} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.parquet.io.ParquetDecodingException + +import org.apache.spark.{Partition => RDDPartition, SparkUpgradeException, TaskContext} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.executor.InputMetrics +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.InputFileBlockHolder +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.{QueryExecutionException, RowIterator} +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.NextIterator + +/** + * Holds common logic for iterators to scan files + */ +abstract class BaseFileScanIterator( + split: RDDPartition, + context: TaskContext, + ignoreCorruptFiles: Boolean, + ignoreMissingFiles: Boolean, + readFunction: PartitionedFile => Iterator[InternalRow]) + extends Iterator[Object] + with AutoCloseable + with Logging { + + protected val inputMetrics: InputMetrics = context.taskMetrics().inputMetrics + private val existingBytesRead = inputMetrics.bytesRead + + // Find a function that will return the FileSystem bytes read by this thread. Do this before + // apply readFunction, because it might read some bytes. + private val getBytesReadCallback = + SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() + + // We get our input bytes from thread-local Hadoop FileSystem statistics. + // If we do a coalesce, however, we are likely to compute multiple partitions in the same + // task and in the same thread, in which case we need to avoid override values written by + // previous partitions (SPARK-13071). + protected def incTaskInputMetricsBytesRead(): Unit = { + inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback()) + } + + private[this] val files = split.asInstanceOf[FilePartition].files.toIterator + protected[this] var currentFile: PartitionedFile = null + protected[this] var currentIterator: Iterator[Object] = null + + override def hasNext: Boolean = { + // Kill the task in case it has been marked as killed. This logic is from + // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order + // to avoid performance overhead. + context.killTaskIfInterrupted() + (currentIterator != null && currentIterator.hasNext) || nextIterator() + } + + override def next(): Object + + private def readFile(file: PartitionedFile): Iterator[InternalRow] = { + try { + readFunction(file) + } catch { + case e: FileNotFoundException => + throw new FileNotFoundException( + e.getMessage + "\n" + + "It is possible the underlying files have been updated. " + + "You can explicitly invalidate the cache in Spark by " + + "running 'REFRESH TABLE tableName' command in SQL or " + + "by recreating the Dataset/DataFrame involved.") + } + } + + /** Advances to the next file. Returns true if a new non-empty iterator is available. */ + protected def nextIterator(): Boolean = { + if (files.hasNext) { + currentFile = files.next() + logInfo(s"Reading File $currentFile") + // Sets InputFileBlockHolder for the file block's information + InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) + + if (ignoreMissingFiles || ignoreCorruptFiles) { + currentIterator = new NextIterator[Object] { + private val file = currentFile + // The readFunction may read some bytes before consuming the iterator, e.g., + // vectorized Parquet reader. Here we use lazy val to delay the creation of + // iterator so that we will throw exception in `getNext`. + private lazy val internalIter = readFile(file) + + override def getNext(): AnyRef = { + try { + if (internalIter.hasNext) { + internalIter.next() + } else { + finished = true + null + } + } catch { + case e: FileNotFoundException if ignoreMissingFiles => + logWarning(s"Skipped missing file: $currentFile", e) + finished = true + null + // Throw FileNotFoundException even if `ignoreCorruptFiles` is true + case e: FileNotFoundException if !ignoreMissingFiles => throw e + case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => + logWarning( + s"Skipped the rest of the content in the corrupted file: $currentFile", e) + finished = true + null + } + } + + override def close(): Unit = {} + } + } else { + currentIterator = readFile(currentFile) + } + + try { + context.killTaskIfInterrupted() + (currentIterator != null && currentIterator.hasNext) || nextIterator() + } catch { + case e: SchemaColumnConvertNotSupportedException => + val message = "Parquet column cannot be converted in " + + s"file ${currentFile.filePath}. Column: ${e.getColumn}, " + + s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}" + throw new QueryExecutionException(message, e) + case e: ParquetDecodingException => + if (e.getCause.isInstanceOf[SparkUpgradeException]) { + throw e.getCause + } else if (e.getMessage.contains("Can not read value at")) { + val message = "Encounter error while reading parquet files. " + + "One possible cause: Parquet column cannot be converted in the " + + "corresponding files. Details: " + throw new QueryExecutionException(message, e) + } + throw e + } + } else { + currentFile = null + InputFileBlockHolder.unset() + false + } + } + + override def close(): Unit = { + incTaskInputMetricsBytesRead() + InputFileBlockHolder.unset() + } +} + +/** + * Iterator to scan files all together at the same time, + * and read row by row based on `sortOrdering` in sort-merge way. + * It uses standard scala priority queue to decide read order. + * This iterator is used for reading sorted bucketed table only. + * + * @param sortOrdering The order to read rows in multiple files + */ +class FileSortedBucketScanIterator( + split: RDDPartition, + context: TaskContext, + ignoreCorruptFiles: Boolean, + ignoreMissingFiles: Boolean, + readFunction: PartitionedFile => Iterator[InternalRow], + sortOrdering: Ordering[InternalRow]) + extends BaseFileScanIterator(split, context, ignoreCorruptFiles, ignoreMissingFiles, + readFunction) { + + // The priority queue to keep the latest row from each file + private val rowHeap = new mutable.PriorityQueue[IteratorWithRow]()( + // Reverse the order as priority queue de-queues the highest priority one + Ordering.by[IteratorWithRow, InternalRow](_.getRow)(sortOrdering).reverse) + private var heapInitialized: Boolean = false + protected var currentIteratorWithRow: IteratorWithRow = _ + + override def hasNext: Boolean = { + // Kill the task in case it has been marked as killed. This logic is from + // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order + // to avoid performance overhead. This is the same as `BaseFileScanIterator.hasNext()`. + context.killTaskIfInterrupted() + + if (!heapInitialized) { + initializeHeapWithFirstRows() + heapInitialized = true + } + rowHeap.nonEmpty + } + + override def next(): Object = { + currentIteratorWithRow = rowHeap.dequeue() + + // Make a copy of row because we need to enqueue next row (if there any) to the heap + val nextRow = currentIteratorWithRow.getRow.copy() + if (currentIteratorWithRow.advanceNext()) { + rowHeap.enqueue(currentIteratorWithRow) + } + + // Too costly to update every record + if (inputMetrics.recordsRead % + SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { + incTaskInputMetricsBytesRead() + } + inputMetrics.incRecordsRead(1) + + // Set InputFileBlockHolder for the file block's information + currentFile = currentIteratorWithRow.getFile + InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) + + nextRow + } + + private def initializeHeapWithFirstRows(): Unit = { + while (nextIterator()) { + require(currentIterator != null && currentFile != null, + "currentIterator and currentFile should not be null if nextIterator() returns true") + // In case of columnar batch, read row by row in one batch + val convertedIter: Iterator[InternalRow] = currentIterator.flatMap { + case batch: ColumnarBatch => batch.rowIterator().asScala + case row => Iterator.single(row.asInstanceOf[InternalRow]) + } + currentIteratorWithRow = new IteratorWithRow(convertedIter, currentFile) + if (currentIteratorWithRow.advanceNext()) { + rowHeap.enqueue(currentIteratorWithRow) + } + } + } +} + +/** + * A wrapper for iterator, its file and its current latest row. + * Designed to be instantiated once per each file in one thread, and reused. + */ +private[execution] class IteratorWithRow( + iterator: Iterator[InternalRow], + file: PartitionedFile) extends RowIterator { + private var row: InternalRow = _ + + override def advanceNext(): Boolean = { + if (iterator.hasNext) { + row = iterator.next() + true + } else { + row = null + false + } + } + + override def getRow: InternalRow = row + + def getFile: PartitionedFile = file +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSortedMergeScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSortedMergeScanRDD.scala new file mode 100644 index 0000000000000..49d4c0353397f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSortedMergeScanRDD.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow + +class FileSortedMergeScanRDD( + @transient sparkSession: SparkSession, + readFunction: (PartitionedFile) => Iterator[InternalRow], + @transient filePartitions: Seq[FilePartition], + val sortOrdering: Ordering[InternalRow] +) extends FileScanRDD(sparkSession, readFunction, filePartitions) { + private val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles + private val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val iterator = new FileSortedBucketScanIterator(split, context, + ignoreCorruptFiles, ignoreMissingFiles, readFunction, sortOrdering) + // Register an on-task-completion callback to close the input stream. + context.addTaskCompletionListener[Unit](_ => iterator.close()) + + iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack. + } +} diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out index 36757863ffcb5..d13649354ae42 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out @@ -70,6 +70,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct +ScanMode: RegularMode (2) Filter Input [2]: [key#x, val#x] @@ -138,6 +139,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct +ScanMode: RegularMode (2) Filter Input [2]: [key#x, val#x] @@ -206,6 +208,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct +ScanMode: RegularMode (2) Filter Input [2]: [key#x, val#x] @@ -221,6 +224,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct +ScanMode: RegularMode (5) Filter Input [2]: [key#x, val#x] @@ -282,6 +286,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key)] ReadSchema: struct +ScanMode: RegularMode (2) Filter Input [2]: [key#x, val#x] @@ -297,6 +302,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp2] PushedFilters: [IsNotNull(key)] ReadSchema: struct +ScanMode: RegularMode (5) Filter Input [2]: [key#x, val#x] @@ -344,6 +350,7 @@ Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct +ScanMode: RegularMode (2) Scan parquet default.explain_temp2 Output [2]: [key#x, val#x] @@ -351,6 +358,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp2] PushedFilters: [IsNotNull(key)] ReadSchema: struct +ScanMode: RegularMode (3) Filter Input [2]: [key#x, val#x] @@ -401,6 +409,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), IsNotNull(val), GreaterThan(val,3)] ReadSchema: struct +ScanMode: RegularMode (2) Filter Input [2]: [key#x, val#x] @@ -440,6 +449,7 @@ Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct +ScanMode: RegularMode (2) Filter Input [2]: [key#x, val#x] @@ -468,6 +478,7 @@ Output: [] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct<> +ScanMode: RegularMode (2) Project Output [1]: [(Subquery subquery#x, [id=#x] + Subquery subquery#x, [id=#x]) AS (scalarsubquery() + scalarsubquery())#x] @@ -507,6 +518,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,10)] ReadSchema: struct +ScanMode: RegularMode (2) Filter Input [2]: [key#x, val#x] @@ -522,6 +534,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,10)] ReadSchema: struct +ScanMode: RegularMode (5) Filter Input [2]: [key#x, val#x] @@ -581,6 +594,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,10)] ReadSchema: struct +ScanMode: RegularMode (2) Filter Input [2]: [key#x, val#x] @@ -614,6 +628,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,10)] ReadSchema: struct +ScanMode: RegularMode (8) Filter Input [2]: [key#x, val#x] @@ -704,6 +719,7 @@ Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct +ScanMode: RegularMode (2) HashAggregate Input [2]: [key#x, val#x] @@ -749,6 +765,7 @@ Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp4] ReadSchema: struct +ScanMode: RegularMode (2) ObjectHashAggregate Input [2]: [key#x, val#x] @@ -796,6 +813,7 @@ Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp4] ReadSchema: struct +ScanMode: RegularMode (2) Sort Input [2]: [key#x, val#x] diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out index 2b07dac0e5d0a..4c5b8407bdf20 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -70,6 +70,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct +ScanMode: RegularMode (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] @@ -137,6 +138,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct +ScanMode: RegularMode (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] @@ -205,6 +207,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct +ScanMode: RegularMode (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] @@ -223,6 +226,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct +ScanMode: RegularMode (6) ColumnarToRow [codegen id : 2] Input [2]: [key#x, val#x] @@ -284,6 +288,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key)] ReadSchema: struct +ScanMode: RegularMode (2) ColumnarToRow [codegen id : 2] Input [2]: [key#x, val#x] @@ -302,6 +307,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp2] PushedFilters: [IsNotNull(key)] ReadSchema: struct +ScanMode: RegularMode (6) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] @@ -349,6 +355,7 @@ Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct +ScanMode: RegularMode (2) ColumnarToRow [codegen id : 2] Input [2]: [key#x, val#x] @@ -359,6 +366,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp2] PushedFilters: [IsNotNull(key)] ReadSchema: struct +ScanMode: RegularMode (4) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] @@ -408,6 +416,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), IsNotNull(val), GreaterThan(val,3)] ReadSchema: struct +ScanMode: RegularMode (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] @@ -438,6 +447,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp2] PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)] ReadSchema: struct +ScanMode: RegularMode (6) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] @@ -484,6 +494,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp3] PushedFilters: [IsNotNull(val), GreaterThan(val,0)] ReadSchema: struct +ScanMode: RegularMode (13) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] @@ -540,6 +551,7 @@ Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct +ScanMode: RegularMode (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] @@ -566,6 +578,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp2] PushedFilters: [IsNotNull(val), GreaterThan(val,0)] ReadSchema: struct +ScanMode: RegularMode (5) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] @@ -612,6 +625,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp3] PushedFilters: [IsNotNull(val), GreaterThan(val,0)] ReadSchema: struct +ScanMode: RegularMode (12) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] @@ -661,6 +675,7 @@ Output: [] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct<> +ScanMode: RegularMode (2) ColumnarToRow [codegen id : 1] Input: [] @@ -684,6 +699,7 @@ Output [1]: [key#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct +ScanMode: RegularMode (5) ColumnarToRow [codegen id : 1] Input [1]: [key#x] @@ -739,6 +755,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,10)] ReadSchema: struct +ScanMode: RegularMode (2) ColumnarToRow [codegen id : 2] Input [2]: [key#x, val#x] @@ -757,6 +774,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,10)] ReadSchema: struct +ScanMode: RegularMode (6) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] @@ -811,6 +829,7 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,10)] ReadSchema: struct +ScanMode: RegularMode (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] @@ -910,6 +929,7 @@ Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct +ScanMode: RegularMode (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] @@ -954,6 +974,7 @@ Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp4] ReadSchema: struct +ScanMode: RegularMode (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] @@ -1000,6 +1021,7 @@ Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp4] ReadSchema: struct +ScanMode: RegularMode (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 812305ba24403..6caf972b6d1df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper} import org.apache.spark.sql.catalyst.util -import org.apache.spark.sql.execution.{DataSourceScanExec, FileSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.{DataSourceScanExec, FileSourceScanExec, RegularMode, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index df8ca33b893cc..48fe8816a8e6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -376,6 +376,17 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { withBucket(df2.repartition(numPartitionsRight).write.format("parquet"), bucketSpecRight) .saveAsTable("bucketed_table2") + // Verify that we can collect the datasets when codegen is enabled, in order + // to test codegen and vectorized readers. + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { + val t1 = spark.table("bucketed_table1") + val t2 = spark.table("bucketed_table2") + t1.collect() + t2.collect() + t1.join(t2, joinCondition(t1, t2), joinType).collect() + } + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { val t1 = spark.table("bucketed_table1") @@ -506,50 +517,70 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { test("check sort and shuffle when bucket and sort columns are join keys") { // In case of bucketing, its possible to have multiple files belonging to the // same bucket in a given relation. Each of these files are locally sorted - // but those files combined together are not globally sorted. Given that, - // the RDD partition will not be sorted even if the relation has sort columns set - // Therefore, we still need to keep the Sort in both sides. + // but those files combined together are not globally sorted. With configuration + // "spark.sql.sources.bucketing.sortedScan.enabled" being enabled, sort ordering + // is preserved by reading those sorted files in sort-merge way. val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) - val bucketedTableTestSpecLeft1 = BucketedTableTestSpec( - bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) - val bucketedTableTestSpecRight1 = BucketedTableTestSpec( - bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft1, - bucketedTableTestSpecRight = bucketedTableTestSpecRight1, - joinCondition = joinCondition(Seq("i", "j")) - ) + Seq(true, false).foreach(sortedScanEnabled => { + withSQLConf(SQLConf.BUCKET_SORTED_SCAN_ENABLED.key -> sortedScanEnabled.toString) { + val bucketedTableTestSpecLeft1 = BucketedTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, + expectedSort = !sortedScanEnabled) + val bucketedTableTestSpecRight1 = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft1, + bucketedTableTestSpecRight = bucketedTableTestSpecRight1, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + }) - val bucketedTableTestSpecLeft2 = BucketedTableTestSpec( - bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - val bucketedTableTestSpecRight2 = BucketedTableTestSpec( - bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft2, - bucketedTableTestSpecRight = bucketedTableTestSpecRight2, - joinCondition = joinCondition(Seq("i", "j")) - ) + Seq(true, false).foreach(sortedScanEnabled => { + withSQLConf(SQLConf.BUCKET_SORTED_SCAN_ENABLED.key -> sortedScanEnabled.toString) { + val bucketedTableTestSpecLeft2 = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketedTableTestSpecRight2 = BucketedTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, + expectedSort = !sortedScanEnabled) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft2, + bucketedTableTestSpecRight = bucketedTableTestSpecRight2, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + }) - val bucketedTableTestSpecLeft3 = BucketedTableTestSpec( - bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) - val bucketedTableTestSpecRight3 = BucketedTableTestSpec( - bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft3, - bucketedTableTestSpecRight = bucketedTableTestSpecRight3, - joinCondition = joinCondition(Seq("i", "j")) - ) + Seq(true, false).foreach(sortedScanEnabled => { + withSQLConf(SQLConf.BUCKET_SORTED_SCAN_ENABLED.key -> sortedScanEnabled.toString) { + val bucketedTableTestSpecLeft3 = BucketedTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, + expectedSort = !sortedScanEnabled) + val bucketedTableTestSpecRight3 = BucketedTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, + expectedSort = !sortedScanEnabled) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft3, + bucketedTableTestSpecRight = bucketedTableTestSpecRight3, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + }) - val bucketedTableTestSpecLeft4 = BucketedTableTestSpec( - bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - val bucketedTableTestSpecRight4 = BucketedTableTestSpec( - bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft4, - bucketedTableTestSpecRight = bucketedTableTestSpecRight4, - joinCondition = joinCondition(Seq("i", "j")) - ) + Seq(true, false).foreach(sortedScanEnabled => { + withSQLConf(SQLConf.BUCKET_SORTED_SCAN_ENABLED.key -> sortedScanEnabled.toString) { + val bucketedTableTestSpecLeft4 = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketedTableTestSpecRight4 = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft4, + bucketedTableTestSpecRight = bucketedTableTestSpecRight4, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + }) } test("avoid shuffle and sort when sort columns are a super set of join keys") {