diff --git a/core/src/test/scala/org/apache/spark/sql/delta/UpdateSuiteBase.scala b/core/src/test/scala/org/apache/spark/sql/delta/UpdateSuiteBase.scala index d69ccba7a34..d88a410fd92 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/UpdateSuiteBase.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/UpdateSuiteBase.scala @@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.functions.struct import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} import org.apache.spark.sql.types._ @@ -92,13 +93,14 @@ abstract class UpdateSuiteBase condition: Option[String], setClauses: String, expectedResults: Seq[Row], - tableName: Option[String] = None): Unit = { + tableName: Option[String] = None, + prefix: String = ""): Unit = { executeUpdate(tableName.getOrElse(s"delta.`$tempPath`"), setClauses, where = condition.orNull) checkAnswer( tableName .map(readDeltaTable(_)) .getOrElse(readDeltaTableByPath(tempPath)) - .select("key", "value"), + .select(s"${prefix}key", s"${prefix}value"), expectedResults) } @@ -663,7 +665,7 @@ abstract class UpdateSuiteBase test("schema pruning on finding files to update") { append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")) - // spark.conf.set("spark.sql.adaptive.enabled", "false") + val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) { checkUpdate(condition = Some("key = 2"), setClauses = "key = 1, value = 3", expectedResults = Row(1, 3) :: Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil) @@ -681,6 +683,26 @@ abstract class UpdateSuiteBase )) } + test("nested schema pruning on finding files to update") { + append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value") + .select(struct("key", "value").alias("nested"))) + + val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) { + checkUpdate(condition = Some("nested.key = 2"), + setClauses = "nested.key = 1, nested.value = 3", + expectedResults = Row(1, 3) :: Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil, + prefix = "nested.") + } + + val scans = executedPlans.flatMap(_.collect { + case f: FileSourceScanExec => f + }) + // Currently nested schemas can't be pruned, but Spark 3.4 loosens some of the restrictions + // on non-determinstic expressions, and this should be pruned to just "nested STRUCT" + // after upgrading + assert(scans.head.schema == StructType.fromDDL("nested STRUCT")) + } + /** * @param function the unsupported function. * @param functionType The type of the unsupported expression to be tested.