Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,12 @@ case class CreateDataSourceTableAsSelectCommand(

override def requiredOrdering: Seq[SortOrder] = {
val options = table.storage.properties
V1WritesUtils.getSortOrder(outputColumns, partitionColumns, table.bucketSpec, options)
val originSortedColumns = query.outputOrdering.flatMap(_.child match {
case attr: Attribute => Some(attr)
case _ => None
})
V1WritesUtils.getSortOrder(originSortedColumns, outputColumns,
partitionColumns, table.bucketSpec, options)
}

override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,14 @@ case class InsertIntoHadoopFsRelationCommand(
staticPartitions.size < partitionColumns.length
}

override def requiredOrdering: Seq[SortOrder] =
V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, options,
staticPartitions.size)
override def requiredOrdering: Seq[SortOrder] = {
val originSortedColumns = query.outputOrdering.flatMap(_.child match {
case attr: Attribute => Some(attr)
case _ => None
})
V1WritesUtils.getSortOrder(originSortedColumns, outputColumns,
partitionColumns, bucketSpec, options, staticPartitions.size)
}

override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
// Most formats don't do well with duplicate columns, so lets not allow that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ object V1WritesUtils {
}

def getSortOrder(
originSortColumns: Seq[Attribute],
outputColumns: Seq[Attribute],
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
Expand All @@ -166,6 +167,7 @@ object V1WritesUtils {
require(partitionColumns.size >= numStaticPartitionCols)

val partitionSet = AttributeSet(partitionColumns)
val originSortSet = outputColumns.filter(AttributeSet(originSortColumns).contains)
val dataColumns = outputColumns.filterNot(partitionSet.contains)
val writerBucketSpec = V1WritesUtils.getWriterBucketSpec(bucketSpec, dataColumns, options)
val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns)
Expand All @@ -178,7 +180,15 @@ object V1WritesUtils {
} else {
// We should first sort by dynamic partition columns, then bucket id, and finally sorting
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the outer Sort("p") better not be added in the first place, as the inner Sort("p", "sort_col") provides this order already? See #38358.

@EnricoMi

There is a constraint on the order of the sorting fields. If outer Sort("p") added in the first place., will this constraint be broken, will it affect the efficiency or other?

// columns.
(dynamicPartitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns)
val sortOrder = (dynamicPartitionColumns ++
writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns)
val exprIdSet = sortOrder.flatMap({
case a: Attribute => Some(a.exprId)
case _ => None
}).toSet
val residualSort = originSortSet.filterNot(s => (sortOrder.contains()
|| exprIdSet.contains(s.exprId)))
(sortOrder ++ residualSort)
.map(SortOrder(_, Ascending))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,23 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession {
}
}
}

test("SPARK-40885: V1 write uses the sort with partitionBy operator") {
withTempPath { f =>
Seq((20, 30, "partition"), (15, 20, "partition"),
(30, 70, "partition"), (18, 40, "partition"))
.toDF("id", "sort_col", "p")
.repartition(1)
.sortWithinPartitions("p", "sort_col")
.write
.partitionBy("p")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very sure what the problem is. partitionBy("p") requires sorting the data by p per partition. It should already be satisfied and we don't need to add it.

Copy link
Contributor

@EnricoMi EnricoMi Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionBy("p") requires sorting the data by p per partition.

Why does write.partitionBy("p") require sorting the data by p per partition? I understand why df.groupBy($"p") requires in-partition order by p (GroupedIterator). But write.partitionBy("p")?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very sure what the problem is.

See #38356 (comment):

The V1Writes rule introduced in Spark 3.4 adds the empty2null to all nullable string partition columns. The modified V1WriteCommand then has a modified write.requiredOrdering but the old write.query.outputOrdering, which then do not match any more. In FileFormatWriter, the outer sort will be added because of that mismatch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems Project shouldn't extend OrderPreservingUnaryNode, as it needs to rewrite the child output ordering based on the project list. cc @ulysses-you @wangyum

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah we can rewrite the ordering to preverse the attribute before alias like AliasAwareOutputExpression. But empty2null is special, it is not just an alias. Since we pull out the empty2null, we should tell project the ordering of empty2null(p) is same with p. One idea may be we should not pull out empty2null and leave it at original place..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be aware that empty2null(p) does not preserve any ordering of p, e.g. it does not preserve NULL LAST or ordering by p and x.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only add empty2null for string type partition columns, maybe we shouldn't optimize this case anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding empty2null throws null values and empty values into same partition, and the user has no way to make Spark treat them as distinct values. But changing this smells like a breaking change, unless some config allows to bring back the current behaviour.

.parquet(f.getAbsolutePath)
val sortColList = spark.read.parquet(f.getAbsolutePath)
.map(_.getInt(1)).collect().toList
val expectList = List(20, 30, 40, 70)
assert(sortColList == expectList)
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ trait CreateHiveTableAsSelectBase extends V1WriteCommand with V1WritesHiveUtils

override def requiredOrdering: Seq[SortOrder] = {
val options = getOptionsWithHiveBucketWrite(tableDesc.bucketSpec)
V1WritesUtils.getSortOrder(outputColumns, partitionColumns, tableDesc.bucketSpec, options)
val originSortedColumns = query.outputOrdering.flatMap(_.child match {
case attr: Attribute => Some(attr)
case _ => None
})
V1WritesUtils.getSortOrder(originSortedColumns, outputColumns,
partitionColumns, tableDesc.bucketSpec, options)
}

override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ case class InsertIntoHiveTable(

override def requiredOrdering: Seq[SortOrder] = {
val options = getOptionsWithHiveBucketWrite(table.bucketSpec)
V1WritesUtils.getSortOrder(outputColumns, partitionColumns, table.bucketSpec, options)
val originSortedColumns = query.outputOrdering.flatMap(_.child match {
case attr: Attribute => Some(attr)
case _ => None
})
V1WritesUtils.getSortOrder(originSortedColumns, outputColumns,
partitionColumns, table.bucketSpec, options)
}

/**
Expand Down