diff --git a/plugin/trino-delta-lake/src/main/java/io/delta/kernel/internal/deletionvectors/RoaringBitmapArrays.java b/plugin/trino-delta-lake/src/main/java/io/delta/kernel/internal/deletionvectors/RoaringBitmapArrays.java new file mode 100644 index 000000000000..1aca923c9833 --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/delta/kernel/internal/deletionvectors/RoaringBitmapArrays.java @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.deletionvectors; + +import java.io.IOException; + +public final class RoaringBitmapArrays +{ + private RoaringBitmapArrays() {} + + public static RoaringBitmapArray readFrom(byte[] bytes) + throws IOException + { + return RoaringBitmapArray.readFrom(bytes); + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java index 04b55f6c7b80..d43e3dd806f2 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; +import io.delta.kernel.internal.deletionvectors.RoaringBitmapArray; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; @@ -62,7 +63,6 @@ import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; import org.joda.time.DateTimeZone; -import org.roaringbitmap.longlong.Roaring64NavigableMap; import java.io.IOException; import java.io.UncheckedIOException; @@ -295,7 +295,7 @@ private PositionDeleteFilter readDeletes( DeletionVectorEntry deletionVector) { try { - Roaring64NavigableMap deletedRows = readDeletionVectors(fileSystem, tableLocation, deletionVector); + RoaringBitmapArray deletedRows = readDeletionVectors(fileSystem, tableLocation, deletionVector); return new PositionDeleteFilter(deletedRows); } catch (IOException e) { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java index 5eeed88c6237..83ec6783ba8c 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java @@ -15,17 +15,16 @@ import com.google.common.base.CharMatcher; import io.delta.kernel.internal.deletionvectors.Base85Codec; +import io.delta.kernel.internal.deletionvectors.RoaringBitmapArray; +import io.delta.kernel.internal.deletionvectors.RoaringBitmapArrays; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoInputFile; import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; import io.trino.spi.TrinoException; -import org.roaringbitmap.RoaringBitmap; -import org.roaringbitmap.longlong.Roaring64NavigableMap; import java.io.DataInputStream; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.UUID; import java.util.zip.CRC32; import java.util.zip.Checksum; @@ -35,14 +34,10 @@ import static io.delta.kernel.internal.deletionvectors.Base85Codec.decodeUUID; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; -import static java.lang.Math.toIntExact; -import static java.nio.ByteOrder.LITTLE_ENDIAN; // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vector-format public final class DeletionVectors { - private static final int PORTABLE_ROARING_BITMAP_MAGIC_NUMBER = 1681511377; - private static final String UUID_MARKER = "u"; // relative path with random prefix on disk private static final String PATH_MARKER = "p"; // absolute path on disk private static final String INLINE_MARKER = "i"; // inline @@ -51,17 +46,13 @@ public final class DeletionVectors private DeletionVectors() {} - public static Roaring64NavigableMap readDeletionVectors(TrinoFileSystem fileSystem, Location location, DeletionVectorEntry deletionVector) + public static RoaringBitmapArray readDeletionVectors(TrinoFileSystem fileSystem, Location location, DeletionVectorEntry deletionVector) throws IOException { if (deletionVector.storageType().equals(UUID_MARKER)) { TrinoInputFile inputFile = fileSystem.newInputFile(location.appendPath(toFileName(deletionVector.pathOrInlineDv()))); byte[] buffer = readDeletionVector(inputFile, deletionVector.offset().orElseThrow(), deletionVector.sizeInBytes()); - Roaring64NavigableMap bitmaps = deserializeDeletionVectors(buffer); - if (bitmaps.getLongCardinality() != deletionVector.cardinality()) { - throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "The number of deleted rows expects %s but got %s".formatted(deletionVector.cardinality(), bitmaps.getLongCardinality())); - } - return bitmaps; + return RoaringBitmapArrays.readFrom(buffer); } if (deletionVector.storageType().equals(INLINE_MARKER) || deletionVector.storageType().equals(PATH_MARKER)) { throw new TrinoException(NOT_SUPPORTED, "Unsupported storage type for deletion vector: " + deletionVector.storageType()); @@ -107,30 +98,4 @@ private static int calculateChecksum(byte[] data) crc.update(data); return (int) crc.getValue(); } - - private static Roaring64NavigableMap deserializeDeletionVectors(byte[] bytes) - throws IOException - { - ByteBuffer buffer = ByteBuffer.wrap(bytes).order(LITTLE_ENDIAN); - checkArgument(buffer.order() == LITTLE_ENDIAN, "Byte order must be little endian: %s", buffer.order()); - int magicNumber = buffer.getInt(); - if (magicNumber == PORTABLE_ROARING_BITMAP_MAGIC_NUMBER) { - int size = toIntExact(buffer.getLong()); - Roaring64NavigableMap bitmaps = new Roaring64NavigableMap(); - for (int i = 0; i < size; i++) { - int key = buffer.getInt(); - checkArgument(key >= 0, "key must not be negative: %s", key); - - RoaringBitmap bitmap = new RoaringBitmap(); - bitmap.deserialize(buffer); - bitmap.stream().forEach(bitmaps::add); - - // there seems to be no better way to ask how many bytes bitmap.deserialize has read - int consumedBytes = bitmap.serializedSizeInBytes(); - buffer.position(buffer.position() + consumedBytes); - } - return bitmaps; - } - throw new IllegalArgumentException("Unsupported magic number: " + magicNumber); - } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java index 42600b740c6f..542dc8eea199 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java @@ -13,26 +13,23 @@ */ package io.trino.plugin.deltalake.delete; +import io.delta.kernel.internal.deletionvectors.RoaringBitmapArray; import io.trino.plugin.deltalake.DeltaLakeColumnHandle; import io.trino.spi.block.Block; -import org.roaringbitmap.longlong.Roaring64NavigableMap; import java.util.List; -import static com.google.common.base.Preconditions.checkArgument; import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.ROW_POSITION_COLUMN_NAME; import static io.trino.spi.type.BigintType.BIGINT; import static java.util.Objects.requireNonNull; public final class PositionDeleteFilter { - private final Roaring64NavigableMap deletedRows; + private final RoaringBitmapArray deletedRows; - public PositionDeleteFilter(Roaring64NavigableMap deletedRows) + public PositionDeleteFilter(RoaringBitmapArray deletedRows) { - requireNonNull(deletedRows, "deletedRows is null"); - checkArgument(!deletedRows.isEmpty(), "deletedRows is empty"); - this.deletedRows = deletedRows; + this.deletedRows = requireNonNull(deletedRows, "deletedRows is null"); } public PageFilter createPredicate(List columns) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java index de7c53c29091..0008eb41360b 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java @@ -14,11 +14,11 @@ package io.trino.plugin.deltalake.delete; import com.google.common.io.Resources; +import io.delta.kernel.internal.deletionvectors.RoaringBitmapArray; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; import org.junit.jupiter.api.Test; -import org.roaringbitmap.longlong.Roaring64NavigableMap; import java.io.File; import java.nio.file.Path; @@ -42,8 +42,7 @@ public void testUuidStorageType() TrinoFileSystem fileSystem = HDFS_FILE_SYSTEM_FACTORY.create(SESSION); DeletionVectorEntry deletionVector = new DeletionVectorEntry("u", "R7QFX3rGXPFLhHGq&7g<", OptionalInt.of(1), 34, 1); - Roaring64NavigableMap bitmaps = readDeletionVectors(fileSystem, Location.of(path.toString()), deletionVector); - assertThat(bitmaps.getLongCardinality()).isEqualTo(1); + RoaringBitmapArray bitmaps = readDeletionVectors(fileSystem, Location.of(path.toString()), deletionVector); assertThat(bitmaps.contains(0)).isFalse(); assertThat(bitmaps.contains(1)).isTrue(); assertThat(bitmaps.contains(2)).isFalse();