Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnMappingMode;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnsMetadata;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnsNullability;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getGeneratedColumnExpressions;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.isAppendOnly;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeSchemaAsJson;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeStatsAsJson;
Expand Down Expand Up @@ -1259,6 +1260,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
if (!getCheckConstraints(table.getMetadataEntry()).isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported");
}
checkUnsupportedGeneratedColumns(table.getMetadataEntry());
checkSupportedWriterVersion(session, table.getSchemaTableName());

List<DeltaLakeColumnHandle> inputColumns = columns.stream()
Expand Down Expand Up @@ -1392,6 +1394,7 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
if (!getCheckConstraints(handle.getMetadataEntry()).isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported");
}
checkUnsupportedGeneratedColumns(handle.getMetadataEntry());
checkSupportedWriterVersion(session, handle.getSchemaTableName());

return DeltaLakeTableHandle.forDelete(
Expand Down Expand Up @@ -1455,6 +1458,7 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable
if (!getCheckConstraints(handle.getMetadataEntry()).isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported");
}
checkUnsupportedGeneratedColumns(handle.getMetadataEntry());
checkSupportedWriterVersion(session, handle.getSchemaTableName());

List<DeltaLakeColumnHandle> updatedColumnHandles = updatedColumns.stream()
Expand Down Expand Up @@ -1525,6 +1529,7 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
if (!getCheckConstraints(handle.getMetadataEntry()).isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported");
}
checkUnsupportedGeneratedColumns(handle.getMetadataEntry());
checkSupportedWriterVersion(session, handle.getSchemaTableName());

ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle);
Expand Down Expand Up @@ -1823,6 +1828,14 @@ private boolean allowWrite(ConnectorSession session, DeltaLakeTableHandle tableH
}
}

private void checkUnsupportedGeneratedColumns(MetadataEntry metadataEntry)
{
Map<String, String> columnGeneratedExpressions = getGeneratedColumnExpressions(metadataEntry);
if (!columnGeneratedExpressions.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with generated columns is not supported");
}
}

private void checkSupportedWriterVersion(ConnectorSession session, SchemaTableName schemaTableName)
{
int requiredWriterVersion = metastore.getProtocol(session, metastore.getSnapshot(schemaTableName, session)).getMinWriterVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,18 @@ private static String getInvariants(JsonNode node)
return invariants == null ? null : invariants.asText();
}

public static Map<String, String> getGeneratedColumnExpressions(MetadataEntry metadataEntry)
{
return getColumnProperties(metadataEntry, DeltaLakeSchemaSupport::getGeneratedColumnExpressions);
}

@Nullable
private static String getGeneratedColumnExpressions(JsonNode node)
{
JsonNode invariants = node.get("metadata").get("delta.generationExpression");
return invariants == null ? null : invariants.asText();
}

public static Map<String, String> getCheckConstraints(MetadataEntry metadataEntry)
{
return metadataEntry.getConfiguration().entrySet().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,4 +516,29 @@ public void testMetadataOperationsRetainCheckConstraints()
onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS})
public void testWritesToTableWithGeneratedColumnFails()
{
String tableName = "test_writes_into_table_with_generated_column_" + randomTableSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + tableName + " (a INT, b BOOLEAN GENERATED ALWAYS AS (CAST(true AS BOOLEAN))) " +
"USING DELTA " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'");

// Disallowing all statements just in case though some statements may not unrelated to generated columns
assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, false)"))
.hasMessageContaining("Writing to tables with generated columns is not supported");
assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a = 3 WHERE b = true"))
.hasMessageContaining("Writing to tables with generated columns is not supported");
assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a = 3"))
.hasMessageContaining("Writing to tables with generated columns is not supported");
assertQueryFailure(() -> onTrino().executeQuery("MERGE INTO delta.default." + tableName + " t USING delta.default." + tableName + " s " +
"ON (t.a = s.a) WHEN MATCHED THEN UPDATE SET b = false"))
.hasMessageContaining("Writing to tables with generated columns is not supported");
}
finally {
onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName);
}
}
}