Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.functions.struct
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -92,13 +93,14 @@ abstract class UpdateSuiteBase
condition: Option[String],
setClauses: String,
expectedResults: Seq[Row],
tableName: Option[String] = None): Unit = {
tableName: Option[String] = None,
prefix: String = ""): Unit = {
executeUpdate(tableName.getOrElse(s"delta.`$tempPath`"), setClauses, where = condition.orNull)
checkAnswer(
tableName
.map(readDeltaTable(_))
.getOrElse(readDeltaTableByPath(tempPath))
.select("key", "value"),
.select(s"${prefix}key", s"${prefix}value"),
expectedResults)
}

Expand Down Expand Up @@ -663,7 +665,7 @@ abstract class UpdateSuiteBase

test("schema pruning on finding files to update") {
append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"))
// spark.conf.set("spark.sql.adaptive.enabled", "false")

val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) {
checkUpdate(condition = Some("key = 2"), setClauses = "key = 1, value = 3",
expectedResults = Row(1, 3) :: Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil)
Expand All @@ -681,6 +683,26 @@ abstract class UpdateSuiteBase
))
}

test("nested schema pruning on finding files to update") {
append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
.select(struct("key", "value").alias("nested")))

val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) {
checkUpdate(condition = Some("nested.key = 2"),
setClauses = "nested.key = 1, nested.value = 3",
expectedResults = Row(1, 3) :: Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil,
prefix = "nested.")
}

val scans = executedPlans.flatMap(_.collect {
case f: FileSourceScanExec => f
})
// Currently nested schemas can't be pruned, but Spark 3.4 loosens some of the restrictions
// on non-determinstic expressions, and this should be pruned to just "nested STRUCT<key: int>"
// after upgrading
assert(scans.head.schema == StructType.fromDDL("nested STRUCT<key: int, value: int>"))
}

/**
* @param function the unsupported function.
* @param functionType The type of the unsupported expression to be tested.
Expand Down