Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2636,6 +2636,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val BUCKET_SORTED_SCAN_ENABLED =
buildConf("spark.sql.sources.bucketing.sortedScan.enabled")
.doc("When true, the bucketed table scan will respect table sort columns, " +
"and read multiple sorted files per bucket in a sort-merge way to preserve ordering. " +
"Note: tasks might have more memory footprint and OOM with vectorized reader, " +
"because multiple rows or columnar batches from different files will be read at same time.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

/**
* Holds information about keys that have been deprecated.
*
Expand Down Expand Up @@ -3222,6 +3232,8 @@ class SQLConf extends Serializable with Logging {

def csvFilterPushDown: Boolean = getConf(CSV_FILTER_PUSHDOWN_ENABLED)

def bucketSortedScanEnabled: Boolean = getConf(SQLConf.BUCKET_SORTED_SCAN_ENABLED)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
Expand Down Expand Up @@ -146,6 +147,22 @@ case class RowDataSourceScanExec(
tableIdentifier = None)
}

/**
* The mode for scanning a list of file partitions.
*
* [[RegularMode]]: Scan files sequentially one by one, and scan each file batch by batch,
* or row by row.
* [[SortedBucketMode]]: Scan files together at same time in a sort-merge way
* (for sorted bucket files only).
*/
sealed abstract class ScanMode

case object RegularMode extends ScanMode

case class SortedBucketMode(sortOrdering: Ordering[InternalRow]) extends ScanMode {
override def toString: String = "SortedBucketMode"
}

/**
* Physical plan node for scanning data from HadoopFsRelations.
*
Expand All @@ -170,9 +187,18 @@ case class FileSourceScanExec(
// Note that some vals referring the file-based relation are lazy intentionally
// so that this plan can be canonicalized on executor side too. See SPARK-23731.
override lazy val supportsColumnar: Boolean = {
relation.fileFormat.supportBatch(relation.sparkSession, schema)
relation.fileFormat.supportBatch(relation.sparkSession, schema) &&
scanMode == RegularMode
}

private lazy val scanMode: ScanMode =
if (conf.bucketSortedScanEnabled && outputOrdering.nonEmpty && !singleFilePartitions) {
val sortOrdering = new LazilyGeneratedOrdering(outputOrdering, output)
SortedBucketMode(sortOrdering)
} else {
RegularMode
}

private lazy val needsUnsafeRowConversion: Boolean = {
if (relation.fileFormat.isInstanceOf[ParquetSource]) {
sqlContext.conf.parquetVectorizedReaderEnabled
Expand All @@ -181,6 +207,14 @@ case class FileSourceScanExec(
}
}

private lazy val singleFilePartitions: Boolean = {
val files = selectedPartitions.flatMap(partition => partition.files)
val bucketToFilesGrouping =
files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file))
bucketToFilesGrouping.forall(p => p._2.length <= 1)
}


override def vectorTypes: Option[Seq[String]] =
relation.fileFormat.vectorTypes(
requiredSchema = requiredSchema,
Expand Down Expand Up @@ -294,18 +328,19 @@ case class FileSourceScanExec(
sortColumns.nonEmpty &&
!hasPartitionsAvailableAtRunTime

val sortOrder = if (shouldCalculateSortOrder) {
// In case of bucketing, its possible to have multiple files belonging to the
// same bucket in a given relation. Each of these files are locally sorted
// but those files combined together are not globally sorted. Given that,
// the RDD partition will not be sorted even if the relation has sort columns set
// Current solution is to check if all the buckets have a single file in it

val files = selectedPartitions.flatMap(partition => partition.files)
val bucketToFilesGrouping =
files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file))
val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1)

// In case of bucketing, its possible to have multiple files belonging to the
// same bucket in a given relation. Each of these files are locally sorted
// but those files combined together are not globally sorted. Given that,
// the RDD partition will not be sorted even if the relation has sort columns set.
//
// 1. With configuration "spark.sql.sources.bucketing.sortedScan.enabled" being enabled,
// output ordering is preserved by reading those sorted files in sort-merge way.
//
// 2. With configuration "spark.sql.legacy.bucketedTableScan.outputOrdering" being enabled,
// output ordering is preserved if each bucket has no more than one file.
val sortOrder = if (conf.bucketSortedScanEnabled) {
sortColumns.map(attribute => SortOrder(attribute, Ascending))
} else if (shouldCalculateSortOrder) {
if (singleFilePartitions) {
// TODO Currently Spark does not support writing columns sorting in descending order
// so using Ascending order. This can be fixed in future
Expand Down Expand Up @@ -341,7 +376,8 @@ case class FileSourceScanExec(
"PartitionFilters" -> seqToString(partitionFilters),
"PushedFilters" -> seqToString(pushedDownFilters),
"DataFilters" -> seqToString(dataFilters),
"Location" -> locationDesc)
"Location" -> locationDesc,
"ScanMode" -> scanMode.toString)

val withSelectedBucketsCount = relation.bucketSpec.map { spec =>
val numSelectedBuckets = optionalBucketSet.map { b =>
Expand Down Expand Up @@ -543,10 +579,14 @@ case class FileSourceScanExec(
}

val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
}
FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) }

new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
scanMode match {
case SortedBucketMode(sortOrdering) =>
new FileSortedMergeScanRDD(fsRelation.sparkSession, readFile, filePartitions, sortOrdering)
case RegularMode =>
new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
}
}

/**
Expand Down
Loading