diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 04792d1dfc5a..604d5e83dd93 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -172,6 +172,7 @@ import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractColumnMetadata; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractPartitionColumns; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getCheckConstraints; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnComments; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnInvariants; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnMappingMode; @@ -255,7 +256,10 @@ public class DeltaLakeMetadata public static final String CHANGE_COLUMN_OPERATION = "CHANGE COLUMN"; public static final String ISOLATION_LEVEL = "WriteSerializable"; private static final int READER_VERSION = 1; + // The required writer version used by tables created by Trino private static final int WRITER_VERSION = 2; + // The highest writer version Trino supports writing to + private static final int MAX_WRITER_VERSION = 3; // Matches the dummy column Databricks stores in the metastore private static final List DUMMY_DATA_COLUMNS = ImmutableList.of( new Column("col", HiveType.toHiveType(new ArrayType(VarcharType.createUnboundedVarcharType())), Optional.empty())); @@ -1245,6 +1249,9 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto if (!columnInvariants.isEmpty()) { throw new TrinoException(NOT_SUPPORTED, "Inserts are not supported for tables with delta invariants"); } + if (!getCheckConstraints(table.getMetadataEntry()).isEmpty()) { + throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported"); + } checkSupportedWriterVersion(session, table.getSchemaTableName()); List inputColumns = columns.stream() @@ -1375,6 +1382,9 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable String fileSystem = new Path(handle.getLocation()).toUri().getScheme(); throw new TrinoException(NOT_SUPPORTED, format("Deletes are not supported on the %s filesystem", fileSystem)); } + if (!getCheckConstraints(handle.getMetadataEntry()).isEmpty()) { + throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported"); + } checkSupportedWriterVersion(session, handle.getSchemaTableName()); return DeltaLakeTableHandle.forDelete( @@ -1435,6 +1445,9 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable if (!columnInvariants.isEmpty()) { throw new TrinoException(NOT_SUPPORTED, "Updates are not supported for tables with delta invariants"); } + if (!getCheckConstraints(handle.getMetadataEntry()).isEmpty()) { + throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported"); + } checkSupportedWriterVersion(session, handle.getSchemaTableName()); List updatedColumnHandles = updatedColumns.stream() @@ -1502,6 +1515,9 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT if (!getColumnInvariants(handle.getMetadataEntry()).isEmpty()) { throw new TrinoException(NOT_SUPPORTED, "Updates are not supported for tables with delta invariants"); } + if (!getCheckConstraints(handle.getMetadataEntry()).isEmpty()) { + throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported"); + } checkSupportedWriterVersion(session, handle.getSchemaTableName()); ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle); @@ -1803,7 +1819,7 @@ private boolean allowWrite(ConnectorSession session, DeltaLakeTableHandle tableH private void checkSupportedWriterVersion(ConnectorSession session, SchemaTableName schemaTableName) { int requiredWriterVersion = metastore.getProtocol(session, metastore.getSnapshot(schemaTableName, session)).getMinWriterVersion(); - if (requiredWriterVersion > WRITER_VERSION) { + if (requiredWriterVersion > MAX_WRITER_VERSION) { throw new TrinoException( NOT_SUPPORTED, format("Table %s requires Delta Lake writer version %d which is not supported", schemaTableName, requiredWriterVersion)); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java index b14ebc02aa3b..e36ff01fd76a 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java @@ -397,6 +397,13 @@ private static String getInvariants(JsonNode node) return invariants == null ? null : invariants.asText(); } + public static Map getCheckConstraints(MetadataEntry metadataEntry) + { + return metadataEntry.getConfiguration().entrySet().stream() + .filter(entry -> entry.getKey().startsWith("delta.constraints.")) + .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); + } + public static Map> getColumnsMetadata(MetadataEntry metadataEntry) { return getColumnProperties(metadataEntry, node -> OBJECT_MAPPER.convertValue(node.get("metadata"), new TypeReference<>(){})); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java index a9532abae90a..fbbe319d2d04 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java @@ -69,14 +69,14 @@ public void testAddColumnUnsupportedWriterVersion() onDelta().executeQuery(format("" + "CREATE TABLE default.%s (col int) " + "USING DELTA LOCATION 's3://%s/%s'" + - "TBLPROPERTIES ('delta.minWriterVersion'='3')", + "TBLPROPERTIES ('delta.minWriterVersion'='4')", tableName, bucketName, tableDirectory)); try { assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " ADD COLUMN new_col int")) - .hasMessageMatching(".* Table .* requires Delta Lake writer version 3 which is not supported"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 4 which is not supported"); } finally { onDelta().executeQuery("DROP TABLE default." + tableName); @@ -180,14 +180,14 @@ public void testCommentOnTableUnsupportedWriterVersion() onDelta().executeQuery(format("" + "CREATE TABLE default.%s (col int) " + "USING DELTA LOCATION 's3://%s/%s'" + - "TBLPROPERTIES ('delta.minWriterVersion'='3')", + "TBLPROPERTIES ('delta.minWriterVersion'='4')", tableName, bucketName, tableDirectory)); try { assertQueryFailure(() -> onTrino().executeQuery("COMMENT ON TABLE delta.default." + tableName + " IS 'test comment'")) - .hasMessageMatching(".* Table .* requires Delta Lake writer version 3 which is not supported"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 4 which is not supported"); } finally { onTrino().executeQuery("DROP TABLE delta.default." + tableName); @@ -224,14 +224,14 @@ public void testCommentOnColumnUnsupportedWriterVersion() onDelta().executeQuery(format("" + "CREATE TABLE default.%s (col int) " + "USING DELTA LOCATION 's3://%s/%s'" + - "TBLPROPERTIES ('delta.minWriterVersion'='3')", + "TBLPROPERTIES ('delta.minWriterVersion'='4')", tableName, bucketName, tableDirectory)); try { assertQueryFailure(() -> onTrino().executeQuery("COMMENT ON COLUMN delta.default." + tableName + ".col IS 'test column comment'")) - .hasMessageMatching(".* Table .* requires Delta Lake writer version 3 which is not supported"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 4 which is not supported"); } finally { onTrino().executeQuery("DROP TABLE delta.default." + tableName); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java index 51dab7b4c77e..51812b13651f 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java @@ -334,11 +334,11 @@ public void testCheckConstraintsCompatibility() .containsOnly(row(1, 1)); assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (2, 2)")) - .hasMessageMatching(".*Table default." + tableName + " requires Delta Lake writer version 3 which is not supported"); + .hasMessageContaining("Writing to tables with CHECK constraints is not supported"); assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a_number = 1")) - .hasMessageMatching(".*Table default." + tableName + " requires Delta Lake writer version 3 which is not supported"); + .hasMessageContaining("Writing to tables with CHECK constraints is not supported"); assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a_number = 10 WHERE id = 1")) - .hasMessageMatching(".*Table default." + tableName + " requires Delta Lake writer version 3 which is not supported"); + .hasMessageContaining("Writing to tables with CHECK constraints is not supported"); assertThat(onTrino().executeQuery("SELECT id, a_number FROM " + tableName)) .containsOnly(row(1, 1)); @@ -469,4 +469,51 @@ public Object[][] compressionCodecs() {"GZIP"}, }; } + + @Test(groups = {DELTA_LAKE_OSS, DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + public void testWritesToTableWithCheckConstraintFails() + { + String tableName = "test_writes_into_table_with_check_constraint_" + randomTableSuffix(); + try { + onDelta().executeQuery("CREATE TABLE default." + tableName + " (a INT, b INT) " + + "USING DELTA " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'"); + onDelta().executeQuery("ALTER TABLE default." + tableName + " ADD CONSTRAINT aIsPositive CHECK (a > 0)"); + + assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 2)")) + .hasMessageContaining("Writing to tables with CHECK constraints is not supported"); + assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a = 3 WHERE b = 3")) + .hasMessageContaining("Writing to tables with CHECK constraints is not supported"); + assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a = 3")) + .hasMessageContaining("Writing to tables with CHECK constraints is not supported"); + assertQueryFailure(() -> onTrino().executeQuery("MERGE INTO delta.default." + tableName + " t USING delta.default." + tableName + " s " + + "ON (t.a = s.a) WHEN MATCHED THEN UPDATE SET b = 42")) + .hasMessageContaining("Writing to tables with CHECK constraints is not supported"); + } + finally { + onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_OSS, DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + public void testMetadataOperationsRetainCheckConstraints() + { + String tableName = "test_metadata_operations_retain_check_constraints_" + randomTableSuffix(); + try { + onDelta().executeQuery("CREATE TABLE default." + tableName + " (a INT, b INT) " + + "USING DELTA " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'"); + onDelta().executeQuery("ALTER TABLE default." + tableName + " ADD CONSTRAINT aIsPositive CHECK (a > 0)"); + + onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " ADD COLUMN c INT"); + onTrino().executeQuery("COMMENT ON COLUMN delta.default." + tableName + ".c IS 'example column comment'"); + onTrino().executeQuery("COMMENT ON TABLE delta.default." + tableName + " IS 'example table comment'"); + + assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 2, 3)")) + .hasMessageContaining("Writing to tables with CHECK constraints is not supported"); + } + finally { + onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + } + } }