Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractColumnMetadata;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractPartitionColumns;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getCheckConstraints;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnComments;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnInvariants;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnMappingMode;
Expand Down Expand Up @@ -255,7 +256,10 @@ public class DeltaLakeMetadata
public static final String CHANGE_COLUMN_OPERATION = "CHANGE COLUMN";
public static final String ISOLATION_LEVEL = "WriteSerializable";
private static final int READER_VERSION = 1;
// The required writer version used by tables created by Trino
private static final int WRITER_VERSION = 2;
// The highest writer version Trino supports writing to
private static final int MAX_WRITER_VERSION = 3;
// Matches the dummy column Databricks stores in the metastore
private static final List<Column> DUMMY_DATA_COLUMNS = ImmutableList.of(
new Column("col", HiveType.toHiveType(new ArrayType(VarcharType.createUnboundedVarcharType())), Optional.empty()));
Expand Down Expand Up @@ -1245,6 +1249,9 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
if (!columnInvariants.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Inserts are not supported for tables with delta invariants");
}
if (!getCheckConstraints(table.getMetadataEntry()).isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported");
}
checkSupportedWriterVersion(session, table.getSchemaTableName());

List<DeltaLakeColumnHandle> inputColumns = columns.stream()
Expand Down Expand Up @@ -1375,6 +1382,9 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
String fileSystem = new Path(handle.getLocation()).toUri().getScheme();
throw new TrinoException(NOT_SUPPORTED, format("Deletes are not supported on the %s filesystem", fileSystem));
}
if (!getCheckConstraints(handle.getMetadataEntry()).isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported");
}
checkSupportedWriterVersion(session, handle.getSchemaTableName());

