diff --git a/plugin/trino-delta-lake/src/main/java/io/delta/kernel/internal/deletionvectors/RoaringBitmapArrays.java b/plugin/trino-delta-lake/src/main/java/io/delta/kernel/internal/deletionvectors/RoaringBitmapArrays.java deleted file mode 100644 index 1aca923c9833..000000000000 --- a/plugin/trino-delta-lake/src/main/java/io/delta/kernel/internal/deletionvectors/RoaringBitmapArrays.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.delta.kernel.internal.deletionvectors; - -import java.io.IOException; - -public final class RoaringBitmapArrays -{ - private RoaringBitmapArrays() {} - - public static RoaringBitmapArray readFrom(byte[] bytes) - throws IOException - { - return RoaringBitmapArray.readFrom(bytes); - } -} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DataFileInfo.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DataFileInfo.java index 97a7593dde1c..709a157ac346 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DataFileInfo.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DataFileInfo.java @@ -13,9 +13,11 @@ */ package io.trino.plugin.deltalake; +import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeJsonFileStatistics; import java.util.List; +import java.util.Optional; import static java.util.Objects.requireNonNull; @@ -25,7 +27,8 @@ public record DataFileInfo( long creationTime, io.trino.plugin.deltalake.DataFileInfo.DataFileType dataFileType, List partitionValues, - DeltaLakeJsonFileStatistics statistics) + DeltaLakeJsonFileStatistics statistics, + Optional deletionVector) { public enum DataFileType { @@ -37,5 +40,6 @@ public enum DataFileType { requireNonNull(dataFileType, "dataFileType is null"); requireNonNull(statistics, "statistics is null"); + requireNonNull(deletionVector, "deletionVector is null"); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java index 28fe36a62603..2c7dbb82dd4a 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java @@ -14,6 +14,7 @@ package io.trino.plugin.deltalake; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import io.airlift.concurrent.MoreFutures; import io.airlift.json.JsonCodec; import io.airlift.slice.Slice; @@ -22,12 +23,19 @@ import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.TrinoInputFile; +import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetReaderOptions; +import io.trino.parquet.metadata.BlockMetadata; +import io.trino.parquet.metadata.ParquetMetadata; +import io.trino.parquet.reader.MetadataReader; import io.trino.parquet.writer.ParquetWriterOptions; +import io.trino.plugin.deltalake.delete.RoaringBitmapArray; +import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.ReaderPageSource; import io.trino.plugin.hive.parquet.ParquetFileWriter; import io.trino.plugin.hive.parquet.ParquetPageSourceFactory; +import io.trino.plugin.hive.parquet.TrinoParquetDataSource; import io.trino.spi.Page; import io.trino.spi.TrinoException; import io.trino.spi.block.Block; @@ -42,11 +50,10 @@ import jakarta.annotation.Nullable; import org.apache.parquet.format.CompressionCodec; import org.joda.time.DateTimeZone; -import org.roaringbitmap.longlong.LongBitmapDataProvider; -import org.roaringbitmap.longlong.Roaring64Bitmap; import java.io.Closeable; import java.io.IOException; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -66,12 +73,17 @@ import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaLakeColumnType.SYNTHESIZED; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE; +import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_FILESYSTEM_ERROR; import static io.trino.plugin.deltalake.DeltaLakeMetadata.relativePath; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getCompressionCodec; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetWriterBlockSize; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetWriterPageSize; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetWriterPageValueCount; import static io.trino.plugin.deltalake.DeltaLakeTypes.toParquetType; +import static io.trino.plugin.deltalake.DeltaLakeWriter.readStatistics; +import static io.trino.plugin.deltalake.delete.DeletionVectors.readDeletionVectors; +import static io.trino.plugin.deltalake.delete.DeletionVectors.toFileName; +import static io.trino.plugin.deltalake.delete.DeletionVectors.writeDeletionVectors; import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.deserializePartitionValue; import static io.trino.spi.block.RowBlock.getRowFieldsFromBlock; import static io.trino.spi.predicate.Utils.nativeValueToBlock; @@ -113,6 +125,10 @@ public class DeltaLakeMergeSink private final int[] dataColumnsIndices; private final int[] dataAndRowIdColumnsIndices; private final DeltaLakeParquetSchemaMapping parquetSchemaMapping; + private final FileFormatDataSourceStats fileFormatDataSourceStats; + private final ParquetReaderOptions parquetReaderOptions; + private final boolean deletionVectorEnabled; + private final Map deletionVectors; @Nullable private DeltaLakeCdfPageSink cdfPageSink; @@ -132,7 +148,11 @@ public DeltaLakeMergeSink( int domainCompactionThreshold, Supplier cdfPageSinkSupplier, boolean cdfEnabled, - DeltaLakeParquetSchemaMapping parquetSchemaMapping) + DeltaLakeParquetSchemaMapping parquetSchemaMapping, + ParquetReaderOptions parquetReaderOptions, + FileFormatDataSourceStats fileFormatDataSourceStats, + boolean deletionVectorEnabled, + Map deletionVectors) { this.typeOperators = requireNonNull(typeOperators, "typeOperators is null"); this.session = requireNonNull(session, "session is null"); @@ -156,6 +176,10 @@ public DeltaLakeMergeSink( this.cdfPageSinkSupplier = requireNonNull(cdfPageSinkSupplier); this.cdfEnabled = cdfEnabled; this.parquetSchemaMapping = requireNonNull(parquetSchemaMapping, "parquetSchemaMapping is null"); + this.fileFormatDataSourceStats = requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null"); + this.parquetReaderOptions = requireNonNull(parquetReaderOptions, "parquetReaderOptions is null"); + this.deletionVectorEnabled = deletionVectorEnabled; + this.deletionVectors = ImmutableMap.copyOf(requireNonNull(deletionVectors, "deletionVectors is null")); dataColumnsIndices = new int[tableColumnCount]; dataAndRowIdColumnsIndices = new int[tableColumnCount + 1]; for (int i = 0; i < tableColumnCount; i++) { @@ -213,10 +237,10 @@ private void processDeletion(Page deletions, String cdfOperation) FileDeletion deletion = fileDeletions.computeIfAbsent(filePath, _ -> new FileDeletion(partitionValues)); if (cdfOperation.equals(UPDATE_PREIMAGE_CDF_LABEL)) { - deletion.rowsDeletedByUpdate().addLong(rowPosition); + deletion.rowsDeletedByUpdate().add(rowPosition); } else { - deletion.rowsDeletedByDelete().addLong(rowPosition); + deletion.rowsDeletedByDelete().add(rowPosition); } } } @@ -308,8 +332,14 @@ public CompletableFuture> finish() .map(Slices::wrappedBuffer) .forEach(fragments::add); - fileDeletions.forEach((path, deletion) -> - fragments.addAll(rewriteFile(path.toStringUtf8(), deletion))); + fileDeletions.forEach((path, deletion) -> { + if (deletionVectorEnabled) { + fragments.add(writeMergeResult(path, deletion)); + } + else { + fragments.addAll(rewriteFile(path.toStringUtf8(), deletion)); + } + }); if (cdfEnabled && cdfPageSink != null) { // cdf may be enabled but there may be no update/deletion so sink was not instantiated MoreFutures.getDone(cdfPageSink.finish()).stream() @@ -324,6 +354,82 @@ public CompletableFuture> finish() return completedFuture(fragments); } + private Slice writeMergeResult(Slice path, FileDeletion deletion) + { + RoaringBitmapArray deletedRows = loadDeletionVector(Location.of(path.toStringUtf8())); + deletedRows.or(deletion.rowsDeletedByDelete()); + deletedRows.or(deletion.rowsDeletedByUpdate()); + + TrinoInputFile inputFile = fileSystem.newInputFile(Location.of(path.toStringUtf8())); + try (ParquetDataSource dataSource = new TrinoParquetDataSource(inputFile, parquetReaderOptions, fileFormatDataSourceStats)) { + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + long rowCount = parquetMetadata.getBlocks().stream().map(BlockMetadata::rowCount).mapToLong(Long::longValue).sum(); + RoaringBitmapArray rowsRetained = new RoaringBitmapArray(); + rowsRetained.addRange(0, rowCount); + rowsRetained.andNot(deletedRows); + if (rowsRetained.isEmpty()) { + // No rows are retained in the file, so we don't need to write deletion vectors. + return onlySourceFile(path.toStringUtf8(), deletion); + } + return writeDeletionVector(path.toStringUtf8(), inputFile.length(), inputFile.lastModified(), deletedRows, deletion, parquetMetadata, rowCount); + } + catch (IOException e) { + throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, "Error reading Parquet file: " + path, e); + } + } + + private Slice writeDeletionVector( + String sourcePath, + long length, + Instant lastModified, + RoaringBitmapArray deletedRows, + FileDeletion deletion, + ParquetMetadata parquetMetadata, + long rowCount) + { + String tablePath = rootTableLocation.toString(); + String sourceRelativePath = relativePath(tablePath, sourcePath); + + DeletionVectorEntry deletionVectorEntry; + try { + deletionVectorEntry = writeDeletionVectors(fileSystem, rootTableLocation, deletedRows); + } + catch (IOException e) { + throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Unable to write deletion vector file", e); + } + + try { + DataFileInfo newFileInfo = new DataFileInfo( + sourceRelativePath, + length, + lastModified.toEpochMilli(), + DATA, + deletion.partitionValues, + readStatistics(parquetMetadata, dataColumns, rowCount), + Optional.of(deletionVectorEntry)); + DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues, Optional.of(sourceRelativePath), Optional.of(newFileInfo)); + return utf8Slice(mergeResultJsonCodec.toJson(result)); + } + catch (Throwable e) { + try { + fileSystem.deleteFile(rootTableLocation.appendPath(toFileName(deletionVectorEntry.pathOrInlineDv()))); + } + catch (IOException ex) { + if (!e.equals(ex)) { + e.addSuppressed(ex); + } + } + throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Unable to write deletion vector file", e); + } + } + + private Slice onlySourceFile(String sourcePath, FileDeletion deletion) + { + String sourceRelativePath = relativePath(rootTableLocation.toString(), sourcePath); + DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.empty()); + return utf8Slice(mergeResultJsonCodec.toJson(result)); + } + // In spite of the name "Delta" Lake, we must rewrite the entire file to delete rows. private List rewriteFile(String sourcePath, FileDeletion deletion) { @@ -395,11 +501,26 @@ private ParquetFileWriter createParquetFileWriter(Location path, List rewriteParquetFile(Location path, FileDeletion deletion, DeltaLakeWriter fileWriter) throws IOException { - LongBitmapDataProvider rowsDeletedByDelete = deletion.rowsDeletedByDelete(); - LongBitmapDataProvider rowsDeletedByUpdate = deletion.rowsDeletedByUpdate(); + RoaringBitmapArray rowsDeletedByDelete = deletion.rowsDeletedByDelete(); + RoaringBitmapArray rowsDeletedByUpdate = deletion.rowsDeletedByUpdate(); try (ConnectorPageSource connectorPageSource = createParquetPageSource(path).get()) { long filePosition = 0; while (!connectorPageSource.isFinished()) { @@ -410,8 +531,8 @@ private Optional rewriteParquetFile(Location path, FileDeletion de int positionCount = page.getPositionCount(); int[] retained = new int[positionCount]; - int[] deletedByDelete = new int[(int) rowsDeletedByDelete.getLongCardinality()]; - int[] deletedByUpdate = new int[(int) rowsDeletedByUpdate.getLongCardinality()]; + int[] deletedByDelete = new int[(int) rowsDeletedByDelete.cardinality()]; + int[] deletedByUpdate = new int[(int) rowsDeletedByUpdate.cardinality()]; int retainedCount = 0; int deletedByUpdateCount = 0; int deletedByDeleteCount = 0; @@ -529,8 +650,8 @@ public void abort() private static class FileDeletion { private final List partitionValues; - private final LongBitmapDataProvider rowsDeletedByDelete = new Roaring64Bitmap(); - private final LongBitmapDataProvider rowsDeletedByUpdate = new Roaring64Bitmap(); + private final RoaringBitmapArray rowsDeletedByDelete = new RoaringBitmapArray(); + private final RoaringBitmapArray rowsDeletedByUpdate = new RoaringBitmapArray(); private FileDeletion(List partitionValues) { @@ -544,12 +665,12 @@ public List partitionValues() return partitionValues; } - public LongBitmapDataProvider rowsDeletedByDelete() + public RoaringBitmapArray rowsDeletedByDelete() { return rowsDeletedByDelete; } - public LongBitmapDataProvider rowsDeletedByUpdate() + public RoaringBitmapArray rowsDeletedByUpdate() { return rowsDeletedByUpdate; } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeTableHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeTableHandle.java index 30206fe04b62..7dca73f08213 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeTableHandle.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeTableHandle.java @@ -13,20 +13,26 @@ */ package io.trino.plugin.deltalake; +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; import io.trino.spi.connector.ConnectorMergeTableHandle; import io.trino.spi.connector.ConnectorTableHandle; +import java.util.Map; + import static java.util.Objects.requireNonNull; public record DeltaLakeMergeTableHandle( DeltaLakeTableHandle tableHandle, - DeltaLakeInsertTableHandle insertTableHandle) + DeltaLakeInsertTableHandle insertTableHandle, + Map deletionVectors) implements ConnectorMergeTableHandle { public DeltaLakeMergeTableHandle { requireNonNull(tableHandle, "tableHandle is null"); requireNonNull(insertTableHandle, "insertTableHandle is null"); + deletionVectors = ImmutableMap.copyOf(requireNonNull(deletionVectors, "deletionVectors is null")); } @Override diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 46a968cf72bd..bb0204493525 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -63,6 +63,7 @@ import io.trino.plugin.deltalake.transactionlog.AddFileEntry; import io.trino.plugin.deltalake.transactionlog.CdcEntry; import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry; +import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeComputedStatistics; import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport; import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode; @@ -267,6 +268,7 @@ import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getIsolationLevel; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getMaxColumnId; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.isAppendOnly; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.isDeletionVectorEnabled; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeColumnType; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeSchemaAsJson; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeStatsAsJson; @@ -2062,7 +2064,7 @@ private static void appendAddFileEntries(TransactionLogWriter transactionLogWrit Optional.of(serializeStatsAsJson(statisticsWithExactNames)), Optional.empty(), ImmutableMap.of(), - Optional.empty())); + info.deletionVector())); } } @@ -2418,7 +2420,32 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT DeltaLakeInsertTableHandle insertHandle = createInsertHandle(retryMode, handle, inputColumns); - return new DeltaLakeMergeTableHandle(handle, insertHandle); + Map deletionVectors = loadDeletionVectors(session, handle); + return new DeltaLakeMergeTableHandle(handle, insertHandle, deletionVectors); + } + + private Map loadDeletionVectors(ConnectorSession session, DeltaLakeTableHandle handle) + { + if (!isDeletionVectorEnabled(handle.getMetadataEntry(), handle.getProtocolEntry())) { + return ImmutableMap.of(); + } + + ImmutableMap.Builder deletionVectors = ImmutableMap.builder(); + try (Stream activeFiles = transactionLogAccess.getActiveFiles( + session, + getSnapshot(session, handle), + handle.getMetadataEntry(), + handle.getProtocolEntry(), + handle.getEnforcedPartitionConstraint(), + handle.getProjectedColumns().orElse(ImmutableSet.of()))) { + Iterator addFileEntryIterator = activeFiles.iterator(); + while (addFileEntryIterator.hasNext()) { + AddFileEntry addFileEntry = addFileEntryIterator.next(); + addFileEntry.getDeletionVector().ifPresent(deletionVector -> deletionVectors.put(addFileEntry.getPath(), deletionVector)); + } + } + // The latest deletion vector contains all the past deleted rows + return deletionVectors.buildKeepingLast(); } @Override diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java index d12ad3247cfb..2351c0614c2d 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java @@ -18,11 +18,14 @@ import io.airlift.json.JsonCodec; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.parquet.ParquetReaderOptions; import io.trino.plugin.deltalake.procedure.DeltaLakeTableExecuteHandle; import io.trino.plugin.deltalake.procedure.DeltaTableOptimizeHandle; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; +import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.NodeVersion; +import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.spi.PageIndexerFactory; import io.trino.spi.connector.ConnectorInsertTableHandle; import io.trino.spi.connector.ConnectorMergeSink; @@ -51,6 +54,7 @@ import static io.trino.plugin.deltalake.DeltaLakeParquetSchemas.createParquetSchemaMapping; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.changeDataFeedEnabled; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.isDeletionVectorEnabled; import static io.trino.spi.type.VarcharType.VARCHAR; import static java.util.Objects.requireNonNull; @@ -62,6 +66,8 @@ public class DeltaLakePageSinkProvider private final JsonCodec dataFileInfoCodec; private final JsonCodec mergeResultJsonCodec; private final DeltaLakeWriterStats stats; + private final ParquetReaderOptions parquetReaderOptions; + private final FileFormatDataSourceStats fileFormatDataSourceStats; private final int maxPartitionsPerWriter; private final DateTimeZone parquetDateTimeZone; private final TypeManager typeManager; @@ -75,7 +81,9 @@ public DeltaLakePageSinkProvider( JsonCodec dataFileInfoCodec, JsonCodec mergeResultJsonCodec, DeltaLakeWriterStats stats, + FileFormatDataSourceStats fileFormatDataSourceStats, DeltaLakeConfig deltaLakeConfig, + ParquetReaderConfig parquetReaderConfig, TypeManager typeManager, NodeVersion nodeVersion) { @@ -84,6 +92,8 @@ public DeltaLakePageSinkProvider( this.dataFileInfoCodec = dataFileInfoCodec; this.mergeResultJsonCodec = requireNonNull(mergeResultJsonCodec, "mergeResultJsonCodec is null"); this.stats = stats; + this.parquetReaderOptions = parquetReaderConfig.toParquetReaderOptions(); + this.fileFormatDataSourceStats = requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null"); this.maxPartitionsPerWriter = deltaLakeConfig.getMaxPartitionsPerWriter(); this.parquetDateTimeZone = deltaLakeConfig.getParquetDateTimeZone(); this.domainCompactionThreshold = deltaLakeConfig.getDomainCompactionThreshold(); @@ -185,7 +195,11 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction domainCompactionThreshold, () -> createCdfPageSink(merge, session), changeDataFeedEnabled(tableHandle.metadataEntry(), tableHandle.protocolEntry()).orElse(false), - parquetSchemaMapping); + parquetSchemaMapping, + parquetReaderOptions, + fileFormatDataSourceStats, + isDeletionVectorEnabled(tableHandle.metadataEntry(), tableHandle.protocolEntry()), + merge.deletionVectors()); } private DeltaLakeCdfPageSink createCdfPageSink( diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java index b4ddbe4e0472..f08ecc84f839 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java @@ -19,7 +19,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; -import io.delta.kernel.internal.deletionvectors.RoaringBitmapArray; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; @@ -31,6 +30,7 @@ import io.trino.parquet.reader.MetadataReader; import io.trino.plugin.deltalake.delete.PageFilter; import io.trino.plugin.deltalake.delete.PositionDeleteFilter; +import io.trino.plugin.deltalake.delete.RoaringBitmapArray; import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode; import io.trino.plugin.hive.FileFormatDataSourceStats; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index fc2299202043..0d2c05cb0762 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -166,7 +166,7 @@ private Stream getSplits( boolean splittable = // Delta Lake handles updates and deletes by copying entire data files, minus updates/deletes. Because of this we can only have one Split/UpdatablePageSource - // per file. TODO (https://github.com/trinodb/trino/issues/17063) use deletion vectors instead of copy-on-write and remove DeltaLakeTableHandle.writeType + // per file. TODO The connector already supports deletion vectors. Update this condition. tableHandle.getWriteType().isEmpty() && // When only partitioning columns projected, there is no point splitting the files mayAnyDataColumnProjected(tableHandle); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java index ad5b7fae89f1..b06a2d5fcdf9 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java @@ -42,6 +42,7 @@ public class DeltaLakeTableProperties public static final String CHECKPOINT_INTERVAL_PROPERTY = "checkpoint_interval"; public static final String CHANGE_DATA_FEED_ENABLED_PROPERTY = "change_data_feed_enabled"; public static final String COLUMN_MAPPING_MODE_PROPERTY = "column_mapping_mode"; + // TODO Add support for creating tables with deletion vectors private final List> tableProperties; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java index d1fc6e964a6e..8f686205e239 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java @@ -182,22 +182,26 @@ public long getRowCount() public DataFileInfo getDataFileInfo() throws IOException { - Map dataColumnTypes = columnHandles.stream() - // Lowercase because the subsequent logic expects lowercase - .collect(toImmutableMap(column -> column.basePhysicalColumnName().toLowerCase(ENGLISH), DeltaLakeColumnHandle::basePhysicalType)); + Location path = rootTableLocation.appendPath(relativeFilePath); + FileMetaData fileMetaData = fileWriter.getFileMetadata(); + ParquetMetadata parquetMetadata = MetadataReader.createParquetMetadata(fileMetaData, new ParquetDataSourceId(path.toString())); + return new DataFileInfo( relativeFilePath, getWrittenBytes(), creationTime, dataFileType, partitionValues, - readStatistics(fileWriter.getFileMetadata(), rootTableLocation.appendPath(relativeFilePath), dataColumnTypes, rowCount)); + readStatistics(parquetMetadata, columnHandles, rowCount), + Optional.empty()); } - private static DeltaLakeJsonFileStatistics readStatistics(FileMetaData fileMetaData, Location path, Map typeForColumn, long rowCount) + public static DeltaLakeJsonFileStatistics readStatistics(ParquetMetadata parquetMetadata, List columnHandles, long rowCount) throws IOException { - ParquetMetadata parquetMetadata = MetadataReader.createParquetMetadata(fileMetaData, new ParquetDataSourceId(path.toString())); + Map typeForColumn = columnHandles.stream() + // Lowercase because the subsequent logic expects lowercase + .collect(toImmutableMap(column -> column.basePhysicalColumnName().toLowerCase(ENGLISH), DeltaLakeColumnHandle::basePhysicalType)); ImmutableMultimap.Builder metadataForColumn = ImmutableMultimap.builder(); for (BlockMetadata blockMetaData : parquetMetadata.getBlocks()) { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java index 83ec6783ba8c..7c22163f5c19 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java @@ -15,16 +15,18 @@ import com.google.common.base.CharMatcher; import io.delta.kernel.internal.deletionvectors.Base85Codec; -import io.delta.kernel.internal.deletionvectors.RoaringBitmapArray; -import io.delta.kernel.internal.deletionvectors.RoaringBitmapArrays; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoInputFile; import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; import io.trino.spi.TrinoException; +import org.roaringbitmap.RoaringBitmap; import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.OptionalInt; import java.util.UUID; import java.util.zip.CRC32; import java.util.zip.Checksum; @@ -32,12 +34,22 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static io.delta.kernel.internal.deletionvectors.Base85Codec.decodeUUID; +import static io.delta.kernel.internal.deletionvectors.Base85Codec.encodeUUID; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.lang.Math.toIntExact; +import static java.nio.ByteOrder.LITTLE_ENDIAN; +import static java.util.UUID.randomUUID; // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vector-format public final class DeletionVectors { + private static final int PORTABLE_ROARING_BITMAP_MAGIC_NUMBER = 1681511377; + private static final int MAGIC_NUMBER_BYTE_SIZE = 4; + private static final int BIT_MAP_COUNT_BYTE_SIZE = 8; + private static final int BIT_MAP_KEY_BYTE_SIZE = 4; + private static final int FORMAT_VERSION_V1 = 1; + private static final String UUID_MARKER = "u"; // relative path with random prefix on disk private static final String PATH_MARKER = "p"; // absolute path on disk private static final String INLINE_MARKER = "i"; // inline @@ -52,7 +64,7 @@ public static RoaringBitmapArray readDeletionVectors(TrinoFileSystem fileSystem, if (deletionVector.storageType().equals(UUID_MARKER)) { TrinoInputFile inputFile = fileSystem.newInputFile(location.appendPath(toFileName(deletionVector.pathOrInlineDv()))); byte[] buffer = readDeletionVector(inputFile, deletionVector.offset().orElseThrow(), deletionVector.sizeInBytes()); - return RoaringBitmapArrays.readFrom(buffer); + return deserializeDeletionVectors(buffer); } if (deletionVector.storageType().equals(INLINE_MARKER) || deletionVector.storageType().equals(PATH_MARKER)) { throw new TrinoException(NOT_SUPPORTED, "Unsupported storage type for deletion vector: " + deletionVector.storageType()); @@ -60,6 +72,48 @@ public static RoaringBitmapArray readDeletionVectors(TrinoFileSystem fileSystem, throw new IllegalArgumentException("Unexpected storage type: " + deletionVector.storageType()); } + public static DeletionVectorEntry writeDeletionVectors( + TrinoFileSystem fileSystem, + Location location, + RoaringBitmapArray deletedRows) + throws IOException + { + UUID uuid = randomUUID(); + String deletionVectorFilename = "deletion_vector_" + uuid + ".bin"; + String pathOrInlineDv = encodeUUID(uuid); + int sizeInBytes = MAGIC_NUMBER_BYTE_SIZE + BIT_MAP_COUNT_BYTE_SIZE + BIT_MAP_KEY_BYTE_SIZE + deletedRows.serializedSizeInBytes(); + long cardinality = deletedRows.cardinality(); + + checkArgument(sizeInBytes > 0, "sizeInBytes must be positive: %s", sizeInBytes); + checkArgument(cardinality > 0, "cardinality must be positive: %s", cardinality); + + OptionalInt offset; + byte[] data = serializeAsByteArray(deletedRows, sizeInBytes); + try (DataOutputStream output = new DataOutputStream(fileSystem.newOutputFile(location.appendPath(deletionVectorFilename)).create())) { + output.writeByte(FORMAT_VERSION_V1); + offset = OptionalInt.of(output.size()); + output.writeInt(sizeInBytes); + output.write(data); + output.writeInt(calculateChecksum(data)); + } + + return new DeletionVectorEntry(UUID_MARKER, pathOrInlineDv, offset, sizeInBytes, cardinality); + } + + private static byte[] serializeAsByteArray(RoaringBitmapArray bitmaps, int sizeInBytes) + { + ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes).order(LITTLE_ENDIAN); + buffer.putInt(PORTABLE_ROARING_BITMAP_MAGIC_NUMBER); + buffer.putLong(bitmaps.length()); + for (int i = 0; i < bitmaps.length(); i++) { + buffer.putInt(i); // Bitmap index + RoaringBitmap bitmap = bitmaps.get(i); + bitmap.runOptimize(); + bitmap.serialize(buffer); + } + return buffer.array(); + } + public static String toFileName(String pathOrInlineDv) { int randomPrefixLength = pathOrInlineDv.length() - Base85Codec.ENCODED_UUID_LENGTH; @@ -98,4 +152,30 @@ private static int calculateChecksum(byte[] data) crc.update(data); return (int) crc.getValue(); } + + private static RoaringBitmapArray deserializeDeletionVectors(byte[] bytes) + throws IOException + { + ByteBuffer buffer = ByteBuffer.wrap(bytes).order(LITTLE_ENDIAN); + checkArgument(buffer.order() == LITTLE_ENDIAN, "Byte order must be little endian: %s", buffer.order()); + int magicNumber = buffer.getInt(); + if (magicNumber == PORTABLE_ROARING_BITMAP_MAGIC_NUMBER) { + int size = toIntExact(buffer.getLong()); + RoaringBitmapArray bitmaps = new RoaringBitmapArray(); + for (int i = 0; i < size; i++) { + int key = buffer.getInt(); + checkArgument(key >= 0, "key must not be negative: %s", key); + + RoaringBitmap bitmap = new RoaringBitmap(); + bitmap.deserialize(buffer); + bitmap.stream().forEach(bitmaps::add); + + // there seems to be no better way to ask how many bytes bitmap.deserialize has read + int consumedBytes = bitmap.serializedSizeInBytes(); + buffer.position(buffer.position() + consumedBytes); + } + return bitmaps; + } + throw new IllegalArgumentException("Unsupported magic number: " + magicNumber); + } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java index 62a7dcdc3945..d34c0188e606 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.deltalake.delete; -import io.delta.kernel.internal.deletionvectors.RoaringBitmapArray; import io.trino.plugin.deltalake.DeltaLakeColumnHandle; import io.trino.spi.block.Block; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/RoaringBitmapArray.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/RoaringBitmapArray.java new file mode 100644 index 000000000000..2029f1654bc9 --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/RoaringBitmapArray.java @@ -0,0 +1,166 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.delete; + +import com.google.common.primitives.UnsignedInts; +import org.roaringbitmap.RoaringBitmap; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.lang.Math.toIntExact; + +/** + * A simplified version of RoaringBitmapArray + */ +public final class RoaringBitmapArray +{ + // Must bitmask to avoid sign extension + private static final long MAX_REPRESENTABLE_VALUE = (((long) Integer.MAX_VALUE - 1) << 32) | (((long) Integer.MIN_VALUE) & 0xFFFFFFFFL); + private static final int INDIVIDUAL_BITMAP_KEY_SIZE = 4; + + private RoaringBitmap[] bitmaps = new RoaringBitmap[0]; + + public RoaringBitmap get(int i) + { + return bitmaps[i]; + } + + public boolean contains(long value) + { + checkArgument(value >= 0 && value <= MAX_REPRESENTABLE_VALUE, "Unsupported value: %s", value); + + int high = highBytes(value); + if (high >= bitmaps.length) { + return false; + } + int low = lowBytes(value); + return bitmaps[high].contains(low); + } + + public boolean isEmpty() + { + for (RoaringBitmap bitmap : bitmaps) { + if (!bitmap.isEmpty()) { + return false; + } + } + return true; + } + + public long length() + { + return bitmaps.length; + } + + public long cardinality() + { + long sum = 0; + for (RoaringBitmap bitmap : bitmaps) { + sum += bitmap.getLongCardinality(); + } + return sum; + } + + public int serializedSizeInBytes() + { + long size = 0; + for (RoaringBitmap bitmap : bitmaps) { + size += bitmap.serializedSizeInBytes() + INDIVIDUAL_BITMAP_KEY_SIZE; + } + return toIntExact(size); + } + + public void add(long value) + { + checkArgument(value >= 0 && value <= MAX_REPRESENTABLE_VALUE, "Unsupported value: %s", value); + + int high = highBytes(value); + int low = lowBytes(value); + + if (high >= bitmaps.length) { + extendBitmaps(high + 1); + } + RoaringBitmap highBitmap = bitmaps[high]; + highBitmap.add(low); + } + + public void addRange(long rangeStart, long rangeEnd) + { + checkArgument(rangeStart >= 0 && rangeStart <= rangeEnd, "Unsupported value: %s", rangeStart); + if (bitmaps.length == 0) { + extendBitmaps(1); + } + int startHigh = highBytes(rangeStart); + int startLow = lowBytes(rangeStart); + + int endHigh = highBytes(rangeEnd); + int endLow = lowBytes(rangeEnd); + + int lastHigh = endHigh; + + if (lastHigh >= bitmaps.length) { + extendBitmaps(lastHigh + 1); + } + + int currentHigh = startHigh; + while (currentHigh <= lastHigh) { + long start = currentHigh == startHigh ? UnsignedInts.toLong(startLow) : 0L; + // RoaringBitmap.add is exclusive the end boundary. + long end = currentHigh == endHigh ? UnsignedInts.toLong(endLow) + 1L : 0xFFFFFFFFL + 1L; + bitmaps[currentHigh].add(start, end); + currentHigh += 1; + } + } + + public void or(RoaringBitmapArray other) + { + if (bitmaps.length < other.bitmaps.length) { + extendBitmaps(other.bitmaps.length); + } + for (int i = 0; i < other.bitmaps.length; i++) { + bitmaps[i].or(other.bitmaps[i]); + } + } + + public void andNot(RoaringBitmapArray other) + { + int length = Math.min(bitmaps.length, other.bitmaps.length); + for (int i = 0; i < length; i++) { + bitmaps[i].andNot(other.bitmaps[i]); + } + } + + private void extendBitmaps(int newLength) + { + if (bitmaps.length == 0 && newLength == 1) { + bitmaps = new RoaringBitmap[] {new RoaringBitmap()}; + return; + } + RoaringBitmap[] newBitmaps = new RoaringBitmap[newLength]; + System.arraycopy(bitmaps, 0, newBitmaps, 0, bitmaps.length); + for (int i = bitmaps.length; i < newLength; i++) { + newBitmaps[i] = new RoaringBitmap(); + } + bitmaps = newBitmaps; + } + + private static int highBytes(long value) + { + return (int) (value >> 32); + } + + private static int lowBytes(long value) + { + return (int) value; + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java index a2ca4a25aa8e..2944d4370962 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java @@ -65,6 +65,7 @@ import static io.trino.plugin.deltalake.DeltaLakeMetadata.MAX_WRITER_VERSION; import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkValidTableHandle; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getVacuumMinRetention; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.DELETION_VECTORS_FEATURE_NAME; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.unsupportedWriterFeatures; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; @@ -187,10 +188,15 @@ private void doVacuum( if (protocolEntry.minWriterVersion() > MAX_WRITER_VERSION) { throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %d writer version".formatted(protocolEntry.minWriterVersion())); } + Set writerFeatures = protocolEntry.writerFeatures().orElse(ImmutableSet.of()); Set unsupportedWriterFeatures = unsupportedWriterFeatures(protocolEntry.writerFeatures().orElse(ImmutableSet.of())); if (!unsupportedWriterFeatures.isEmpty()) { throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %s writer features".formatted(unsupportedWriterFeatures)); } + if (writerFeatures.contains(DELETION_VECTORS_FEATURE_NAME)) { + // TODO https://github.com/trinodb/trino/issues/22809 Add support for vacuuming tables with deletion vectors + throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %s writer features".formatted(DELETION_VECTORS_FEATURE_NAME)); + } String tableLocation = tableSnapshot.getTableLocation(); String transactionLogDir = getTransactionLogDir(tableLocation); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java index fb5745435482..7fbc16099ff9 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java @@ -106,7 +106,7 @@ private DeltaLakeSchemaSupport() {} private static final String CHANGE_DATA_FEED_FEATURE_NAME = "changeDataFeed"; private static final String CHECK_CONSTRAINTS_FEATURE_NAME = "checkConstraints"; private static final String COLUMN_MAPPING_FEATURE_NAME = "columnMapping"; - private static final String DELETION_VECTORS_FEATURE_NAME = "deletionVectors"; + public static final String DELETION_VECTORS_FEATURE_NAME = "deletionVectors"; private static final String ICEBERG_COMPATIBILITY_V1_FEATURE_NAME = "icebergCompatV1"; private static final String ICEBERG_COMPATIBILITY_V2_FEATURE_NAME = "icebergCompatV2"; private static final String IDENTITY_COLUMNS_FEATURE_NAME = "identityColumns"; @@ -132,6 +132,7 @@ private DeltaLakeSchemaSupport() {} .build(); private static final Set SUPPORTED_WRITER_FEATURES = ImmutableSet.builder() .add(APPEND_ONLY_FEATURE_NAME) + .add(DELETION_VECTORS_FEATURE_NAME) .add(INVARIANTS_FEATURE_NAME) .add(CHECK_CONSTRAINTS_FEATURE_NAME) .add(CHANGE_DATA_FEED_FEATURE_NAME) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java index 4ca401fd2954..113ecb13830e 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java @@ -176,9 +176,6 @@ public RowType getAddEntryType( addFields.add(RowType.field("size", BIGINT)); addFields.add(RowType.field("modificationTime", BIGINT)); addFields.add(RowType.field("dataChange", BOOLEAN)); - if (deletionVectorEnabled) { - addFields.add(RowType.field("deletionVector", DELETION_VECTORS_TYPE)); - } if (requireWriteStatsAsJson) { addFields.add(RowType.field("stats", VARCHAR)); } @@ -193,6 +190,9 @@ public RowType getAddEntryType( addFields.add(RowType.field("stats_parsed", RowType.from(statsColumns.build()))); } addFields.add(RowType.field("tags", stringMap)); + if (deletionVectorEnabled) { + addFields.add(RowType.field("deletionVector", DELETION_VECTORS_TYPE)); + } return RowType.from(addFields.build()); } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java index c4fc8d69c154..db89761d6ad8 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java @@ -23,6 +23,7 @@ import io.trino.plugin.deltalake.DeltaLakeColumnHandle; import io.trino.plugin.deltalake.DeltaLakeColumnMetadata; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; +import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry; @@ -61,6 +62,7 @@ import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.toNullCounts; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractPartitionColumns; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.isDeletionVectorEnabled; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeStatsAsJson; import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHECKPOINT_WRITE_STATS_AS_JSON_PROPERTY; import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHECKPOINT_WRITE_STATS_AS_STRUCT_PROPERTY; @@ -246,6 +248,7 @@ private void writeAddFileEntry( boolean writeStatsAsJson, boolean writeStatsAsStruct) { + boolean deletionVectorEnabled = isDeletionVectorEnabled(metadataEntry, protocolEntry); pageBuilder.declarePosition(); RowBlockBuilder blockBuilder = (RowBlockBuilder) pageBuilder.getBlockBuilder(ADD_BLOCK_CHANNEL); blockBuilder.buildEntry(fieldBuilders -> { @@ -279,8 +282,12 @@ private void writeAddFileEntry( writeParsedStats(fieldBuilders.get(fieldId), entryType, addFileEntry, fieldId); fieldId++; } - writeStringMap(fieldBuilders.get(fieldId), entryType, fieldId, "tags", addFileEntry.getTags()); + fieldId++; + + if (deletionVectorEnabled) { + writeDeletionVector(fieldBuilders.get(fieldId), entryType, addFileEntry.getDeletionVector(), fieldId); + } }); // null for others @@ -384,6 +391,23 @@ private void writeParsedStats(BlockBuilder entryBlockBuilder, RowType entryType, }); } + private void writeDeletionVector(BlockBuilder entryBlockBuilder, RowType entryType, Optional deletionVector, int fieldId) + { + if (deletionVector.isEmpty()) { + entryBlockBuilder.appendNull(); + return; + } + + RowType type = getInternalRowType(entryType, fieldId, "deletionVector"); + ((RowBlockBuilder) entryBlockBuilder).buildEntry(builders -> { + writeString(builders.get(0), type, 0, "storageType", deletionVector.get().storageType()); + writeString(builders.get(1), type, 1, "pathOrInlineDv", deletionVector.get().pathOrInlineDv()); + writeLong(builders.get(2), type, 2, "offset", (long) deletionVector.get().offset().orElse(0)); + writeLong(builders.get(4), type, 4, "sizeInBytes", (long) deletionVector.get().sizeInBytes()); + writeLong(builders.get(5), type, 5, "cardinality", deletionVector.get().cardinality()); + }); + } + private void writeMinMaxMapAsFields(BlockBuilder blockBuilder, RowType type, int fieldId, String fieldName, Optional> values, boolean isJson) { RowType.Field valuesField = validateAndGetField(type, fieldId, fieldName); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java index 64c555fc6854..918aa503fbd7 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java @@ -98,7 +98,6 @@ public class TestDeltaLakeBasic new ResourceTable("allow_column_defaults", "deltalake/allow_column_defaults"), new ResourceTable("stats_with_minmax_nulls", "deltalake/stats_with_minmax_nulls"), new ResourceTable("no_column_stats", "databricks73/no_column_stats"), - new ResourceTable("deletion_vectors", "databricks122/deletion_vectors"), new ResourceTable("liquid_clustering", "deltalake/liquid_clustering"), new ResourceTable("timestamp_ntz", "databricks131/timestamp_ntz"), new ResourceTable("timestamp_ntz_partition", "databricks131/timestamp_ntz_partition"), @@ -1003,8 +1002,126 @@ public void testAllowColumnDefaults() */ @Test public void testDeletionVectors() + throws Exception + { + String tableName = "deletion_vectors" + randomNameSuffix(); + + Path tableLocation = catalogDir.resolve(tableName); + copyDirectoryContents(new File(Resources.getResource("databricks122/deletion_vectors").toURI()).toPath(), tableLocation); + assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri())); + + assertQuery("SELECT * FROM " + tableName, "VALUES (1, 11)"); + + assertUpdate("INSERT INTO " + tableName + " VALUES (3, 31), (3, 32)", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, 11), (3, 31), (3, 32)"); + + assertUpdate("DELETE FROM " + tableName + " WHERE a = 3 AND b = 31", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, 11), (3, 32)"); + + assertUpdate("UPDATE " + tableName + " SET a = -3 WHERE b = 32", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, 11), (-3, 32)"); + + assertUpdate("UPDATE " + tableName + " SET a = -3 WHERE b = 32", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, 11), (-3, 32)"); + + assertUpdate("MERGE INTO " + tableName + " t " + + "USING (SELECT * FROM (VALUES 1)) AS s(a) " + + "ON (t.a = s.a) " + + "WHEN MATCHED THEN UPDATE SET b = -11", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, -11), (-3, 32)"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testDeletionVectorsAllRows() + throws Exception + { + String tableName = "deletion_vectors" + randomNameSuffix(); + + Path tableLocation = catalogDir.resolve(tableName); + copyDirectoryContents(new File(Resources.getResource("databricks122/deletion_vectors").toURI()).toPath(), tableLocation); + assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri())); + + assertUpdate("INSERT INTO " + tableName + " VALUES (3, 31), (3, 32)", 2); + assertUpdate("DELETE FROM " + tableName + " WHERE a != 999", 3); + assertQueryReturnsEmptyResult("SELECT * FROM " + tableName); + + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 10), (2, 20)", 2); + assertUpdate("UPDATE " + tableName + " SET a = a + 10", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (11, 10), (12, 20)"); + + assertUpdate("MERGE INTO " + tableName + " t " + + "USING (SELECT * FROM (VALUES 11, 12)) AS s(a) " + + "ON (t.a = s.a) " + + "WHEN MATCHED AND t.a = 11 THEN UPDATE SET b = 100 " + + "WHEN MATCHED AND t.a = 12 THEN DELETE", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (11, 100)"); + + assertUpdate("TRUNCATE TABLE " + tableName); + assertQueryReturnsEmptyResult("SELECT * FROM " + tableName); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testDeletionVectorsLargeDelete() + throws Exception { - assertQuery("SELECT * FROM deletion_vectors", "VALUES (1, 11)"); + String tableName = "deletion_vectors" + randomNameSuffix(); + + Path tableLocation = catalogDir.resolve(tableName); + copyDirectoryContents(new File(Resources.getResource("databricks122/deletion_vectors_empty").toURI()).toPath(), tableLocation); + assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri())); + + assertUpdate("INSERT INTO " + tableName + " SELECT orderkey, custkey FROM tpch.tiny.orders", 15000); + assertUpdate("DELETE FROM " + tableName + " WHERE a != 1", 14999); + + assertThat(query("SELECT * FROM " + tableName)) + .matches("SELECT CAST(orderkey AS integer), CAST(custkey AS integer) FROM tpch.tiny.orders WHERE orderkey = 1"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testDeletionVectorsCheckPoint() + throws Exception + { + String tableName = "deletion_vectors" + randomNameSuffix(); + + Path tableLocation = catalogDir.resolve(tableName); + copyDirectoryContents(new File(Resources.getResource("databricks122/deletion_vectors_empty").toURI()).toPath(), tableLocation); + assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri())); + + for (int i = 0; i < 9; i++) { + assertUpdate("INSERT INTO " + tableName + " VALUES (" + i + ", " + i + ")", 1); + } + + assertThat(tableLocation.resolve("_delta_log/00000000000000000010.checkpoint.parquet")).doesNotExist(); + assertUpdate("DELETE FROM " + tableName + " WHERE a != 1", 8); + assertThat(tableLocation.resolve("_delta_log/00000000000000000010.checkpoint.parquet")).exists(); + + assertQuery("SELECT * FROM " + tableName, "VALUES (1, 1)"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testUnsupportedVacuumDeletionVectors() + throws Exception + { + String tableName = "deletion_vectors" + randomNameSuffix(); + + Path tableLocation = catalogDir.resolve(tableName); + copyDirectoryContents(new File(Resources.getResource("databricks122/deletion_vectors_empty").toURI()).toPath(), tableLocation); + assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri())); + + // TODO https://github.com/trinodb/trino/issues/22809 Add support for vacuuming tables with deletion vectors + assertQueryFails( + "CALL delta.system.vacuum('tpch', '" + tableName + "', '7d')", + "Cannot execute vacuum procedure with deletionVectors writer features"); + + assertUpdate("DROP TABLE " + tableName); } /** diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java index ccaf71b59618..4ebd501f3365 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java @@ -23,8 +23,10 @@ import io.trino.operator.FlatHashStrategyCompiler; import io.trino.operator.GroupByHashPageIndexerFactory; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; +import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.HiveTransactionHandle; import io.trino.plugin.hive.NodeVersion; +import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.spi.Page; import io.trino.spi.PageBuilder; import io.trino.spi.block.BlockBuilder; @@ -185,7 +187,9 @@ private static ConnectorPageSink createPageSink(String outputPath, DeltaLakeWrit JsonCodec.jsonCodec(DataFileInfo.class), JsonCodec.jsonCodec(DeltaLakeMergeResult.class), stats, + new FileFormatDataSourceStats(), deltaLakeConfig, + new ParquetReaderConfig(), new TestingTypeManager(), new NodeVersion("test-version")); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java index 0008eb41360b..c118ac743a08 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java @@ -14,7 +14,6 @@ package io.trino.plugin.deltalake.delete; import com.google.common.io.Resources; -import io.delta.kernel.internal.deletionvectors.RoaringBitmapArray; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestRoaringBitmapArray.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestRoaringBitmapArray.java new file mode 100644 index 000000000000..990d3a275cb6 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestRoaringBitmapArray.java @@ -0,0 +1,140 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.delete; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +final class TestRoaringBitmapArray +{ + @Test + void testIsEmpty() + { + RoaringBitmapArray bitmaps = new RoaringBitmapArray(); + assertThat(bitmaps.isEmpty()).isTrue(); + + bitmaps.add(0); + assertThat(bitmaps.isEmpty()).isFalse(); + } + + @Test + void testLength() + { + RoaringBitmapArray bitmaps = new RoaringBitmapArray(); + assertThat(bitmaps.length()).isZero(); + + bitmaps.add(0); + assertThat(bitmaps.length()).isEqualTo(1); + + bitmaps.add(1); + assertThat(bitmaps.length()).isEqualTo(1); + } + + @Test + void testCardinality() + { + RoaringBitmapArray bitmaps = new RoaringBitmapArray(); + assertThat(bitmaps.cardinality()).isZero(); + + bitmaps.add(0); + assertThat(bitmaps.cardinality()).isEqualTo(1); + + bitmaps.add(1); + assertThat(bitmaps.cardinality()).isEqualTo(2); + } + + @Test + void testSerializedSizeInBytes() + { + RoaringBitmapArray bitmaps = new RoaringBitmapArray(); + assertThat(bitmaps.serializedSizeInBytes()).isZero(); + + bitmaps.add(0); + assertThat(bitmaps.serializedSizeInBytes()).isEqualTo(22); + + bitmaps.add(1); + assertThat(bitmaps.serializedSizeInBytes()).isEqualTo(24); + } + + @Test + void testAdd() + { + RoaringBitmapArray bitmaps = new RoaringBitmapArray(); + bitmaps.add(0); + + assertThat(bitmaps.contains(0)).isTrue(); + assertThat(bitmaps.contains(1)).isFalse(); + + bitmaps.add((long) Integer.MAX_VALUE + 1); + assertThat(bitmaps.contains(Integer.MAX_VALUE)).isFalse(); + assertThat(bitmaps.contains((long) Integer.MAX_VALUE + 1)).isTrue(); + assertThat(bitmaps.contains((long) Integer.MAX_VALUE + 2)).isFalse(); + } + + @Test + void testAddRange() + { + RoaringBitmapArray bitmaps = new RoaringBitmapArray(); + bitmaps.addRange(3, 5); + + assertThat(bitmaps.contains(2)).isFalse(); + assertThat(bitmaps.contains(3)).isTrue(); + assertThat(bitmaps.contains(4)).isTrue(); + assertThat(bitmaps.contains(5)).isTrue(); + assertThat(bitmaps.contains(6)).isFalse(); + + bitmaps.addRange(7, 8); + + assertThat(bitmaps.contains(2)).isFalse(); + assertThat(bitmaps.contains(3)).isTrue(); + assertThat(bitmaps.contains(4)).isTrue(); + assertThat(bitmaps.contains(5)).isTrue(); + assertThat(bitmaps.contains(6)).isFalse(); + assertThat(bitmaps.contains(7)).isTrue(); + assertThat(bitmaps.contains(8)).isTrue(); + assertThat(bitmaps.contains(9)).isFalse(); + } + + @Test + void testOr() + { + RoaringBitmapArray bitmapsOr = new RoaringBitmapArray(); + bitmapsOr.add(2); + RoaringBitmapArray bitmaps = new RoaringBitmapArray(); + bitmaps.add(3); + bitmapsOr.or(bitmaps); + + assertThat(bitmapsOr.contains(1)).isFalse(); + assertThat(bitmapsOr.contains(2)).isTrue(); + assertThat(bitmapsOr.contains(3)).isTrue(); + assertThat(bitmapsOr.contains(4)).isFalse(); + } + + @Test + void testAndNot() + { + RoaringBitmapArray bitmapsAndNot = new RoaringBitmapArray(); + bitmapsAndNot.addRange(1, 5); + RoaringBitmapArray bitmaps = new RoaringBitmapArray(); + bitmaps.addRange(2, 4); + bitmapsAndNot.andNot(bitmaps); + + assertThat(bitmapsAndNot.contains(1)).isTrue(); + assertThat(bitmapsAndNot.contains(2)).isFalse(); + assertThat(bitmapsAndNot.contains(3)).isFalse(); + assertThat(bitmapsAndNot.contains(4)).isFalse(); + assertThat(bitmapsAndNot.contains(5)).isTrue(); + } +} diff --git a/plugin/trino-delta-lake/src/test/resources/databricks122/deletion_vectors_empty/README.md b/plugin/trino-delta-lake/src/test/resources/databricks122/deletion_vectors_empty/README.md new file mode 100644 index 000000000000..3a070fea7ba7 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks122/deletion_vectors_empty/README.md @@ -0,0 +1,10 @@ +Data generated using Databricks 12.2: + +```sql +CREATE TABLE default.test_deletion_vectors ( + a INT, + b INT) +USING delta +LOCATION 's3://trino-ci-test/test_deletion_vectors' +TBLPROPERTIES ('delta.enableDeletionVectors' = true); +``` diff --git a/plugin/trino-delta-lake/src/test/resources/databricks122/deletion_vectors_empty/_delta_log/00000000000000000000.json b/plugin/trino-delta-lake/src/test/resources/databricks122/deletion_vectors_empty/_delta_log/00000000000000000000.json new file mode 100644 index 000000000000..4a5d53407173 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks122/deletion_vectors_empty/_delta_log/00000000000000000000.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1682326581374,"operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{\"delta.enableDeletionVectors\":\"true\"}"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Databricks-Runtime/12.2.x-scala2.12","txnId":"2cbfa481-d2b0-4f59-83f9-1261492dfd46"}} +{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}} +{"metaData":{"id":"32f26f4b-95ba-4980-b209-0132e949b3e4","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"a\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"b\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true"},"createdTime":1682326580906}} diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java index 156f68e6de60..38acf3a9c63e 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java @@ -233,16 +233,16 @@ public void testDeletionVectors(String mode) assertThat(onTrino().executeQuery("DESCRIBE delta.default." + tableName)) .contains(row("a", "integer", "", ""), row("b", "integer", "", "")); - // TODO https://github.com/trinodb/trino/issues/17063 Use Delta Deletion Vectors for row-level deletes - assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (3, 33)")) - .hasMessageContaining("Unsupported writer features: [deletionVectors]"); - assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + tableName)) - .hasMessageContaining("Unsupported writer features: [deletionVectors]"); - assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a = 3")) - .hasMessageContaining("Unsupported writer features: [deletionVectors]"); - assertQueryFailure(() -> onTrino().executeQuery("MERGE INTO delta.default." + tableName + " t USING delta.default." + tableName + " s " + - "ON (t.a = s.a) WHEN MATCHED THEN UPDATE SET b = -1")) - .hasMessageContaining("Unsupported writer features: [deletionVectors]"); + onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (3, 33)"); + onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a = 1"); + onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a = 30 WHERE b = 33"); + onTrino().executeQuery("MERGE INTO delta.default." + tableName + " t USING delta.default." + tableName + " s " + + "ON (t.a = s.a) WHEN MATCHED THEN UPDATE SET b = -1"); + + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(2, -1), row(30, -1)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(2, -1), row(30, -1)); } finally { dropDeltaTableWithRetry("default." + tableName);