Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.plugin.hive.HiveTransactionHandle;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorAccessControl;
import io.trino.spi.connector.ConnectorCapabilities;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
import io.trino.spi.connector.ConnectorPageSinkProvider;
Expand All @@ -41,6 +42,8 @@
import java.util.Set;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Sets.immutableEnumSet;
import static io.trino.spi.connector.ConnectorCapabilities.NOT_NULL_COLUMN_CONSTRAINT;
import static io.trino.spi.transaction.IsolationLevel.READ_COMMITTED;
import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -214,4 +217,10 @@ public final void shutdown()
{
lifeCycleManager.stop();
}

@Override
public Set<ConnectorCapabilities> getCapabilities()
{
return immutableEnumSet(NOT_NULL_COLUMN_CONSTRAINT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,9 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) table;
String location = metastore.getTableLocation(tableHandle.getSchemaTableName(), session);
Map<String, String> columnComments = getColumnComments(tableHandle.getMetadataEntry());
Map<String, Boolean> columnsNullability = getColumnsNullability(tableHandle.getMetadataEntry());
List<ColumnMetadata> columns = getColumns(tableHandle.getMetadataEntry()).stream()
.map(column -> getColumnMetadata(column, columnComments.get(column.getName())))
.map(column -> getColumnMetadata(column, columnComments.get(column.getName()), columnsNullability.getOrDefault(column.getName(), true)))
.collect(toImmutableList());

ImmutableMap.Builder<String, Object> properties = ImmutableMap.<String, Object>builder()
Expand Down Expand Up @@ -474,7 +475,10 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
{
DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle;
DeltaLakeColumnHandle column = (DeltaLakeColumnHandle) columnHandle;
return getColumnMetadata(column, getColumnComments(table.getMetadataEntry()).get(column.getName()));
return getColumnMetadata(
column,
getColumnComments(table.getMetadataEntry()).get(column.getName()),
getColumnsNullability(table.getMetadataEntry()).getOrDefault(column.getName(), true));
}

/**
Expand Down Expand Up @@ -536,8 +540,9 @@ public Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession sessio
// intentionally skip case when table snapshot is present but it lacks metadata portion
return metastore.getMetadata(metastore.getSnapshot(table, session), session).stream().map(metadata -> {
Map<String, String> columnComments = getColumnComments(metadata);
Map<String, Boolean> columnsNullability = getColumnsNullability(metadata);
List<ColumnMetadata> columnMetadata = getColumns(metadata).stream()
.map(column -> getColumnMetadata(column, columnComments.get(column.getName())))
.map(column -> getColumnMetadata(column, columnComments.get(column.getName()), columnsNullability.getOrDefault(column.getName(), true)))
.collect(toImmutableList());
return TableColumnsMetadata.forTable(table, columnMetadata);
});
Expand Down Expand Up @@ -686,6 +691,8 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
Map<String, String> columnComments = tableMetadata.getColumns().stream()
.filter(column -> column.getComment() != null)
.collect(toImmutableMap(ColumnMetadata::getName, ColumnMetadata::getComment));
Map<String, Boolean> columnsNullability = tableMetadata.getColumns().stream()
.collect(toImmutableMap(ColumnMetadata::getName, ColumnMetadata::isNullable));
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, targetPath.toString());
appendTableEntries(
0,
Expand All @@ -694,7 +701,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
deltaLakeColumns,
partitionColumns,
columnComments,
deltaLakeColumns.stream().collect(toImmutableMap(DeltaLakeColumnHandle::getName, ignored -> true)),
columnsNullability,
deltaLakeColumns.stream().collect(toImmutableMap(DeltaLakeColumnHandle::getName, ignored -> ImmutableMap.of())),
configurationForNewTable(checkpointInterval),
CREATE_TABLE_OPERATION,
Expand Down Expand Up @@ -1126,6 +1133,9 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
if (newColumnMetadata.getComment() != null) {
columnComments.put(newColumnMetadata.getName(), newColumnMetadata.getComment());
}
ImmutableMap.Builder<String, Boolean> columnsNullability = ImmutableMap.builder();
columnsNullability.putAll(getColumnsNullability(handle.getMetadataEntry()));
columnsNullability.put(newColumnMetadata.getName(), newColumnMetadata.isNullable());

ImmutableMap.Builder<String, Boolean> columnNullability = ImmutableMap.builder();
columnNullability.putAll(getColumnsNullability(handle.getMetadataEntry()));
Expand Down Expand Up @@ -1236,11 +1246,6 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
String fileSystem = new Path(table.getLocation()).toUri().getScheme();
throw new TrinoException(NOT_SUPPORTED, format("Inserts are not supported on the %s filesystem", fileSystem));
}
Map<String, Boolean> columnNullabilities = getColumnsNullability(table.getMetadataEntry());
boolean nonNullableColumnsExist = columnNullabilities.values().stream().anyMatch(nullability -> !nullability);
if (nonNullableColumnsExist) {
throw new TrinoException(NOT_SUPPORTED, "Inserts are not supported for tables with non-nullable columns");
}
Map<String, String> columnInvariants = getColumnInvariants(table.getMetadataEntry());
if (!columnInvariants.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Inserts are not supported for tables with delta invariants");
Expand Down Expand Up @@ -1431,11 +1436,7 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable
String fileSystem = new Path(handle.getLocation()).toUri().getScheme();
throw new TrinoException(NOT_SUPPORTED, format("Updates are not supported on the %s filesystem", fileSystem));
}
Map<String, Boolean> columnNullabilities = getColumnsNullability(handle.getMetadataEntry());
boolean nonNullableColumnsExist = columnNullabilities.values().stream().anyMatch(nullability -> !nullability);
if (nonNullableColumnsExist) {
throw new TrinoException(NOT_SUPPORTED, "Updates are not supported for tables with non-nullable columns");
}

Map<String, String> columnInvariants = getColumnInvariants(handle.getMetadataEntry());
if (!columnInvariants.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Updates are not supported for tables with delta invariants");
Expand Down Expand Up @@ -2548,13 +2549,14 @@ public DeltaLakeMetastore getMetastore()
return metastore;
}

private static ColumnMetadata getColumnMetadata(DeltaLakeColumnHandle column, @Nullable String comment)
private static ColumnMetadata getColumnMetadata(DeltaLakeColumnHandle column, @Nullable String comment, boolean nullability)
{
return ColumnMetadata.builder()
.setName(column.getName())
.setType(column.getType())
.setHidden(column.getColumnType() == SYNTHESIZED)
.setComment(Optional.ofNullable(comment))
.setNullable(nullability)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
switch (connectorBehavior) {
case SUPPORTS_DELETE:
case SUPPORTS_ROW_LEVEL_DELETE:
return true;
case SUPPORTS_UPDATE:
case SUPPORTS_NOT_NULL_CONSTRAINT:
return true;
case SUPPORTS_MERGE:
return true;
Expand All @@ -116,13 +116,18 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
case SUPPORTS_DROP_COLUMN:
case SUPPORTS_RENAME_COLUMN:
case SUPPORTS_RENAME_SCHEMA:
case SUPPORTS_NOT_NULL_CONSTRAINT:
return false;
default:
return super.hasBehavior(connectorBehavior);
}
}

@Override
protected String errorMessageForInsertIntoNotNullColumn(String columnName)
{
return "NULL value not allowed for NOT NULL column: " + columnName;
}

@Override
protected void verifyConcurrentUpdateFailurePermissible(Exception e)
{
Expand Down Expand Up @@ -770,6 +775,25 @@ public Object[][] targetAndSourceWithDifferentPartitioning()
};
}

@Test
public void testTableWithNonNullableColumns()
{
String tableName = "test_table_with_non_nullable_columns_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + "(col1 INTEGER NOT NULL, col2 INTEGER, col3 INTEGER)");
assertUpdate("INSERT INTO " + tableName + " VALUES(1, 10, 100)", 1);
assertUpdate("INSERT INTO " + tableName + " VALUES(2, 20, 200)", 1);
assertThatThrownBy(() -> query("INSERT INTO " + tableName + " VALUES(null, 30, 300)"))
.hasMessageContaining("NULL value not allowed for NOT NULL column: col1");

//TODO this should fail https://github.com/trinodb/trino/issues/13434
assertUpdate("INSERT INTO " + tableName + " VALUES(TRY(5/0), 40, 400)", 1);
//TODO these 2 should fail https://github.com/trinodb/trino/issues/13435
assertUpdate("UPDATE " + tableName + " SET col2 = NULL where col3 = 100", 1);
assertUpdate("UPDATE " + tableName + " SET col2 = TRY(5/0) where col3 = 200", 1);

assertQuery("SELECT * FROM " + tableName, "VALUES(1, null, 100), (2, null, 200), (null, 40, 400)");
}

@Override
protected String createSchemaSql(String schemaName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,34 +406,6 @@ public void verifyCompressionCodecsDataProvider()
.collect(toImmutableList())));
}

@Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS})
public void testPreventingWritesToTableWithNotNullableColumns()
{
String tableName = "test_preventing_inserts_into_table_with_not_nullable_columns_" + randomTableSuffix();

try {
onDelta().executeQuery("CREATE TABLE default." + tableName + "( " +
" id INT NOT NULL, " +
" a_number INT) " +
"USING DELTA " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'");

onDelta().executeQuery("INSERT INTO " + tableName + " VALUES(1,1)");
assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO " + tableName + " VALUES (2, 2)"))
.hasMessageContaining("Inserts are not supported for tables with non-nullable columns");
assertThat(onTrino().executeQuery("SELECT * FROM " + tableName))
.containsOnly(row(1, 1));
onDelta().executeQuery("UPDATE " + tableName + " SET a_number = 2 WHERE id = 1");
assertQueryFailure(() -> onTrino().executeQuery("UPDATE " + tableName + " SET a_number = 3 WHERE id = 1"))
.hasMessageContaining("Updates are not supported for tables with non-nullable columns");
assertThat(onTrino().executeQuery("SELECT * FROM " + tableName))
.containsOnly(row(1, 2));
}
finally {
onDelta().executeQuery("DROP TABLE IF EXISTS " + tableName);
}
}

@DataProvider
public Object[][] compressionCodecs()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure;
import static io.trino.tempto.assertions.QueryAssert.assertThat;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_73;
import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS;
import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix;
import static io.trino.tests.product.utils.QueryExecutors.onDelta;
Expand Down Expand Up @@ -204,6 +205,87 @@ public void testCaseDeleteEntirePartition(String partitionColumn)
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS})
public void testTrinoRespectsDatabricksSettingNonNullableColumn()
{
String tableName = "test_databricks_table_with_nonnullable_columns_" + randomTableSuffix();

onDelta().executeQuery(format(
"CREATE TABLE default.%1$s (non_nullable_col INT NOT NULL, nullable_col INT) USING DELTA LOCATION '%2$s%1$s'",
tableName,
getBaseLocation()));

try {
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 2)");
assertQueryFailure(() -> onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (null, 4)"))
.hasMessageContaining("NOT NULL constraint violated for column: non_nullable_col");
assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (null, 5)"))
.hasMessageContaining("NULL value not allowed for NOT NULL column: non_nullable_col");

assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
.containsOnly(row(1, 2));
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
.containsOnly(row(1, 2));
}
finally {
onDelta().executeQuery("DROP TABLE default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS})
public void testDatabricksRespectsTrinoSettingNonNullableColumn()
{
String tableName = "test_trino_table_with_nonnullable_columns_" + randomTableSuffix();

onTrino().executeQuery("CREATE TABLE delta.default.\"" + tableName + "\" " +
"(non_nullable_col INT NOT NULL, nullable_col INT) " +
"WITH (location = 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "')");

try {
onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 2)");
assertQueryFailure(() -> onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (null, 4)"))
.hasMessageContaining("NOT NULL constraint violated for column: non_nullable_col");
assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (null, 5)"))
.hasMessageContaining("NULL value not allowed for NOT NULL column: non_nullable_col");

assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
.containsOnly(row(1, 2));
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
.containsOnly(row(1, 2));
}
finally {
onTrino().executeQuery("DROP TABLE delta.default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS})
public void testInsertingIntoDatabricksTableWithAddedNotNullConstraint()
{
String tableName = "test_databricks_table_altered_after_initial_write_" + randomTableSuffix();

onDelta().executeQuery(format(
"CREATE TABLE default.%1$s (non_nullable_col INT, nullable_col INT) USING DELTA LOCATION '%2$s%1$s'",
tableName,
getBaseLocation()));

try {
onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 2)");
onDelta().executeQuery("ALTER TABLE default." + tableName + " ALTER COLUMN non_nullable_col SET NOT NULL");
assertQueryFailure(() -> onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (null, 4)"))
.hasMessageContaining("NOT NULL constraint violated for column: non_nullable_col");
assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (null, 5)"))
.hasMessageContaining("NULL value not allowed for NOT NULL column: non_nullable_col");

assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
.containsOnly(row(1, 2));
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
.containsOnly(row(1, 2));
}
finally {
onDelta().executeQuery("DROP TABLE default." + tableName);
}
}

@DataProvider(name = "partition_column_names")
public static Object[][] partitionColumns()
{
Expand Down