Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnComments;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnInvariants;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnMappingMode;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnsMetadata;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnsNullability;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.isAppendOnly;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeSchemaAsJson;
Expand Down Expand Up @@ -684,6 +685,8 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
deltaLakeColumns,
partitionColumns,
columnComments,
deltaLakeColumns.stream().collect(toImmutableMap(DeltaLakeColumnHandle::getName, ignored -> true)),
deltaLakeColumns.stream().collect(toImmutableMap(DeltaLakeColumnHandle::getName, ignored -> ImmutableMap.of())),
configurationForNewTable(checkpointInterval),
CREATE_TABLE_OPERATION,
session,
Expand Down Expand Up @@ -950,6 +953,8 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
handle.getInputColumns(),
handle.getPartitionedBy(),
ImmutableMap.of(),
handle.getInputColumns().stream().collect(toImmutableMap(DeltaLakeColumnHandle::getName, ignored -> true)),
handle.getInputColumns().stream().collect(toImmutableMap(DeltaLakeColumnHandle::getName, ignored -> ImmutableMap.of())),
configurationForNewTable(handle.getCheckpointInterval()),
CREATE_TABLE_AS_OPERATION,
session,
Expand Down Expand Up @@ -1027,6 +1032,8 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table
columns,
partitionColumns,
getColumnComments(handle.getMetadataEntry()),
getColumnsNullability(handle.getMetadataEntry()),
getColumnsMetadata(handle.getMetadataEntry()),
handle.getMetadataEntry().getConfiguration(),
SET_TBLPROPERTIES_OPERATION,
session,
Expand Down Expand Up @@ -1072,6 +1079,8 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl
columns,
partitionColumns,
columnComments.buildOrThrow(),
getColumnsNullability(deltaLakeTableHandle.getMetadataEntry()),
getColumnsMetadata(deltaLakeTableHandle.getMetadataEntry()),
deltaLakeTableHandle.getMetadataEntry().getConfiguration(),
CHANGE_COLUMN_OPERATION,
session,
Expand Down Expand Up @@ -1109,6 +1118,14 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
columnComments.put(newColumnMetadata.getName(), newColumnMetadata.getComment());
}

ImmutableMap.Builder<String, Boolean> columnNullability = ImmutableMap.builder();
columnNullability.putAll(getColumnsNullability(handle.getMetadataEntry()));
columnNullability.put(newColumnMetadata.getName(), true);

ImmutableMap.Builder<String, Map<String, Object>> columnMetadata = ImmutableMap.builder();
columnMetadata.putAll(getColumnsMetadata(handle.getMetadataEntry()));
columnMetadata.put(newColumnMetadata.getName(), ImmutableMap.of());

TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, handle.getLocation());
appendTableEntries(
commitVersion,
Expand All @@ -1117,6 +1134,8 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
columnsBuilder.build(),
partitionColumns,
columnComments.buildOrThrow(),
columnNullability.buildOrThrow(),
columnMetadata.buildOrThrow(),
handle.getMetadataEntry().getConfiguration(),
ADD_COLUMN_OPERATION,
session,
Expand All @@ -1137,6 +1156,8 @@ private static void appendTableEntries(
List<DeltaLakeColumnHandle> columns,
List<String> partitionColumnNames,
Map<String, String> columnComments,
Map<String, Boolean> columnNullability,
Map<String, Map<String, Object>> columnMetadata,
Map<String, String> configuration,
String operation,
ConnectorSession session,
Expand Down Expand Up @@ -1168,7 +1189,7 @@ private static void appendTableEntries(
null,
comment.orElse(null),
new Format("parquet", ImmutableMap.of()),
serializeSchemaAsJson(columns, columnComments),
serializeSchemaAsJson(columns, columnComments, columnNullability, columnMetadata),
partitionColumnNames,
ImmutableMap.copyOf(configuration),
createdTime));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.deltalake.transactionlog;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -128,38 +129,61 @@ public static List<DeltaLakeColumnHandle> extractPartitionColumns(List<DeltaLake
.collect(toImmutableList());
}

public static String serializeSchemaAsJson(List<DeltaLakeColumnHandle> columns, Map<String, String> columnComments)
public static String serializeSchemaAsJson(
List<DeltaLakeColumnHandle> columns,
Map<String, String> columnComments,
Map<String, Boolean> columnNullability,
Map<String, Map<String, Object>> columnMetadata)
{
try {
return OBJECT_MAPPER.writeValueAsString(serializeStructType(columns, columnComments));
return OBJECT_MAPPER.writeValueAsString(serializeStructType(columns, columnComments, columnNullability, columnMetadata));
}
catch (JsonProcessingException e) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, getLocation(e), "Failed to encode Delta Lake schema", e);
}
}

private static Map<String, Object> serializeStructType(List<DeltaLakeColumnHandle> columns, Map<String, String> columnComments)
private static Map<String, Object> serializeStructType(
List<DeltaLakeColumnHandle> columns,
Map<String, String> columnComments,
Map<String, Boolean> columnNullability,
Map<String, Map<String, Object>> columnMetadata)
{
ImmutableMap.Builder<String, Object> schema = ImmutableMap.builder();

schema.put("fields", columns.stream().map(column -> serializeStructField(column.getName(), column.getType(), columnComments.get(column.getName()))).collect(toImmutableList()));
schema.put("fields", columns.stream()
.map(column -> {
String columnName = column.getName();
return serializeStructField(
column.getName(),
column.getType(),
columnComments.get(columnName),
columnNullability.get(columnName),
columnMetadata.get(columnName));
})
.collect(toImmutableList()));
schema.put("type", "struct");

return schema.buildOrThrow();
}

