diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/UpdatablePageSource.java b/core/trino-spi/src/main/java/io/trino/spi/connector/UpdatablePageSource.java index fc877b7cb0e5..b84c75952d37 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/UpdatablePageSource.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/UpdatablePageSource.java @@ -29,6 +29,12 @@ default void deleteRows(Block rowIds) throw new UnsupportedOperationException("This connector does not support row-level delete"); } + /** + * Write updated rows to the PageSource. + * @param page Contains values for all updated columns, as well as the $row_id column. The order of these Blocks can be derived from columnValueAndRowIdChannels. + * @param columnValueAndRowIdChannels The index of this list matches the index columns have in updatedColumns parameter of {@link ConnectorMetadata#beginUpdate} + * The value at each index is the channel number in the given page. The last element of this list is always the channel number for the $row_id column. + */ default void updateRows(Page page, List columnValueAndRowIdChannels) { throw new UnsupportedOperationException("This connector does not support row update"); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java index afb6b9ad72be..fbd63b7b2c55 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java @@ -541,6 +541,10 @@ public synchronized void replaceTable(String databaseName, String tableName, Tab throw new TrinoException(HIVE_METASTORE_ERROR, "Replacement table must have same name"); } + if (isIcebergTable(table) && !Objects.equals(table.getParameters().get("metadata_location"), newTable.getParameters().get("previous_metadata_location"))) { + throw new TrinoException(HIVE_METASTORE_ERROR, "Cannot update Iceberg table: supplied previous location does not match current location"); + } + Path tableMetadataDirectory = getTableMetadataDirectory(table); writeSchemaFile(TABLE, tableMetadataDirectory, tableCodec, new TableMetadata(currentVersion, newTable), true); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index a9cbcb5072ac..4110f4fbad84 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -294,6 +294,20 @@ public void testUpdateRowConcurrently() .hasMessage("Hive update is only supported for ACID transactional tables"); } + @Override + public void testUpdateWithPredicates() + { + assertThatThrownBy(super::testUpdateWithPredicates) + .hasMessage("Hive update is only supported for ACID transactional tables"); + } + + @Override + public void testUpdateAllValues() + { + assertThatThrownBy(super::testUpdateAllValues) + .hasMessage("Hive update is only supported for ACID transactional tables"); + } + @Override public void testExplainAnalyzeWithDeleteWithSubquery() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java index a50832fdeea6..e8abc130a6ab 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java @@ -34,6 +34,10 @@ public class IcebergColumnHandle implements ColumnHandle { + // Iceberg reserved row ids begin at INTEGER.MAX_VALUE and count down. Starting with MIN_VALUE here to avoid conflicts. + public static final int TRINO_UPDATE_ROW_ID_COLUMN_ID = Integer.MIN_VALUE; + public static final String TRINO_UPDATE_ROW_ID_COLUMN_NAME = "$row_id"; + private final ColumnIdentity baseColumnIdentity; private final Type baseType; // The list of field ids to indicate the projected part of the top-level column represented by baseColumnIdentity @@ -149,6 +153,12 @@ public boolean isRowPositionColumn() return id == ROW_POSITION.fieldId(); } + @JsonIgnore + public boolean isUpdateRowIdColumn() + { + return id == TRINO_UPDATE_ROW_ID_COLUMN_ID; + } + /** * Marker column used by the Iceberg DeleteFilter to indicate rows which are deleted by equality deletes. */ diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 4bc321262b65..19c5b0d3ef20 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -86,7 +86,6 @@ import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; -import org.apache.iceberg.FileContent; import org.apache.iceberg.FileMetadata; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.IsolationLevel; @@ -105,13 +104,16 @@ import org.apache.iceberg.TableScan; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.Deque; @@ -145,6 +147,8 @@ import static io.trino.plugin.hive.HiveApplyProjectionUtil.replaceWithNewVariables; import static io.trino.plugin.hive.util.HiveUtil.isStructuralType; import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression; +import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_UPDATE_ROW_ID_COLUMN_ID; +import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_UPDATE_ROW_ID_COLUMN_NAME; import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnHandle; import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnMetadata; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR; @@ -162,6 +166,7 @@ import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning; import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue; +import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle; import static io.trino.plugin.iceberg.IcebergUtil.getColumns; import static io.trino.plugin.iceberg.IcebergUtil.getFileFormat; import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys; @@ -177,7 +182,6 @@ import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.DELETE_ORPHAN_FILES; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE; -import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.connector.RetryMode.NO_RETRIES; import static java.lang.String.format; @@ -269,6 +273,7 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa name.getTableType(), snapshotId, SchemaParser.toJson(table.schema()), + PartitionSpecParser.toJson(table.spec()), table.operations().current().formatVersion(), TupleDomain.all(), TupleDomain.all(), @@ -276,7 +281,8 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa Optional.ofNullable(nameMappingJson), table.location(), table.properties(), - NO_RETRIES); + NO_RETRIES, + ImmutableList.of()); } @Override @@ -1279,7 +1285,60 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable @Override public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHandle, Collection fragments) { + finishWrite(session, (IcebergTableHandle) tableHandle, fragments, false); + } + + @Override + public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return IcebergUtil.getColumnHandle(ROW_POSITION, typeManager); + } + + @Override + public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTableHandle tableHandle, List updatedColumns, RetryMode retryMode) + { + IcebergTableHandle table = (IcebergTableHandle) tableHandle; + if (table.getFormatVersion() < 2) { + throw new TrinoException(NOT_SUPPORTED, "Iceberg table updates require at least format version 2"); + } + verify(transaction == null, "transaction already set"); + transaction = catalog.loadTable(session, table.getSchemaTableName()).newTransaction(); + return table.withRetryMode(retryMode) + .withUpdatedColumns(updatedColumns.stream() + .map(IcebergColumnHandle.class::cast) + .collect(toImmutableList())); + } + + @Override + public void finishUpdate(ConnectorSession session, ConnectorTableHandle tableHandle, Collection fragments) + { + finishWrite(session, (IcebergTableHandle) tableHandle, fragments, true); + } + + @Override + public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle, List updatedColumns) + { + List unmodifiedColumns = new ArrayList<>(); + unmodifiedColumns.add(ROW_POSITION); + + // Include all the non-updated columns. These are needed when writing the new data file with updated column values. IcebergTableHandle table = (IcebergTableHandle) tableHandle; + Set updatedFields = updatedColumns.stream() + .map(IcebergColumnHandle.class::cast) + .map(IcebergColumnHandle::getId) + .collect(toImmutableSet()); + for (Types.NestedField column : SchemaParser.fromJson(table.getTableSchemaJson()).columns()) { + if (!updatedFields.contains(column.fieldId())) { + unmodifiedColumns.add(column); + } + } + + Types.NestedField icebergRowIdField = Types.NestedField.required(TRINO_UPDATE_ROW_ID_COLUMN_ID, TRINO_UPDATE_ROW_ID_COLUMN_NAME, Types.StructType.of(unmodifiedColumns)); + return getColumnHandle(icebergRowIdField, typeManager); + } + + private void finishWrite(ConnectorSession session, IcebergTableHandle table, Collection fragments, boolean runUpdateValidations) + { Table icebergTable = transaction.table(); List commitTasks = fragments.stream() @@ -1299,32 +1358,56 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan rowDelta.validateNoConflictingDataFiles(); } + if (runUpdateValidations) { + // Ensure a row that is updated by this commit was not deleted by a separate commit + rowDelta.validateDeletedFiles(); + rowDelta.validateNoConflictingDeleteFiles(); + } + ImmutableSet.Builder writtenFiles = ImmutableSet.builder(); ImmutableSet.Builder referencedDataFiles = ImmutableSet.builder(); for (CommitTaskData task : commitTasks) { - if (task.getContent() != FileContent.POSITION_DELETES) { - throw new TrinoException(GENERIC_INTERNAL_ERROR, "Iceberg finishDelete called with commit task that was not a position delete file"); - } PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, task.getPartitionSpecJson()); Type[] partitionColumnTypes = partitionSpec.fields().stream() .map(field -> field.transform().getResultType(icebergTable.schema().findType(field.sourceId()))) .toArray(Type[]::new); - FileMetadata.Builder deleteBuilder = FileMetadata.deleteFileBuilder(partitionSpec) - .withPath(task.getPath()) - .withFormat(task.getFileFormat().toIceberg()) - .ofPositionDeletes() - .withFileSizeInBytes(task.getFileSizeInBytes()) - .withMetrics(task.getMetrics().metrics()); + switch (task.getContent()) { + case POSITION_DELETES: + FileMetadata.Builder deleteBuilder = FileMetadata.deleteFileBuilder(partitionSpec) + .withPath(task.getPath()) + .withFormat(task.getFileFormat().toIceberg()) + .ofPositionDeletes() + .withFileSizeInBytes(task.getFileSizeInBytes()) + .withMetrics(task.getMetrics().metrics()); + + if (!partitionSpec.fields().isEmpty()) { + String partitionDataJson = task.getPartitionDataJson() + .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); + deleteBuilder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes)); + } - if (!partitionSpec.fields().isEmpty()) { - String partitionDataJson = task.getPartitionDataJson() - .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); - deleteBuilder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes)); + rowDelta.addDeletes(deleteBuilder.build()); + writtenFiles.add(task.getPath()); + task.getReferencedDataFile().ifPresent(referencedDataFiles::add); + break; + case DATA: + DataFiles.Builder builder = DataFiles.builder(partitionSpec) + .withPath(task.getPath()) + .withFormat(task.getFileFormat().toIceberg()) + .withFileSizeInBytes(task.getFileSizeInBytes()) + .withMetrics(task.getMetrics().metrics()); + + if (!icebergTable.spec().fields().isEmpty()) { + String partitionDataJson = task.getPartitionDataJson() + .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); + builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes)); + } + rowDelta.addRows(builder.build()); + writtenFiles.add(task.getPath()); + break; + default: + throw new UnsupportedOperationException("Unsupported task content: " + task.getContent()); } - - rowDelta.addDeletes(deleteBuilder.build()); - writtenFiles.add(task.getPath()); - task.getReferencedDataFile().ifPresent(referencedDataFiles::add); } // try to leave as little garbage as possible behind @@ -1333,17 +1416,16 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan } rowDelta.validateDataFilesExist(referencedDataFiles.build()); - rowDelta.commit(); - transaction.commitTransaction(); + try { + rowDelta.commit(); + transaction.commitTransaction(); + } + catch (ValidationException e) { + throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to commit Iceberg update to table: " + table.getSchemaTableName(), e); + } transaction = null; } - @Override - public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle) - { - return IcebergUtil.getColumnHandle(ROW_POSITION, typeManager); - } - @Override public void createView(ConnectorSession session, SchemaTableName viewName, ConnectorViewDefinition definition, boolean replace) { @@ -1452,6 +1534,7 @@ public Optional> applyFilter(C table.getTableType(), table.getSnapshotId(), table.getTableSchemaJson(), + table.getPartitionSpecJson(), table.getFormatVersion(), newUnenforcedConstraint, newEnforcedConstraint, @@ -1459,7 +1542,8 @@ public Optional> applyFilter(C table.getNameMappingJson(), table.getTableLocation(), table.getStorageProperties(), - table.getRetryMode()), + table.getRetryMode(), + table.getUpdatedColumns()), remainingConstraint.transformKeys(ColumnHandle.class::cast), false)); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java index 3059fc4b38d1..4cb070a43b4f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import io.airlift.slice.Slice; import io.trino.plugin.hive.ReaderProjectionsAdapter; import io.trino.plugin.iceberg.delete.IcebergPositionDeletePageSink; @@ -21,11 +22,15 @@ import io.trino.spi.Page; import io.trino.spi.TrinoException; import io.trino.spi.block.Block; +import io.trino.spi.block.ColumnarRow; +import io.trino.spi.block.RowBlock; import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.connector.UpdatablePageSource; import io.trino.spi.type.Type; +import org.apache.iceberg.Schema; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.types.Types; import javax.annotation.Nullable; @@ -33,55 +38,104 @@ import java.io.UncheckedIOException; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; import java.util.function.Supplier; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Throwables.throwIfInstanceOf; +import static com.google.common.base.Verify.verify; import static io.trino.plugin.base.util.Closables.closeAllSuppress; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; public class IcebergPageSource implements UpdatablePageSource { + private final Schema schema; private final Type[] columnTypes; private final int[] expectedColumnIndexes; private final ConnectorPageSource delegate; private final Optional projectionsAdapter; private final Optional> deleteFilter; private final Supplier positionDeleteSinkSupplier; + private final Supplier updatedRowPageSinkSupplier; + // An array with one element per field in the $row_id column. The value in the array points to the + // channel where the data can be read from. + private int[] updateRowIdChildColumnIndexes = new int[0]; + // The $row_id's index in 'expectedColumns', or -1 if there isn't one + private int updateRowIdColumnIndex = -1; + // Maps the Iceberg field ids of unmodified columns to their indexes in updateRowIdChildColumnIndexes + private Map icebergIdToRowIdColumnIndex = ImmutableMap.of(); + // Maps the Iceberg field ids of modified columns to their indexes in the updateColumns columnValueAndRowIdChannels array + private Map icebergIdToUpdatedColumnIndex = ImmutableMap.of(); @Nullable private IcebergPositionDeletePageSink positionDeleteSink; + @Nullable + private IcebergPageSink updatedRowPageSink; public IcebergPageSource( + Schema schema, List expectedColumns, List requiredColumns, + List readColumns, ConnectorPageSource delegate, Optional projectionsAdapter, Optional> deleteFilter, - Supplier positionDeleteSinkSupplier) + Supplier positionDeleteSinkSupplier, + Supplier updatedRowPageSinkSupplier, + List updatedColumns) { + this.schema = requireNonNull(schema, "schema is null"); // expectedColumns should contain columns which should be in the final Page // requiredColumns should include all expectedColumns as well as any columns needed by the DeleteFilter requireNonNull(expectedColumns, "expectedColumns is null"); requireNonNull(requiredColumns, "requiredColumns is null"); this.expectedColumnIndexes = new int[expectedColumns.size()]; for (int i = 0; i < expectedColumns.size(); i++) { - checkArgument(expectedColumns.get(i).equals(requiredColumns.get(i)), "Expected columns must be a prefix of required columns"); + IcebergColumnHandle expectedColumn = expectedColumns.get(i); + checkArgument(expectedColumn.equals(requiredColumns.get(i)), "Expected columns must be a prefix of required columns"); expectedColumnIndexes[i] = i; + + if (expectedColumn.isUpdateRowIdColumn()) { + this.updateRowIdColumnIndex = i; + + Map fieldIdToColumnIndex = mapFieldIdsToIndex(requiredColumns); + List rowIdFields = expectedColumn.getColumnIdentity().getChildren(); + ImmutableMap.Builder fieldIdToRowIdIndex = ImmutableMap.builder(); + this.updateRowIdChildColumnIndexes = new int[rowIdFields.size()]; + for (int columnIndex = 0; columnIndex < rowIdFields.size(); columnIndex++) { + int fieldId = rowIdFields.get(columnIndex).getId(); + updateRowIdChildColumnIndexes[columnIndex] = requireNonNull(fieldIdToColumnIndex.get(fieldId), () -> format("Column %s not found in requiredColumns", fieldId)); + fieldIdToRowIdIndex.put(fieldId, columnIndex); + } + this.icebergIdToRowIdColumnIndex = fieldIdToRowIdIndex.buildOrThrow(); + } } - this.columnTypes = requiredColumns.stream() + this.columnTypes = readColumns.stream() .map(IcebergColumnHandle::getType) .toArray(Type[]::new); this.delegate = requireNonNull(delegate, "delegate is null"); this.projectionsAdapter = requireNonNull(projectionsAdapter, "projectionsAdapter is null"); this.deleteFilter = requireNonNull(deleteFilter, "deleteFilter is null"); this.positionDeleteSinkSupplier = requireNonNull(positionDeleteSinkSupplier, "positionDeleteSinkSupplier is null"); + this.updatedRowPageSinkSupplier = requireNonNull(updatedRowPageSinkSupplier, "updatedRowPageSinkSupplier is null"); + requireNonNull(updatedColumns, "updatedColumnFieldIds is null"); + if (!updatedColumns.isEmpty()) { + ImmutableMap.Builder icebergIdToUpdatedColumnIndex = ImmutableMap.builder(); + for (int columnIndex = 0; columnIndex < updatedColumns.size(); columnIndex++) { + IcebergColumnHandle updatedColumn = updatedColumns.get(columnIndex); + icebergIdToUpdatedColumnIndex.put(updatedColumn.getId(), columnIndex); + } + this.icebergIdToUpdatedColumnIndex = icebergIdToUpdatedColumnIndex.buildOrThrow(); + } } @Override @@ -113,9 +167,6 @@ public Page getNextPage() { try { Page dataPage = delegate.getNextPage(); - if (projectionsAdapter.isPresent()) { - dataPage = projectionsAdapter.get().adaptPage(dataPage); - } if (dataPage == null) { return null; } @@ -129,13 +180,20 @@ public Page getNextPage() positionsToKeep[positionsToKeepCount] = rowToKeep.getPosition(); positionsToKeepCount++; } - dataPage = dataPage.getPositions(positionsToKeep, 0, positionsToKeepCount).getColumns(expectedColumnIndexes); + dataPage = dataPage.getPositions(positionsToKeep, 0, positionsToKeepCount); } catch (IOException e) { throw new TrinoException(ICEBERG_BAD_DATA, "Failed to filter rows during merge-on-read operation", e); } } + if (projectionsAdapter.isPresent()) { + dataPage = projectionsAdapter.get().adaptPage(dataPage); + } + + dataPage = setUpdateRowIdBlock(dataPage); + dataPage = dataPage.getColumns(expectedColumnIndexes); + return dataPage; } catch (RuntimeException e) { @@ -145,22 +203,96 @@ public Page getNextPage() } } + /** + * The $row_id column used for updates is a composite column of at least one other column in the Page. + * The indexes of the columns needed for the $row_id are in the updateRowIdChildColumnIndexes array. + * @param page The raw Page from the Parquet/ORC reader. + * @return A Page where the $row_id channel has been populated. + */ + private Page setUpdateRowIdBlock(Page page) + { + if (updateRowIdColumnIndex == -1) { + return page; + } + + Block[] rowIdFields = new Block[updateRowIdChildColumnIndexes.length]; + for (int childIndex = 0; childIndex < updateRowIdChildColumnIndexes.length; childIndex++) { + rowIdFields[childIndex] = page.getBlock(updateRowIdChildColumnIndexes[childIndex]); + } + + Block[] fullPage = new Block[page.getChannelCount()]; + for (int channel = 0; channel < page.getChannelCount(); channel++) { + if (channel == updateRowIdColumnIndex) { + fullPage[channel] = RowBlock.fromFieldBlocks(page.getPositionCount(), Optional.empty(), rowIdFields); + continue; + } + + fullPage[channel] = page.getBlock(channel); + } + + return new Page(page.getPositionCount(), fullPage); + } + @Override public void deleteRows(Block rowIds) { if (positionDeleteSink == null) { positionDeleteSink = positionDeleteSinkSupplier.get(); + verify(positionDeleteSink != null); } positionDeleteSink.appendPage(new Page(rowIds)); } + @Override + public void updateRows(Page page, List columnValueAndRowIdChannels) + { + int rowIdChannel = columnValueAndRowIdChannels.get(columnValueAndRowIdChannels.size() - 1); + List columnChannelMapping = columnValueAndRowIdChannels.subList(0, columnValueAndRowIdChannels.size() - 1); + + if (positionDeleteSink == null) { + positionDeleteSink = positionDeleteSinkSupplier.get(); + verify(positionDeleteSink != null); + } + if (updatedRowPageSink == null) { + updatedRowPageSink = updatedRowPageSinkSupplier.get(); + verify(updatedRowPageSink != null); + } + + ColumnarRow rowIdColumns = ColumnarRow.toColumnarRow(page.getBlock(rowIdChannel)); + positionDeleteSink.appendPage(new Page(rowIdColumns.getField(0))); + + List columns = schema.columns(); + Block[] fullPage = new Block[columns.size()]; + Set updatedColumnFieldIds = icebergIdToUpdatedColumnIndex.keySet(); + + for (int targetChannel = 0; targetChannel < columns.size(); targetChannel++) { + Types.NestedField column = columns.get(targetChannel); + if (updatedColumnFieldIds.contains(column.fieldId())) { + fullPage[targetChannel] = page.getBlock(columnChannelMapping.get(icebergIdToUpdatedColumnIndex.get(column.fieldId()))); + } + else { + // Plus one because the first field is the row position column + fullPage[targetChannel] = rowIdColumns.getField(icebergIdToRowIdColumnIndex.get(column.fieldId())); + } + } + + updatedRowPageSink.appendPage(new Page(page.getPositionCount(), fullPage)); + } + @Override public CompletableFuture> finish() { + CompletableFuture> fragments = CompletableFuture.completedFuture(ImmutableList.of()); + BiFunction, Collection, Collection> combineSliceCollections = (collection1, collection2) -> + ImmutableList.builder().addAll(collection1).addAll(collection2).build(); + if (positionDeleteSink != null) { - return positionDeleteSink.finish(); + fragments = fragments.thenCombine(positionDeleteSink.finish(), combineSliceCollections); } - return CompletableFuture.completedFuture(ImmutableList.of()); + if (updatedRowPageSink != null) { + fragments = fragments.thenCombine(updatedRowPageSink.finish(), combineSliceCollections); + } + return fragments; } @Override @@ -169,6 +301,9 @@ public void abort() if (positionDeleteSink != null) { positionDeleteSink.abort(); } + if (updatedRowPageSink != null) { + updatedRowPageSink.abort(); + } } @Override @@ -202,4 +337,13 @@ protected void closeWithSuppression(Throwable throwable) { closeAllSuppress(throwable, this); } + + private static Map mapFieldIdsToIndex(List columns) + { + ImmutableMap.Builder fieldIdsToIndex = ImmutableMap.builder(); + for (int i = 0; i < columns.size(); i++) { + fieldIdsToIndex.put(columns.get(i).getId(), i); + } + return fieldIdsToIndex.buildOrThrow(); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index 1f0ba2ae3b81..730999b553ff 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -55,6 +55,7 @@ import io.trino.plugin.iceberg.delete.IcebergPositionDeletePageSink; import io.trino.plugin.iceberg.delete.TrinoDeleteFilter; import io.trino.plugin.iceberg.delete.TrinoRow; +import io.trino.spi.PageIndexerFactory; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorPageSource; @@ -148,6 +149,7 @@ import static io.trino.plugin.iceberg.IcebergSessionProperties.isUseFileSizeFromMetadata; import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD; import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue; +import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle; import static io.trino.plugin.iceberg.IcebergUtil.getColumns; import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider; import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys; @@ -166,6 +168,7 @@ import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toUnmodifiableList; +import static org.apache.iceberg.MetadataColumns.ROW_POSITION; import static org.joda.time.DateTimeZone.UTC; public class IcebergPageSourceProvider @@ -179,6 +182,8 @@ public class IcebergPageSourceProvider private final FileIoProvider fileIoProvider; private final JsonCodec jsonCodec; private final IcebergFileWriterFactory fileWriterFactory; + private final PageIndexerFactory pageIndexerFactory; + private final int maxOpenPartitions; @Inject public IcebergPageSourceProvider( @@ -189,7 +194,9 @@ public IcebergPageSourceProvider( TypeManager typeManager, FileIoProvider fileIoProvider, JsonCodec jsonCodec, - IcebergFileWriterFactory fileWriterFactory) + IcebergFileWriterFactory fileWriterFactory, + PageIndexerFactory pageIndexerFactory, + IcebergConfig icebergConfig) { this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.fileFormatDataSourceStats = requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null"); @@ -199,6 +206,9 @@ public IcebergPageSourceProvider( this.fileIoProvider = requireNonNull(fileIoProvider, "fileIoProvider is null"); this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null"); + this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null"); + requireNonNull(icebergConfig, "icebergConfig is null"); + this.maxOpenPartitions = icebergConfig.getMaxPartitionsPerWriter(); } @Override @@ -237,12 +247,27 @@ public ConnectorPageSource createPageSource( PartitionData partitionData = PartitionData.fromJson(split.getPartitionDataJson(), partitionColumnTypes); Map> partitionKeys = getPartitionKeys(partitionData, partitionSpec); - ImmutableList.Builder requiredColumnsBuilder = ImmutableList.builder(); - requiredColumnsBuilder.addAll(icebergColumns); + List requiredColumns = new ArrayList<>(icebergColumns); deleteFilterRequiredSchema.stream() .filter(column -> !icebergColumns.contains(column)) - .forEach(requiredColumnsBuilder::add); - List requiredColumns = requiredColumnsBuilder.build(); + .forEach(requiredColumns::add); + icebergColumns.stream() + .filter(IcebergColumnHandle::isUpdateRowIdColumn) + .findFirst().ifPresent(updateRowIdColumn -> { + Set alreadyRequiredColumnIds = requiredColumns.stream() + .map(IcebergColumnHandle::getId) + .collect(toImmutableSet()); + for (ColumnIdentity requiredColumnIdentity : updateRowIdColumn.getColumnIdentity().getChildren()) { + if (!alreadyRequiredColumnIds.contains(requiredColumnIdentity.getId())) { + if (requiredColumnIdentity.getId() == ROW_POSITION.fieldId()) { + requiredColumns.add(new IcebergColumnHandle(requiredColumnIdentity, BIGINT, ImmutableList.of(), BIGINT, Optional.empty())); + } + else { + requiredColumns.add(getColumnHandle(tableSchema.findField(requiredColumnIdentity.getId()), typeManager)); + } + } + } + }); TupleDomain effectivePredicate = table.getUnenforcedPredicate() .intersect(dynamicFilter.getCurrentPredicate().transformKeys(IcebergColumnHandle.class::cast)) @@ -271,10 +296,13 @@ public ConnectorPageSource createPageSource( column -> ((IcebergColumnHandle) column).getType(), IcebergPageSourceProvider::applyProjection)); + List readColumns = dataPageSource.getReaderColumns() + .map(readerColumns -> readerColumns.get().stream().map(IcebergColumnHandle.class::cast).collect(toList())) + .orElse(requiredColumns); DeleteFilter deleteFilter = new TrinoDeleteFilter( dummyFileScanTask, tableSchema, - requiredColumns, + readColumns, fileIO); Optional partition = partitionSpec.isUnpartitioned() ? Optional.empty() : Optional.of(partitionData); @@ -291,13 +319,32 @@ public ConnectorPageSource createPageSource( session, split.getFileFormat()); + Supplier updatedRowPageSinkSupplier = () -> new IcebergPageSink( + tableSchema, + PartitionSpecParser.fromJson(tableSchema, table.getPartitionSpecJson()), + locationProvider, + fileWriterFactory, + pageIndexerFactory, + hdfsEnvironment, + hdfsContext, + tableSchema.columns().stream().map(column -> getColumnHandle(column, typeManager)).collect(toImmutableList()), + jsonCodec, + session, + split.getFileFormat(), + table.getStorageProperties(), + maxOpenPartitions); + return new IcebergPageSource( + tableSchema, icebergColumns, requiredColumns, + readColumns, dataPageSource.get(), projectionsAdapter, Optional.of(deleteFilter).filter(filter -> filter.hasPosDeletes() || filter.hasEqDeletes()), - positionDeleteSink); + positionDeleteSink, + updatedRowPageSinkSupplier, + table.getUpdatedColumns()); } private ReaderPageSource createDataPageSource( @@ -444,6 +491,10 @@ else if (partitionKeys.containsKey(column.getId())) { else if (column.isPathColumn()) { columnAdaptations.add(ColumnAdaptation.constantColumn(nativeValueToBlock(FILE_PATH.getType(), utf8Slice(path.toString())))); } + else if (column.isUpdateRowIdColumn()) { + // $row_id is a composite of multiple physical columns. It is assembled by the IcebergPageSource + columnAdaptations.add(ColumnAdaptation.nullColumn(column.getType())); + } else if (column.isRowPositionColumn()) { columnAdaptations.add(ColumnAdaptation.positionColumn()); } @@ -643,6 +694,11 @@ public IdBasedFieldMapperFactory(List columns) ImmutableMap.Builder> mapping = ImmutableMap.builder(); for (IcebergColumnHandle column : columns) { + if (column.isUpdateRowIdColumn()) { + // The update $row_id column contains fields which should not be accounted for in the mapping. + continue; + } + // Recursively compute subfield name to id mapping for every column populateMapping(column.getColumnIdentity(), mapping); } @@ -794,6 +850,14 @@ else if (partitionKeys.containsKey(column.getId())) { else if (column.isPathColumn()) { constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_PATH.getType(), utf8Slice(path.toString()))); } + else if (column.isUpdateRowIdColumn()) { + // $row_id is a composite of multiple physical columns, it is assembled by the IcebergPageSource + trinoTypes.add(column.getType()); + internalFields.add(Optional.empty()); + rowIndexChannels.add(false); + constantPopulatingPageSourceBuilder.addDelegateColumn(parquetSourceChannel); + parquetSourceChannel++; + } else if (column.isRowPositionColumn()) { trinoTypes.add(BIGINT); internalFields.add(Optional.empty()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java index 016dd30bc480..e2076e529a2d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java @@ -16,6 +16,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.airlift.units.DataSize; @@ -24,6 +25,7 @@ import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.TupleDomain; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; @@ -40,11 +42,15 @@ public class IcebergTableHandle private final TableType tableType; private final Optional snapshotId; private final String tableSchemaJson; + private final String partitionSpecJson; private final int formatVersion; private final String tableLocation; private final Map storageProperties; private final RetryMode retryMode; + // UPDATE only + private final List updatedColumns; + // Filter used during split generation and table scan, but not required to be strictly enforced by Iceberg Connector private final TupleDomain unenforcedPredicate; @@ -65,6 +71,7 @@ public IcebergTableHandle( @JsonProperty("tableType") TableType tableType, @JsonProperty("snapshotId") Optional snapshotId, @JsonProperty("tableSchemaJson") String tableSchemaJson, + @JsonProperty("partitionSpecJson") String partitionSpecJson, @JsonProperty("formatVersion") int formatVersion, @JsonProperty("unenforcedPredicate") TupleDomain unenforcedPredicate, @JsonProperty("enforcedPredicate") TupleDomain enforcedPredicate, @@ -72,7 +79,8 @@ public IcebergTableHandle( @JsonProperty("nameMappingJson") Optional nameMappingJson, @JsonProperty("tableLocation") String tableLocation, @JsonProperty("storageProperties") Map storageProperties, - @JsonProperty("retryMode") RetryMode retryMode) + @JsonProperty("retryMode") RetryMode retryMode, + @JsonProperty("updatedColumns") List updatedColumns) { this( schemaName, @@ -80,6 +88,7 @@ public IcebergTableHandle( tableType, snapshotId, tableSchemaJson, + partitionSpecJson, formatVersion, unenforcedPredicate, enforcedPredicate, @@ -88,6 +97,7 @@ public IcebergTableHandle( tableLocation, storageProperties, retryMode, + updatedColumns, false, Optional.empty()); } @@ -98,6 +108,7 @@ public IcebergTableHandle( TableType tableType, Optional snapshotId, String tableSchemaJson, + String partitionSpecJson, int formatVersion, TupleDomain unenforcedPredicate, TupleDomain enforcedPredicate, @@ -106,6 +117,7 @@ public IcebergTableHandle( String tableLocation, Map storageProperties, RetryMode retryMode, + List updatedColumns, boolean recordScannedFiles, Optional maxScannedFileSize) { @@ -114,6 +126,7 @@ public IcebergTableHandle( this.tableType = requireNonNull(tableType, "tableType is null"); this.snapshotId = requireNonNull(snapshotId, "snapshotId is null"); this.tableSchemaJson = requireNonNull(tableSchemaJson, "schemaJson is null"); + this.partitionSpecJson = requireNonNull(partitionSpecJson, "partitionSpecJson is null"); this.formatVersion = formatVersion; this.unenforcedPredicate = requireNonNull(unenforcedPredicate, "unenforcedPredicate is null"); this.enforcedPredicate = requireNonNull(enforcedPredicate, "enforcedPredicate is null"); @@ -122,6 +135,7 @@ public IcebergTableHandle( this.tableLocation = requireNonNull(tableLocation, "tableLocation is null"); this.storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null")); this.retryMode = requireNonNull(retryMode, "retryMode is null"); + this.updatedColumns = ImmutableList.copyOf(requireNonNull(updatedColumns, "updatedColumns is null")); this.recordScannedFiles = recordScannedFiles; this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null"); } @@ -156,6 +170,12 @@ public String getTableSchemaJson() return tableSchemaJson; } + @JsonProperty + public String getPartitionSpecJson() + { + return partitionSpecJson; + } + @JsonProperty public int getFormatVersion() { @@ -204,6 +224,12 @@ public RetryMode getRetryMode() return retryMode; } + @JsonProperty + public List getUpdatedColumns() + { + return updatedColumns; + } + @JsonIgnore public boolean isRecordScannedFiles() { @@ -234,6 +260,7 @@ public IcebergTableHandle withProjectedColumns(Set projecte tableType, snapshotId, tableSchemaJson, + partitionSpecJson, formatVersion, unenforcedPredicate, enforcedPredicate, @@ -242,6 +269,7 @@ public IcebergTableHandle withProjectedColumns(Set projecte tableLocation, storageProperties, retryMode, + updatedColumns, recordScannedFiles, maxScannedFileSize); } @@ -254,6 +282,29 @@ public IcebergTableHandle withRetryMode(RetryMode retryMode) tableType, snapshotId, tableSchemaJson, + partitionSpecJson, + formatVersion, + unenforcedPredicate, + enforcedPredicate, + projectedColumns, + nameMappingJson, + tableLocation, + storageProperties, + retryMode, + updatedColumns, + recordScannedFiles, + maxScannedFileSize); + } + + public IcebergTableHandle withUpdatedColumns(List updatedColumns) + { + return new IcebergTableHandle( + schemaName, + tableName, + tableType, + snapshotId, + tableSchemaJson, + partitionSpecJson, formatVersion, unenforcedPredicate, enforcedPredicate, @@ -262,6 +313,7 @@ public IcebergTableHandle withRetryMode(RetryMode retryMode) tableLocation, storageProperties, retryMode, + updatedColumns, recordScannedFiles, maxScannedFileSize); } @@ -274,6 +326,7 @@ public IcebergTableHandle forOptimize(boolean recordScannedFiles, DataSize maxSc tableType, snapshotId, tableSchemaJson, + partitionSpecJson, formatVersion, unenforcedPredicate, enforcedPredicate, @@ -282,6 +335,7 @@ public IcebergTableHandle forOptimize(boolean recordScannedFiles, DataSize maxSc tableLocation, storageProperties, retryMode, + updatedColumns, recordScannedFiles, Optional.of(maxScannedFileSize)); } @@ -303,6 +357,7 @@ public boolean equals(Object o) tableType == that.tableType && Objects.equals(snapshotId, that.snapshotId) && Objects.equals(tableSchemaJson, that.tableSchemaJson) && + Objects.equals(partitionSpecJson, that.partitionSpecJson) && formatVersion == that.formatVersion && Objects.equals(unenforcedPredicate, that.unenforcedPredicate) && Objects.equals(enforcedPredicate, that.enforcedPredicate) && @@ -310,6 +365,7 @@ public boolean equals(Object o) Objects.equals(nameMappingJson, that.nameMappingJson) && Objects.equals(tableLocation, that.tableLocation) && Objects.equals(retryMode, that.retryMode) && + Objects.equals(updatedColumns, that.updatedColumns) && Objects.equals(storageProperties, that.storageProperties) && Objects.equals(maxScannedFileSize, that.maxScannedFileSize); } @@ -317,8 +373,8 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(schemaName, tableName, tableType, snapshotId, tableSchemaJson, formatVersion, unenforcedPredicate, enforcedPredicate, - projectedColumns, nameMappingJson, tableLocation, storageProperties, retryMode, recordScannedFiles, maxScannedFileSize); + return Objects.hash(schemaName, tableName, tableType, snapshotId, tableSchemaJson, partitionSpecJson, formatVersion, unenforcedPredicate, enforcedPredicate, + projectedColumns, nameMappingJson, tableLocation, storageProperties, retryMode, updatedColumns, recordScannedFiles, maxScannedFileSize); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java index 5432ca6d952c..fc7a47eb328b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java @@ -71,6 +71,12 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) // todo privileges should not be replaced for an alter PrincipalPrivileges privileges = table.getOwner().map(MetastoreUtil::buildInitialPrivilegeSet).orElse(NO_PRIVILEGES); - metastore.replaceTable(database, tableName, table, privileges); + + try { + metastore.replaceTable(database, tableName, table, privileges); + } + catch (RuntimeException e) { + throw new CommitFailedException(e, "Failed to commit transaction to FileHiveMetastore"); + } } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/TrinoDeleteFilter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/TrinoDeleteFilter.java index d4bae5433baf..9e94637dca66 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/TrinoDeleteFilter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/TrinoDeleteFilter.java @@ -20,16 +20,17 @@ import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import java.util.List; -import java.util.Optional; -import java.util.Set; import static com.google.common.collect.ImmutableList.toImmutableList; -import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_UPDATE_ROW_ID_COLUMN_ID; +import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_UPDATE_ROW_ID_COLUMN_NAME; import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.MetadataColumns.FILE_PATH; +import static org.apache.iceberg.MetadataColumns.IS_DELETED; +import static org.apache.iceberg.MetadataColumns.ROW_POSITION; public class TrinoDeleteFilter extends DeleteFilter @@ -38,7 +39,7 @@ public class TrinoDeleteFilter public TrinoDeleteFilter(FileScanTask task, Schema tableSchema, List requestedColumns, FileIO fileIO) { - super(task, tableSchema, filterSchema(tableSchema, requestedColumns)); + super(task, tableSchema, toSchema(tableSchema, requestedColumns)); this.fileIO = requireNonNull(fileIO, "fileIO is null"); } @@ -54,43 +55,26 @@ protected InputFile getInputFile(String s) return fileIO.newInputFile(s); } - private static Schema filterSchema(Schema tableSchema, List requestedColumns) + private static Schema toSchema(Schema tableSchema, List requestedColumns) { - Set requestedFieldIds = requestedColumns.stream() - .map(IcebergColumnHandle::getId) - .collect(toImmutableSet()); - return new Schema(filterFieldList(tableSchema.columns(), requestedFieldIds)); + return new Schema(requestedColumns.stream().map(column -> toNestedField(tableSchema, column)).collect(toImmutableList())); } - private static List filterFieldList(List fields, Set requestedFieldIds) + private static Types.NestedField toNestedField(Schema tableSchema, IcebergColumnHandle columnHandle) { - return fields.stream() - .map(field -> filterField(field, requestedFieldIds)) - .filter(Optional::isPresent) - .map(Optional::get) - .collect(toImmutableList()); - } - - private static Optional filterField(Types.NestedField field, Set requestedFieldIds) - { - Type fieldType = field.type(); - if (requestedFieldIds.contains(field.fieldId())) { - return Optional.of(field); + if (columnHandle.isRowPositionColumn()) { + return ROW_POSITION; } - - if (fieldType.isStructType()) { - List requiredChildren = filterFieldList(fieldType.asStructType().fields(), requestedFieldIds); - if (requiredChildren.isEmpty()) { - return Optional.empty(); - } - return Optional.of(Types.NestedField.of( - field.fieldId(), - field.isOptional(), - field.name(), - Types.StructType.of(requiredChildren), - field.doc())); + if (columnHandle.isIsDeletedColumn()) { + return IS_DELETED; + } + if (columnHandle.isPathColumn()) { + return FILE_PATH; + } + if (columnHandle.isUpdateRowIdColumn()) { + return Types.NestedField.of(TRINO_UPDATE_ROW_ID_COLUMN_ID, false, TRINO_UPDATE_ROW_ID_COLUMN_NAME, Types.StructType.of()); } - return Optional.empty(); + return tableSchema.findField(columnHandle.getId()); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java index e117e2a3c4f1..cf2160084c4b 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java @@ -59,6 +59,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) return true; case SUPPORTS_DELETE: + case SUPPORTS_UPDATE: return true; default: return super.hasBehavior(connectorBehavior); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 698b1c88ecc4..a1977e643a52 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -155,12 +155,19 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) return false; case SUPPORTS_DELETE: + case SUPPORTS_UPDATE: return true; default: return super.hasBehavior(connectorBehavior); } } + @Override + protected void verifyConcurrentUpdateFailurePermissible(Exception e) + { + assertThat(e).hasMessageContaining("Failed to commit Iceberg update to table"); + } + @Test public void testDeleteOnV1Table() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergFailureRecoveryTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergFailureRecoveryTest.java index 7ff57fd39f65..ae44be3daafb 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergFailureRecoveryTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergFailureRecoveryTest.java @@ -136,7 +136,7 @@ public void testDelete() .withCleanupQuery(cleanupQuery) .experiencing(TASK_MANAGEMENT_REQUEST_TIMEOUT) .at(boundaryDistributedStage()) - .failsWithoutRetries(failure -> failure.hasMessageContaining("Encountered too many errors talking to a worker node")) + .failsWithoutRetries(failure -> failure.hasMessageFindingMatch("Encountered too many errors talking to a worker node|Error closing remote buffer")) .finishesSuccessfully(); assertThatQuery(deleteQuery) @@ -144,22 +144,92 @@ public void testDelete() .withCleanupQuery(cleanupQuery) .experiencing(TASK_GET_RESULTS_REQUEST_TIMEOUT) .at(boundaryDistributedStage()) - .failsWithoutRetries(failure -> failure.hasMessageContaining("Encountered too many errors talking to a worker node")) + .failsWithoutRetries(failure -> failure.hasMessageFindingMatch("Encountered too many errors talking to a worker node|Error closing remote buffer")) .finishesSuccessfully(); } + // Copied from BaseDeltaFailureRecoveryTest @Override public void testUpdate() { - assertThatThrownBy(super::testUpdate) - .hasMessageContaining("This connector does not support updates"); - } + // Test method is overriden because method from superclass assumes more complex plan for `UPDATE` query. + // Assertions do not play well if plan consists of just two fragments. - @Override - public void testUpdateWithSubquery() - { - assertThatThrownBy(super::testUpdateWithSubquery) - .hasMessageContaining("This connector does not support updates"); + Optional setupQuery = Optional.of("CREATE TABLE AS SELECT * FROM orders"); + Optional cleanupQuery = Optional.of("DROP TABLE
"); + String updateQuery = "UPDATE
SET shippriority = 101 WHERE custkey = 1"; + + assertThatQuery(updateQuery) + .withSetupQuery(setupQuery) + .withCleanupQuery(cleanupQuery) + .experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR)) + .at(boundaryCoordinatorStage()) + .failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE)); + + assertThatQuery(updateQuery) + .withSetupQuery(setupQuery) + .withCleanupQuery(cleanupQuery) + .experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR)) + .at(rootStage()) + .failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE)); + + assertThatQuery(updateQuery) + .withSetupQuery(setupQuery) + .withCleanupQuery(cleanupQuery) + .experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR)) + .at(leafStage()) + .failsWithoutRetries(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE)) + .finishesSuccessfully(); + + assertThatQuery(updateQuery) + .withSetupQuery(setupQuery) + .withCleanupQuery(cleanupQuery) + .experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR)) + .at(boundaryDistributedStage()) + .failsWithoutRetries(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE)) + .finishesSuccessfully(); + + // UPDATE plan is too simplistic for testing with `intermediateDistributedStage` + assertThatThrownBy(() -> + assertThatQuery(updateQuery) + .withSetupQuery(setupQuery) + .withCleanupQuery(cleanupQuery) + .experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR)) + .at(intermediateDistributedStage()) + .failsWithoutRetries(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE))) + .hasMessageContaining("stage not found"); + + assertThatQuery(updateQuery) + .withSetupQuery(setupQuery) + .withCleanupQuery(cleanupQuery) + .experiencing(TASK_MANAGEMENT_REQUEST_FAILURE) + .at(boundaryDistributedStage()) + .failsWithoutRetries(failure -> failure.hasMessageFindingMatch("Error 500 Internal Server Error|Error closing remote buffer, expected 204 got 500")) + .finishesSuccessfully(); + + assertThatQuery(updateQuery) + .withSetupQuery(setupQuery) + .withCleanupQuery(cleanupQuery) + .experiencing(TASK_GET_RESULTS_REQUEST_FAILURE) + .at(boundaryDistributedStage()) + .failsWithoutRetries(failure -> failure.hasMessageFindingMatch("Error 500 Internal Server Error|Error closing remote buffer, expected 204 got 500")) + .finishesSuccessfully(); + + assertThatQuery(updateQuery) + .withSetupQuery(setupQuery) + .withCleanupQuery(cleanupQuery) + .experiencing(TASK_MANAGEMENT_REQUEST_TIMEOUT) + .at(boundaryDistributedStage()) + .failsWithoutRetries(failure -> failure.hasMessageFindingMatch("Encountered too many errors talking to a worker node|Error closing remote buffer")) + .finishesSuccessfully(); + + assertThatQuery(updateQuery) + .withSetupQuery(setupQuery) + .withCleanupQuery(cleanupQuery) + .experiencing(TASK_GET_RESULTS_REQUEST_TIMEOUT) + .at(boundaryDistributedStage()) + .failsWithoutRetries(failure -> failure.hasMessageFindingMatch("Encountered too many errors talking to a worker node|Error closing remote buffer")) + .finishesSuccessfully(); } @Test(invocationCount = INVOCATION_COUNT) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConnectorSmokeTest.java index c306c902412b..95a6660d3b71 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConnectorSmokeTest.java @@ -14,8 +14,6 @@ package io.trino.plugin.iceberg; import io.trino.testing.QueryRunner; -import org.testng.SkipException; -import org.testng.annotations.Test; import static org.apache.iceberg.FileFormat.ORC; @@ -37,11 +35,4 @@ protected QueryRunner createQueryRunner() .setInitialTables(REQUIRED_TPCH_TABLES) .build(); } - - @Test - @Override - public void testDeleteRowsConcurrently() - { - throw new SkipException("The File Hive Metastore does not have strong concurrency guarantees"); - } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java index d3120e8b4081..0487f52fe8b9 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java @@ -20,6 +20,7 @@ import io.airlift.testing.TempFile; import io.trino.connector.CatalogName; import io.trino.metadata.TableHandle; +import io.trino.operator.GroupByHashPageIndexerFactory; import io.trino.orc.OrcWriteValidation; import io.trino.orc.OrcWriter; import io.trino.orc.OrcWriterOptions; @@ -43,7 +44,9 @@ import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.Type; +import io.trino.sql.gen.JoinCompiler; import io.trino.testing.TestingConnectorSession; +import io.trino.type.BlockTypeOperators; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; @@ -169,6 +172,7 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle TableType.DATA, Optional.empty(), SchemaParser.toJson(TABLE_SCHEMA), + PartitionSpecParser.toJson(PartitionSpec.unpartitioned()), 2, TupleDomain.withColumnDomains(ImmutableMap.of(KEY_ICEBERG_COLUMN_HANDLE, Domain.singleValue(INTEGER, (long) KEY_COLUMN_VALUE))), TupleDomain.all(), @@ -176,7 +180,8 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle Optional.empty(), outputFile.getParentFile().getAbsolutePath(), ImmutableMap.of(), - RetryMode.NO_RETRIES), + RetryMode.NO_RETRIES, + ImmutableList.of()), transaction); FileFormatDataSourceStats stats = new FileFormatDataSourceStats(); @@ -188,7 +193,9 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle TESTING_TYPE_MANAGER, new HdfsFileIoProvider(HDFS_ENVIRONMENT), new JsonCodecFactory().jsonCodec(CommitTaskData.class), - new IcebergFileWriterFactory(HDFS_ENVIRONMENT, TESTING_TYPE_MANAGER, new NodeVersion("trino_test"), stats, ORC_WRITER_CONFIG)); + new IcebergFileWriterFactory(HDFS_ENVIRONMENT, TESTING_TYPE_MANAGER, new NodeVersion("trino_test"), stats, ORC_WRITER_CONFIG), + new GroupByHashPageIndexerFactory(new JoinCompiler(TESTING_TYPE_MANAGER.getTypeOperators()), new BlockTypeOperators()), + icebergConfig); return provider.createPageSource( transaction, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index ee8df4cde150..7d3bbc38be09 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -40,6 +40,7 @@ import io.trino.spi.type.TestingTypeManager; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; +import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; import org.apache.iceberg.types.Conversions; @@ -127,6 +128,7 @@ public void testIncompleteDynamicFilterTimeout() TableType.DATA, Optional.empty(), SchemaParser.toJson(nationTable.schema()), + PartitionSpecParser.toJson(nationTable.spec()), 1, TupleDomain.all(), TupleDomain.all(), @@ -134,7 +136,8 @@ public void testIncompleteDynamicFilterTimeout() Optional.empty(), nationTable.location(), nationTable.properties(), - NO_RETRIES); + NO_RETRIES, + ImmutableList.of()); IcebergSplitSource splitSource = new IcebergSplitSource( tableHandle, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index 920f0d2c988e..e908ca055e7f 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -171,7 +171,8 @@ public void testV2TableWithEqualityDelete() Table icebergTable = updateTableToV2(tableName); writeEqualityDeleteToNationTable(icebergTable); assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey != 1"); - assertQuery("SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1"); + // natiokey is before the equality delete column in the table schema, comment is after + assertQuery("SELECT nationkey, comment FROM " + tableName, "SELECT nationkey, comment FROM nation WHERE regionkey != 1"); } @Test diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java index 899a7048605a..97e36eac258a 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java @@ -165,7 +165,8 @@ public void testProjectionPushdown() BIGINT, Optional.empty()); - IcebergTableHandle icebergTable = new IcebergTableHandle(SCHEMA_NAME, tableName, DATA, Optional.of(1L), "", 1, TupleDomain.all(), TupleDomain.all(), ImmutableSet.of(), Optional.empty(), "", ImmutableMap.of(), NO_RETRIES); + IcebergTableHandle icebergTable = new IcebergTableHandle(SCHEMA_NAME, tableName, DATA, Optional.of(1L), "", "", 1, + TupleDomain.all(), TupleDomain.all(), ImmutableSet.of(), Optional.empty(), "", ImmutableMap.of(), NO_RETRIES, ImmutableList.of()); TableHandle table = new TableHandle(new CatalogName(ICEBERG_CATALOG_NAME), icebergTable, new HiveTransactionHandle(false)); IcebergColumnHandle fullColumn = partialColumn.getBaseColumn(); @@ -229,7 +230,8 @@ public void testPredicatePushdown() PushPredicateIntoTableScan pushPredicateIntoTableScan = new PushPredicateIntoTableScan(tester().getPlannerContext(), tester().getTypeAnalyzer()); - IcebergTableHandle icebergTable = new IcebergTableHandle(SCHEMA_NAME, tableName, DATA, Optional.of(1L), "", 1, TupleDomain.all(), TupleDomain.all(), ImmutableSet.of(), Optional.empty(), "", ImmutableMap.of(), NO_RETRIES); + IcebergTableHandle icebergTable = new IcebergTableHandle(SCHEMA_NAME, tableName, DATA, Optional.of(1L), "", "", 1, + TupleDomain.all(), TupleDomain.all(), ImmutableSet.of(), Optional.empty(), "", ImmutableMap.of(), NO_RETRIES, ImmutableList.of()); TableHandle table = new TableHandle(new CatalogName(ICEBERG_CATALOG_NAME), icebergTable, new HiveTransactionHandle(false)); IcebergColumnHandle column = new IcebergColumnHandle(primitiveColumnIdentity(1, "a"), INTEGER, ImmutableList.of(), INTEGER, Optional.empty()); @@ -261,7 +263,8 @@ public void testColumnPruningProjectionPushdown() PruneTableScanColumns pruneTableScanColumns = new PruneTableScanColumns(tester().getMetadata()); - IcebergTableHandle icebergTable = new IcebergTableHandle(SCHEMA_NAME, tableName, DATA, Optional.empty(), "", 1, TupleDomain.all(), TupleDomain.all(), ImmutableSet.of(), Optional.empty(), "", ImmutableMap.of(), NO_RETRIES); + IcebergTableHandle icebergTable = new IcebergTableHandle(SCHEMA_NAME, tableName, DATA, Optional.empty(), "", "", 1, + TupleDomain.all(), TupleDomain.all(), ImmutableSet.of(), Optional.empty(), "", ImmutableMap.of(), NO_RETRIES, ImmutableList.of()); TableHandle table = new TableHandle(new CatalogName(ICEBERG_CATALOG_NAME), icebergTable, new HiveTransactionHandle(false)); IcebergColumnHandle columnA = new IcebergColumnHandle(primitiveColumnIdentity(0, "a"), INTEGER, ImmutableList.of(), INTEGER, Optional.empty()); @@ -304,7 +307,8 @@ public void testPushdownWithDuplicateExpressions() tester().getTypeAnalyzer(), new ScalarStatsCalculator(tester().getPlannerContext(), tester().getTypeAnalyzer())); - IcebergTableHandle icebergTable = new IcebergTableHandle(SCHEMA_NAME, tableName, DATA, Optional.of(1L), "", 1, TupleDomain.all(), TupleDomain.all(), ImmutableSet.of(), Optional.empty(), "", ImmutableMap.of(), NO_RETRIES); + IcebergTableHandle icebergTable = new IcebergTableHandle(SCHEMA_NAME, tableName, DATA, Optional.of(1L), "", "", 1, + TupleDomain.all(), TupleDomain.all(), ImmutableSet.of(), Optional.empty(), "", ImmutableMap.of(), NO_RETRIES, ImmutableList.of()); TableHandle table = new TableHandle(new CatalogName(ICEBERG_CATALOG_NAME), icebergTable, new HiveTransactionHandle(false)); IcebergColumnHandle bigintColumn = new IcebergColumnHandle(primitiveColumnIdentity(1, "just_bigint"), BIGINT, ImmutableList.of(), BIGINT, Optional.empty()); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveRedirectionToIceberg.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveRedirectionToIceberg.java index 94e788b4e32d..d8ccd4661886 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveRedirectionToIceberg.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveRedirectionToIceberg.java @@ -216,8 +216,10 @@ public void testUpdate() createIcebergTable(icebergTableName, true); - assertQueryFailure(() -> onTrino().executeQuery("UPDATE " + hiveTableName + " SET nationkey = nationkey + 100 WHERE regionkey = 1")) - .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): This connector does not support updates"); + assertThat(onTrino().executeQuery("UPDATE " + hiveTableName + " SET nationkey = nationkey + 100 WHERE regionkey = 1")).updatedRowsCountIsEqualTo(5); + assertResultsEqual( + onTrino().executeQuery("SELECT comment, nationkey FROM " + hiveTableName), + onTrino().executeQuery("SELECT comment, IF(regionkey = 1, nationkey + 100, nationkey) FROM tpch.tiny.nation")); onTrino().executeQuery("DROP TABLE " + icebergTableName); } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index 31f2e74205e9..2e9b18a7d8b5 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -63,6 +63,7 @@ import static java.util.Arrays.asList; import static java.util.Locale.ENGLISH; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; /** @@ -1850,6 +1851,81 @@ public void testCleaningUpIcebergTableWithRowLevelDeletes(StorageFormat tableSto .containsOnly(row); } + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + public void testUpdateAfterSchemaEvolution() + { + String baseTableName = "test_update_after_schema_evolution_" + randomTableSuffix(); + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); + + onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(part_key INT, a INT, b INT, c INT) " + + "USING ICEBERG PARTITIONED BY (part_key) " + + "TBLPROPERTIES ('format-version'='2', 'write.delete.mode'='merge-on-read')"); + onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1, 2, 3, 4), (11, 12, 13, 14)"); + + onSpark().executeQuery("ALTER TABLE " + sparkTableName + " DROP PARTITION FIELD part_key"); + onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD PARTITION FIELD a"); + + onSpark().executeQuery("ALTER TABLE " + sparkTableName + " DROP COLUMN b"); + onSpark().executeQuery("ALTER TABLE " + sparkTableName + " DROP COLUMN c"); + onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD COLUMN c INT"); + + List expected = ImmutableList.of(row(1, 2, null), row(11, 12, null)); + assertThat(onTrino().executeQuery("SELECT * FROM " + trinoTableName)).containsOnly(expected); + assertThat(onSpark().executeQuery("SELECT * FROM " + sparkTableName)).containsOnly(expected); + + // Because of the DROP/ADD on column c these two should be no-op updates + onTrino().executeQuery("UPDATE " + trinoTableName + " SET c = c + 1"); + onTrino().executeQuery("UPDATE " + trinoTableName + " SET a = a + 1 WHERE c = 4"); + assertThat(onTrino().executeQuery("SELECT * FROM " + trinoTableName)).containsOnly(expected); + assertThat(onSpark().executeQuery("SELECT * FROM " + sparkTableName)).containsOnly(expected); + + // Check the new data files are using the updated partition scheme + List filePaths = onTrino().executeQuery("SELECT DISTINCT file_path FROM " + TRINO_CATALOG + "." + TEST_SCHEMA_NAME + ".\"" + baseTableName + "$files\"").column(1); + assertEquals( + filePaths.stream() + .map(String::valueOf) + .filter(path -> path.contains("/a=") && !path.contains("/part_key=")) + .count(), + 2); + + onSpark().executeQuery("DROP TABLE " + sparkTableName); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + public void testUpdateOnPartitionColumn() + { + String baseTableName = "test_update_on_partition_column" + randomTableSuffix(); + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); + + onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(a INT, b STRING) " + + "USING ICEBERG PARTITIONED BY (a) " + + "TBLPROPERTIES ('format-version'='2', 'write.delete.mode'='merge-on-read')"); + onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES (1, 'first'), (1, 'second'), (2, 'third'), (2, 'forth'), (2, 'fifth')"); + + onTrino().executeQuery("UPDATE " + trinoTableName + " SET a = a + 1"); + List expected = ImmutableList.of(row(2, "first"), row(2, "second"), row(3, "third"), row(3, "forth"), row(3, "fifth")); + assertThat(onTrino().executeQuery("SELECT * FROM " + trinoTableName)).containsOnly(expected); + assertThat(onSpark().executeQuery("SELECT * FROM " + sparkTableName)).containsOnly(expected); + + onTrino().executeQuery("UPDATE " + trinoTableName + " SET a = a + (CASE b WHEN 'first' THEN 1 ELSE 0 END)"); + expected = ImmutableList.of(row(3, "first"), row(2, "second"), row(3, "third"), row(3, "forth"), row(3, "fifth")); + assertThat(onTrino().executeQuery("SELECT * FROM " + trinoTableName)).containsOnly(expected); + assertThat(onSpark().executeQuery("SELECT * FROM " + sparkTableName)).containsOnly(expected); + + // Test moving rows from one file into different partitions, compact first + onSpark().executeQuery("CALL " + SPARK_CATALOG + ".system.rewrite_data_files(table=>'" + TEST_SCHEMA_NAME + "." + baseTableName + "', options => map('min-input-files','1'))"); + onTrino().executeQuery("UPDATE " + trinoTableName + " SET a = a + (CASE b WHEN 'forth' THEN -1 ELSE 1 END)"); + expected = ImmutableList.of(row(4, "first"), row(3, "second"), row(4, "third"), row(2, "forth"), row(4, "fifth")); + assertThat(onTrino().executeQuery("SELECT * FROM " + trinoTableName)).containsOnly(expected); + assertThat(onSpark().executeQuery("SELECT * FROM " + sparkTableName)).containsOnly(expected); + + onSpark().executeQuery("DROP TABLE " + sparkTableName); + } + private int calculateMetadataFilesForPartitionedTable(String tableName) { String dataFilePath = onTrino().executeQuery(format("SELECT file_path FROM iceberg.default.\"%s$files\" limit 1", tableName)).row(0).get(0).toString(); diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java index 2d7fa696e657..e9f4e4567973 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java @@ -2747,6 +2747,59 @@ protected void verifyConcurrentUpdateFailurePermissible(Exception e) throw new AssertionError("Unexpected concurrent update failure", e); } + @Test + public void testUpdateWithPredicates() + { + if (!hasBehavior(SUPPORTS_UPDATE)) { + // Note this change is a no-op, if actually run + assertQueryFails("UPDATE nation SET nationkey = nationkey + regionkey WHERE regionkey < 1", "This connector does not support updates"); + return; + } + + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_update_with_predicates", "(a INT, b INT, c INT)")) { + String tableName = table.getName(); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 2, 3), (11, 12, 13), (21, 22, 23)", 3); + assertUpdate("UPDATE " + tableName + " SET a = a - 1 WHERE c = 3", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (0, 2, 3), (11, 12, 13), (21, 22, 23)"); + + assertUpdate("UPDATE " + tableName + " SET c = c + 1 WHERE a = 11", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (0, 2, 3), (11, 12, 14), (21, 22, 23)"); + + assertUpdate("UPDATE " + tableName + " SET b = b * 2 WHERE b = 22", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (0, 2, 3), (11, 12, 14), (21, 44, 23)"); + } + + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_update_with_predicates_on_row_types", "(int_t INT, row_t ROW(f1 INT, f2 INT))")) { + String tableName = table.getName(); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, ROW(2, 3)), (11, ROW(12, 13)), (21, ROW(22, 23))", 3); + assertUpdate("UPDATE " + tableName + " SET int_t = int_t - 1 WHERE row_t.f2 = 3", 1); + assertQuery("SELECT int_t, row_t.f1, row_t.f2 FROM " + tableName, "VALUES (0, 2, 3), (11, 12, 13), (21, 22, 23)"); + + assertUpdate("UPDATE " + tableName + " SET row_t = ROW(row_t.f1, row_t.f2 + 1) WHERE int_t = 11", 1); + assertQuery("SELECT int_t, row_t.f1, row_t.f2 FROM " + tableName, "VALUES (0, 2, 3), (11, 12, 14), (21, 22, 23)"); + + assertUpdate("UPDATE " + tableName + " SET row_t = ROW(row_t.f1 * 2, row_t.f2) WHERE row_t.f1 = 22", 1); + assertQuery("SELECT int_t, row_t.f1, row_t.f2 FROM " + tableName, "VALUES (0, 2, 3), (11, 12, 14), (21, 44, 23)"); + } + } + + @Test + public void testUpdateAllValues() + { + if (!hasBehavior(SUPPORTS_UPDATE)) { + // Note this change is a no-op, if actually run + assertQueryFails("UPDATE nation SET nationkey = nationkey + regionkey WHERE regionkey < 1", "This connector does not support updates"); + return; + } + + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_update_all_columns", "(a INT, b INT, c INT)")) { + String tableName = table.getName(); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 2, 3), (11, 12, 13), (21, 22, 23)", 3); + assertUpdate("UPDATE " + tableName + " SET a = a + 1, b = b - 1, c = c * 2", 3); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, 1, 6), (12, 11, 26), (22, 21, 46)"); + } + } + @Test public void testDropTable() {