From 350064578deb5dbd8544678dd93d88185df094a5 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Fri, 7 Oct 2022 14:45:01 +0900 Subject: [PATCH 1/2] Disallow writing to Delta Lake tables with generated columns --- .../plugin/deltalake/DeltaLakeMetadata.java | 13 ++++++++++ .../DeltaLakeSchemaSupport.java | 12 +++++++++ ...eltaLakeDatabricksInsertCompatibility.java | 25 +++++++++++++++++++ 3 files changed, 50 insertions(+) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index ffcf21c070fb..f2eabdb88d06 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -182,6 +182,7 @@ import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnMappingMode; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnsMetadata; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnsNullability; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getGeneratedColumnExpressions; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.isAppendOnly; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeSchemaAsJson; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeStatsAsJson; @@ -1259,6 +1260,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto if (!getCheckConstraints(table.getMetadataEntry()).isEmpty()) { throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported"); } + checkUnsupportedGeneratedColumns(table.getMetadataEntry()); checkSupportedWriterVersion(session, table.getSchemaTableName()); List inputColumns = columns.stream() @@ -1392,6 +1394,7 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable if (!getCheckConstraints(handle.getMetadataEntry()).isEmpty()) { throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported"); } + checkUnsupportedGeneratedColumns(handle.getMetadataEntry()); checkSupportedWriterVersion(session, handle.getSchemaTableName()); return DeltaLakeTableHandle.forDelete( @@ -1455,6 +1458,7 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable if (!getCheckConstraints(handle.getMetadataEntry()).isEmpty()) { throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported"); } + checkUnsupportedGeneratedColumns(handle.getMetadataEntry()); checkSupportedWriterVersion(session, handle.getSchemaTableName()); List updatedColumnHandles = updatedColumns.stream() @@ -1525,6 +1529,7 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT if (!getCheckConstraints(handle.getMetadataEntry()).isEmpty()) { throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported"); } + checkUnsupportedGeneratedColumns(handle.getMetadataEntry()); checkSupportedWriterVersion(session, handle.getSchemaTableName()); ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle); @@ -1823,6 +1828,14 @@ private boolean allowWrite(ConnectorSession session, DeltaLakeTableHandle tableH } } + private void checkUnsupportedGeneratedColumns(MetadataEntry metadataEntry) + { + Map columnGeneratedExpressions = getGeneratedColumnExpressions(metadataEntry); + if (!columnGeneratedExpressions.isEmpty()) { + throw new TrinoException(NOT_SUPPORTED, "Writing to tables with generated columns is not supported"); + } + } + private void checkSupportedWriterVersion(ConnectorSession session, SchemaTableName schemaTableName) { int requiredWriterVersion = metastore.getProtocol(session, metastore.getSnapshot(schemaTableName, session)).getMinWriterVersion(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java index e8a45327dd81..ed1e4dc00335 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java @@ -415,6 +415,18 @@ private static String getInvariants(JsonNode node) return invariants == null ? null : invariants.asText(); } + public static Map getGeneratedColumnExpressions(MetadataEntry metadataEntry) + { + return getColumnProperties(metadataEntry, DeltaLakeSchemaSupport::getGeneratedColumnExpressions); + } + + @Nullable + private static String getGeneratedColumnExpressions(JsonNode node) + { + JsonNode invariants = node.get("metadata").get("delta.generationExpression"); + return invariants == null ? null : invariants.asText(); + } + public static Map getCheckConstraints(MetadataEntry metadataEntry) { return metadataEntry.getConfiguration().entrySet().stream() diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java index 7888b02f7515..db244d30337f 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java @@ -516,4 +516,29 @@ public void testMetadataOperationsRetainCheckConstraints() onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); } } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + public void testWritesToTableWithGeneratedColumnFails() + { + String tableName = "test_writes_into_table_with_generated_column_" + randomTableSuffix(); + try { + onDelta().executeQuery("CREATE TABLE default." + tableName + " (a INT, b BOOLEAN GENERATED ALWAYS AS (CAST(true AS BOOLEAN))) " + + "USING DELTA " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'"); + + // Disallowing all statements just in case though some statements may not unrelated to generated columns + assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, false)")) + .hasMessageContaining("Writing to tables with generated columns is not supported"); + assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a = 3 WHERE b = true")) + .hasMessageContaining("Writing to tables with generated columns is not supported"); + assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a = 3")) + .hasMessageContaining("Writing to tables with generated columns is not supported"); + assertQueryFailure(() -> onTrino().executeQuery("MERGE INTO delta.default." + tableName + " t USING delta.default." + tableName + " s " + + "ON (t.a = s.a) WHEN MATCHED THEN UPDATE SET b = false")) + .hasMessageContaining("Writing to tables with generated columns is not supported"); + } + finally { + onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + } + } } From 4c3aff3dcc711bf5c48abd2aea7bdc0e9605f5aa Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Fri, 7 Oct 2022 17:20:16 +0900 Subject: [PATCH 2/2] empty