From 90d2a10b1af3ba4efd60a4abc4c95801a3399e23 Mon Sep 17 00:00:00 2001 From: Sabir Date: Thu, 14 Mar 2024 22:30:21 +0100 Subject: [PATCH 1/5] . --- .../spark/sql/delta/DeltaColumnMapping.scala | 2 +- .../RemoveColumnMappingSuite.scala | 108 ++++++++++++++++-- .../RemoveColumnMappingSuiteUtils.scala | 19 +-- 3 files changed, 111 insertions(+), 18 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala index 928b6500bfd..73cfac97721 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala @@ -96,7 +96,7 @@ trait DeltaColumnMappingBase extends DeltaLogging { // No change. (oldMode == newMode) || // Downgrade allowed with a flag. - (removalAllowed && (oldMode == NameMapping && newMode == NoMapping)) || + (removalAllowed && (oldMode != NoMapping && newMode == NoMapping)) || // Upgrade always allowed. (oldMode == NoMapping && newMode == NameMapping) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala index 3e5fd78421d..f6b371f17f3 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala @@ -16,20 +16,17 @@ package org.apache.spark.sql.delta.columnmapping -import org.apache.spark.sql.delta.DeltaAnalysisException -import org.apache.spark.sql.delta.DeltaColumnMappingUnsupportedException -import org.apache.spark.sql.delta.DeltaConfigs -import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.schema.DeltaInvariantViolationException import org.apache.spark.sql.delta.sources.DeltaSQLConf._ +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.TableIdentifier /** * Test removing column mapping from a table. */ -class RemoveColumnMappingSuite - extends RemoveColumnMappingSuiteUtils - { +class RemoveColumnMappingSuite extends RemoveColumnMappingSuiteUtils { test("column mapping cannot be removed without the feature flag") { withSQLConf(ALLOW_COLUMN_MAPPING_REMOVAL.key -> "false") { @@ -48,6 +45,16 @@ class RemoveColumnMappingSuite } } + test("table without column mapping enabled") { + sql(s"""CREATE TABLE $testTableName + |USING delta + |TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'none') + |AS SELECT 1 as a + |""".stripMargin) + + unsetColumnMappingProperty(useUnset = true) + } + test("invalid column names") { val invalidColName1 = colName("col1") val invalidColName2 = colName("col2") @@ -58,7 +65,7 @@ class RemoveColumnMappingSuite |""".stripMargin) val e = intercept[DeltaAnalysisException] { // Try to remove column mapping. - sql(s"ALTER TABLE $testTableName SET TBLPROPERTIES ('delta.columnMapping.mode' = 'none')") + unsetColumnMappingProperty(useUnset = true) } assert(e.errorClass .contains("DELTA_INVALID_COLUMN_NAMES_WHEN_REMOVING_COLUMN_MAPPING")) @@ -217,4 +224,89 @@ class RemoveColumnMappingSuite assert(sql(s"SELECT $secondColumn FROM $testTableName WHERE $secondColumn IS NOT NULL").count() == 0) } + + test("remove column mapping from a table with deletion vectors") { + sql( + s"""CREATE TABLE $testTableName + |USING delta + |TBLPROPERTIES ( + | '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name', + | '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = true) + |AS SELECT id as $logicalColumnName, id + 1 as $secondColumn + | FROM RANGE(0, $totalRows, 1, $numFiles) + |""".stripMargin) + sql(s"DELETE FROM $testTableName WHERE $logicalColumnName % 2 = 0") + testRemovingColumnMapping() + } + + test("remove column mapping from a table with a generated column") { + // Note: generate expressions are using logical column names and renaming referenced columns + // is forbidden. + sql( + s"""CREATE TABLE $testTableName ($logicalColumnName BIGINT, + | $secondColumn BIGINT GENERATED ALWAYS AS ($logicalColumnName + 1) + |) + |USING delta + |TBLPROPERTIES ( + | '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name') + |""".stripMargin) + // Insert data into the table. + spark.range(totalRows) + .selectExpr(s"id as $logicalColumnName") + .writeTo(testTableName) + .append() + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName = testTableName)) + assert(GeneratedColumn.getGeneratedColumns(deltaLog.update()).head.name == secondColumn) + testRemovingColumnMapping() + // Verify the generated column is still there. + assert(GeneratedColumn.getGeneratedColumns(deltaLog.update()).head.name == secondColumn) + // Insert more rows. + spark.range(totalRows) + .selectExpr(s"id + $totalRows as $logicalColumnName") + .writeTo(testTableName) + .append() + // Verify the generated column values are correct. + checkAnswer(sql(s"SELECT $logicalColumnName, $secondColumn FROM $testTableName"), + (0 until totalRows * 2).map(i => Row(i, i + 1))) + } + + test("column constraints are preserved") { + // Note: constraints are using logical column names and renaming is forbidden until + // constraint is dropped. + sql( + s"""CREATE TABLE $testTableName + |USING delta + |TBLPROPERTIES ( + | '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name') + |AS SELECT id as $logicalColumnName, id + 1 as $secondColumn + | FROM RANGE(0, $totalRows, 1, $numFiles) + |""".stripMargin) + val constraintName = "secondcolumnaddone" + val constraintExpr = s"$secondColumn = $logicalColumnName + 1" + sql(s"ALTER TABLE $testTableName ADD CONSTRAINT " + + s"$constraintName CHECK ($constraintExpr)") + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName = testTableName)) + assert(deltaLog.update().metadata.configuration(s"delta.constraints.$constraintName") == + constraintExpr) + testRemovingColumnMapping() + // Verify the constraint is still there. + assert(deltaLog.update().metadata.configuration(s"delta.constraints.$constraintName") == + constraintExpr) + // Verify the constraint is still enforced. + intercept[DeltaInvariantViolationException] { + sql(s"INSERT INTO $testTableName VALUES (0, 0)") + } + } + + test("remove column mapping in id mode") { + sql( + s"""CREATE TABLE $testTableName + |USING delta + |TBLPROPERTIES ( + | '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'id') + |AS SELECT id as $logicalColumnName, id + 1 as $secondColumn + | FROM RANGE(0, $totalRows, 1, $numFiles) + |""".stripMargin) + testRemovingColumnMapping() + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuiteUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuiteUtils.scala index ff8cc2f3f61..4081c3608ad 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuiteUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuiteUtils.scala @@ -23,7 +23,9 @@ import org.apache.spark.sql.delta.schema.SchemaMergingUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf._ import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.spark.sql.DataFrame import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.functions.col @@ -52,12 +54,9 @@ trait RemoveColumnMappingSuiteUtils extends QueryTest with DeltaColumnMappingSui import testImplicits._ - protected def testRemovingColumnMapping( - unsetTableProperty: Boolean = false): Any = { + protected def testRemovingColumnMapping(unsetTableProperty: Boolean = false): Any = { // Verify the input data is as expected. - checkAnswer( - spark.table(tableName = testTableName).select(logicalColumnName), - spark.range(totalRows).select(col("id").as(logicalColumnName))) + val originalData = spark.table(tableName = testTableName).select(logicalColumnName).collect() // Add a schema comment and verify it is preserved after the rewrite. val comment = "test comment" sql(s"ALTER TABLE $testTableName ALTER COLUMN $logicalColumnName COMMENT '$comment'") @@ -73,10 +72,11 @@ trait RemoveColumnMappingSuiteUtils extends QueryTest with DeltaColumnMappingSui unsetColumnMappingProperty(useUnset = unsetTableProperty) verifyRewrite( - unsetTableProperty, + unsetTableProperty = unsetTableProperty, deltaLog, originalFiles, - startingVersion) + startingVersion, + originalData = originalData) // Verify the schema comment is preserved after the rewrite. assert(deltaLog.update().schema.head.getComment().get == comment, "Should preserve the schema comment.") @@ -90,10 +90,11 @@ trait RemoveColumnMappingSuiteUtils extends QueryTest with DeltaColumnMappingSui unsetTableProperty: Boolean, deltaLog: DeltaLog, originalFiles: Array[AddFile], - startingVersion: Long): Unit = { + startingVersion: Long, + originalData: Array[Row]): Unit = { checkAnswer( spark.table(tableName = testTableName).select(logicalColumnName), - spark.range(totalRows).select(col("id").as(logicalColumnName))) + originalData) val newSnapshot = deltaLog.update() assert(newSnapshot.version - startingVersion == 1, "Should rewrite the table in one commit.") From 6b7e6821d03d51a83e42d5e7e1e9e41f50e482bf Mon Sep 17 00:00:00 2001 From: Sabir Date: Fri, 15 Mar 2024 12:40:39 +0100 Subject: [PATCH 2/5] . --- .../sql/delta/columnmapping/RemoveColumnMappingSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala index f6b371f17f3..c5de30686aa 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala @@ -248,7 +248,9 @@ class RemoveColumnMappingSuite extends RemoveColumnMappingSuiteUtils { |) |USING delta |TBLPROPERTIES ( - | '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name') + | '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name' + | 'delta.minReaderVersion' = '2', + | 'delta.minWriterVersion' = '5') |""".stripMargin) // Insert data into the table. spark.range(totalRows) From 56bccf215ad238e925831420064fb684ca2449f4 Mon Sep 17 00:00:00 2001 From: Sabir Date: Fri, 15 Mar 2024 15:03:04 +0100 Subject: [PATCH 3/5] retrigger jenkins tests From 4bd01b5a97c2ec07e6671fec44b612393f7ec02f Mon Sep 17 00:00:00 2001 From: Sabir Date: Sun, 17 Mar 2024 13:38:50 +0100 Subject: [PATCH 4/5] . --- .../sql/delta/columnmapping/RemoveColumnMappingSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala index c5de30686aa..0652d1c249e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala @@ -248,7 +248,7 @@ class RemoveColumnMappingSuite extends RemoveColumnMappingSuiteUtils { |) |USING delta |TBLPROPERTIES ( - | '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name' + | '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name', | 'delta.minReaderVersion' = '2', | 'delta.minWriterVersion' = '5') |""".stripMargin) From ca3757c26fe6dbbc2b40658f9ed843ac49f8313f Mon Sep 17 00:00:00 2001 From: Sabir Date: Mon, 18 Mar 2024 14:56:50 +0100 Subject: [PATCH 5/5] . --- .../RemoveColumnMappingSuite.scala | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala index 0652d1c249e..6aca88c004e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.delta.columnmapping +import io.delta.tables.DeltaTable + import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.schema.DeltaInvariantViolationException import org.apache.spark.sql.delta.sources.DeltaSQLConf._ @@ -242,16 +244,16 @@ class RemoveColumnMappingSuite extends RemoveColumnMappingSuiteUtils { test("remove column mapping from a table with a generated column") { // Note: generate expressions are using logical column names and renaming referenced columns // is forbidden. - sql( - s"""CREATE TABLE $testTableName ($logicalColumnName BIGINT, - | $secondColumn BIGINT GENERATED ALWAYS AS ($logicalColumnName + 1) - |) - |USING delta - |TBLPROPERTIES ( - | '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name', - | 'delta.minReaderVersion' = '2', - | 'delta.minWriterVersion' = '5') - |""".stripMargin) + DeltaTable.create(spark) + .tableName(testTableName) + .addColumn(logicalColumnName, "LONG") + .addColumn( + DeltaTable.columnBuilder(secondColumn) + .dataType("LONG") + .generatedAlwaysAs(s"$logicalColumnName + 1") + .build()) + .property(DeltaConfigs.COLUMN_MAPPING_MODE.key, "name") + .execute() // Insert data into the table. spark.range(totalRows) .selectExpr(s"id as $logicalColumnName")