diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConnector.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConnector.java index 1a0dbf151ed1..531ced3f0aea 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConnector.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConnector.java @@ -21,6 +21,7 @@ import io.trino.plugin.hive.HiveTransactionHandle; import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorAccessControl; +import io.trino.spi.connector.ConnectorCapabilities; import io.trino.spi.connector.ConnectorMetadata; import io.trino.spi.connector.ConnectorNodePartitioningProvider; import io.trino.spi.connector.ConnectorPageSinkProvider; @@ -41,6 +42,8 @@ import java.util.Set; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Sets.immutableEnumSet; +import static io.trino.spi.connector.ConnectorCapabilities.NOT_NULL_COLUMN_CONSTRAINT; import static io.trino.spi.transaction.IsolationLevel.READ_COMMITTED; import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports; import static java.util.Objects.requireNonNull; @@ -214,4 +217,10 @@ public final void shutdown() { lifeCycleManager.stop(); } + + @Override + public Set getCapabilities() + { + return immutableEnumSet(NOT_NULL_COLUMN_CONSTRAINT); + } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 18861427027b..c9cfd141c3b0 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -428,8 +428,9 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) table; String location = metastore.getTableLocation(tableHandle.getSchemaTableName(), session); Map columnComments = getColumnComments(tableHandle.getMetadataEntry()); + Map columnsNullability = getColumnsNullability(tableHandle.getMetadataEntry()); List columns = getColumns(tableHandle.getMetadataEntry()).stream() - .map(column -> getColumnMetadata(column, columnComments.get(column.getName()))) + .map(column -> getColumnMetadata(column, columnComments.get(column.getName()), columnsNullability.getOrDefault(column.getName(), true))) .collect(toImmutableList()); ImmutableMap.Builder properties = ImmutableMap.builder() @@ -474,7 +475,10 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable { DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle; DeltaLakeColumnHandle column = (DeltaLakeColumnHandle) columnHandle; - return getColumnMetadata(column, getColumnComments(table.getMetadataEntry()).get(column.getName())); + return getColumnMetadata( + column, + getColumnComments(table.getMetadataEntry()).get(column.getName()), + getColumnsNullability(table.getMetadataEntry()).getOrDefault(column.getName(), true)); } /** @@ -536,8 +540,9 @@ public Iterator streamTableColumns(ConnectorSession sessio // intentionally skip case when table snapshot is present but it lacks metadata portion return metastore.getMetadata(metastore.getSnapshot(table, session), session).stream().map(metadata -> { Map columnComments = getColumnComments(metadata); + Map columnsNullability = getColumnsNullability(metadata); List columnMetadata = getColumns(metadata).stream() - .map(column -> getColumnMetadata(column, columnComments.get(column.getName()))) + .map(column -> getColumnMetadata(column, columnComments.get(column.getName()), columnsNullability.getOrDefault(column.getName(), true))) .collect(toImmutableList()); return TableColumnsMetadata.forTable(table, columnMetadata); }); @@ -686,6 +691,8 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe Map columnComments = tableMetadata.getColumns().stream() .filter(column -> column.getComment() != null) .collect(toImmutableMap(ColumnMetadata::getName, ColumnMetadata::getComment)); + Map columnsNullability = tableMetadata.getColumns().stream() + .collect(toImmutableMap(ColumnMetadata::getName, ColumnMetadata::isNullable)); TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, targetPath.toString()); appendTableEntries( 0, @@ -694,7 +701,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe deltaLakeColumns, partitionColumns, columnComments, - deltaLakeColumns.stream().collect(toImmutableMap(DeltaLakeColumnHandle::getName, ignored -> true)), + columnsNullability, deltaLakeColumns.stream().collect(toImmutableMap(DeltaLakeColumnHandle::getName, ignored -> ImmutableMap.of())), configurationForNewTable(checkpointInterval), CREATE_TABLE_OPERATION, @@ -1126,6 +1133,9 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle if (newColumnMetadata.getComment() != null) { columnComments.put(newColumnMetadata.getName(), newColumnMetadata.getComment()); } + ImmutableMap.Builder columnsNullability = ImmutableMap.builder(); + columnsNullability.putAll(getColumnsNullability(handle.getMetadataEntry())); + columnsNullability.put(newColumnMetadata.getName(), newColumnMetadata.isNullable()); ImmutableMap.Builder columnNullability = ImmutableMap.builder(); columnNullability.putAll(getColumnsNullability(handle.getMetadataEntry())); @@ -1236,11 +1246,6 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto String fileSystem = new Path(table.getLocation()).toUri().getScheme(); throw new TrinoException(NOT_SUPPORTED, format("Inserts are not supported on the %s filesystem", fileSystem)); } - Map columnNullabilities = getColumnsNullability(table.getMetadataEntry()); - boolean nonNullableColumnsExist = columnNullabilities.values().stream().anyMatch(nullability -> !nullability); - if (nonNullableColumnsExist) { - throw new TrinoException(NOT_SUPPORTED, "Inserts are not supported for tables with non-nullable columns"); - } Map columnInvariants = getColumnInvariants(table.getMetadataEntry()); if (!columnInvariants.isEmpty()) { throw new TrinoException(NOT_SUPPORTED, "Inserts are not supported for tables with delta invariants"); @@ -1431,11 +1436,7 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable String fileSystem = new Path(handle.getLocation()).toUri().getScheme(); throw new TrinoException(NOT_SUPPORTED, format("Updates are not supported on the %s filesystem", fileSystem)); } - Map columnNullabilities = getColumnsNullability(handle.getMetadataEntry()); - boolean nonNullableColumnsExist = columnNullabilities.values().stream().anyMatch(nullability -> !nullability); - if (nonNullableColumnsExist) { - throw new TrinoException(NOT_SUPPORTED, "Updates are not supported for tables with non-nullable columns"); - } + Map columnInvariants = getColumnInvariants(handle.getMetadataEntry()); if (!columnInvariants.isEmpty()) { throw new TrinoException(NOT_SUPPORTED, "Updates are not supported for tables with delta invariants"); @@ -2548,13 +2549,14 @@ public DeltaLakeMetastore getMetastore() return metastore; } - private static ColumnMetadata getColumnMetadata(DeltaLakeColumnHandle column, @Nullable String comment) + private static ColumnMetadata getColumnMetadata(DeltaLakeColumnHandle column, @Nullable String comment, boolean nullability) { return ColumnMetadata.builder() .setName(column.getName()) .setType(column.getType()) .setHidden(column.getColumnType() == SYNTHESIZED) .setComment(Optional.ofNullable(comment)) + .setNullable(nullability) .build(); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java index a3982e684411..0a8122a2851b 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java @@ -104,8 +104,8 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) switch (connectorBehavior) { case SUPPORTS_DELETE: case SUPPORTS_ROW_LEVEL_DELETE: - return true; case SUPPORTS_UPDATE: + case SUPPORTS_NOT_NULL_CONSTRAINT: return true; case SUPPORTS_MERGE: return true; @@ -116,13 +116,18 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) case SUPPORTS_DROP_COLUMN: case SUPPORTS_RENAME_COLUMN: case SUPPORTS_RENAME_SCHEMA: - case SUPPORTS_NOT_NULL_CONSTRAINT: return false; default: return super.hasBehavior(connectorBehavior); } } + @Override + protected String errorMessageForInsertIntoNotNullColumn(String columnName) + { + return "NULL value not allowed for NOT NULL column: " + columnName; + } + @Override protected void verifyConcurrentUpdateFailurePermissible(Exception e) { @@ -770,6 +775,25 @@ public Object[][] targetAndSourceWithDifferentPartitioning() }; } + @Test + public void testTableWithNonNullableColumns() + { + String tableName = "test_table_with_non_nullable_columns_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + "(col1 INTEGER NOT NULL, col2 INTEGER, col3 INTEGER)"); + assertUpdate("INSERT INTO " + tableName + " VALUES(1, 10, 100)", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES(2, 20, 200)", 1); + assertThatThrownBy(() -> query("INSERT INTO " + tableName + " VALUES(null, 30, 300)")) + .hasMessageContaining("NULL value not allowed for NOT NULL column: col1"); + + //TODO this should fail https://github.com/trinodb/trino/issues/13434 + assertUpdate("INSERT INTO " + tableName + " VALUES(TRY(5/0), 40, 400)", 1); + //TODO these 2 should fail https://github.com/trinodb/trino/issues/13435 + assertUpdate("UPDATE " + tableName + " SET col2 = NULL where col3 = 100", 1); + assertUpdate("UPDATE " + tableName + " SET col2 = TRY(5/0) where col3 = 200", 1); + + assertQuery("SELECT * FROM " + tableName, "VALUES(1, null, 100), (2, null, 200), (null, 40, 400)"); + } + @Override protected String createSchemaSql(String schemaName) { diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java index 1fdd1d811609..7cebee2cbbd5 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java @@ -406,34 +406,6 @@ public void verifyCompressionCodecsDataProvider() .collect(toImmutableList()))); } - @Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS}) - public void testPreventingWritesToTableWithNotNullableColumns() - { - String tableName = "test_preventing_inserts_into_table_with_not_nullable_columns_" + randomTableSuffix(); - - try { - onDelta().executeQuery("CREATE TABLE default." + tableName + "( " + - " id INT NOT NULL, " + - " a_number INT) " + - "USING DELTA " + - "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'"); - - onDelta().executeQuery("INSERT INTO " + tableName + " VALUES(1,1)"); - assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO " + tableName + " VALUES (2, 2)")) - .hasMessageContaining("Inserts are not supported for tables with non-nullable columns"); - assertThat(onTrino().executeQuery("SELECT * FROM " + tableName)) - .containsOnly(row(1, 1)); - onDelta().executeQuery("UPDATE " + tableName + " SET a_number = 2 WHERE id = 1"); - assertQueryFailure(() -> onTrino().executeQuery("UPDATE " + tableName + " SET a_number = 3 WHERE id = 1")) - .hasMessageContaining("Updates are not supported for tables with non-nullable columns"); - assertThat(onTrino().executeQuery("SELECT * FROM " + tableName)) - .containsOnly(row(1, 2)); - } - finally { - onDelta().executeQuery("DROP TABLE IF EXISTS " + tableName); - } - } - @DataProvider public Object[][] compressionCodecs() { diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java index 16df2ee0973c..8dee06830e5e 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java @@ -25,6 +25,7 @@ import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure; import static io.trino.tempto.assertions.QueryAssert.assertThat; import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_73; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix; import static io.trino.tests.product.utils.QueryExecutors.onDelta; @@ -204,6 +205,87 @@ public void testCaseDeleteEntirePartition(String partitionColumn) } } + @Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS}) + public void testTrinoRespectsDatabricksSettingNonNullableColumn() + { + String tableName = "test_databricks_table_with_nonnullable_columns_" + randomTableSuffix(); + + onDelta().executeQuery(format( + "CREATE TABLE default.%1$s (non_nullable_col INT NOT NULL, nullable_col INT) USING DELTA LOCATION '%2$s%1$s'", + tableName, + getBaseLocation())); + + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 2)"); + assertQueryFailure(() -> onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (null, 4)")) + .hasMessageContaining("NOT NULL constraint violated for column: non_nullable_col"); + assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (null, 5)")) + .hasMessageContaining("NULL value not allowed for NOT NULL column: non_nullable_col"); + + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(1, 2)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, 2)); + } + finally { + onDelta().executeQuery("DROP TABLE default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS}) + public void testDatabricksRespectsTrinoSettingNonNullableColumn() + { + String tableName = "test_trino_table_with_nonnullable_columns_" + randomTableSuffix(); + + onTrino().executeQuery("CREATE TABLE delta.default.\"" + tableName + "\" " + + "(non_nullable_col INT NOT NULL, nullable_col INT) " + + "WITH (location = 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "')"); + + try { + onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 2)"); + assertQueryFailure(() -> onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (null, 4)")) + .hasMessageContaining("NOT NULL constraint violated for column: non_nullable_col"); + assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (null, 5)")) + .hasMessageContaining("NULL value not allowed for NOT NULL column: non_nullable_col"); + + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(1, 2)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, 2)); + } + finally { + onTrino().executeQuery("DROP TABLE delta.default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + public void testInsertingIntoDatabricksTableWithAddedNotNullConstraint() + { + String tableName = "test_databricks_table_altered_after_initial_write_" + randomTableSuffix(); + + onDelta().executeQuery(format( + "CREATE TABLE default.%1$s (non_nullable_col INT, nullable_col INT) USING DELTA LOCATION '%2$s%1$s'", + tableName, + getBaseLocation())); + + try { + onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 2)"); + onDelta().executeQuery("ALTER TABLE default." + tableName + " ALTER COLUMN non_nullable_col SET NOT NULL"); + assertQueryFailure(() -> onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (null, 4)")) + .hasMessageContaining("NOT NULL constraint violated for column: non_nullable_col"); + assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (null, 5)")) + .hasMessageContaining("NULL value not allowed for NOT NULL column: non_nullable_col"); + + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(1, 2)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, 2)); + } + finally { + onDelta().executeQuery("DROP TABLE default." + tableName); + } + } + @DataProvider(name = "partition_column_names") public static Object[][] partitionColumns() {