Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Column mapping removal: support tables with deletion vectors, column constraints and generated columns. #2753

Merged
merged 5 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ trait DeltaColumnMappingBase extends DeltaLogging {
// No change.
(oldMode == newMode) ||
// Downgrade allowed with a flag.
(removalAllowed && (oldMode == NameMapping && newMode == NoMapping)) ||
(removalAllowed && (oldMode != NoMapping && newMode == NoMapping)) ||
// Upgrade always allowed.
(oldMode == NoMapping && newMode == NameMapping)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,19 @@

package org.apache.spark.sql.delta.columnmapping

import org.apache.spark.sql.delta.DeltaAnalysisException
import org.apache.spark.sql.delta.DeltaColumnMappingUnsupportedException
import org.apache.spark.sql.delta.DeltaConfigs
import org.apache.spark.sql.delta.DeltaLog
import io.delta.tables.DeltaTable

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.schema.DeltaInvariantViolationException
import org.apache.spark.sql.delta.sources.DeltaSQLConf._

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier

/**
* Test removing column mapping from a table.
*/
class RemoveColumnMappingSuite
extends RemoveColumnMappingSuiteUtils
{
class RemoveColumnMappingSuite extends RemoveColumnMappingSuiteUtils {

test("column mapping cannot be removed without the feature flag") {
withSQLConf(ALLOW_COLUMN_MAPPING_REMOVAL.key -> "false") {
Expand All @@ -48,6 +47,16 @@ class RemoveColumnMappingSuite
}
}

test("table without column mapping enabled") {
sql(s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'none')
|AS SELECT 1 as a
|""".stripMargin)

unsetColumnMappingProperty(useUnset = true)
}

test("invalid column names") {
val invalidColName1 = colName("col1")
val invalidColName2 = colName("col2")
Expand All @@ -58,7 +67,7 @@ class RemoveColumnMappingSuite
|""".stripMargin)
val e = intercept[DeltaAnalysisException] {
// Try to remove column mapping.
sql(s"ALTER TABLE $testTableName SET TBLPROPERTIES ('delta.columnMapping.mode' = 'none')")
unsetColumnMappingProperty(useUnset = true)
}
assert(e.errorClass
.contains("DELTA_INVALID_COLUMN_NAMES_WHEN_REMOVING_COLUMN_MAPPING"))
Expand Down Expand Up @@ -217,4 +226,91 @@ class RemoveColumnMappingSuite
assert(sql(s"SELECT $secondColumn FROM $testTableName WHERE $secondColumn IS NOT NULL").count()
== 0)
}

test("remove column mapping from a table with deletion vectors") {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES (
| '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name',
| '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = true)
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
sql(s"DELETE FROM $testTableName WHERE $logicalColumnName % 2 = 0")
testRemovingColumnMapping()
}

test("remove column mapping from a table with a generated column") {
// Note: generate expressions are using logical column names and renaming referenced columns
// is forbidden.
DeltaTable.create(spark)
.tableName(testTableName)
.addColumn(logicalColumnName, "LONG")
.addColumn(
DeltaTable.columnBuilder(secondColumn)
.dataType("LONG")
.generatedAlwaysAs(s"$logicalColumnName + 1")
.build())
.property(DeltaConfigs.COLUMN_MAPPING_MODE.key, "name")
.execute()
// Insert data into the table.
spark.range(totalRows)
.selectExpr(s"id as $logicalColumnName")
.writeTo(testTableName)
.append()
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName = testTableName))
assert(GeneratedColumn.getGeneratedColumns(deltaLog.update()).head.name == secondColumn)
testRemovingColumnMapping()
// Verify the generated column is still there.
assert(GeneratedColumn.getGeneratedColumns(deltaLog.update()).head.name == secondColumn)
// Insert more rows.
spark.range(totalRows)
.selectExpr(s"id + $totalRows as $logicalColumnName")
.writeTo(testTableName)
.append()
// Verify the generated column values are correct.
checkAnswer(sql(s"SELECT $logicalColumnName, $secondColumn FROM $testTableName"),
(0 until totalRows * 2).map(i => Row(i, i + 1)))
}

test("column constraints are preserved") {
// Note: constraints are using logical column names and renaming is forbidden until
// constraint is dropped.
sql(
s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES (
| '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
val constraintName = "secondcolumnaddone"
val constraintExpr = s"$secondColumn = $logicalColumnName + 1"
sql(s"ALTER TABLE $testTableName ADD CONSTRAINT " +
s"$constraintName CHECK ($constraintExpr)")
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName = testTableName))
assert(deltaLog.update().metadata.configuration(s"delta.constraints.$constraintName") ==
constraintExpr)
testRemovingColumnMapping()
// Verify the constraint is still there.
assert(deltaLog.update().metadata.configuration(s"delta.constraints.$constraintName") ==
constraintExpr)
// Verify the constraint is still enforced.
intercept[DeltaInvariantViolationException] {
sql(s"INSERT INTO $testTableName VALUES (0, 0)")
}
}

test("remove column mapping in id mode") {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES (
| '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'id')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
testRemovingColumnMapping()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf._
import com.fasterxml.jackson.databind.ObjectMapper

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.functions.col

Expand Down Expand Up @@ -52,12 +54,9 @@ trait RemoveColumnMappingSuiteUtils extends QueryTest with DeltaColumnMappingSui

import testImplicits._

protected def testRemovingColumnMapping(
unsetTableProperty: Boolean = false): Any = {
protected def testRemovingColumnMapping(unsetTableProperty: Boolean = false): Any = {
// Verify the input data is as expected.
checkAnswer(
spark.table(tableName = testTableName).select(logicalColumnName),
spark.range(totalRows).select(col("id").as(logicalColumnName)))
val originalData = spark.table(tableName = testTableName).select(logicalColumnName).collect()
// Add a schema comment and verify it is preserved after the rewrite.
val comment = "test comment"
sql(s"ALTER TABLE $testTableName ALTER COLUMN $logicalColumnName COMMENT '$comment'")
Expand All @@ -73,10 +72,11 @@ trait RemoveColumnMappingSuiteUtils extends QueryTest with DeltaColumnMappingSui
unsetColumnMappingProperty(useUnset = unsetTableProperty)

verifyRewrite(
unsetTableProperty,
unsetTableProperty = unsetTableProperty,
deltaLog,
originalFiles,
startingVersion)
startingVersion,
originalData = originalData)
// Verify the schema comment is preserved after the rewrite.
assert(deltaLog.update().schema.head.getComment().get == comment,
"Should preserve the schema comment.")
Expand All @@ -90,10 +90,11 @@ trait RemoveColumnMappingSuiteUtils extends QueryTest with DeltaColumnMappingSui
unsetTableProperty: Boolean,
deltaLog: DeltaLog,
originalFiles: Array[AddFile],
startingVersion: Long): Unit = {
startingVersion: Long,
originalData: Array[Row]): Unit = {
checkAnswer(
spark.table(tableName = testTableName).select(logicalColumnName),
spark.range(totalRows).select(col("id").as(logicalColumnName)))
originalData)

val newSnapshot = deltaLog.update()
assert(newSnapshot.version - startingVersion == 1, "Should rewrite the table in one commit.")
Expand Down
Loading