Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,7 @@ object FileFormatWriter extends Logging {
.map(FileSourceMetadataAttribute.cleanupFileSourceMetadataInformation))
val dataColumns = finalOutputSpec.outputColumns.filterNot(partitionSet.contains)

val hasEmpty2Null = plan.find {
case p: ProjectExec => V1WritesUtils.hasEmptyToNull(p.projectList)
case _ => false
}.isDefined
val hasEmpty2Null = plan.exists(p => V1WritesUtils.hasEmptyToNull(p.expressions))
val empty2NullPlan = if (hasEmpty2Null) {
plan
} else {
Expand Down Expand Up @@ -150,14 +147,7 @@ object FileFormatWriter extends Logging {
// the sort order doesn't matter
// Use the output ordering from the original plan before adding the empty2null projection.
val actualOrdering = plan.outputOrdering.map(_.child)
val orderingMatched = if (requiredOrdering.length > actualOrdering.length) {
false
} else {
requiredOrdering.zip(actualOrdering).forall {
case (requiredOrder, childOutputOrder) =>
requiredOrder.semanticEquals(childOutputOrder)
}
}
val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, actualOrdering)

SQLExecution.checkSQLExecutionId(sparkSession)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ trait V1WriteCommand extends DataWritingCommand {
* A rule that adds logical sorts to V1 data writing commands.
*/
object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {

import V1WritesUtils._

override def apply(plan: LogicalPlan): LogicalPlan = {
if (conf.plannedWriteEnabled) {
plan.transformDown {
Expand All @@ -65,10 +68,11 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
}

private def prepareQuery(write: V1WriteCommand, query: LogicalPlan): LogicalPlan = {
val empty2NullPlan = if (hasEmptyToNull(query)) {
val hasEmpty2Null = query.exists(p => hasEmptyToNull(p.expressions))
val empty2NullPlan = if (hasEmpty2Null) {
query
} else {
val projectList = V1WritesUtils.convertEmptyToNull(query.output, write.partitionColumns)
val projectList = convertEmptyToNull(query.output, write.partitionColumns)
if (projectList.isEmpty) query else Project(projectList, query)
}
assert(empty2NullPlan.output.length == query.output.length)
Expand All @@ -80,26 +84,13 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
}.asInstanceOf[SortOrder])
val outputOrdering = query.outputOrdering
// Check if the ordering is already matched to ensure the idempotency of the rule.
val orderingMatched = if (requiredOrdering.length > outputOrdering.length) {
false
} else {
requiredOrdering.zip(outputOrdering).forall {
case (requiredOrder, outputOrder) => requiredOrder.semanticEquals(outputOrder)
}
}
val orderingMatched = isOrderingMatched(requiredOrdering, outputOrdering)
if (orderingMatched) {
empty2NullPlan
} else {
Sort(requiredOrdering, global = false, empty2NullPlan)
}
}

private def hasEmptyToNull(plan: LogicalPlan): Boolean = {
plan.find {
case p: Project => V1WritesUtils.hasEmptyToNull(p.projectList)
case _ => false
}.isDefined
}
}

object V1WritesUtils {
Expand Down Expand Up @@ -209,4 +200,16 @@ object V1WritesUtils {
def hasEmptyToNull(expressions: Seq[Expression]): Boolean = {
expressions.exists(_.exists(_.isInstanceOf[Empty2Null]))
}

def isOrderingMatched(
requiredOrdering: Seq[Expression],
outputOrdering: Seq[Expression]): Boolean = {
if (requiredOrdering.length > outputOrdering.length) {
false
} else {
requiredOrdering.zip(outputOrdering).forall {
case (requiredOrder, outputOrder) => requiredOrder.semanticEquals(outputOrder)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,32 @@
package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
import org.apache.spark.sql.util.QueryExecutionListener

abstract class V1WriteCommandSuiteBase extends QueryTest with SQLTestUtils {
trait V1WriteCommandSuiteBase extends SQLTestUtils {

import testImplicits._

setupTestData()

protected override def beforeAll(): Unit = {
override def beforeAll(): Unit = {
super.beforeAll()
(0 to 20).map(i => (i, i % 5, (i % 10).toString))
.toDF("i", "j", "k")
.write
.saveAsTable("t0")
}

protected override def afterAll(): Unit = {
override def afterAll(): Unit = {
sql("drop table if exists t0")
super.afterAll()
}

protected def withPlannedWrite(testFunc: Boolean => Any): Unit = {
def withPlannedWrite(testFunc: Boolean => Any): Unit = {
Seq(true, false).foreach { enabled =>
withSQLConf(SQLConf.PLANNED_WRITE_ENABLED.key -> enabled.toString) {
testFunc(enabled)
Expand Down Expand Up @@ -87,19 +87,16 @@ abstract class V1WriteCommandSuiteBase extends QueryTest with SQLTestUtils {
s"Expect hasLogicalSort: $hasLogicalSort, Actual: ${optimizedPlan.isInstanceOf[Sort]}")

// Check empty2null conversion.
val projection = optimizedPlan.collectFirst {
case p: Project
if p.projectList.exists(_.exists(_.isInstanceOf[V1WritesUtils.Empty2Null])) => p
}
assert(projection.isDefined == hasEmpty2Null,
s"Expect hasEmpty2Null: $hasEmpty2Null, Actual: ${projection.isDefined}")
val empty2nullExpr = optimizedPlan.exists(p => V1WritesUtils.hasEmptyToNull(p.expressions))
assert(empty2nullExpr == hasEmpty2Null,
s"Expect hasEmpty2Null: $hasEmpty2Null, Actual: $empty2nullExpr. Plan:\n$optimizedPlan")
}

spark.listenerManager.unregister(listener)
}
}

class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSession {
class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1WriteCommandSuiteBase {

import testImplicits._

Expand Down Expand Up @@ -277,4 +274,21 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio
}
}
}

test("v1 write with empty2null in aggregate") {
withPlannedWrite { enabled =>
withTable("t") {
executeAndCheckOrdering(
hasLogicalSort = enabled, orderingMatched = enabled, hasEmpty2Null = enabled) {
sql(
"""
|CREATE TABLE t USING PARQUET
|PARTITIONED BY (k) AS
|SELECT SUM(i) AS i, SUM(j) AS j, k
|FROM t0 WHERE i > 0 GROUP BY k
|""".stripMargin)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark.sql.hive.execution.command

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.execution.datasources.V1WriteCommandSuiteBase
import org.apache.spark.sql.hive.test.TestHiveSingleton

class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingleton {
class V1WriteHiveCommandSuite
extends QueryTest with TestHiveSingleton with V1WriteCommandSuiteBase {

test("create hive table as select - no partition column") {
withPlannedWrite { enabled =>
Expand Down