diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 1c038d41325b..355ecda5442a 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -174,6 +174,7 @@ import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnComments; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnInvariants; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnMappingMode; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnsMetadata; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnsNullability; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.isAppendOnly; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeSchemaAsJson; @@ -684,6 +685,8 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe deltaLakeColumns, partitionColumns, columnComments, + deltaLakeColumns.stream().collect(toImmutableMap(DeltaLakeColumnHandle::getName, ignored -> true)), + deltaLakeColumns.stream().collect(toImmutableMap(DeltaLakeColumnHandle::getName, ignored -> ImmutableMap.of())), configurationForNewTable(checkpointInterval), CREATE_TABLE_OPERATION, session, @@ -950,6 +953,8 @@ public Optional finishCreateTable( handle.getInputColumns(), handle.getPartitionedBy(), ImmutableMap.of(), + handle.getInputColumns().stream().collect(toImmutableMap(DeltaLakeColumnHandle::getName, ignored -> true)), + handle.getInputColumns().stream().collect(toImmutableMap(DeltaLakeColumnHandle::getName, ignored -> ImmutableMap.of())), configurationForNewTable(handle.getCheckpointInterval()), CREATE_TABLE_AS_OPERATION, session, @@ -1027,6 +1032,8 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table columns, partitionColumns, getColumnComments(handle.getMetadataEntry()), + getColumnsNullability(handle.getMetadataEntry()), + getColumnsMetadata(handle.getMetadataEntry()), handle.getMetadataEntry().getConfiguration(), SET_TBLPROPERTIES_OPERATION, session, @@ -1072,6 +1079,8 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl columns, partitionColumns, columnComments.buildOrThrow(), + getColumnsNullability(deltaLakeTableHandle.getMetadataEntry()), + getColumnsMetadata(deltaLakeTableHandle.getMetadataEntry()), deltaLakeTableHandle.getMetadataEntry().getConfiguration(), CHANGE_COLUMN_OPERATION, session, @@ -1109,6 +1118,14 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle columnComments.put(newColumnMetadata.getName(), newColumnMetadata.getComment()); } + ImmutableMap.Builder columnNullability = ImmutableMap.builder(); + columnNullability.putAll(getColumnsNullability(handle.getMetadataEntry())); + columnNullability.put(newColumnMetadata.getName(), true); + + ImmutableMap.Builder> columnMetadata = ImmutableMap.builder(); + columnMetadata.putAll(getColumnsMetadata(handle.getMetadataEntry())); + columnMetadata.put(newColumnMetadata.getName(), ImmutableMap.of()); + TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, handle.getLocation()); appendTableEntries( commitVersion, @@ -1117,6 +1134,8 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle columnsBuilder.build(), partitionColumns, columnComments.buildOrThrow(), + columnNullability.buildOrThrow(), + columnMetadata.buildOrThrow(), handle.getMetadataEntry().getConfiguration(), ADD_COLUMN_OPERATION, session, @@ -1137,6 +1156,8 @@ private static void appendTableEntries( List columns, List partitionColumnNames, Map columnComments, + Map columnNullability, + Map> columnMetadata, Map configuration, String operation, ConnectorSession session, @@ -1168,7 +1189,7 @@ private static void appendTableEntries( null, comment.orElse(null), new Format("parquet", ImmutableMap.of()), - serializeSchemaAsJson(columns, columnComments), + serializeSchemaAsJson(columns, columnComments, columnNullability, columnMetadata), partitionColumnNames, ImmutableMap.copyOf(configuration), createdTime)); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java index 06a6311b3d25..b14ebc02aa3b 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java @@ -14,6 +14,7 @@ package io.trino.plugin.deltalake.transactionlog; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; @@ -128,38 +129,61 @@ public static List extractPartitionColumns(List columns, Map columnComments) + public static String serializeSchemaAsJson( + List columns, + Map columnComments, + Map columnNullability, + Map> columnMetadata) { try { - return OBJECT_MAPPER.writeValueAsString(serializeStructType(columns, columnComments)); + return OBJECT_MAPPER.writeValueAsString(serializeStructType(columns, columnComments, columnNullability, columnMetadata)); } catch (JsonProcessingException e) { throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, getLocation(e), "Failed to encode Delta Lake schema", e); } } - private static Map serializeStructType(List columns, Map columnComments) + private static Map serializeStructType( + List columns, + Map columnComments, + Map columnNullability, + Map> columnMetadata) { ImmutableMap.Builder schema = ImmutableMap.builder(); - schema.put("fields", columns.stream().map(column -> serializeStructField(column.getName(), column.getType(), columnComments.get(column.getName()))).collect(toImmutableList())); + schema.put("fields", columns.stream() + .map(column -> { + String columnName = column.getName(); + return serializeStructField( + column.getName(), + column.getType(), + columnComments.get(columnName), + columnNullability.get(columnName), + columnMetadata.get(columnName)); + }) + .collect(toImmutableList())); schema.put("type", "struct"); return schema.buildOrThrow(); } - private static Map serializeStructField(String name, Type type, @Nullable String comment) + private static Map serializeStructField(String name, Type type, @Nullable String comment, @Nullable Boolean nullable, @Nullable Map metadata) { ImmutableMap.Builder fieldContents = ImmutableMap.builder(); - ImmutableMap.Builder metadata = ImmutableMap.builder(); + ImmutableMap.Builder columnMetadata = ImmutableMap.builder(); if (comment != null) { - metadata.put("comment", comment); + columnMetadata.put("comment", comment); + } + if (metadata != null) { + metadata.entrySet().stream() + .filter(entry -> !entry.getKey().equals("comment")) + .forEach(entry -> columnMetadata.put(entry.getKey(), entry.getValue())); } - fieldContents.put("metadata", metadata.buildOrThrow()); + fieldContents.put("metadata", columnMetadata.buildOrThrow()); fieldContents.put("name", name); - fieldContents.put("nullable", true); // TODO: Is column nullability configurable in Trino? + fieldContents.put("nullable", nullable != null ? nullable : true); fieldContents.put("type", serializeColumnType(type)); return fieldContents.buildOrThrow(); @@ -207,7 +231,8 @@ private static Map serializeStructType(RowType rowType) ImmutableMap.Builder fields = ImmutableMap.builder(); fields.put("type", "struct"); - fields.put("fields", rowType.getFields().stream().map(field -> serializeStructField(field.getName().orElse(null), field.getType(), null)).collect(toImmutableList())); + fields.put("fields", rowType.getFields().stream() + .map(field -> serializeStructField(field.getName().orElse(null), field.getType(), null, null, null)).collect(toImmutableList())); return fields.buildOrThrow(); } @@ -372,6 +397,11 @@ private static String getInvariants(JsonNode node) return invariants == null ? null : invariants.asText(); } + public static Map> getColumnsMetadata(MetadataEntry metadataEntry) + { + return getColumnProperties(metadataEntry, node -> OBJECT_MAPPER.convertValue(node.get("metadata"), new TypeReference<>(){})); + } + public static Map getColumnProperties(MetadataEntry metadataEntry, Function extractor) { return Optional.ofNullable(metadataEntry.getSchemaString()) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorSmokeTest.java index 1459c6f4dc5e..edcea550ea66 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorSmokeTest.java @@ -170,6 +170,26 @@ public void testReadingTableWithDeltaColumnInvariant() .hasMessageContaining("Updates are not supported for tables with delta invariants"); } + @Test + public void testSchemaEvolutionOnTableWithColumnInvariant() + { + String tableName = "test_schema_evolution_on_table_with_column_invariant_" + randomTableSuffix(); + hiveMinioDataLake.copyResources("databricks/invariants", tableName); + getQueryRunner().execute(format("CREATE TABLE %s (ignored int) WITH (location = '%s')", + tableName, + getLocationForTable(bucketName, tableName))); + + assertThatThrownBy(() -> query("INSERT INTO invariants VALUES(2)")) + .hasMessageContaining("Inserts are not supported for tables with delta invariants"); + + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INT"); + assertUpdate("COMMENT ON COLUMN " + tableName + ".c IS 'example column comment'"); + assertUpdate("COMMENT ON TABLE " + tableName + " IS 'example table comment'"); + + assertThatThrownBy(() -> query("INSERT INTO " + tableName + " VALUES(2, 2)")) + .hasMessageContaining("Inserts are not supported for tables with delta invariants"); + } + @DataProvider public static Object[][] writesLockInvalidContentsValuesProvider() { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestDeltaLakeSchemaSupport.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestDeltaLakeSchemaSupport.java index 23a4d19e51ff..b07f5025a81f 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestDeltaLakeSchemaSupport.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestDeltaLakeSchemaSupport.java @@ -205,7 +205,7 @@ public void testSerializeSchemaAsJson() URL expected = getResource("io/trino/plugin/deltalake/transactionlog/schema/nested_schema.json"); ObjectMapper objectMapper = new ObjectMapper(); - String jsonEncoding = serializeSchemaAsJson(ImmutableList.of(arrayColumn, structColumn, mapColumn), ImmutableMap.of()); + String jsonEncoding = serializeSchemaAsJson(ImmutableList.of(arrayColumn, structColumn, mapColumn), ImmutableMap.of(), ImmutableMap.of(), ImmutableMap.of()); assertEquals(objectMapper.readTree(jsonEncoding), objectMapper.readTree(expected)); } @@ -223,7 +223,7 @@ public void testRoundTripComplexSchema() .map(metadata -> new DeltaLakeColumnHandle(metadata.getName(), metadata.getType(), metadata.getName(), metadata.getType(), REGULAR)) .collect(toImmutableList()); ObjectMapper objectMapper = new ObjectMapper(); - assertEquals(objectMapper.readTree(serializeSchemaAsJson(columnHandles, ImmutableMap.of())), objectMapper.readTree(json)); + assertEquals(objectMapper.readTree(serializeSchemaAsJson(columnHandles, ImmutableMap.of(), ImmutableMap.of(), ImmutableMap.of())), objectMapper.readTree(json)); } @Test(dataProvider = "supportedTypes")