private static Map<String, Object> serializeStructField(String name, Type type, @Nullable String comment)
private static Map<String, Object> serializeStructField(String name, Type type, @Nullable String comment, @Nullable Boolean nullable, @Nullable Map<String, Object> metadata)
{
ImmutableMap.Builder<String, Object> fieldContents = ImmutableMap.builder();

ImmutableMap.Builder<String, Object> metadata = ImmutableMap.builder();
ImmutableMap.Builder<String, Object> columnMetadata = ImmutableMap.builder();
if (comment != null) {
metadata.put("comment", comment);
columnMetadata.put("comment", comment);
}
if (metadata != null) {
metadata.entrySet().stream()
.filter(entry -> !entry.getKey().equals("comment"))
.forEach(entry -> columnMetadata.put(entry.getKey(), entry.getValue()));
}

fieldContents.put("metadata", metadata.buildOrThrow());
fieldContents.put("metadata", columnMetadata.buildOrThrow());
fieldContents.put("name", name);
fieldContents.put("nullable", true); // TODO: Is column nullability configurable in Trino?
fieldContents.put("nullable", nullable != null ? nullable : true);
fieldContents.put("type", serializeColumnType(type));

return fieldContents.buildOrThrow();
Expand Down Expand Up @@ -207,7 +231,8 @@ private static Map<String, Object> serializeStructType(RowType rowType)
ImmutableMap.Builder<String, Object> fields = ImmutableMap.builder();

fields.put("type", "struct");
fields.put("fields", rowType.getFields().stream().map(field -> serializeStructField(field.getName().orElse(null), field.getType(), null)).collect(toImmutableList()));
fields.put("fields", rowType.getFields().stream()
.map(field -> serializeStructField(field.getName().orElse(null), field.getType(), null, null, null)).collect(toImmutableList()));

return fields.buildOrThrow();
}
Expand Down Expand Up @@ -372,6 +397,11 @@ private static String getInvariants(JsonNode node)
return invariants == null ? null : invariants.asText();
}

public static Map<String, Map<String, Object>> getColumnsMetadata(MetadataEntry metadataEntry)
{
return getColumnProperties(metadataEntry, node -> OBJECT_MAPPER.convertValue(node.get("metadata"), new TypeReference<>(){}));
}

public static <T> Map<String, T> getColumnProperties(MetadataEntry metadataEntry, Function<JsonNode, T> extractor)
{
return Optional.ofNullable(metadataEntry.getSchemaString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,26 @@ public void testReadingTableWithDeltaColumnInvariant()
.hasMessageContaining("Updates are not supported for tables with delta invariants");
}

@Test
public void testSchemaEvolutionOnTableWithColumnInvariant()
{
String tableName = "test_schema_evolution_on_table_with_column_invariant_" + randomTableSuffix();
hiveMinioDataLake.copyResources("databricks/invariants", tableName);
getQueryRunner().execute(format("CREATE TABLE %s (ignored int) WITH (location = '%s')",
tableName,
getLocationForTable(bucketName, tableName)));

assertThatThrownBy(() -> query("INSERT INTO invariants VALUES(2)"))
.hasMessageContaining("Inserts are not supported for tables with delta invariants");

assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INT");
assertUpdate("COMMENT ON COLUMN " + tableName + ".c IS 'example column comment'");
assertUpdate("COMMENT ON TABLE " + tableName + " IS 'example table comment'");

assertThatThrownBy(() -> query("INSERT INTO " + tableName + " VALUES(2, 2)"))
.hasMessageContaining("Inserts are not supported for tables with delta invariants");
}

@DataProvider
public static Object[][] writesLockInvalidContentsValuesProvider()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void testSerializeSchemaAsJson()
URL expected = getResource("io/trino/plugin/deltalake/transactionlog/schema/nested_schema.json");
ObjectMapper objectMapper = new ObjectMapper();

String jsonEncoding = serializeSchemaAsJson(ImmutableList.of(arrayColumn, structColumn, mapColumn), ImmutableMap.of());
String jsonEncoding = serializeSchemaAsJson(ImmutableList.of(arrayColumn, structColumn, mapColumn), ImmutableMap.of(), ImmutableMap.of(), ImmutableMap.of());
assertEquals(objectMapper.readTree(jsonEncoding), objectMapper.readTree(expected));
}

Expand All @@ -223,7 +223,7 @@ public void testRoundTripComplexSchema()
.map(metadata -> new DeltaLakeColumnHandle(metadata.getName(), metadata.getType(), metadata.getName(), metadata.getType(), REGULAR))
.collect(toImmutableList());
ObjectMapper objectMapper = new ObjectMapper();
assertEquals(objectMapper.readTree(serializeSchemaAsJson(columnHandles, ImmutableMap.of())), objectMapper.readTree(json));
assertEquals(objectMapper.readTree(serializeSchemaAsJson(columnHandles, ImmutableMap.of(), ImmutableMap.of(), ImmutableMap.of())), objectMapper.readTree(json));
}

@Test(dataProvider = "supportedTypes")
Expand Down