diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ForIcebergCopyOnWriteTableChangesProcessor.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ForIcebergCopyOnWriteTableChangesProcessor.java new file mode 100644 index 000000000000..70358fe21c54 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ForIcebergCopyOnWriteTableChangesProcessor.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target({FIELD, PARAMETER, METHOD}) +@BindingAnnotation +public @interface ForIcebergCopyOnWriteTableChangesProcessor {} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index 04ad9b879467..eaa4b63c6ffc 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -95,6 +95,7 @@ public class IcebergConfig private Set queryPartitionFilterRequiredSchemas = ImmutableSet.of(); private int splitManagerThreads = Math.min(Runtime.getRuntime().availableProcessors() * 2, 32); private int planningThreads = Math.min(Runtime.getRuntime().availableProcessors(), 16); + private int tableChangesProcessorThreads = Math.min(Runtime.getRuntime().availableProcessors(), 16); private int fileDeleteThreads = Runtime.getRuntime().availableProcessors() * 2; private List allowedExtraProperties = ImmutableList.of(); private boolean incrementalRefreshEnabled = true; @@ -566,6 +567,20 @@ public IcebergConfig setPlanningThreads(String planningThreads) return this; } + @Min(0) + public int getTableChangesProcessorThreads() + { + return tableChangesProcessorThreads; + } + + @Config("iceberg.table-changes-processor-threads") + @ConfigDescription("Number of threads to use for table changes processing for Copy-On-Write tables") + public IcebergConfig setTableChangesProcessorThreads(String tableChangesProcessorThreads) + { + this.tableChangesProcessorThreads = ThreadCount.valueOf(tableChangesProcessorThreads).getThreadCount(); + return this; + } + @Min(0) public int getFileDeleteThreads() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergExecutorModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergExecutorModule.java index 9d401f33b808..838974f0f2bf 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergExecutorModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergExecutorModule.java @@ -40,6 +40,7 @@ public void configure(Binder binder) closingBinder(binder).registerExecutor(Key.get(ListeningExecutorService.class, ForIcebergSplitSource.class)); closingBinder(binder).registerExecutor(Key.get(ExecutorService.class, ForIcebergSplitManager.class)); closingBinder(binder).registerExecutor(Key.get(ExecutorService.class, ForIcebergPlanning.class)); + closingBinder(binder).registerExecutor(Key.get(ExecutorService.class, ForIcebergCopyOnWriteTableChangesProcessor.class)); } @Singleton @@ -96,4 +97,17 @@ public ExecutorService createFileDeleteExecutor(CatalogName catalogName, Iceberg config.getFileDeleteThreads(), daemonThreadsNamed("iceberg-file-delete-" + catalogName + "-%s")); } + + @Provides + @Singleton + @ForIcebergCopyOnWriteTableChangesProcessor + public ExecutorService createIcebergCopyOnWriteTableChangesExecutor(CatalogName catalogName, IcebergConfig config) + { + if (config.getPlanningThreads() == 0) { + return newDirectExecutorService(); + } + return newFixedThreadPool( + config.getTableChangesProcessorThreads(), + daemonThreadsNamed("iceberg-copy-on-write-table-changes-" + catalogName + "-%s")); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index 844878304d2f..c555b8229ed5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -66,6 +66,7 @@ import org.apache.iceberg.ManifestReader; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotUpdate; @@ -187,11 +188,13 @@ import static java.util.Comparator.comparing; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ; import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; import static org.apache.iceberg.TableProperties.FORMAT_VERSION; +import static org.apache.iceberg.TableProperties.MERGE_MODE; import static org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED; import static org.apache.iceberg.TableProperties.METADATA_PREVIOUS_VERSIONS_MAX; import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED; @@ -1271,4 +1274,10 @@ public static ManifestReader> readerForManifest(Manifes case DELETES -> ManifestFiles.readDeleteManifest(manifest, fileIO, specsById); }; } + + public static RowLevelOperationMode rowLevelOperationMode(Table table) + { + // Trino uses MoR by default unlike Spark + return RowLevelOperationMode.fromName(table.properties().getOrDefault(MERGE_MODE, MERGE_ON_READ.modeName())); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/AbstractTableChangesSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/AbstractTableChangesSplitSource.java new file mode 100644 index 000000000000..95aa4bd1bf38 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/AbstractTableChangesSplitSource.java @@ -0,0 +1,112 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.functions.tablechanges; + +import io.trino.plugin.iceberg.IcebergFileFormat; +import io.trino.plugin.iceberg.PartitionData; +import io.trino.spi.SplitWeight; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSplit; +import io.trino.spi.connector.ConnectorSplitSource; +import io.trino.spi.type.DateTimeEncoding; +import org.apache.iceberg.AddedRowsScanTask; +import org.apache.iceberg.ChangelogScanTask; +import org.apache.iceberg.DeletedDataFileScanTask; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.SplittableScanTask; +import org.apache.iceberg.Table; + +import java.util.Iterator; + +import static com.google.common.collect.Iterators.singletonIterator; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static io.trino.spi.type.TimeZoneKey.UTC_KEY; +import static java.util.Objects.requireNonNull; + +public abstract class AbstractTableChangesSplitSource + implements ConnectorSplitSource +{ + private final Table icebergTable; + + public AbstractTableChangesSplitSource(Table icebergTable) + { + this.icebergTable = requireNonNull(icebergTable, "icebergTable is null"); + } + + @SuppressWarnings("unchecked") + protected static Iterator splitIfPossible(ChangelogScanTask wholeFileScan, long targetSplitSize) + { + if (wholeFileScan instanceof AddedRowsScanTask) { + return ((SplittableScanTask) wholeFileScan).split(targetSplitSize).iterator(); + } + + if (wholeFileScan instanceof DeletedDataFileScanTask) { + return ((SplittableScanTask) wholeFileScan).split(targetSplitSize).iterator(); + } + + return singletonIterator(wholeFileScan); + } + + protected ConnectorSplit toIcebergSplit(ChangelogScanTask task) + { + // TODO: Support DeletedRowsScanTask (requires https://github.com/apache/iceberg/pull/6182) + if (task instanceof AddedRowsScanTask addedRowsScanTask) { + return toSplit(addedRowsScanTask); + } + else if (task instanceof DeletedDataFileScanTask deletedDataFileScanTask) { + return toSplit(deletedDataFileScanTask); + } + else { + throw new TrinoException(NOT_SUPPORTED, "ChangelogScanTask type is not supported:" + task); + } + } + + protected TableChangesInternalSplit toSplit(AddedRowsScanTask task) + { + return new TableChangesInternalSplit( + TableChangesInternalSplit.ChangeType.ADDED_FILE, + task.commitSnapshotId(), + DateTimeEncoding.packDateTimeWithZone(icebergTable.snapshot(task.commitSnapshotId()).timestampMillis(), UTC_KEY), + task.changeOrdinal(), + task.file().location(), + task.start(), + task.length(), + task.file().fileSizeInBytes(), + task.file().recordCount(), + IcebergFileFormat.fromIceberg(task.file().format()), + PartitionSpecParser.toJson(task.spec()), + PartitionData.toJson(task.file().partition()), + SplitWeight.standard(), + icebergTable.io().properties()); + } + + protected TableChangesInternalSplit toSplit(DeletedDataFileScanTask task) + { + return new TableChangesInternalSplit( + TableChangesInternalSplit.ChangeType.DELETED_FILE, + task.commitSnapshotId(), + DateTimeEncoding.packDateTimeWithZone(icebergTable.snapshot(task.commitSnapshotId()).timestampMillis(), UTC_KEY), + task.changeOrdinal(), + task.file().location(), + task.start(), + task.length(), + task.file().fileSizeInBytes(), + task.file().recordCount(), + IcebergFileFormat.fromIceberg(task.file().format()), + PartitionSpecParser.toJson(task.spec()), + PartitionData.toJson(task.file().partition()), + SplitWeight.standard(), + icebergTable.io().properties()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/CopyOnWriteTableChangesFunctionProcessor.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/CopyOnWriteTableChangesFunctionProcessor.java new file mode 100644 index 000000000000..1a58be661fa6 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/CopyOnWriteTableChangesFunctionProcessor.java @@ -0,0 +1,326 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.functions.tablechanges; + +import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.plugin.iceberg.IcebergColumnHandle; +import io.trino.plugin.iceberg.IcebergPageSourceProvider; +import io.trino.spi.Page; +import io.trino.spi.PageBuilder; +import io.trino.spi.block.Block; +import io.trino.spi.block.BlockBuilder; +import io.trino.spi.block.RunLengthEncodedBlock; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplit; +import io.trino.spi.function.table.TableFunctionProcessorState; +import io.trino.spi.function.table.TableFunctionSplitProcessor; +import io.trino.spi.type.Type; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Verify.verify; +import static com.google.common.collect.Iterables.getOnlyElement; +import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; +import static io.airlift.concurrent.MoreFutures.getFutureValue; +import static io.trino.plugin.iceberg.IcebergColumnHandle.DATA_CHANGE_TYPE_ID; +import static io.trino.spi.function.table.TableFunctionProcessorState.Blocked.blocked; +import static io.trino.spi.function.table.TableFunctionProcessorState.Finished.FINISHED; +import static io.trino.spi.function.table.TableFunctionProcessorState.Processed.produced; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.CompletableFuture.runAsync; + +public class CopyOnWriteTableChangesFunctionProcessor + implements TableFunctionSplitProcessor +{ + private final TableFunctionSplitProcessor delegate; + + public CopyOnWriteTableChangesFunctionProcessor( + ConnectorSession session, + TableChangesFunctionHandle functionHandle, + TableChangesSplit split, + IcebergPageSourceProvider icebergPageSourceProvider, + ExecutorService executor) + { + requireNonNull(session, "session is null"); + requireNonNull(functionHandle, "functionHandle is null"); + requireNonNull(split, "split is null"); + requireNonNull(icebergPageSourceProvider, "icebergPageSourceProvider is null"); + + List splits = split.splits(); + checkArgument(!splits.isEmpty(), "splits is empty"); + + if (splits.size() == 1) { + this.delegate = new InternalTableChangesFunctionProcessor(session, functionHandle, (TableChangesInternalSplit) getOnlyElement(splits), icebergPageSourceProvider); + } + else { + this.delegate = new RemoveCarryoverRowsTableFunctionSplitProcessor(session, functionHandle, splits, icebergPageSourceProvider, executor); + } + } + + @Override + public TableFunctionProcessorState process() + { + return delegate.process(); + } + + @Override + public void close() + throws IOException + { + delegate.close(); + } + + private static class RemoveCarryoverRowsTableFunctionSplitProcessor + implements TableFunctionSplitProcessor + { + private static final Slice INSERT = Slices.utf8Slice("insert"); + private static final Slice DELETE = Slices.utf8Slice("delete"); + + private final Map rows = new ConcurrentHashMap<>(); + private final int dataChangeTypeChannel; + private final int[] channels; + private final Type[] types; + + private final CompletableFuture processFuture; + private boolean finished; + + public RemoveCarryoverRowsTableFunctionSplitProcessor( + ConnectorSession session, + TableChangesFunctionHandle functionHandle, + List splits, + IcebergPageSourceProvider icebergPageSourceProvider, + ExecutorService executor) + { + this.dataChangeTypeChannel = dataChangeTypeChannel(functionHandle.columns()); + this.channels = IntStream.range(0, functionHandle.columns().size()) + .filter(channel -> channel != dataChangeTypeChannel) + .toArray(); + this.types = dataTypes(functionHandle.columns()); + ImmutableList.Builder> futures = ImmutableList.builder(); + for (ConnectorSplit split : splits) { + TableChangesInternalSplit internalSplit = (TableChangesInternalSplit) split; + InternalTableChangesFunctionProcessor processor = new InternalTableChangesFunctionProcessor(session, functionHandle, internalSplit, icebergPageSourceProvider); + futures.add(executor.submit(() -> processAndRemoveCarryoverRows(processor))); + } + + processFuture = runAsync( + () -> { + for (Future future : futures.build()) { + getFutureValue(future); + } + }, + newDirectExecutorService()); + } + + private static int dataChangeTypeChannel(List columns) + { + for (int i = 0; i < columns.size(); i++) { + if (columns.get(i).getId() == DATA_CHANGE_TYPE_ID) { + return i; + } + } + throw new IllegalArgumentException("Data change type id column is missing from: " + columns); + } + + private static Type[] dataTypes(List columns) + { + return columns.stream() + .filter(column -> column.getId() != DATA_CHANGE_TYPE_ID) + .map(IcebergColumnHandle::getType) + .toArray(Type[]::new); + } + + private void processAndRemoveCarryoverRows(TableFunctionSplitProcessor processor) + { + TableFunctionProcessorState processState = processor.process(); + if (processState == FINISHED) { + return; + } + + verify(processState instanceof TableFunctionProcessorState.Processed, "Unexpected state %s", processState); + TableFunctionProcessorState.Processed processedState = (TableFunctionProcessorState.Processed) processState; + Page result = processedState.getResult(); + if (result == null || result.getPositionCount() == 0) { + processAndRemoveCarryoverRows(processor); + return; + } + + Block dataChangeTypeBlock = result.getBlock(dataChangeTypeChannel); + Page dataPage = result.getColumns(channels); + for (int position = 0; position < result.getPositionCount(); position++) { + Row row = new Row(types, dataPage, position); + rows.computeIfAbsent(row, _ -> new AtomicInteger()); + rows.get(row).addAndGet(computeDelta(dataChangeTypeBlock, position)); + } + + processAndRemoveCarryoverRows(processor); + } + + private static class Row + { + private final Object[] values; + + public Row(Type[] types, Page page, int position) + { + requireNonNull(page, "page is null"); + checkArgument(types.length == page.getChannelCount(), "mismatched types for page"); + values = new Object[types.length]; + for (int i = 0; i < values.length; i++) { + Type type = types[i]; + + Class javaType = type.getJavaType(); + if (javaType == boolean.class) { + values[i] = type.getBoolean(page.getBlock(i), position); + } + else if (javaType == long.class) { + values[i] = type.getLong(page.getBlock(i), position); + } + else if (javaType == double.class) { + values[i] = type.getDouble(page.getBlock(i), position); + } + else if (javaType == Slice.class) { + values[i] = type.getSlice(page.getBlock(i), position); + } + else { + values[i] = type.getObject(page.getBlock(i), position); + } + } + } + + public int size() + { + return values.length; + } + + public Object getValue(int index) + { + return values[index]; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Row row = (Row) o; + return Objects.deepEquals(values, row.values); + } + + @Override + public int hashCode() + { + return Arrays.hashCode(values); + } + } + + private static int computeDelta(Block dataChangeTypeBlock, int position) + { + Slice slice = VARCHAR.getSlice(dataChangeTypeBlock, position); + + if (INSERT.equals(slice)) { + return 1; + } + if (DELETE.equals(slice)) { + return -1; + } + + throw new IllegalStateException("Unexpected data change type: " + slice.toStringUtf8()); + } + + @Override + public TableFunctionProcessorState process() + { + if (!processFuture.isDone()) { + return blocked(processFuture); + } + + if (finished) { + return FINISHED; + } + + // extract page + PageBuilder pageBuilder = new PageBuilder(Arrays.asList(types)); + int expectedEntries = rows.values().stream() + .mapToInt(AtomicInteger::get) + .map(Math::abs) + .sum(); + BlockBuilder dataChangeTypeBlockBuilder = VARCHAR.createBlockBuilder(null, expectedEntries); + for (Map.Entry entry : rows.entrySet()) { + Row row = entry.getKey(); + AtomicInteger count = entry.getValue(); + if (count.get() == 0) { + continue; + } + + Slice dataChange = count.get() > 0 ? INSERT : DELETE; + int positionCount = Math.abs(count.get()); + + verify(row.size() == types.length); + + pageBuilder.declarePositions(positionCount); + for (int channel = 0; channel < types.length; channel++) { + Type type = types[channel]; + pageBuilder.getBlockBuilder(channel) + .appendBlockRange(RunLengthEncodedBlock.create(type, row.getValue(channel), positionCount), 0, positionCount); + } + dataChangeTypeBlockBuilder.appendBlockRange(RunLengthEncodedBlock.create(VARCHAR, dataChange, positionCount), 0, positionCount); + } + + rows.clear(); + Page page = pageBuilder.build(); + + verify(dataChangeTypeChannel < types.length, "Unexpected data change type channel: %s, rest columns size: %s", dataChangeTypeChannel, types.length); + Block[] blocks = new Block[types.length + 1]; + for (int i = 0; i < types.length; i++) { + if (i < dataChangeTypeChannel) { + blocks[i] = page.getBlock(i); + } + else if (i == dataChangeTypeChannel) { + blocks[i] = dataChangeTypeBlockBuilder.build(); + blocks[i + 1] = page.getBlock(i); + } + else { + blocks[i + 1] = page.getBlock(i); + } + } + finished = true; + return produced(new Page(blocks)); + } + + @Override + public void close() + { + rows.clear(); + processFuture.cancel(true); + } + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/CopyOnWriteTableChangesSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/CopyOnWriteTableChangesSplitSource.java new file mode 100644 index 000000000000..4cb047f38d04 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/CopyOnWriteTableChangesSplitSource.java @@ -0,0 +1,162 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.functions.tablechanges; + +import com.google.common.collect.ImmutableList; +import com.google.common.io.Closer; +import io.trino.plugin.iceberg.PartitionData; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSplit; +import org.apache.iceberg.ChangelogScanTask; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.IncrementalChangelogScan; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructLikeWrapper; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; + +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.util.Collections.emptyIterator; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE; + +public class CopyOnWriteTableChangesSplitSource + extends AbstractTableChangesSplitSource +{ + private final Table icebergTable; + private final IncrementalChangelogScan tableScan; + private final long targetSplitSize; + private final Closer closer = Closer.create(); + + private CloseableIterable changelogScanIterable; + private CloseableIterator changelogScanIterator; + private Iterator fileTasksIterator = emptyIterator(); + private final Queue splits = new ArrayDeque<>(); + + private volatile boolean ready; + + public CopyOnWriteTableChangesSplitSource( + Table icebergTable, + IncrementalChangelogScan tableScan) + { + super(icebergTable); + this.icebergTable = requireNonNull(icebergTable, "table is null"); + this.tableScan = requireNonNull(tableScan, "tableScan is null"); + this.targetSplitSize = tableScan.targetSplitSize(); + } + + @Override + public CompletableFuture getNextBatch(int maxSize) + { + if (changelogScanIterable == null) { + try { + this.changelogScanIterable = closer.register(tableScan.planFiles()); + this.changelogScanIterator = closer.register(changelogScanIterable.iterator()); + + CompletableFuture.runAsync(this::drainChangelogScanIterator); + } + catch (UnsupportedOperationException e) { + throw new TrinoException(NOT_SUPPORTED, "Table uses features which are not yet supported by the table_changes function", e); + } + } + + if (!ready) { + return completedFuture(new ConnectorSplitBatch(ImmutableList.of(), isFinished())); + } + + List resultSplits = new ArrayList<>(maxSize); + while (resultSplits.size() < maxSize && !splits.isEmpty()) { + resultSplits.add(splits.remove()); + } + + return completedFuture(new ConnectorSplitBatch(resultSplits, isFinished())); + } + + private void drainChangelogScanIterator() + { + PartitionSpec spec = icebergTable.spec(); + Types.StructType partitionType = spec.partitionType(); + Map> changelogScanMap = new HashMap<>(); + + while (fileTasksIterator.hasNext() || changelogScanIterator.hasNext()) { + if (!fileTasksIterator.hasNext()) { + ChangelogScanTask wholeFileTask = changelogScanIterator.next(); + fileTasksIterator = splitIfPossible(wholeFileTask, targetSplitSize); + continue; + } + + ChangelogScanTask next = fileTasksIterator.next(); + ContentScanTask scanTask = (ContentScanTask) next; + StructLikeWrapper partition = StructLikeWrapper.forType(partitionType).set(scanTask.file().partition()); + + changelogScanMap.computeIfAbsent(new PartitionWithOrdinal(partition, next.changeOrdinal()), _ -> new ArrayList<>()) + .add(toIcebergSplit(next)); + } + + String partitionSchemaJson = PartitionSpecParser.toJson(spec); + for (Map.Entry> entry : changelogScanMap.entrySet()) { + PartitionWithOrdinal partitionWithOrdinal = entry.getKey(); + List changelogSplits = entry.getValue(); + splits.add(new TableChangesSplit( + COPY_ON_WRITE, + PartitionData.toJson(partitionWithOrdinal.partition().get()), + partitionSchemaJson, + ImmutableList.copyOf(changelogSplits))); + } + + ready = true; + } + + @Override + public boolean isFinished() + { + return ready && splits.isEmpty(); + } + + @Override + public void close() + { + try { + splits.clear(); + closer.close(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private record PartitionWithOrdinal(StructLikeWrapper partition, int changeOrdinal) + { + public PartitionWithOrdinal + { + requireNonNull(partition, "partition is null"); + } + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/InternalTableChangesFunctionProcessor.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/InternalTableChangesFunctionProcessor.java new file mode 100644 index 000000000000..7fd0190c15b9 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/InternalTableChangesFunctionProcessor.java @@ -0,0 +1,189 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.functions.tablechanges; + +import com.google.common.collect.ImmutableList; +import io.trino.plugin.iceberg.IcebergColumnHandle; +import io.trino.plugin.iceberg.IcebergPageSourceProvider; +import io.trino.plugin.iceberg.PartitionData; +import io.trino.spi.Page; +import io.trino.spi.block.Block; +import io.trino.spi.block.RunLengthEncodedBlock; +import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.DynamicFilter; +import io.trino.spi.connector.SourcePage; +import io.trino.spi.function.table.TableFunctionProcessorState; +import io.trino.spi.function.table.TableFunctionSplitProcessor; +import io.trino.spi.predicate.TupleDomain; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.mapping.NameMappingParser; + +import java.io.IOException; +import java.util.Optional; + +import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.plugin.iceberg.IcebergColumnHandle.DATA_CHANGE_ORDINAL_ID; +import static io.trino.plugin.iceberg.IcebergColumnHandle.DATA_CHANGE_TIMESTAMP_ID; +import static io.trino.plugin.iceberg.IcebergColumnHandle.DATA_CHANGE_TYPE_ID; +import static io.trino.plugin.iceberg.IcebergColumnHandle.DATA_CHANGE_VERSION_ID; +import static io.trino.spi.function.table.TableFunctionProcessorState.Finished.FINISHED; +import static io.trino.spi.function.table.TableFunctionProcessorState.Processed.produced; +import static io.trino.spi.predicate.Utils.nativeValueToBlock; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; +import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; +import static java.util.Objects.requireNonNull; + +public class InternalTableChangesFunctionProcessor + implements TableFunctionSplitProcessor +{ + private static final Page EMPTY_PAGE = new Page(0); + + private final ConnectorPageSource pageSource; + private final int[] delegateColumnMap; + private final Optional changeTypeIndex; + private final Block changeTypeValue; + private final Optional changeVersionIndex; + private final Block changeVersionValue; + private final Optional changeTimestampIndex; + private final Block changeTimestampValue; + private final Optional changeOrdinalIndex; + private final Block changeOrdinalValue; + + public InternalTableChangesFunctionProcessor( + ConnectorSession session, + TableChangesFunctionHandle functionHandle, + TableChangesInternalSplit split, + IcebergPageSourceProvider icebergPageSourceProvider) + { + requireNonNull(session, "session is null"); + requireNonNull(functionHandle, "functionHandle is null"); + requireNonNull(split, "split is null"); + requireNonNull(icebergPageSourceProvider, "icebergPageSourceProvider is null"); + + Schema tableSchema = SchemaParser.fromJson(functionHandle.tableSchemaJson()); + PartitionSpec partitionSpec = PartitionSpecParser.fromJson(tableSchema, split.partitionSpecJson()); + org.apache.iceberg.types.Type[] partitionColumnTypes = partitionSpec.fields().stream() + .map(field -> field.transform().getResultType(tableSchema.findType(field.sourceId()))) + .toArray(org.apache.iceberg.types.Type[]::new); + + int delegateColumnIndex = 0; + int[] delegateColumnMap = new int[functionHandle.columns().size()]; + Optional changeTypeIndex = Optional.empty(); + Optional changeVersionIndex = Optional.empty(); + Optional changeTimestampIndex = Optional.empty(); + Optional changeOrdinalIndex = Optional.empty(); + for (int columnIndex = 0; columnIndex < functionHandle.columns().size(); columnIndex++) { + IcebergColumnHandle column = functionHandle.columns().get(columnIndex); + if (column.getId() == DATA_CHANGE_TYPE_ID) { + changeTypeIndex = Optional.of(columnIndex); + delegateColumnMap[columnIndex] = -1; + } + else if (column.getId() == DATA_CHANGE_VERSION_ID) { + changeVersionIndex = Optional.of(columnIndex); + delegateColumnMap[columnIndex] = -1; + } + else if (column.getId() == DATA_CHANGE_TIMESTAMP_ID) { + changeTimestampIndex = Optional.of(columnIndex); + delegateColumnMap[columnIndex] = -1; + } + else if (column.getId() == DATA_CHANGE_ORDINAL_ID) { + changeOrdinalIndex = Optional.of(columnIndex); + delegateColumnMap[columnIndex] = -1; + } + else { + delegateColumnMap[columnIndex] = delegateColumnIndex; + delegateColumnIndex++; + } + } + + this.pageSource = icebergPageSourceProvider.createPageSource( + session, + functionHandle.columns(), + tableSchema, + partitionSpec, + PartitionData.fromJson(split.partitionDataJson(), partitionColumnTypes), + ImmutableList.of(), + DynamicFilter.EMPTY, + TupleDomain.all(), + TupleDomain.all(), + split.path(), + split.start(), + split.length(), + split.fileSize(), + split.fileRecordCount(), + split.partitionDataJson(), + split.fileFormat(), + split.fileIoProperties(), + 0, + functionHandle.nameMappingJson().map(NameMappingParser::fromJson)); + this.delegateColumnMap = delegateColumnMap; + + this.changeTypeIndex = changeTypeIndex; + this.changeTypeValue = nativeValueToBlock(createUnboundedVarcharType(), utf8Slice(split.changeType().getTableValue())); + + this.changeVersionIndex = changeVersionIndex; + this.changeVersionValue = nativeValueToBlock(BIGINT, split.snapshotId()); + + this.changeTimestampIndex = changeTimestampIndex; + this.changeTimestampValue = nativeValueToBlock(TIMESTAMP_TZ_MILLIS, split.snapshotTimestamp()); + + this.changeOrdinalIndex = changeOrdinalIndex; + this.changeOrdinalValue = nativeValueToBlock(INTEGER, (long) split.changeOrdinal()); + } + + @Override + public TableFunctionProcessorState process() + { + if (pageSource.isFinished()) { + return FINISHED; + } + + SourcePage dataPage = pageSource.getNextSourcePage(); + if (dataPage == null) { + return TableFunctionProcessorState.Processed.produced(EMPTY_PAGE); + } + + Block[] blocks = new Block[delegateColumnMap.length]; + for (int targetChannel = 0; targetChannel < delegateColumnMap.length; targetChannel++) { + int delegateIndex = delegateColumnMap[targetChannel]; + if (delegateIndex != -1) { + blocks[targetChannel] = dataPage.getBlock(delegateIndex); + } + } + + changeTypeIndex.ifPresent(columnChannel -> + blocks[columnChannel] = RunLengthEncodedBlock.create(changeTypeValue, dataPage.getPositionCount())); + changeVersionIndex.ifPresent(columnChannel -> + blocks[columnChannel] = RunLengthEncodedBlock.create(changeVersionValue, dataPage.getPositionCount())); + changeTimestampIndex.ifPresent(columnChannel -> + blocks[columnChannel] = RunLengthEncodedBlock.create(changeTimestampValue, dataPage.getPositionCount())); + changeOrdinalIndex.ifPresent(columnChannel -> + blocks[columnChannel] = RunLengthEncodedBlock.create(changeOrdinalValue, dataPage.getPositionCount())); + + return produced(new Page(dataPage.getPositionCount(), blocks)); + } + + @Override + public void close() + throws IOException + { + pageSource.close(); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/MergeOnReadTableChangesFunctionProcessor.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/MergeOnReadTableChangesFunctionProcessor.java new file mode 100644 index 000000000000..557847897bbc --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/MergeOnReadTableChangesFunctionProcessor.java @@ -0,0 +1,58 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.functions.tablechanges; + +import io.trino.plugin.iceberg.IcebergPageSourceProvider; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.function.table.TableFunctionProcessorState; +import io.trino.spi.function.table.TableFunctionSplitProcessor; + +import java.io.IOException; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static java.util.Objects.requireNonNull; + +public class MergeOnReadTableChangesFunctionProcessor + implements TableFunctionSplitProcessor +{ + private final TableFunctionSplitProcessor delegate; + + public MergeOnReadTableChangesFunctionProcessor( + ConnectorSession session, + TableChangesFunctionHandle functionHandle, + TableChangesSplit tableChangesSplit, + IcebergPageSourceProvider icebergPageSourceProvider) + { + requireNonNull(session, "session is null"); + requireNonNull(functionHandle, "functionHandle is null"); + requireNonNull(tableChangesSplit, "tableChangesSplit is null"); + requireNonNull(icebergPageSourceProvider, "icebergPageSourceProvider is null"); + + TableChangesInternalSplit split = (TableChangesInternalSplit) getOnlyElement(tableChangesSplit.splits()); + this.delegate = new InternalTableChangesFunctionProcessor(session, functionHandle, split, icebergPageSourceProvider); + } + + @Override + public TableFunctionProcessorState process() + { + return delegate.process(); + } + + @Override + public void close() + throws IOException + { + delegate.close(); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/MergeOnReadTableChangesSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/MergeOnReadTableChangesSplitSource.java new file mode 100644 index 000000000000..71473283ffe1 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/MergeOnReadTableChangesSplitSource.java @@ -0,0 +1,113 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.functions.tablechanges; + +import com.google.common.collect.ImmutableList; +import com.google.common.io.Closer; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSplit; +import org.apache.iceberg.ChangelogScanTask; +import org.apache.iceberg.IncrementalChangelogScan; +import org.apache.iceberg.RowLevelOperationMode; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.util.Collections.emptyIterator; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.CompletableFuture.completedFuture; + +public class MergeOnReadTableChangesSplitSource + extends AbstractTableChangesSplitSource +{ + private final IncrementalChangelogScan tableScan; + private final long targetSplitSize; + private final Closer closer = Closer.create(); + + private CloseableIterable changelogScanIterable; + private CloseableIterator changelogScanIterator; + private Iterator fileTasksIterator = emptyIterator(); + + public MergeOnReadTableChangesSplitSource( + Table icebergTable, + IncrementalChangelogScan tableScan) + { + super(icebergTable); + this.tableScan = requireNonNull(tableScan, "tableScan is null"); + this.targetSplitSize = tableScan.targetSplitSize(); + } + + @Override + public CompletableFuture getNextBatch(int maxSize) + { + if (changelogScanIterable == null) { + try { + this.changelogScanIterable = closer.register(tableScan.planFiles()); + this.changelogScanIterator = closer.register(changelogScanIterable.iterator()); + } + catch (UnsupportedOperationException e) { + throw new TrinoException(NOT_SUPPORTED, "Table uses features which are not yet supported by the table_changes function", e); + } + } + + List splits = new ArrayList<>(maxSize); + while (splits.size() < maxSize && (fileTasksIterator.hasNext() || changelogScanIterator.hasNext())) { + if (!fileTasksIterator.hasNext()) { + ChangelogScanTask wholeFileTask = changelogScanIterator.next(); + fileTasksIterator = splitIfPossible(wholeFileTask, targetSplitSize); + continue; + } + + ChangelogScanTask next = fileTasksIterator.next(); + splits.add(toIcebergSplit(next)); + } + return completedFuture(new ConnectorSplitBatch(splits, isFinished())); + } + + @Override + public boolean isFinished() + { + return changelogScanIterator != null && !changelogScanIterator.hasNext() && !fileTasksIterator.hasNext(); + } + + @Override + public void close() + { + try { + closer.close(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + protected ConnectorSplit toIcebergSplit(ChangelogScanTask task) + { + TableChangesInternalSplit icebergSplit = (TableChangesInternalSplit) super.toIcebergSplit(task); + return new TableChangesSplit( + RowLevelOperationMode.MERGE_ON_READ, + icebergSplit.partitionDataJson(), + icebergSplit.partitionSpecJson(), + ImmutableList.of(icebergSplit)); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessor.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessor.java index cbed46f2a570..78155d2bd073 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessor.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessor.java @@ -13,177 +13,42 @@ */ package io.trino.plugin.iceberg.functions.tablechanges; -import com.google.common.collect.ImmutableList; -import io.trino.plugin.iceberg.IcebergColumnHandle; import io.trino.plugin.iceberg.IcebergPageSourceProvider; -import io.trino.plugin.iceberg.PartitionData; -import io.trino.spi.Page; -import io.trino.spi.block.Block; -import io.trino.spi.block.RunLengthEncodedBlock; -import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.connector.ConnectorSession; -import io.trino.spi.connector.DynamicFilter; -import io.trino.spi.connector.SourcePage; import io.trino.spi.function.table.TableFunctionProcessorState; import io.trino.spi.function.table.TableFunctionSplitProcessor; -import io.trino.spi.predicate.TupleDomain; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.PartitionSpecParser; -import org.apache.iceberg.Schema; -import org.apache.iceberg.SchemaParser; -import org.apache.iceberg.mapping.NameMappingParser; import java.io.IOException; -import java.util.Optional; - -import static io.airlift.slice.Slices.utf8Slice; -import static io.trino.plugin.iceberg.IcebergColumnHandle.DATA_CHANGE_ORDINAL_ID; -import static io.trino.plugin.iceberg.IcebergColumnHandle.DATA_CHANGE_TIMESTAMP_ID; -import static io.trino.plugin.iceberg.IcebergColumnHandle.DATA_CHANGE_TYPE_ID; -import static io.trino.plugin.iceberg.IcebergColumnHandle.DATA_CHANGE_VERSION_ID; -import static io.trino.spi.function.table.TableFunctionProcessorState.Finished.FINISHED; -import static io.trino.spi.function.table.TableFunctionProcessorState.Processed.produced; -import static io.trino.spi.predicate.Utils.nativeValueToBlock; -import static io.trino.spi.type.BigintType.BIGINT; -import static io.trino.spi.type.IntegerType.INTEGER; -import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; -import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; -import static java.util.Objects.requireNonNull; +import java.util.concurrent.ExecutorService; public class TableChangesFunctionProcessor implements TableFunctionSplitProcessor { - private static final Page EMPTY_PAGE = new Page(0); - - private final ConnectorPageSource pageSource; - private final int[] delegateColumnMap; - private final Optional changeTypeIndex; - private final Block changeTypeValue; - private final Optional changeVersionIndex; - private final Block changeVersionValue; - private final Optional changeTimestampIndex; - private final Block changeTimestampValue; - private final Optional changeOrdinalIndex; - private final Block changeOrdinalValue; + private final TableFunctionSplitProcessor delegate; public TableChangesFunctionProcessor( ConnectorSession session, TableChangesFunctionHandle functionHandle, TableChangesSplit split, - IcebergPageSourceProvider icebergPageSourceProvider) + IcebergPageSourceProvider icebergPageSourceProvider, + ExecutorService tableFunctionProcessorExecutor) { - requireNonNull(session, "session is null"); - requireNonNull(functionHandle, "functionHandle is null"); - requireNonNull(split, "split is null"); - requireNonNull(icebergPageSourceProvider, "icebergPageSourceProvider is null"); - - Schema tableSchema = SchemaParser.fromJson(functionHandle.tableSchemaJson()); - PartitionSpec partitionSpec = PartitionSpecParser.fromJson(tableSchema, split.partitionSpecJson()); - org.apache.iceberg.types.Type[] partitionColumnTypes = partitionSpec.fields().stream() - .map(field -> field.transform().getResultType(tableSchema.findType(field.sourceId()))) - .toArray(org.apache.iceberg.types.Type[]::new); - - int delegateColumnIndex = 0; - int[] delegateColumnMap = new int[functionHandle.columns().size()]; - Optional changeTypeIndex = Optional.empty(); - Optional changeVersionIndex = Optional.empty(); - Optional changeTimestampIndex = Optional.empty(); - Optional changeOrdinalIndex = Optional.empty(); - for (int columnIndex = 0; columnIndex < functionHandle.columns().size(); columnIndex++) { - IcebergColumnHandle column = functionHandle.columns().get(columnIndex); - if (column.getId() == DATA_CHANGE_TYPE_ID) { - changeTypeIndex = Optional.of(columnIndex); - delegateColumnMap[columnIndex] = -1; - } - else if (column.getId() == DATA_CHANGE_VERSION_ID) { - changeVersionIndex = Optional.of(columnIndex); - delegateColumnMap[columnIndex] = -1; - } - else if (column.getId() == DATA_CHANGE_TIMESTAMP_ID) { - changeTimestampIndex = Optional.of(columnIndex); - delegateColumnMap[columnIndex] = -1; - } - else if (column.getId() == DATA_CHANGE_ORDINAL_ID) { - changeOrdinalIndex = Optional.of(columnIndex); - delegateColumnMap[columnIndex] = -1; - } - else { - delegateColumnMap[columnIndex] = delegateColumnIndex; - delegateColumnIndex++; - } - } - - this.pageSource = icebergPageSourceProvider.createPageSource( - session, - functionHandle.columns(), - tableSchema, - partitionSpec, - PartitionData.fromJson(split.partitionDataJson(), partitionColumnTypes), - ImmutableList.of(), - DynamicFilter.EMPTY, - TupleDomain.all(), - TupleDomain.all(), - split.path(), - split.start(), - split.length(), - split.fileSize(), - split.fileRecordCount(), - split.partitionDataJson(), - split.fileFormat(), - split.fileIoProperties(), - 0, - functionHandle.nameMappingJson().map(NameMappingParser::fromJson)); - this.delegateColumnMap = delegateColumnMap; - - this.changeTypeIndex = changeTypeIndex; - this.changeTypeValue = nativeValueToBlock(createUnboundedVarcharType(), utf8Slice(split.changeType().getTableValue())); - - this.changeVersionIndex = changeVersionIndex; - this.changeVersionValue = nativeValueToBlock(BIGINT, split.snapshotId()); - - this.changeTimestampIndex = changeTimestampIndex; - this.changeTimestampValue = nativeValueToBlock(TIMESTAMP_TZ_MILLIS, split.snapshotTimestamp()); - - this.changeOrdinalIndex = changeOrdinalIndex; - this.changeOrdinalValue = nativeValueToBlock(INTEGER, (long) split.changeOrdinal()); + this.delegate = switch (split.operationMode()) { + case COPY_ON_WRITE -> new CopyOnWriteTableChangesFunctionProcessor(session, functionHandle, split, icebergPageSourceProvider, tableFunctionProcessorExecutor); + case MERGE_ON_READ -> new MergeOnReadTableChangesFunctionProcessor(session, functionHandle, split, icebergPageSourceProvider); + }; } @Override public TableFunctionProcessorState process() { - if (pageSource.isFinished()) { - return FINISHED; - } - - SourcePage dataPage = pageSource.getNextSourcePage(); - if (dataPage == null) { - return TableFunctionProcessorState.Processed.produced(EMPTY_PAGE); - } - - Block[] blocks = new Block[delegateColumnMap.length]; - for (int targetChannel = 0; targetChannel < delegateColumnMap.length; targetChannel++) { - int delegateIndex = delegateColumnMap[targetChannel]; - if (delegateIndex != -1) { - blocks[targetChannel] = dataPage.getBlock(delegateIndex); - } - } - - changeTypeIndex.ifPresent(columnChannel -> - blocks[columnChannel] = RunLengthEncodedBlock.create(changeTypeValue, dataPage.getPositionCount())); - changeVersionIndex.ifPresent(columnChannel -> - blocks[columnChannel] = RunLengthEncodedBlock.create(changeVersionValue, dataPage.getPositionCount())); - changeTimestampIndex.ifPresent(columnChannel -> - blocks[columnChannel] = RunLengthEncodedBlock.create(changeTimestampValue, dataPage.getPositionCount())); - changeOrdinalIndex.ifPresent(columnChannel -> - blocks[columnChannel] = RunLengthEncodedBlock.create(changeOrdinalValue, dataPage.getPositionCount())); - - return produced(new Page(dataPage.getPositionCount(), blocks)); + return delegate.process(); } @Override public void close() throws IOException { - pageSource.close(); + delegate.close(); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessorProviderFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessorProviderFactory.java index a3846e4ddbc4..33f30a830037 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessorProviderFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessorProviderFactory.java @@ -15,6 +15,7 @@ import com.google.inject.Inject; import io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionSplitProcessor; +import io.trino.plugin.iceberg.ForIcebergCopyOnWriteTableChangesProcessor; import io.trino.plugin.iceberg.IcebergPageSourceProvider; import io.trino.plugin.iceberg.IcebergPageSourceProviderFactory; import io.trino.spi.connector.ConnectorSession; @@ -24,27 +25,31 @@ import io.trino.spi.function.table.TableFunctionProcessorProviderFactory; import io.trino.spi.function.table.TableFunctionSplitProcessor; +import java.util.concurrent.ExecutorService; + import static java.util.Objects.requireNonNull; public class TableChangesFunctionProcessorProviderFactory implements TableFunctionProcessorProviderFactory { private final IcebergPageSourceProviderFactory icebergPageSourceProviderFactory; + private final ExecutorService tableChangesProcessorExecutor; @Inject - public TableChangesFunctionProcessorProviderFactory(IcebergPageSourceProviderFactory icebergPageSourceProviderFactory) + public TableChangesFunctionProcessorProviderFactory(IcebergPageSourceProviderFactory icebergPageSourceProviderFactory, @ForIcebergCopyOnWriteTableChangesProcessor ExecutorService tableChangesProcessorExecutor) { this.icebergPageSourceProviderFactory = requireNonNull(icebergPageSourceProviderFactory, "icebergPageSourceProviderFactory is null"); + this.tableChangesProcessorExecutor = requireNonNull(tableChangesProcessorExecutor, "tableChangesProcessorExecutor is null"); } @Override public TableFunctionProcessorProvider createTableFunctionProcessorProvider() { IcebergPageSourceProvider pageSourceProvider = (IcebergPageSourceProvider) icebergPageSourceProviderFactory.createPageSourceProvider(); - return new TableChangesFunctionProcessorProvider(pageSourceProvider); + return new TableChangesFunctionProcessorProvider(pageSourceProvider, tableChangesProcessorExecutor); } - private record TableChangesFunctionProcessorProvider(IcebergPageSourceProvider icebergPageSourceProvider) + private record TableChangesFunctionProcessorProvider(IcebergPageSourceProvider icebergPageSourceProvider, ExecutorService tableChangesProcessorExecutor) implements TableFunctionProcessorProvider { @Override @@ -55,7 +60,8 @@ public TableFunctionSplitProcessor getSplitProcessor(ConnectorSession session, C session, (TableChangesFunctionHandle) handle, (TableChangesSplit) split, - icebergPageSourceProvider), + icebergPageSourceProvider, + tableChangesProcessorExecutor), getClass().getClassLoader()); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesInternalSplit.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesInternalSplit.java new file mode 100644 index 000000000000..2e010746c406 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesInternalSplit.java @@ -0,0 +1,102 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.functions.tablechanges; + +import com.google.common.collect.ImmutableMap; +import io.airlift.slice.SizeOf; +import io.trino.plugin.iceberg.IcebergFileFormat; +import io.trino.spi.SplitWeight; +import io.trino.spi.connector.ConnectorSplit; + +import java.util.Map; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static java.util.Objects.requireNonNull; + +public record TableChangesInternalSplit( + ChangeType changeType, + long snapshotId, + long snapshotTimestamp, + int changeOrdinal, + String path, + long start, + long length, + long fileSize, + long fileRecordCount, + IcebergFileFormat fileFormat, + String partitionSpecJson, + String partitionDataJson, + SplitWeight splitWeight, + Map fileIoProperties) implements ConnectorSplit +{ + private static final int INSTANCE_SIZE = SizeOf.instanceSize(TableChangesInternalSplit.class); + + public TableChangesInternalSplit + { + requireNonNull(changeType, "changeType is null"); + requireNonNull(path, "path is null"); + requireNonNull(fileFormat, "fileFormat is null"); + requireNonNull(partitionSpecJson, "partitionSpecJson is null"); + requireNonNull(partitionDataJson, "partitionDataJson is null"); + requireNonNull(splitWeight, "splitWeight is null"); + fileIoProperties = ImmutableMap.copyOf(requireNonNull(fileIoProperties, "fileIoProperties is null")); + } + + @Override + public SplitWeight getSplitWeight() + { + return splitWeight; + } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(path) + + estimatedSizeOf(partitionSpecJson) + + estimatedSizeOf(partitionDataJson) + + splitWeight.getRetainedSizeInBytes() + + estimatedSizeOf(fileIoProperties, SizeOf::estimatedSizeOf, SizeOf::estimatedSizeOf); + } + + @Override + public String toString() + { + return toStringHelper(this) + .addValue(path) + .add("start", start) + .add("length", length) + .add("records", fileRecordCount) + .toString(); + } + + public enum ChangeType { + ADDED_FILE("insert"), + DELETED_FILE("delete"), + POSITIONAL_DELETE("delete"); + + private final String tableValue; + + ChangeType(String tableValue) + { + this.tableValue = tableValue; + } + + public String getTableValue() + { + return tableValue; + } + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesSplit.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesSplit.java index bc6f905e35d3..166253c33cf5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesSplit.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesSplit.java @@ -13,90 +13,51 @@ */ package io.trino.plugin.iceberg.functions.tablechanges; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableList; import io.airlift.slice.SizeOf; -import io.trino.plugin.iceberg.IcebergFileFormat; -import io.trino.spi.SplitWeight; import io.trino.spi.connector.ConnectorSplit; +import org.apache.iceberg.RowLevelOperationMode; -import java.util.Map; +import java.util.List; import static com.google.common.base.MoreObjects.toStringHelper; import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; public record TableChangesSplit( - ChangeType changeType, - long snapshotId, - long snapshotTimestamp, - int changeOrdinal, - String path, - long start, - long length, - long fileSize, - long fileRecordCount, - IcebergFileFormat fileFormat, - String partitionSpecJson, + RowLevelOperationMode operationMode, String partitionDataJson, - SplitWeight splitWeight, - Map fileIoProperties) implements ConnectorSplit + String partitionSpecJson, + List splits) + implements ConnectorSplit { private static final int INSTANCE_SIZE = SizeOf.instanceSize(TableChangesSplit.class); public TableChangesSplit { - requireNonNull(changeType, "changeType is null"); - requireNonNull(path, "path is null"); - requireNonNull(fileFormat, "fileFormat is null"); - requireNonNull(partitionSpecJson, "partitionSpecJson is null"); + requireNonNull(operationMode, "operationMode is null"); requireNonNull(partitionDataJson, "partitionDataJson is null"); - requireNonNull(splitWeight, "splitWeight is null"); - fileIoProperties = ImmutableMap.copyOf(requireNonNull(fileIoProperties, "fileIoProperties is null")); - } - - @Override - public SplitWeight getSplitWeight() - { - return splitWeight; + requireNonNull(partitionSpecJson, "partitionSpecJson is null"); + splits = ImmutableList.copyOf(splits); } @Override public long getRetainedSizeInBytes() { - return INSTANCE_SIZE - + estimatedSizeOf(path) - + estimatedSizeOf(partitionSpecJson) - + estimatedSizeOf(partitionDataJson) - + splitWeight.getRetainedSizeInBytes() - + estimatedSizeOf(fileIoProperties, SizeOf::estimatedSizeOf, SizeOf::estimatedSizeOf); + return INSTANCE_SIZE + + estimatedSizeOf(partitionDataJson) + + estimatedSizeOf(partitionSpecJson) + + splits.stream().map(ConnectorSplit::getRetainedSizeInBytes).mapToLong(Long::longValue).sum(); } @Override public String toString() { return toStringHelper(this) - .addValue(path) - .add("start", start) - .add("length", length) - .add("records", fileRecordCount) + .add("operationMode", operationMode) + .add("partitionDataJson", partitionDataJson) + .add("partitionSpecJson", partitionSpecJson) + .add("splits", splits) .toString(); } - - public enum ChangeType { - ADDED_FILE("insert"), - DELETED_FILE("delete"), - POSITIONAL_DELETE("delete"); - - private final String tableValue; - - ChangeType(String tableValue) - { - this.tableValue = tableValue; - } - - public String getTableValue() - { - return tableValue; - } - } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesSplitSource.java index 0ff3415d5273..38834e0f51e1 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesSplitSource.java @@ -13,166 +13,45 @@ */ package io.trino.plugin.iceberg.functions.tablechanges; -import com.google.common.io.Closer; -import io.trino.plugin.iceberg.IcebergFileFormat; -import io.trino.plugin.iceberg.PartitionData; -import io.trino.spi.SplitWeight; -import io.trino.spi.TrinoException; -import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.connector.ConnectorSplitSource; -import io.trino.spi.type.DateTimeEncoding; -import org.apache.iceberg.AddedRowsScanTask; -import org.apache.iceberg.ChangelogScanTask; -import org.apache.iceberg.DeletedDataFileScanTask; import org.apache.iceberg.IncrementalChangelogScan; -import org.apache.iceberg.PartitionSpecParser; -import org.apache.iceberg.SplittableScanTask; import org.apache.iceberg.Table; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.CloseableIterator; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; import java.util.concurrent.CompletableFuture; -import static com.google.common.collect.Iterators.singletonIterator; -import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; -import static io.trino.spi.type.TimeZoneKey.UTC_KEY; -import static java.util.Collections.emptyIterator; -import static java.util.Objects.requireNonNull; -import static java.util.concurrent.CompletableFuture.completedFuture; +import static io.trino.plugin.iceberg.IcebergUtil.rowLevelOperationMode; public class TableChangesSplitSource implements ConnectorSplitSource { - private final Table icebergTable; - private final IncrementalChangelogScan tableScan; - private final long targetSplitSize; - private final Closer closer = Closer.create(); - - private CloseableIterable changelogScanIterable; - private CloseableIterator changelogScanIterator; - private Iterator fileTasksIterator = emptyIterator(); + private final ConnectorSplitSource delegate; public TableChangesSplitSource( Table icebergTable, IncrementalChangelogScan tableScan) { - this.icebergTable = requireNonNull(icebergTable, "table is null"); - this.tableScan = requireNonNull(tableScan, "tableScan is null"); - this.targetSplitSize = tableScan.targetSplitSize(); + // TODO: handle the splits according to how data was written in each snapshot + this.delegate = switch (rowLevelOperationMode(icebergTable)) { + case COPY_ON_WRITE -> new CopyOnWriteTableChangesSplitSource(icebergTable, tableScan); + case MERGE_ON_READ -> new MergeOnReadTableChangesSplitSource(icebergTable, tableScan); + }; } @Override public CompletableFuture getNextBatch(int maxSize) { - if (changelogScanIterable == null) { - try { - this.changelogScanIterable = closer.register(tableScan.planFiles()); - this.changelogScanIterator = closer.register(changelogScanIterable.iterator()); - } - catch (UnsupportedOperationException e) { - throw new TrinoException(NOT_SUPPORTED, "Table uses features which are not yet supported by the table_changes function", e); - } - } - - List splits = new ArrayList<>(maxSize); - while (splits.size() < maxSize && (fileTasksIterator.hasNext() || changelogScanIterator.hasNext())) { - if (!fileTasksIterator.hasNext()) { - ChangelogScanTask wholeFileTask = changelogScanIterator.next(); - fileTasksIterator = splitIfPossible(wholeFileTask, targetSplitSize); - continue; - } - - ChangelogScanTask next = fileTasksIterator.next(); - splits.add(toIcebergSplit(next)); - } - return completedFuture(new ConnectorSplitBatch(splits, isFinished())); + return delegate.getNextBatch(maxSize); } @Override public boolean isFinished() { - return changelogScanIterator != null && !changelogScanIterator.hasNext() && !fileTasksIterator.hasNext(); + return delegate.isFinished(); } @Override public void close() { - try { - closer.close(); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - @SuppressWarnings("unchecked") - private static Iterator splitIfPossible(ChangelogScanTask wholeFileScan, long targetSplitSize) - { - if (wholeFileScan instanceof AddedRowsScanTask) { - return ((SplittableScanTask) wholeFileScan).split(targetSplitSize).iterator(); - } - - if (wholeFileScan instanceof DeletedDataFileScanTask) { - return ((SplittableScanTask) wholeFileScan).split(targetSplitSize).iterator(); - } - - return singletonIterator(wholeFileScan); - } - - private ConnectorSplit toIcebergSplit(ChangelogScanTask task) - { - // TODO: Support DeletedRowsScanTask (requires https://github.com/apache/iceberg/pull/6182) - if (task instanceof AddedRowsScanTask addedRowsScanTask) { - return toSplit(addedRowsScanTask); - } - else if (task instanceof DeletedDataFileScanTask deletedDataFileScanTask) { - return toSplit(deletedDataFileScanTask); - } - else { - throw new TrinoException(NOT_SUPPORTED, "ChangelogScanTask type is not supported:" + task); - } - } - - private TableChangesSplit toSplit(AddedRowsScanTask task) - { - return new TableChangesSplit( - TableChangesSplit.ChangeType.ADDED_FILE, - task.commitSnapshotId(), - DateTimeEncoding.packDateTimeWithZone(icebergTable.snapshot(task.commitSnapshotId()).timestampMillis(), UTC_KEY), - task.changeOrdinal(), - task.file().location(), - task.start(), - task.length(), - task.file().fileSizeInBytes(), - task.file().recordCount(), - IcebergFileFormat.fromIceberg(task.file().format()), - PartitionSpecParser.toJson(task.spec()), - PartitionData.toJson(task.file().partition()), - SplitWeight.standard(), - icebergTable.io().properties()); - } - - private TableChangesSplit toSplit(DeletedDataFileScanTask task) - { - return new TableChangesSplit( - TableChangesSplit.ChangeType.DELETED_FILE, - task.commitSnapshotId(), - DateTimeEncoding.packDateTimeWithZone(icebergTable.snapshot(task.commitSnapshotId()).timestampMillis(), UTC_KEY), - task.changeOrdinal(), - task.file().location(), - task.start(), - task.length(), - task.file().fileSizeInBytes(), - task.file().recordCount(), - IcebergFileFormat.fromIceberg(task.file().format()), - PartitionSpecParser.toJson(task.spec()), - PartitionData.toJson(task.file().partition()), - SplitWeight.standard(), - icebergTable.io().properties()); + delegate.close(); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index a410e74a24f8..17116bd5fe5f 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -78,6 +78,7 @@ public void testDefaults() .setQueryPartitionFilterRequiredSchemas(ImmutableSet.of()) .setSplitManagerThreads(Integer.toString(Math.min(Runtime.getRuntime().availableProcessors() * 2, 32))) .setPlanningThreads(Integer.toString(Math.min(Runtime.getRuntime().availableProcessors(), 16))) + .setTableChangesProcessorThreads(Integer.toString(Math.min(Runtime.getRuntime().availableProcessors(), 16))) .setFileDeleteThreads(Integer.toString(Runtime.getRuntime().availableProcessors() * 2)) .setAllowedExtraProperties(ImmutableList.of()) .setIncrementalRefreshEnabled(true) @@ -125,6 +126,7 @@ public void testExplicitPropertyMappings() .put("iceberg.query-partition-filter-required-schemas", "bronze,silver") .put("iceberg.split-manager-threads", "42") .put("iceberg.planning-threads", "42") + .put("iceberg.table-changes-processor-threads", "42") .put("iceberg.file-delete-threads", "42") .put("iceberg.allowed-extra-properties", "propX,propY") .put("iceberg.incremental-refresh-enabled", "false") @@ -168,6 +170,7 @@ public void testExplicitPropertyMappings() .setQueryPartitionFilterRequiredSchemas(ImmutableSet.of("bronze", "silver")) .setSplitManagerThreads("42") .setPlanningThreads("42") + .setTableChangesProcessorThreads("42") .setFileDeleteThreads("42") .setAllowedExtraProperties(ImmutableList.of("propX", "propY")) .setIncrementalRefreshEnabled(false) diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index bc2523a58b7b..849bbd6c12f5 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -2199,6 +2199,49 @@ public enum CreateMode CREATE_TABLE_WITH_NO_DATA_AND_INSERT } + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + public void testTableChangesForCopyOnWriteTables(StorageFormat storageFormat) + { + String baseTableName = toLowerCase("test_table_changes_" + storageFormat + randomNameSuffix()); + String sparkTableName = sparkTableName(baseTableName); + + String snapshotTrinoTableName = trinoTableName("\"" + baseTableName + "$snapshots\""); + + onSpark().executeQuery(format( + "CREATE TABLE %s (" + + "x BIGINT, " + + "y STRING, " + + "part STRING) " + + " USING ICEBERG" + + " TBLPROPERTIES ('write.format.default' = '%s', 'write.delete.mode' = 'copy-on-write')", + sparkTableName, + storageFormat)); + + onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (5, 'a', 'p1'), (4, 'b', 'p1')"); + onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (5, 'a', 'p2'), (4, 'b', 'p2')"); + + long snapshotAfterInsert = getCurrentSnapshotId(snapshotTrinoTableName); + + onSpark().executeQuery("UPDATE " + sparkTableName + " SET y = 'updated' WHERE x = 5"); + + long snapshotAfterUpdate = getCurrentSnapshotId(snapshotTrinoTableName); + + assertThat(onTrino().executeQuery("SELECT x, y, _change_type, _change_version_id, _change_ordinal " + + "FROM TABLE(iceberg.system.table_changes('%s', '%s', %s, %s))".formatted(TEST_SCHEMA_NAME, baseTableName, snapshotAfterInsert, snapshotAfterUpdate))) + .containsOnly( + row(5, "a", "delete", snapshotAfterUpdate, 0), + row(5, "a", "delete", snapshotAfterUpdate, 0), + row(5, "updated", "insert", snapshotAfterUpdate, 0), + row(5, "updated", "insert", snapshotAfterUpdate, 0)); + } + + private long getCurrentSnapshotId(String snapshotsTableName) + { + List row = onTrino().executeQuery("SELECT snapshot_id FROM " + snapshotsTableName + " ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES").row(0); + assertThat(row).hasSize(1); + return ((Long) row.getFirst()).longValue(); + } + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") public void testSparkReadsTrinoTableAfterCleaningUp(StorageFormat storageFormat) {