diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index ec23a9c41a02..7a227045687e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import org.apache.commons.lang.StringUtils +import org.apache.spark.Partition import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier} @@ -27,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} -import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.execution.datasources.{FilePartition, HadoopFsRelation} import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetSource} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf @@ -147,6 +148,7 @@ private[sql] case class RowDataSourceScanExec( rdd: RDD[InternalRow], @transient relation: BaseRelation, override val outputPartitioning: Partitioning, + override val outputOrdering: Seq[SortOrder], override val metadata: Map[String, String], override val metastoreTableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with CodegenSupport { @@ -221,6 +223,7 @@ private[sql] case class BatchedDataSourceScanExec( rdd: RDD[InternalRow], @transient relation: BaseRelation, override val outputPartitioning: Partitioning, + override val outputOrdering: Seq[SortOrder], override val metadata: Map[String, String], override val metastoreTableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with CodegenSupport { @@ -338,26 +341,43 @@ private[sql] object DataSourceScanExec { rdd: RDD[InternalRow], relation: BaseRelation, metadata: Map[String, String] = Map.empty, - metastoreTableIdentifier: Option[TableIdentifier] = None): DataSourceScanExec = { - val outputPartitioning = { - val bucketSpec = relation match { - // TODO: this should be closer to bucket planning. - case r: HadoopFsRelation - if r.sparkSession.sessionState.conf.bucketingEnabled => r.bucketSpec - case _ => None - } + metastoreTableIdentifier: Option[TableIdentifier] = None, + partitions: Seq[Partition] = Seq()): DataSourceScanExec = { + val (outputPartitioning, outputOrdering) = { def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse { throw new AnalysisException(s"bucket column $colName not found in existing columns " + s"(${output.map(_.name).mkString(", ")})") } + val (bucketSpec, partitionColumns) = relation match { + // TODO: this should be closer to bucket planning. + case r: HadoopFsRelation if r.sparkSession.sessionState.conf.bucketingEnabled => + (r.bucketSpec, r.partitionSchema.fieldNames.toSeq.map(toAttribute)) + case _ => (None, Seq()) + } + bucketSpec.map { spec => val numBuckets = spec.numBuckets val bucketColumns = spec.bucketColumnNames.map(toAttribute) - HashPartitioning(bucketColumns, numBuckets) + + val singleFilePartitions = partitions.forall { + case f: FilePartition => f.files.length <= 1 + case _ => false + } + + // In case of bucketing, its possible to have multiple files belonging to the + // same bucket in a given table partition. Each of these files are locally sorted + // but those files combined together are not globally sorted. Given that, + // the RDD partition will not be sorted even if the table has sort columns set. + val sortOrder = if (singleFilePartitions) { + spec.sortColumnNames.map(c => SortOrder(toAttribute(c), Ascending)) + } else { + Nil + } + (HashPartitioning(bucketColumns ++ partitionColumns, numBuckets), sortOrder) }.getOrElse { - UnknownPartitioning(0) + (UnknownPartitioning(0), Nil) } } @@ -365,10 +385,22 @@ private[sql] object DataSourceScanExec { case r: HadoopFsRelation if r.fileFormat.supportBatch(r.sparkSession, StructType.fromAttributes(output)) => BatchedDataSourceScanExec( - output, rdd, relation, outputPartitioning, metadata, metastoreTableIdentifier) + output, + rdd, + relation, + outputPartitioning, + outputOrdering, + metadata, + metastoreTableIdentifier) case _ => RowDataSourceScanExec( - output, rdd, relation, outputPartitioning, metadata, metastoreTableIdentifier) + output, + rdd, + relation, + outputPartitioning, + outputOrdering, + metadata, + metastoreTableIdentifier) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 350508c1d9f4..69f7654ce5c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -210,7 +210,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { plannedPartitions), files, meta, - table) + table, + plannedPartitions) val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And) val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 446571aa8409..a3e22b5b6730 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -236,7 +236,12 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) => if (requiredOrdering.nonEmpty) { // If child.outputOrdering is [a, b] and requiredOrdering is [a], we do not need to sort. - if (requiredOrdering != child.outputOrdering.take(requiredOrdering.length)) { + val orderingMatched = requiredOrdering.zip(child.outputOrdering).forall { + case (requiredOrder, childOutputOrder) => + requiredOrder == childOutputOrder + } + + if (orderingMatched) { SortExec(requiredOrdering, global = false, child = child) } else { child