diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeResult.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeResult.java index f495c358c473..63068dfcdd54 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeResult.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeResult.java @@ -13,6 +13,8 @@ */ package io.trino.plugin.deltalake; +import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; + import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -24,6 +26,7 @@ public record DeltaLakeMergeResult( List partitionValues, Optional oldFile, + Optional oldDeletionVector, Optional newFile) { public DeltaLakeMergeResult @@ -32,7 +35,9 @@ public record DeltaLakeMergeResult( // noinspection Java9CollectionFactory partitionValues = unmodifiableList(new ArrayList<>(requireNonNull(partitionValues, "partitionValues is null"))); requireNonNull(oldFile, "oldFile is null"); + requireNonNull(oldDeletionVector, "oldDeletionVector is null"); requireNonNull(newFile, "newFile is null"); checkArgument(oldFile.isPresent() || newFile.isPresent(), "old or new must be present"); + checkArgument(oldDeletionVector.isEmpty() || oldFile.isPresent(), "oldDeletionVector is present only when oldFile is present"); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java index 2c7dbb82dd4a..5fe764a72756 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java @@ -327,7 +327,7 @@ public CompletableFuture> finish() insertPageSink.finish().join().stream() .map(Slice::getBytes) .map(dataFileInfoCodec::fromJson) - .map(info -> new DeltaLakeMergeResult(info.partitionValues(), Optional.empty(), Optional.of(info))) + .map(info -> new DeltaLakeMergeResult(info.partitionValues(), Optional.empty(), Optional.empty(), Optional.of(info))) .map(mergeResultJsonCodec::toJsonBytes) .map(Slices::wrappedBuffer) .forEach(fragments::add); @@ -345,7 +345,7 @@ public CompletableFuture> finish() MoreFutures.getDone(cdfPageSink.finish()).stream() .map(Slice::getBytes) .map(dataFileInfoCodec::fromJson) - .map(info -> new DeltaLakeMergeResult(info.partitionValues(), Optional.empty(), Optional.of(info))) + .map(info -> new DeltaLakeMergeResult(info.partitionValues(), Optional.empty(), Optional.empty(), Optional.of(info))) .map(mergeResultJsonCodec::toJsonBytes) .map(Slices::wrappedBuffer) .forEach(fragments::add); @@ -365,7 +365,7 @@ private Slice writeMergeResult(Slice path, FileDeletion deletion) ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); long rowCount = parquetMetadata.getBlocks().stream().map(BlockMetadata::rowCount).mapToLong(Long::longValue).sum(); RoaringBitmapArray rowsRetained = new RoaringBitmapArray(); - rowsRetained.addRange(0, rowCount); + rowsRetained.addRange(0, rowCount - 1); rowsRetained.andNot(deletedRows); if (rowsRetained.isEmpty()) { // No rows are retained in the file, so we don't need to write deletion vectors. @@ -407,7 +407,7 @@ private Slice writeDeletionVector( deletion.partitionValues, readStatistics(parquetMetadata, dataColumns, rowCount), Optional.of(deletionVectorEntry)); - DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues, Optional.of(sourceRelativePath), Optional.of(newFileInfo)); + DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues, Optional.of(sourceRelativePath), Optional.empty(), Optional.of(newFileInfo)); return utf8Slice(mergeResultJsonCodec.toJson(result)); } catch (Throwable e) { @@ -426,7 +426,8 @@ private Slice writeDeletionVector( private Slice onlySourceFile(String sourcePath, FileDeletion deletion) { String sourceRelativePath = relativePath(rootTableLocation.toString(), sourcePath); - DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.empty()); + DeletionVectorEntry deletionVector = deletionVectors.get(sourceRelativePath); + DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.ofNullable(deletionVector), Optional.empty()); return utf8Slice(mergeResultJsonCodec.toJson(result)); } @@ -453,7 +454,7 @@ private List rewriteFile(String sourcePath, FileDeletion deletion) Optional newFileInfo = rewriteParquetFile(sourceLocation, deletion, writer); - DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), newFileInfo); + DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.empty(), newFileInfo); return ImmutableList.of(utf8Slice(mergeResultJsonCodec.toJson(result))); } catch (IOException e) { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 868f893a5df5..ae5eb47d3bbe 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -1213,7 +1213,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe while (addFileEntryIterator.hasNext()) { long writeTimestamp = Instant.now().toEpochMilli(); AddFileEntry addFileEntry = addFileEntryIterator.next(); - transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true)); + transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true, Optional.empty())); } } protocolEntry = protocolEntryForTable(tableHandle.getProtocolEntry(), containsTimestampType, tableMetadata.getProperties()); @@ -1610,7 +1610,7 @@ public Optional finishCreateTable( Iterator addFileEntryIterator = activeFiles.iterator(); while (addFileEntryIterator.hasNext()) { AddFileEntry addFileEntry = addFileEntryIterator.next(); - transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true)); + transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true, Optional.empty())); } } } @@ -2564,7 +2564,7 @@ private long commitMergeOperation( if (mergeResult.oldFile().isEmpty()) { continue; } - transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(toUriFormat(mergeResult.oldFile().get()), createPartitionValuesMap(partitionColumns, mergeResult.partitionValues()), writeTimestamp, true)); + transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(toUriFormat(mergeResult.oldFile().get()), createPartitionValuesMap(partitionColumns, mergeResult.partitionValues()), writeTimestamp, true, mergeResult.oldDeletionVector())); } appendAddFileEntries(transactionLogWriter, newFiles, partitionColumns, getExactColumnNames(handle.getMetadataEntry()), true); @@ -2767,7 +2767,8 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl toUriFormat(relativePath), createPartitionValuesMap(canonicalPartitionValues), writeTimestamp, - false)); + false, + Optional.empty())); } // Note: during writes we want to preserve original case of partition columns @@ -4160,7 +4161,7 @@ private CommitDeleteOperationResult commitDeleteOperation( Iterator addFileEntryIterator = activeFiles.iterator(); while (addFileEntryIterator.hasNext()) { AddFileEntry addFileEntry = addFileEntryIterator.next(); - transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true)); + transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true, Optional.empty())); Optional fileRecords = addFileEntry.getStats().flatMap(DeltaLakeFileStatistics::getNumRecords); allDeletedFilesStatsPresent &= fileRecords.isPresent(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/RoaringBitmapArray.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/RoaringBitmapArray.java index 2029f1654bc9..295999153cbb 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/RoaringBitmapArray.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/RoaringBitmapArray.java @@ -94,6 +94,10 @@ public void add(long value) highBitmap.add(low); } + /** + * @param rangeStart inclusive beginning of range + * @param rangeEnd exclusive ending of range + */ public void addRange(long rangeStart, long rangeEnd) { checkArgument(rangeStart >= 0 && rangeStart <= rangeEnd, "Unsupported value: %s", rangeStart); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/RemoveFileEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/RemoveFileEntry.java index 0a690a1def82..057b8edd2641 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/RemoveFileEntry.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/RemoveFileEntry.java @@ -16,6 +16,7 @@ import jakarta.annotation.Nullable; import java.util.Map; +import java.util.Optional; import static java.util.Objects.requireNonNull; @@ -23,10 +24,12 @@ public record RemoveFileEntry( String path, @Nullable Map partitionValues, long deletionTimestamp, - boolean dataChange) + boolean dataChange, + Optional deletionVector) { public RemoveFileEntry { requireNonNull(path, "path is null"); + requireNonNull(deletionVector, "deletionVector is null"); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java index 8c067646630d..04673aeab8ea 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java @@ -153,6 +153,7 @@ public String getColumnName() private final Optional addDeletionVectorType; private final Optional addParsedStatsFieldType; private final Optional removeType; + private final Optional removeDeletionVectorType; private final Optional metadataType; private final Optional protocolType; private final Optional commitType; @@ -246,6 +247,7 @@ public CheckpointEntryIterator( addDeletionVectorType = addType.flatMap(type -> getOptionalFieldType(type, "deletionVector")); addParsedStatsFieldType = addType.flatMap(type -> getOptionalFieldType(type, "stats_parsed")); removeType = getParquetType(fields, REMOVE); + removeDeletionVectorType = removeType.flatMap(type -> getOptionalFieldType(type, "deletionVector")); metadataType = getParquetType(fields, METADATA); protocolType = getParquetType(fields, PROTOCOL); commitType = getParquetType(fields, COMMIT); @@ -537,11 +539,17 @@ private DeltaLakeTransactionLogEntry buildRemoveEntry(ConnectorSession session, format("Expected block %s to have %d children, but found %s", block, removeFields, removeEntryRow.getFieldCount())); } CheckpointFieldReader remove = new CheckpointFieldReader(session, removeEntryRow, type); + Optional deletionVector = Optional.empty(); + if (deletionVectorsEnabled) { + deletionVector = Optional.ofNullable(remove.getRow("deletionVector")) + .map(row -> parseDeletionVectorFromParquet(session, row, removeDeletionVectorType.orElseThrow())); + } RemoveFileEntry result = new RemoveFileEntry( remove.getString("path"), remove.getMap(stringMap, "partitionValues"), remove.getLong("deletionTimestamp"), - remove.getBoolean("dataChange")); + remove.getBoolean("dataChange"), + deletionVector); log.debug("Result: %s", result); return DeltaLakeTransactionLogEntry.removeFileEntry(result); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java index ba2e4bb280d5..70cdce9c5e4f 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java @@ -31,6 +31,7 @@ import io.trino.parquet.metadata.ParquetMetadata; import io.trino.parquet.reader.MetadataReader; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; +import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; @@ -1163,8 +1164,15 @@ public void testDeletionVectorsAllRows() copyDirectoryContents(new File(Resources.getResource("databricks122/deletion_vectors").toURI()).toPath(), tableLocation); assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri())); + assertUpdate("DELETE FROM " + tableName + " WHERE a != 999", 1); + + // 'remove' entry should have the same deletion vector as the previous operation when deleting all rows + DeletionVectorEntry deletionVector = getEntriesFromJson(2, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow().get(2).getAdd().getDeletionVector().orElseThrow(); + assertThat(getEntriesFromJson(3, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow().get(1).getRemove().deletionVector().orElseThrow()) + .isEqualTo(deletionVector); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, 31), (3, 32)", 2); - assertUpdate("DELETE FROM " + tableName + " WHERE a != 999", 3); + assertUpdate("DELETE FROM " + tableName + " WHERE a != 999", 2); assertQueryReturnsEmptyResult("SELECT * FROM " + tableName); assertUpdate("INSERT INTO " + tableName + " VALUES (1, 10), (2, 20)", 2); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java index 8ccd0d904712..1163828ecb92 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java @@ -105,12 +105,12 @@ public class TestTransactionLogAccess "age=29/part-00000-3794c463-cb0c-4beb-8d07-7cc1e3b5920f.c000.snappy.parquet"); private static final Set EXPECTED_REMOVE_ENTRIES = ImmutableSet.of( - new RemoveFileEntry("age=30/part-00000-7e43a3c3-ea26-4ae7-8eac-8f60cbb4df03.c000.snappy.parquet", null, 1579190163932L, false), - new RemoveFileEntry("age=30/part-00000-72a56c23-01ba-483a-9062-dd0accc86599.c000.snappy.parquet", null, 1579190163932L, false), - new RemoveFileEntry("age=42/part-00000-951068bd-bcf4-4094-bb94-536f3c41d31f.c000.snappy.parquet", null, 1579190155406L, false), - new RemoveFileEntry("age=25/part-00000-609e34b1-5466-4dbc-a780-2708166e7adb.c000.snappy.parquet", null, 1579190163932L, false), - new RemoveFileEntry("age=42/part-00000-6aed618a-2beb-4edd-8466-653e67a9b380.c000.snappy.parquet", null, 1579190155406L, false), - new RemoveFileEntry("age=42/part-00000-b82d8859-84a0-4f05-872c-206b07dd54f0.c000.snappy.parquet", null, 1579190163932L, false)); + new RemoveFileEntry("age=30/part-00000-7e43a3c3-ea26-4ae7-8eac-8f60cbb4df03.c000.snappy.parquet", null, 1579190163932L, false, Optional.empty()), + new RemoveFileEntry("age=30/part-00000-72a56c23-01ba-483a-9062-dd0accc86599.c000.snappy.parquet", null, 1579190163932L, false, Optional.empty()), + new RemoveFileEntry("age=42/part-00000-951068bd-bcf4-4094-bb94-536f3c41d31f.c000.snappy.parquet", null, 1579190155406L, false, Optional.empty()), + new RemoveFileEntry("age=25/part-00000-609e34b1-5466-4dbc-a780-2708166e7adb.c000.snappy.parquet", null, 1579190163932L, false, Optional.empty()), + new RemoveFileEntry("age=42/part-00000-6aed618a-2beb-4edd-8466-653e67a9b380.c000.snappy.parquet", null, 1579190155406L, false, Optional.empty()), + new RemoveFileEntry("age=42/part-00000-b82d8859-84a0-4f05-872c-206b07dd54f0.c000.snappy.parquet", null, 1579190163932L, false, Optional.empty())); private final TestingTelemetry testingTelemetry = TestingTelemetry.create("transaction-log-access"); private final TracingFileSystemFactory tracingFileSystemFactory = new TracingFileSystemFactory(testingTelemetry.getTracer(), new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS)); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java index 18fb06886e17..d3869da15ebc 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java @@ -59,11 +59,11 @@ public void testCheckpointBuilder() builder.addLogEntry(transactionEntry(app2TransactionV5)); AddFileEntry addA1 = new AddFileEntry("a", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty()); - RemoveFileEntry removeA1 = new RemoveFileEntry("a", Map.of(), 1, true); + RemoveFileEntry removeA1 = new RemoveFileEntry("a", Map.of(), 1, true, Optional.empty()); AddFileEntry addA2 = new AddFileEntry("a", Map.of(), 2, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty()); AddFileEntry addB = new AddFileEntry("b", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty()); - RemoveFileEntry removeB = new RemoveFileEntry("b", Map.of(), 1, true); - RemoveFileEntry removeC = new RemoveFileEntry("c", Map.of(), 1, true); + RemoveFileEntry removeB = new RemoveFileEntry("b", Map.of(), 1, true, Optional.empty()); + RemoveFileEntry removeC = new RemoveFileEntry("c", Map.of(), 1, true, Optional.empty()); builder.addLogEntry(addFileEntry(addA1)); builder.addLogEntry(removeFileEntry(removeA1)); builder.addLogEntry(addFileEntry(addA2)); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java index 8acf7be68d32..390a10be3de9 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java @@ -612,7 +612,8 @@ public void testReadAllEntries() // partitionValues information is missing in the checkpoint null, 1579190155406L, - false)); + false, + Optional.empty())); // CommitInfoEntry // not found in the checkpoint, TODO add a test @@ -925,7 +926,8 @@ public void testSkipRemoveEntries() UUID.randomUUID().toString(), ImmutableMap.of("part_key", "2023-01-01 00:00:00"), 1000, - true)) + true, + Optional.empty())) .collect(toImmutableSet()); CheckpointEntries entries = new CheckpointEntries( diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java index 410e5686cad1..173303df8460 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java @@ -184,7 +184,8 @@ public void testCheckpointWriteReadJsonRoundtrip() "removeFilePath", ImmutableMap.of("part_key", "7.0"), 1000, - true); + true, + Optional.empty()); CheckpointEntries entries = new CheckpointEntries( metadataEntry, @@ -325,7 +326,8 @@ public void testCheckpointWriteReadParquetStatisticsRoundtrip() "removeFilePath", ImmutableMap.of("part_key", "7.0"), 1000, - true); + true, + Optional.empty()); CheckpointEntries entries = new CheckpointEntries( metadataEntry,