Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2744,6 +2744,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val BUCKET_SORTED_SCAN_ENABLED =
buildConf("spark.sql.sources.bucketing.sortedScan.enabled")
.doc("When true, the bucketed table scan will respect table sort columns, " +
"and read multiple sorted files per bucket in a sort-merge way to preserve ordering. " +
"Note: tasks might have more memory footprint and OOM with vectorized reader, " +
"because multiple rows or columnar batches from different files will be read at same time.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

/**
* Holds information about keys that have been deprecated.
*
Expand Down Expand Up @@ -3364,6 +3374,8 @@ class SQLConf extends Serializable with Logging {

def truncateTrashEnabled: Boolean = getConf(SQLConf.TRUNCATE_TRASH_ENABLED)

def bucketSortedScanEnabled: Boolean = getConf(SQLConf.BUCKET_SORTED_SCAN_ENABLED)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
Expand Down Expand Up @@ -172,9 +173,20 @@ case class FileSourceScanExec(
// Note that some vals referring the file-based relation are lazy intentionally
// so that this plan can be canonicalized on executor side too. See SPARK-23731.
override lazy val supportsColumnar: Boolean = {
relation.fileFormat.supportBatch(relation.sparkSession, schema)
relation.fileFormat.supportBatch(relation.sparkSession, schema) &&
scanMode == BatchMode
}

private lazy val scanMode: ScanMode =
if (conf.bucketSortedScanEnabled && outputOrdering.nonEmpty) {
val sortOrdering = new LazilyGeneratedOrdering(outputOrdering, output)
SortedBucketMode(sortOrdering)
} else if (relation.fileFormat.supportBatch(relation.sparkSession, schema)) {
BatchMode
} else {
RowMode
}

private lazy val needsUnsafeRowConversion: Boolean = {
if (relation.fileFormat.isInstanceOf[ParquetSource]) {
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
Expand Down Expand Up @@ -301,29 +313,34 @@ case class FileSourceScanExec(
sortColumns.nonEmpty &&
!hasPartitionsAvailableAtRunTime

val sortOrder = if (shouldCalculateSortOrder) {
// In case of bucketing, its possible to have multiple files belonging to the
// same bucket in a given relation. Each of these files are locally sorted
// but those files combined together are not globally sorted. Given that,
// the RDD partition will not be sorted even if the relation has sort columns set
// Current solution is to check if all the buckets have a single file in it

val files = selectedPartitions.flatMap(partition => partition.files)
val bucketToFilesGrouping =
files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file))
val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1)

// TODO SPARK-24528 Sort order is currently ignored if buckets are coalesced.
if (singleFilePartitions && optionalNumCoalescedBuckets.isEmpty) {
// TODO Currently Spark does not support writing columns sorting in descending order
// so using Ascending order. This can be fixed in future
// In case of bucketing, its possible to have multiple files belonging to the
// same bucket in a given relation. Each of these files are locally sorted
// but those files combined together are not globally sorted. Given that,
// the RDD partition will not be sorted even if the relation has sort columns set.
//
// 1. With configuration "spark.sql.sources.bucketing.sortedScan.enabled" being enabled,
// output ordering is preserved by reading those sorted files in sort-merge way.
//
// 2. With configuration "spark.sql.legacy.bucketedTableScan.outputOrdering" being enabled,
// output ordering is preserved if each bucket has no more than one file.
val sortOrder = if (conf.bucketSortedScanEnabled) {
sortColumns.map(attribute => SortOrder(attribute, Ascending))
} else if (shouldCalculateSortOrder) {
val files = selectedPartitions.flatMap(partition => partition.files)
val bucketToFilesGrouping =
files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file))
val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1)

if (singleFilePartitions && optionalNumCoalescedBuckets.isEmpty) {
// TODO Currently Spark does not support writing columns sorting in descending order
// so using Ascending order. This can be fixed in future
sortColumns.map(attribute => SortOrder(attribute, Ascending))
} else {
Nil
}
} else {
Nil
}
} else {
Nil
}
(partitioning, sortOrder)
} else {
(UnknownPartitioning(0), Nil)
Expand Down Expand Up @@ -563,7 +580,7 @@ case class FileSourceScanExec(
}
}

new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions, scanMode)
}

/**
Expand Down Expand Up @@ -604,7 +621,7 @@ case class FileSourceScanExec(
val partitions =
FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)

new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
new FileScanRDD(fsRelation.sparkSession, readFile, partitions, scanMode)
}

// Filters unused DynamicPruningExpression expressions - one which has been replaced
Expand Down
Loading