return DeltaLakeTableHandle.forDelete(
Expand Down Expand Up @@ -1435,6 +1445,9 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable
if (!columnInvariants.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Updates are not supported for tables with delta invariants");
}
if (!getCheckConstraints(handle.getMetadataEntry()).isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported");
}
checkSupportedWriterVersion(session, handle.getSchemaTableName());

List<DeltaLakeColumnHandle> updatedColumnHandles = updatedColumns.stream()
Expand Down Expand Up @@ -1502,6 +1515,9 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
if (!getColumnInvariants(handle.getMetadataEntry()).isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Updates are not supported for tables with delta invariants");
}
if (!getCheckConstraints(handle.getMetadataEntry()).isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported");
}
checkSupportedWriterVersion(session, handle.getSchemaTableName());

ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle);
Expand Down Expand Up @@ -1803,7 +1819,7 @@ private boolean allowWrite(ConnectorSession session, DeltaLakeTableHandle tableH
private void checkSupportedWriterVersion(ConnectorSession session, SchemaTableName schemaTableName)
{
int requiredWriterVersion = metastore.getProtocol(session, metastore.getSnapshot(schemaTableName, session)).getMinWriterVersion();
if (requiredWriterVersion > WRITER_VERSION) {
if (requiredWriterVersion > MAX_WRITER_VERSION) {
throw new TrinoException(
NOT_SUPPORTED,
format("Table %s requires Delta Lake writer version %d which is not supported", schemaTableName, requiredWriterVersion));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,13 @@ private static String getInvariants(JsonNode node)
return invariants == null ? null : invariants.asText();
}

public static Map<String, String> getCheckConstraints(MetadataEntry metadataEntry)
{
return metadataEntry.getConfiguration().entrySet().stream()
.filter(entry -> entry.getKey().startsWith("delta.constraints."))
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
}

public static Map<String, Map<String, Object>> getColumnsMetadata(MetadataEntry metadataEntry)
{
return getColumnProperties(metadataEntry, node -> OBJECT_MAPPER.convertValue(node.get("metadata"), new TypeReference<>(){}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ public void testAddColumnUnsupportedWriterVersion()
onDelta().executeQuery(format("" +
"CREATE TABLE default.%s (col int) " +
"USING DELTA LOCATION 's3://%s/%s'" +
"TBLPROPERTIES ('delta.minWriterVersion'='3')",
"TBLPROPERTIES ('delta.minWriterVersion'='4')",
tableName,
bucketName,
tableDirectory));

try {
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " ADD COLUMN new_col int"))
.hasMessageMatching(".* Table .* requires Delta Lake writer version 3 which is not supported");
.hasMessageMatching(".* Table .* requires Delta Lake writer version 4 which is not supported");
}
finally {
onDelta().executeQuery("DROP TABLE default." + tableName);
Expand Down Expand Up @@ -180,14 +180,14 @@ public void testCommentOnTableUnsupportedWriterVersion()
onDelta().executeQuery(format("" +
"CREATE TABLE default.%s (col int) " +
"USING DELTA LOCATION 's3://%s/%s'" +
"TBLPROPERTIES ('delta.minWriterVersion'='3')",
"TBLPROPERTIES ('delta.minWriterVersion'='4')",
tableName,
bucketName,
tableDirectory));

try {
assertQueryFailure(() -> onTrino().executeQuery("COMMENT ON TABLE delta.default." + tableName + " IS 'test comment'"))
.hasMessageMatching(".* Table .* requires Delta Lake writer version 3 which is not supported");
.hasMessageMatching(".* Table .* requires Delta Lake writer version 4 which is not supported");
}
finally {
onTrino().executeQuery("DROP TABLE delta.default." + tableName);
Expand Down Expand Up @@ -224,14 +224,14 @@ public void testCommentOnColumnUnsupportedWriterVersion()
onDelta().executeQuery(format("" +
"CREATE TABLE default.%s (col int) " +
"USING DELTA LOCATION 's3://%s/%s'" +
"TBLPROPERTIES ('delta.minWriterVersion'='3')",
"TBLPROPERTIES ('delta.minWriterVersion'='4')",
tableName,
bucketName,
tableDirectory));

try {
assertQueryFailure(() -> onTrino().executeQuery("COMMENT ON COLUMN delta.default." + tableName + ".col IS 'test column comment'"))
.hasMessageMatching(".* Table .* requires Delta Lake writer version 3 which is not supported");
.hasMessageMatching(".* Table .* requires Delta Lake writer version 4 which is not supported");
}
finally {
onTrino().executeQuery("DROP TABLE delta.default." + tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,11 @@ public void testCheckConstraintsCompatibility()
.containsOnly(row(1, 1));

assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (2, 2)"))
.hasMessageMatching(".*Table default." + tableName + " requires Delta Lake writer version 3 which is not supported");
.hasMessageContaining("Writing to tables with CHECK constraints is not supported");
assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a_number = 1"))
.hasMessageMatching(".*Table default." + tableName + " requires Delta Lake writer version 3 which is not supported");
.hasMessageContaining("Writing to tables with CHECK constraints is not supported");
assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a_number = 10 WHERE id = 1"))
.hasMessageMatching(".*Table default." + tableName + " requires Delta Lake writer version 3 which is not supported");
.hasMessageContaining("Writing to tables with CHECK constraints is not supported");

assertThat(onTrino().executeQuery("SELECT id, a_number FROM " + tableName))
.containsOnly(row(1, 1));
Expand Down Expand Up @@ -469,4 +469,51 @@ public Object[][] compressionCodecs()
{"GZIP"},
};
}

@Test(groups = {DELTA_LAKE_OSS, DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS})
public void testWritesToTableWithCheckConstraintFails()
{
String tableName = "test_writes_into_table_with_check_constraint_" + randomTableSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + tableName + " (a INT, b INT) " +
"USING DELTA " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'");
onDelta().executeQuery("ALTER TABLE default." + tableName + " ADD CONSTRAINT aIsPositive CHECK (a > 0)");

assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 2)"))
.hasMessageContaining("Writing to tables with CHECK constraints is not supported");
assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a = 3 WHERE b = 3"))
.hasMessageContaining("Writing to tables with CHECK constraints is not supported");
assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a = 3"))
.hasMessageContaining("Writing to tables with CHECK constraints is not supported");
Comment on lines +487 to +488
Copy link
Copy Markdown
Member

@ebyhr ebyhr Sep 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to disable DELETE when a table has check constraints? Is there any situation deleting rows violate check constraints?

I'm fine with disallowing it because allowing only DELETE looks not useful. Just asking to understand the context.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be safe to enable as long as the check constraints are only row by row. I think I'd opt to leave it disabled for now though and we can enable it when we come back and take a closer look at supporting check constraints.

assertQueryFailure(() -> onTrino().executeQuery("MERGE INTO delta.default." + tableName + " t USING delta.default." + tableName + " s " +
"ON (t.a = s.a) WHEN MATCHED THEN UPDATE SET b = 42"))
.hasMessageContaining("Writing to tables with CHECK constraints is not supported");
}
finally {
onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_OSS, DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS})
public void testMetadataOperationsRetainCheckConstraints()
{
String tableName = "test_metadata_operations_retain_check_constraints_" + randomTableSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + tableName + " (a INT, b INT) " +
"USING DELTA " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'");
onDelta().executeQuery("ALTER TABLE default." + tableName + " ADD CONSTRAINT aIsPositive CHECK (a > 0)");

onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " ADD COLUMN c INT");
onTrino().executeQuery("COMMENT ON COLUMN delta.default." + tableName + ".c IS 'example column comment'");
onTrino().executeQuery("COMMENT ON TABLE delta.default." + tableName + " IS 'example table comment'");

assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 2, 3)"))
.hasMessageContaining("Writing to tables with CHECK constraints is not supported");
Copy link
Copy Markdown
Member

@homar homar Sep 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are missing a test for MERGE here ;)

}
finally {
onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName);
}
}
}