diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index d406a694f8e9..62382d7dade0 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableTable; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; import io.airlift.json.JsonCodec; import io.airlift.log.Logger; @@ -36,6 +35,7 @@ import io.trino.plugin.base.filter.UtcConstraintExtractor; import io.trino.plugin.base.projection.ApplyProjectionUtil; import io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode; +import io.trino.plugin.deltalake.DeltaLakeTable.DeltaLakeColumn; import io.trino.plugin.deltalake.expression.ParsingException; import io.trino.plugin.deltalake.expression.SparkExpressionParser; import io.trino.plugin.deltalake.metastore.DeltaLakeMetastore; @@ -171,7 +171,6 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; -import static com.google.common.collect.Maps.filterKeys; import static com.google.common.collect.Sets.difference; import static com.google.common.primitives.Ints.max; import static io.trino.filesystem.Locations.appendPath; @@ -228,14 +227,9 @@ import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractPartitionColumns; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.generateColumnMetadata; -import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getCheckConstraints; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnComments; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnIdentities; -import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnInvariants; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnMappingMode; -import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnTypes; -import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnsMetadata; -import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnsNullability; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getExactColumnNames; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getGeneratedColumnExpressions; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getMaxColumnId; @@ -612,11 +606,8 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect MetadataEntry metadataEntry = tableHandle.getMetadataEntry(); ProtocolEntry protocolEntry = tableHandle.getProtocolEntry(); - List constraints = ImmutableList.builder() - .addAll(getCheckConstraints(metadataEntry, protocolEntry).values()) - .addAll(getColumnInvariants(metadataEntry, protocolEntry).values()) // The internal logic for column invariants in Delta Lake is same as check constraints - .build(); - List columns = getTableColumnMetadata(metadataEntry, protocolEntry); + List columns = getTableColumnMetadata(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry()); + DeltaLakeTable deltaTable = DeltaLakeTable.builder(metadataEntry, protocolEntry).build(); ImmutableMap.Builder properties = ImmutableMap.builder() .put(LOCATION_PROPERTY, tableHandle.getLocation()); @@ -641,7 +632,7 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect columns, properties.buildOrThrow(), Optional.ofNullable(metadataEntry.getDescription()), - constraints.stream() + deltaTable.constraints().stream() .map(constraint -> { try { return SparkExpressionParser.toTrinoExpression(constraint); @@ -655,13 +646,10 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect private List getTableColumnMetadata(MetadataEntry metadataEntry, ProtocolEntry protocolEntry) { - Map columnComments = getColumnComments(metadataEntry); - Map columnsNullability = getColumnsNullability(metadataEntry); - Map columnGenerations = getGeneratedColumnExpressions(metadataEntry); - List columns = getColumns(metadataEntry, protocolEntry).stream() - .map(column -> getColumnMetadata(column, columnComments.get(column.getBaseColumnName()), columnsNullability.getOrDefault(column.getBaseColumnName(), true), columnGenerations.get(column.getBaseColumnName()))) + DeltaLakeTable deltaTable = DeltaLakeTable.builder(metadataEntry, protocolEntry).build(); + return getColumns(metadataEntry, protocolEntry).stream() + .map(column -> getColumnMetadata(deltaTable, column)) .collect(toImmutableList()); - return columns; } @Override @@ -691,15 +679,21 @@ public Map getColumnHandles(ConnectorSession session, Conn public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) { DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle; - DeltaLakeColumnHandle column = (DeltaLakeColumnHandle) columnHandle; - if (column.getProjectionInfo().isPresent()) { - return getColumnMetadata(column, null, true, null); + DeltaLakeTable deltaTable = DeltaLakeTable.builder(table.getMetadataEntry(), table.getProtocolEntry()).build(); + return getColumnMetadata(deltaTable, (DeltaLakeColumnHandle) columnHandle); + } + + private static ColumnMetadata getColumnMetadata(DeltaLakeTable deltaTable, DeltaLakeColumnHandle column) + { + if (column.getProjectionInfo().isPresent() || column.getColumnType() == SYNTHESIZED) { + return getColumnMetadata(column, null, true, Optional.empty()); } + DeltaLakeColumn deltaColumn = deltaTable.findColumn(column.getBaseColumnName()); return getColumnMetadata( column, - getColumnComments(table.getMetadataEntry()).get(column.getBaseColumnName()), - getColumnsNullability(table.getMetadataEntry()).getOrDefault(column.getBaseColumnName(), true), - getGeneratedColumnExpressions(table.getMetadataEntry()).get(column.getBaseColumnName())); + deltaColumn.comment(), + deltaColumn.nullable(), + deltaColumn.generationExpression()); } /** @@ -763,12 +757,7 @@ public Iterator streamTableColumns(ConnectorSession sessio TableSnapshot snapshot = transactionLogAccess.loadSnapshot(session, table, tableLocation); MetadataEntry metadata = transactionLogAccess.getMetadataEntry(snapshot, session); ProtocolEntry protocol = transactionLogAccess.getProtocolEntry(session, snapshot); - Map columnComments = getColumnComments(metadata); - Map columnsNullability = getColumnsNullability(metadata); - Map columnGenerations = getGeneratedColumnExpressions(metadata); - List columnMetadata = getColumns(metadata, protocol).stream() - .map(column -> getColumnMetadata(column, columnComments.get(column.getColumnName()), columnsNullability.getOrDefault(column.getBaseColumnName(), true), columnGenerations.get(column.getBaseColumnName()))) - .collect(toImmutableList()); + List columnMetadata = getTableColumnMetadata(metadata, protocol); return Stream.of(TableColumnsMetadata.forTable(table, columnMetadata)); } catch (NotADeltaLakeTableException | IOException e) { @@ -910,25 +899,20 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe TrinoFileSystem fileSystem = fileSystemFactory.create(session); if (!fileSystem.listFiles(deltaLogDirectory).hasNext()) { validateTableColumns(tableMetadata); - - List partitionColumns = getPartitionedBy(tableMetadata.getProperties()); - ImmutableList.Builder columnNames = ImmutableList.builderWithExpectedSize(tableMetadata.getColumns().size()); - ImmutableMap.Builder columnTypes = ImmutableMap.builderWithExpectedSize(tableMetadata.getColumns().size()); - ImmutableMap.Builder> columnsMetadata = ImmutableMap.builderWithExpectedSize(tableMetadata.getColumns().size()); boolean containsTimestampType = false; + DeltaLakeTable.Builder deltaTable = DeltaLakeTable.builder(); for (ColumnMetadata column : tableMetadata.getColumns()) { - columnNames.add(column.getName()); - columnTypes.put(column.getName(), serializeColumnType(columnMappingMode, fieldId, column.getType())); - columnsMetadata.put(column.getName(), generateColumnMetadata(columnMappingMode, fieldId)); + deltaTable.addColumn( + column.getName(), + serializeColumnType(columnMappingMode, fieldId, column.getType()), + column.isNullable(), + column.getComment(), + generateColumnMetadata(columnMappingMode, fieldId)); if (!containsTimestampType) { containsTimestampType = containsTimestampType(column.getType()); } } - Map columnComments = tableMetadata.getColumns().stream() - .filter(column -> column.getComment() != null) - .collect(toImmutableMap(ColumnMetadata::getName, ColumnMetadata::getComment)); - Map columnsNullability = tableMetadata.getColumns().stream() - .collect(toImmutableMap(ColumnMetadata::getName, ColumnMetadata::isNullable)); + OptionalInt maxFieldId = OptionalInt.empty(); if (columnMappingMode == ID || columnMappingMode == NAME) { maxFieldId = OptionalInt.of(fieldId.get()); @@ -939,12 +923,8 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe 0, transactionLogWriter, randomUUID().toString(), - columnNames.build(), - partitionColumns, - columnTypes.buildOrThrow(), - columnComments, - columnsNullability, - columnsMetadata.buildOrThrow(), + serializeSchemaAsJson(deltaTable.build()), + getPartitionedBy(tableMetadata.getProperties()), configurationForNewTable(checkpointInterval, changeDataFeedEnabled, columnMappingMode, maxFieldId), CREATE_TABLE_OPERATION, session, @@ -1044,19 +1024,12 @@ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, Con boolean usePhysicalName = columnMappingMode == ID || columnMappingMode == NAME; boolean containsTimestampType = false; int columnSize = tableMetadata.getColumns().size(); - ImmutableList.Builder columnNames = ImmutableList.builderWithExpectedSize(columnSize); - ImmutableMap.Builder columnTypes = ImmutableMap.builderWithExpectedSize(columnSize); - ImmutableMap.Builder columnNullabilities = ImmutableMap.builderWithExpectedSize(columnSize); - ImmutableMap.Builder> columnsMetadata = ImmutableMap.builderWithExpectedSize(columnSize); + DeltaLakeTable.Builder deltaTable = DeltaLakeTable.builder(); ImmutableList.Builder columnHandles = ImmutableList.builderWithExpectedSize(columnSize); for (ColumnMetadata column : tableMetadata.getColumns()) { - columnNames.add(column.getName()); - columnNullabilities.put(column.getName(), column.isNullable()); containsTimestampType |= containsTimestampType(column.getType()); - Object serializedType = serializeColumnType(columnMappingMode, fieldId, column.getType()); Type physicalType = deserializeType(typeManager, serializedType, usePhysicalName); - columnTypes.put(column.getName(), serializedType); OptionalInt id; String physicalName; @@ -1075,16 +1048,9 @@ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, Con default -> throw new IllegalArgumentException("Unexpected column mapping mode: " + columnMappingMode); } columnHandles.add(toColumnHandle(column.getName(), column.getType(), id, physicalName, physicalType, partitionedBy)); - columnsMetadata.put(column.getName(), columnMetadata); + deltaTable.addColumn(column.getName(), serializedType, column.isNullable(), column.getComment(), columnMetadata); } - String schemaString = serializeSchemaAsJson( - columnNames.build(), - columnTypes.buildOrThrow(), - ImmutableMap.of(), - columnNullabilities.buildOrThrow(), - columnsMetadata.buildOrThrow()); - OptionalInt maxFieldId = OptionalInt.empty(); if (columnMappingMode == ID || columnMappingMode == NAME) { maxFieldId = OptionalInt.of(fieldId.get()); @@ -1099,7 +1065,7 @@ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, Con external, tableMetadata.getComment(), getChangeDataFeedEnabled(tableMetadata.getProperties()), - schemaString, + serializeSchemaAsJson(deltaTable.build()), columnMappingMode, maxFieldId, protocolEntryForNewTable(containsTimestampType, tableMetadata.getProperties())); @@ -1308,8 +1274,6 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table ProtocolEntry protocolEntry = handle.getProtocolEntry(); checkUnsupportedWriterFeatures(protocolEntry); - ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle); - try { long commitVersion = handle.getReadVersion() + 1; @@ -1319,7 +1283,7 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table transactionLogWriter, handle.getMetadataEntry().getId(), handle.getMetadataEntry().getSchemaString(), - getPartitionedBy(tableMetadata.getProperties()), + handle.getMetadataEntry().getOriginalPartitionColumns(), handle.getMetadataEntry().getConfiguration(), SET_TBLPROPERTIES_OPERATION, session, @@ -1349,23 +1313,17 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl try { long commitVersion = deltaLakeTableHandle.getReadVersion() + 1; - ImmutableMap.Builder columnComments = ImmutableMap.builder(); - columnComments.putAll(getColumnComments(deltaLakeTableHandle.getMetadataEntry()).entrySet().stream() - .filter(e -> !e.getKey().equals(deltaLakeColumnHandle.getBaseColumnName())) - .collect(Collectors.toMap(Entry::getKey, Entry::getValue))); - comment.ifPresent(s -> columnComments.put(deltaLakeColumnHandle.getBaseColumnName(), s)); + DeltaLakeTable deltaTable = DeltaLakeTable.builder(deltaLakeTableHandle.getMetadataEntry(), deltaLakeTableHandle.getProtocolEntry()) + .setColumnComment(deltaLakeColumnHandle.getBaseColumnName(), comment.orElse(null)) + .build(); TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, deltaLakeTableHandle.getLocation()); appendTableEntries( commitVersion, transactionLogWriter, deltaLakeTableHandle.getMetadataEntry().getId(), - getExactColumnNames(deltaLakeTableHandle.getMetadataEntry()), + serializeSchemaAsJson(deltaTable), deltaLakeTableHandle.getMetadataEntry().getOriginalPartitionColumns(), - getColumnTypes(deltaLakeTableHandle.getMetadataEntry()), - columnComments.buildOrThrow(), - getColumnsNullability(deltaLakeTableHandle.getMetadataEntry()), - getColumnsMetadata(deltaLakeTableHandle.getMetadataEntry()), deltaLakeTableHandle.getMetadataEntry().getConfiguration(), CHANGE_COLUMN_OPERATION, session, @@ -1421,26 +1379,20 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle default -> throw new IllegalArgumentException("Unexpected column mapping mode: " + columnMappingMode); }; - List columnNames = ImmutableList.builder() - .addAll(getExactColumnNames(handle.getMetadataEntry())) - .add(newColumnMetadata.getName()) - .build(); ImmutableMap.Builder columnComments = ImmutableMap.builder(); columnComments.putAll(getColumnComments(handle.getMetadataEntry())); if (newColumnMetadata.getComment() != null) { columnComments.put(newColumnMetadata.getName(), newColumnMetadata.getComment()); } - ImmutableMap.Builder columnsNullability = ImmutableMap.builder(); - columnsNullability.putAll(getColumnsNullability(handle.getMetadataEntry())); - columnsNullability.put(newColumnMetadata.getName(), newColumnMetadata.isNullable()); - Map columnTypes = ImmutableMap.builderWithExpectedSize(columnNames.size()) - .putAll(getColumnTypes(handle.getMetadataEntry())) - .put(Map.entry(newColumnMetadata.getName(), serializeColumnType(columnMappingMode, maxColumnId, newColumnMetadata.getType()))) - .buildOrThrow(); - - ImmutableMap.Builder> columnMetadata = ImmutableMap.builder(); - columnMetadata.putAll(getColumnsMetadata(handle.getMetadataEntry())); - columnMetadata.put(newColumnMetadata.getName(), generateColumnMetadata(columnMappingMode, maxColumnId)); + + DeltaLakeTable deltaTable = DeltaLakeTable.builder(handle.getMetadataEntry(), handle.getProtocolEntry()) + .addColumn( + newColumnMetadata.getName(), + serializeColumnType(columnMappingMode, maxColumnId, newColumnMetadata.getType()), + newColumnMetadata.isNullable(), + newColumnMetadata.getComment(), + generateColumnMetadata(columnMappingMode, maxColumnId)) + .build(); Map configuration = new HashMap<>(handle.getMetadataEntry().getConfiguration()); if (columnMappingMode == ID || columnMappingMode == NAME) { @@ -1453,12 +1405,7 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle commitVersion, transactionLogWriter, handle.getMetadataEntry().getId(), - serializeSchemaAsJson( - columnNames, - columnTypes, - columnComments.buildOrThrow(), - columnsNullability.buildOrThrow(), - columnMetadata.buildOrThrow()), + serializeSchemaAsJson(deltaTable), handle.getMetadataEntry().getOriginalPartitionColumns(), configuration, ADD_COLUMN_OPERATION, @@ -1512,22 +1459,21 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl Map physicalColumnNameMapping = columns.stream() .collect(toImmutableMap(DeltaLakeColumnMetadata::getName, DeltaLakeColumnMetadata::getPhysicalName)); - Map columnTypes = filterKeys(getColumnTypes(metadataEntry), name -> !name.equalsIgnoreCase(dropColumnName)); - Map columnComments = filterKeys(getColumnComments(metadataEntry), name -> !name.equalsIgnoreCase(dropColumnName)); - Map columnsNullability = filterKeys(getColumnsNullability(metadataEntry), name -> !name.equalsIgnoreCase(dropColumnName)); - Map> columnMetadata = filterKeys(getColumnsMetadata(metadataEntry), name -> !name.equalsIgnoreCase(dropColumnName)); + DeltaLakeTable deltaTable = DeltaLakeTable.builder(metadataEntry, protocolEntry) + .removeColumn(dropColumnName) + .build(); + if (deltaTable.columns().size() == partitionColumns.size()) { + throw new TrinoException(NOT_SUPPORTED, "Dropping the last non-partition column is unsupported"); + } + try { TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, table.getLocation()); appendTableEntries( commitVersion, transactionLogWriter, metadataEntry.getId(), - columnNames, - partitionColumns, - columnTypes, - columnComments, - columnsNullability, - columnMetadata, + serializeSchemaAsJson(deltaTable), + metadataEntry.getOriginalPartitionColumns(), metadataEntry.getConfiguration(), DROP_COLUMN_OPERATION, session, @@ -1585,33 +1531,17 @@ public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHan .map(columnName -> columnName.equalsIgnoreCase(sourceColumnName) ? newColumnName : columnName) .collect(toImmutableList()); - List columnNames = getExactColumnNames(metadataEntry).stream() - .map(name -> name.equalsIgnoreCase(sourceColumnName) ? newColumnName : name) - .collect(toImmutableList()); - Map columnTypes = getColumnTypes(metadataEntry).entrySet().stream() - .map(column -> column.getKey().equalsIgnoreCase(sourceColumnName) ? Map.entry(newColumnName, column.getValue()) : column) - .collect(toImmutableMap(Entry::getKey, Entry::getValue)); - Map columnComments = getColumnComments(metadataEntry).entrySet().stream() - .map(column -> column.getKey().equalsIgnoreCase(sourceColumnName) ? Map.entry(newColumnName, column.getValue()) : column) - .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); - Map columnsNullability = getColumnsNullability(metadataEntry).entrySet().stream() - .map(column -> column.getKey().equalsIgnoreCase(sourceColumnName) ? Map.entry(newColumnName, column.getValue()) : column) - .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); - Map> columnMetadata = getColumnsMetadata(metadataEntry).entrySet().stream() - .map(column -> column.getKey().equalsIgnoreCase(sourceColumnName) ? Map.entry(newColumnName, column.getValue()) : column) - .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); + DeltaLakeTable deltaTable = DeltaLakeTable.builder(metadataEntry, protocolEntry) + .renameColumn(sourceColumnName, newColumnName) + .build(); try { TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, table.getLocation()); appendTableEntries( commitVersion, transactionLogWriter, metadataEntry.getId(), - columnNames, + serializeSchemaAsJson(deltaTable), partitionColumns, - columnTypes, - columnComments, - columnsNullability, - columnMetadata, metadataEntry.getConfiguration(), RENAME_COLUMN_OPERATION, session, @@ -1638,21 +1568,17 @@ public void dropNotNullConstraint(ConnectorSession session, ConnectorTableHandle checkUnsupportedWriterFeatures(protocolEntry); checkSupportedWriterVersion(table); - Map columnsNullability = Maps.transformEntries( - getColumnsNullability(metadataEntry), - (name, nullable) -> name.equalsIgnoreCase(columnName) ? true : nullable); + DeltaLakeTable deltaTable = DeltaLakeTable.builder(metadataEntry, protocolEntry) + .dropNotNullConstraint(columnName) + .build(); try { TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, table.getLocation()); appendTableEntries( table.getReadVersion() + 1, transactionLogWriter, metadataEntry.getId(), - getExactColumnNames(metadataEntry), + serializeSchemaAsJson(deltaTable), metadataEntry.getOriginalPartitionColumns(), - getColumnTypes(metadataEntry), - getColumnComments(metadataEntry), - columnsNullability, - getColumnsMetadata(metadataEntry), metadataEntry.getConfiguration(), CHANGE_COLUMN_OPERATION, session, @@ -1665,35 +1591,6 @@ public void dropNotNullConstraint(ConnectorSession session, ConnectorTableHandle } } - private void appendTableEntries( - long commitVersion, - TransactionLogWriter transactionLogWriter, - String tableId, - List columnNames, - List partitionColumnNames, - Map columnTypes, - Map columnComments, - Map columnNullability, - Map> columnMetadata, - Map configuration, - String operation, - ConnectorSession session, - Optional comment, - ProtocolEntry protocolEntry) - { - appendTableEntries( - commitVersion, - transactionLogWriter, - tableId, - serializeSchemaAsJson(columnNames, columnTypes, columnComments, columnNullability, columnMetadata), - partitionColumnNames, - configuration, - operation, - session, - comment, - protocolEntry); - } - private void appendTableEntries( long commitVersion, TransactionLogWriter transactionLogWriter, @@ -3621,7 +3518,7 @@ public DeltaLakeMetastore getMetastore() return metastore; } - private static ColumnMetadata getColumnMetadata(DeltaLakeColumnHandle column, @Nullable String comment, boolean nullability, @Nullable String generation) + private static ColumnMetadata getColumnMetadata(DeltaLakeColumnHandle column, @Nullable String comment, boolean nullability, Optional generationExpression) { String columnName; Type columnType; @@ -3640,7 +3537,7 @@ private static ColumnMetadata getColumnMetadata(DeltaLakeColumnHandle column, @N .setHidden(column.getColumnType() == SYNTHESIZED) .setComment(Optional.ofNullable(comment)) .setNullable(nullability) - .setExtraInfo(generation == null ? Optional.empty() : Optional.of("generated: " + generation)) + .setExtraInfo(generationExpression.map(expression -> "generated: " + expression)) .build(); } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTable.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTable.java new file mode 100644 index 000000000000..b1c605cb903d --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTable.java @@ -0,0 +1,165 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import com.google.common.collect.ImmutableList; +import io.trino.plugin.deltalake.transactionlog.MetadataEntry; +import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; +import jakarta.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static com.google.common.collect.MoreCollectors.onlyElement; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getCheckConstraints; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnComments; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnInvariants; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnTypes; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnsMetadata; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnsNullability; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getExactColumnNames; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getGeneratedColumnExpressions; +import static java.util.Objects.requireNonNull; + +public record DeltaLakeTable(List columns, List constraints) +{ + public DeltaLakeTable + { + requireNonNull(columns, "columns is null"); + requireNonNull(constraints, "constraints is null"); + checkArgument(!columns.isEmpty(), "columns must not be empty"); + + columns = ImmutableList.copyOf(columns); + constraints = ImmutableList.copyOf(constraints); + } + + public DeltaLakeColumn findColumn(String name) + { + requireNonNull(name, "name is null"); + return columns.stream().filter(column -> column.name.equals(name)).collect(onlyElement()); + } + + public static Builder builder() + { + return new Builder(); + } + + public static Builder builder(MetadataEntry metadataEntry, ProtocolEntry protocolEntry) + { + return new Builder(metadataEntry, protocolEntry); + } + + public static class Builder + { + private final List columns = new ArrayList<>(); + private final List constraints = new ArrayList<>(); + + public Builder() {} + + public Builder(MetadataEntry metadataEntry, ProtocolEntry protocolEntry) + { + requireNonNull(metadataEntry, "metadataEntry is null"); + + Map columnTypes = getColumnTypes(metadataEntry); + Map columnsNullability = getColumnsNullability(metadataEntry); + Map columnComments = getColumnComments(metadataEntry); + Map> columnsMetadata = getColumnsMetadata(metadataEntry); + Map columnGenerations = getGeneratedColumnExpressions(metadataEntry); + + for (String columnName : getExactColumnNames(metadataEntry)) { + columns.add(new DeltaLakeColumn(columnName, + columnTypes.get(columnName), + columnsNullability.getOrDefault(columnName, true), + columnComments.get(columnName), + columnsMetadata.get(columnName), + Optional.ofNullable(columnGenerations.get(columnName)))); + } + + constraints.addAll(ImmutableList.builder() + .addAll(getCheckConstraints(metadataEntry, protocolEntry).values()) + .addAll(getColumnInvariants(metadataEntry, protocolEntry).values()) // The internal logic for column invariants in Delta Lake is same as check constraints + .build()); + } + + public Builder addColumn(String name, Object type, boolean nullable, @Nullable String comment, @Nullable Map metadata) + { + columns.add(new DeltaLakeColumn(name, type, nullable, comment, metadata, Optional.empty())); + return this; + } + + public Builder renameColumn(String source, String target) + { + checkArgument(columns.stream().noneMatch(column -> column.name.equalsIgnoreCase(target)), "Column already exists: %s", target); + + DeltaLakeColumn column = findColumn(source); + int index = columns.indexOf(column); + verify(index >= 0, "Unexpected column index"); + + DeltaLakeColumn newColumn = new DeltaLakeColumn(target, column.type, column.nullable, column.comment, column.metadata, column.generationExpression); + columns.set(index, newColumn); + return this; + } + + public Builder removeColumn(String name) + { + DeltaLakeColumn column = findColumn(name); + boolean removed = columns.remove(column); + checkState(removed, "Failed to remove '%s' from %s", name, columns); + return this; + } + + public Builder setColumnComment(String name, @Nullable String comment) + { + DeltaLakeColumn oldColumn = findColumn(name); + DeltaLakeColumn newColumn = new DeltaLakeColumn(oldColumn.name, oldColumn.type, oldColumn.nullable, comment, oldColumn.metadata, oldColumn.generationExpression); + columns.set(columns.indexOf(oldColumn), newColumn); + return this; + } + + public Builder dropNotNullConstraint(String name) + { + DeltaLakeColumn oldColumn = findColumn(name); + verify(!oldColumn.nullable, "Column '%s' is already nullable", name); + DeltaLakeColumn newColumn = new DeltaLakeColumn(oldColumn.name, oldColumn.type, true, oldColumn.comment, oldColumn.metadata, oldColumn.generationExpression); + columns.set(columns.indexOf(oldColumn), newColumn); + return this; + } + + private DeltaLakeColumn findColumn(String name) + { + requireNonNull(name, "name is null"); + return columns.stream().filter(column -> column.name.equals(name)).collect(onlyElement()); + } + + public DeltaLakeTable build() + { + return new DeltaLakeTable(columns, constraints); + } + } + + public record DeltaLakeColumn(String name, Object type, boolean nullable, @Nullable String comment, @Nullable Map metadata, Optional generationExpression) + { + public DeltaLakeColumn + { + checkArgument(!name.isEmpty(), "name is empty"); + requireNonNull(type, "type is null"); + requireNonNull(generationExpression, "generationExpression is null"); + } + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java index a7a21bf6e3a9..7416e53a668d 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java @@ -26,6 +26,7 @@ import io.airlift.json.ObjectMapperProvider; import io.trino.plugin.deltalake.DeltaLakeColumnHandle; import io.trino.plugin.deltalake.DeltaLakeColumnMetadata; +import io.trino.plugin.deltalake.DeltaLakeTable; import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics; import io.trino.plugin.hive.util.HiveUtil; import io.trino.spi.Location; @@ -197,52 +198,42 @@ public static List extractPartitionColumns(List columnNames, - Map columnTypes, - Map columnComments, - Map columnNullability, - Map> columnMetadata) + public static String serializeSchemaAsJson(DeltaLakeTable deltaTable) { try { - return OBJECT_MAPPER.writeValueAsString(serializeStructType(columnNames, columnTypes, columnComments, columnNullability, columnMetadata)); + return OBJECT_MAPPER.writeValueAsString(serializeStructType(deltaTable)); } catch (JsonProcessingException e) { throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, getLocation(e), "Failed to encode Delta Lake schema", e); } } - private static Map serializeStructType( - List columnNames, - Map columnTypes, - Map columnComments, - Map columnNullability, - Map> columnMetadata) + private static Map serializeStructType(DeltaLakeTable deltaTable) { // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#struct-type ImmutableMap.Builder schema = ImmutableMap.builder(); schema.put("type", "struct"); - schema.put("fields", columnNames.stream() - .map(columnName -> serializeStructField( - columnName, - columnTypes.get(columnName), - columnComments.get(columnName), - columnNullability.get(columnName), - columnMetadata.get(columnName))) + schema.put("fields", deltaTable.columns().stream() + .map(column -> serializeStructField( + column.name(), + column.type(), + column.comment(), + column.nullable(), + column.metadata())) .collect(toImmutableList())); return schema.buildOrThrow(); } - private static Map serializeStructField(String name, Object type, @Nullable String comment, @Nullable Boolean nullable, @Nullable Map metadata) + private static Map serializeStructField(String name, Object type, @Nullable String comment, boolean nullable, @Nullable Map metadata) { // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#struct-field ImmutableMap.Builder fieldContents = ImmutableMap.builder(); fieldContents.put("name", name); fieldContents.put("type", type); - fieldContents.put("nullable", nullable != null ? nullable : true); + fieldContents.put("nullable", nullable); ImmutableMap.Builder columnMetadata = ImmutableMap.builder(); if (comment != null) { @@ -306,7 +297,7 @@ private static Map serializeStructType(ColumnMappingMode columnM .map(field -> { Object fieldType = serializeColumnType(columnMappingMode, maxColumnId, field.getType()); Map metadata = generateColumnMetadata(columnMappingMode, maxColumnId); - return serializeStructField(field.getName().orElse(null), fieldType, null, null, metadata); + return serializeStructField(field.getName().orElse(null), fieldType, null, true, metadata); }) .collect(toImmutableList())); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java index 3a1b1b0cbf4b..7973946ec100 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java @@ -43,14 +43,12 @@ import java.time.Instant; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.OptionalInt; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static com.google.common.collect.ImmutableList.toImmutableList; -import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.io.MoreFiles.deleteRecursively; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.airlift.concurrent.MoreFutures.getFutureValue; @@ -159,14 +157,11 @@ private static ConnectorPageSink createPageSink(String outputPath, DeltaLakeWrit { HiveTransactionHandle transaction = new HiveTransactionHandle(false); DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig(); - String schemaString = serializeSchemaAsJson( - getColumnHandles().stream().map(DeltaLakeColumnHandle::getColumnName).collect(toImmutableList()), - getColumnHandles().stream() - .map(column -> Map.entry(column.getColumnName(), serializeColumnType(NONE, new AtomicInteger(), column.getType()))) - .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)), - ImmutableMap.of(), - ImmutableMap.of(), - ImmutableMap.of()); + DeltaLakeTable.Builder deltaTable = DeltaLakeTable.builder(); + for (DeltaLakeColumnHandle column : getColumnHandles()) { + deltaTable.addColumn(column.getColumnName(), serializeColumnType(NONE, new AtomicInteger(), column.getType()), true, null, ImmutableMap.of()); + } + String schemaString = serializeSchemaAsJson(deltaTable.build()); DeltaLakeOutputTableHandle tableHandle = new DeltaLakeOutputTableHandle( SCHEMA_NAME, TABLE_NAME, diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestDeltaLakeSchemaSupport.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestDeltaLakeSchemaSupport.java index 6c41a669b538..5f0cc7893992 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestDeltaLakeSchemaSupport.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestDeltaLakeSchemaSupport.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableMap; import io.trino.plugin.deltalake.DeltaLakeColumnHandle; import io.trino.plugin.deltalake.DeltaLakeColumnMetadata; +import io.trino.plugin.deltalake.DeltaLakeTable; import io.trino.plugin.deltalake.TestingComplexTypeManager; import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode; import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeJsonFileStatistics; @@ -235,14 +236,12 @@ public void testSerializeSchemaAsJson() ObjectMapper objectMapper = new ObjectMapper(); List columnHandles = ImmutableList.of(arrayColumn, structColumn, mapColumn); - ImmutableList.Builder columnNames = ImmutableList.builderWithExpectedSize(columnHandles.size()); - ImmutableMap.Builder columnTypes = ImmutableMap.builderWithExpectedSize(columnHandles.size()); + DeltaLakeTable.Builder deltaTable = DeltaLakeTable.builder(); for (DeltaLakeColumnHandle column : columnHandles) { - columnNames.add(column.getColumnName()); - columnTypes.put(column.getColumnName(), serializeColumnType(ColumnMappingMode.NONE, new AtomicInteger(), column.getBaseType())); + deltaTable.addColumn(column.getColumnName(), serializeColumnType(ColumnMappingMode.NONE, new AtomicInteger(), column.getBaseType()), true, null, ImmutableMap.of()); } - String jsonEncoding = serializeSchemaAsJson(columnNames.build(), columnTypes.buildOrThrow(), ImmutableMap.of(), ImmutableMap.of(), ImmutableMap.of()); + String jsonEncoding = serializeSchemaAsJson(deltaTable.build()); assertThat(objectMapper.readTree(jsonEncoding)).isEqualTo(objectMapper.readTree(expected)); } @@ -257,15 +256,13 @@ public void testRoundTripComplexSchema() .map(DeltaLakeColumnMetadata::getColumnMetadata) .collect(toImmutableList()); - ImmutableList.Builder columnNames = ImmutableList.builderWithExpectedSize(schema.size()); - ImmutableMap.Builder columnTypes = ImmutableMap.builderWithExpectedSize(schema.size()); + DeltaLakeTable.Builder deltaTable = DeltaLakeTable.builder(); for (ColumnMetadata column : schema) { - columnNames.add(column.getName()); - columnTypes.put(column.getName(), serializeColumnType(ColumnMappingMode.NONE, new AtomicInteger(), column.getType())); + deltaTable.addColumn(column.getName(), serializeColumnType(ColumnMappingMode.NONE, new AtomicInteger(), column.getType()), true, null, ImmutableMap.of()); } ObjectMapper objectMapper = new ObjectMapper(); - String jsonEncoding = serializeSchemaAsJson(columnNames.build(), columnTypes.buildOrThrow(), ImmutableMap.of(), ImmutableMap.of(), ImmutableMap.of()); + String jsonEncoding = serializeSchemaAsJson(deltaTable.build()); assertThat(objectMapper.readTree(jsonEncoding)).isEqualTo(objectMapper.readTree(expected)); }