diff --git a/build.gradle b/build.gradle index 6df806db59e4..d25aac32594f 100644 --- a/build.gradle +++ b/build.gradle @@ -363,6 +363,7 @@ project(':iceberg-core') { exclude group: 'org.apache.avro', module: 'avro' exclude group: 'org.slf4j', module: 'slf4j-log4j12' } + implementation("org.rocksdb:rocksdbjni:${libs.versions.rocksdb.get()}") testImplementation libs.jetty.servlet testImplementation libs.jetty.server diff --git a/core/src/main/java/org/apache/iceberg/util/RocksDBStructLikeSet.java b/core/src/main/java/org/apache/iceberg/util/RocksDBStructLikeSet.java new file mode 100644 index 000000000000..587084656f12 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/RocksDBStructLikeSet.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; + +public class RocksDBStructLikeSet extends StructLikeSet { + + private final RocksDB db; + private int size = 0; + private final StructLikeSerializer serializer; + + private static volatile RocksDBStructLikeSet instance; + private static volatile boolean loaded = false; + + private RocksDBStructLikeSet(Types.StructType type) { + super(type); + try { + Options options = new Options().setCreateIfMissing(true); + this.db = RocksDB.open(options, mkDir()); + } catch (Exception e) { + throw new RuntimeException(e); + } + this.serializer = new StructLikeSerializer(type); + } + + static { + RocksDB.loadLibrary(); + } + + public static StructLikeSet getOrCreate( + Types.StructType type, Iterable> deletes) { + if (!loaded) { + synchronized (RocksDBStructLikeSet.class) { + if (!loaded) { + instance = new RocksDBStructLikeSet(type); + Iterables.addAll(instance, Iterables.concat(deletes)); + loaded = true; + } + } + } + return instance; + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size == 0; + } + + @Override + public boolean contains(Object obj) { + if (obj instanceof StructLike || obj == null) { + byte[] key = serializer.serialize((StructLike) obj); + try { + return db.get(key) != null; + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + } + return false; + } + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException("iterator is not supported"); + } + + @Override + public boolean add(StructLike struct) { + byte[] key = serializer.serialize(struct); + try { + db.put(key, new byte[0]); + size += 1; + return true; + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean remove(Object obj) { + throw new UnsupportedOperationException("remove is not supported"); + } + + private static class StructLikeSerializer { + + private static final int NULL_MARK = -1; + private final Types.StructType struct; + private final List types; + + private StructLikeSerializer(Types.StructType struct) { + this.struct = struct; + this.types = + struct.fields().stream().map(Types.NestedField::type).collect(Collectors.toList()); + } + + public byte[] serialize(StructLike object) { + if (object == null) { + return null; + } + + try (ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(out)) { + + for (int i = 0; i < types.size(); i += 1) { + Type type = types.get(i); + Class valueClass = type.typeId().javaClass(); + + byte[] fieldData = + ByteBuffers.toByteArray(Conversions.toByteBuffer(type, object.get(i, valueClass))); + if (fieldData == null) { + dos.writeInt(NULL_MARK); + } else { + dos.writeInt(fieldData.length); + dos.write(fieldData); + } + } + return out.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public StructLike deserialize(byte[] data) { + if (data == null) { + return null; + } + + try (ByteArrayInputStream in = new ByteArrayInputStream(data); + DataInputStream dis = new DataInputStream(in)) { + + GenericRecord record = GenericRecord.create(struct); + for (int i = 0; i < types.size(); i += 1) { + int length = dis.readInt(); + + if (length == NULL_MARK) { + record.set(i, null); + } else { + byte[] fieldData = new byte[length]; + int fieldDataSize = dis.read(fieldData); + Preconditions.checkState(length == fieldDataSize, "%s != %s", length, fieldDataSize); + record.set(i, Conversions.fromByteBuffer(types.get(i), ByteBuffer.wrap(fieldData))); + } + } + + return record; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + private String mkDir() throws IOException { + Path path = Files.createTempDirectory("iceberg-rocksdb"); + return path.toString(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/util/StructLikeSet.java b/core/src/main/java/org/apache/iceberg/util/StructLikeSet.java index f1f32cb92435..e555f77e0d22 100644 --- a/core/src/main/java/org/apache/iceberg/util/StructLikeSet.java +++ b/core/src/main/java/org/apache/iceberg/util/StructLikeSet.java @@ -38,7 +38,7 @@ public static StructLikeSet create(Types.StructType type) { private final Set wrapperSet; private final ThreadLocal wrappers; - private StructLikeSet(Types.StructType type) { + protected StructLikeSet(Types.StructType type) { this.type = type; this.wrapperSet = Sets.newHashSet(); this.wrappers = ThreadLocal.withInitial(() -> StructLikeWrapper.forType(type)); diff --git a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java index 91b7fd1c1dc1..ec5c40ef43ee 100644 --- a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java +++ b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java @@ -47,10 +47,10 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.math.LongMath; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.CharSequenceMap; +import org.apache.iceberg.util.RocksDBStructLikeSet; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; @@ -105,10 +105,9 @@ protected V getOrLoad(String key, Supplier valueSupplier, long valueSize) @Override public StructLikeSet loadEqualityDeletes(Iterable deleteFiles, Schema projection) { Iterable> deletes = - execute(deleteFiles, deleteFile -> getOrReadEqDeletes(deleteFile, projection)); - StructLikeSet deleteSet = StructLikeSet.create(projection.asStruct()); - Iterables.addAll(deleteSet, Iterables.concat(deletes)); - return deleteSet; + execute( + deleteFiles, deleteFile -> toStructs(openDeletes(deleteFile, projection), projection)); + return RocksDBStructLikeSet.getOrCreate(projection.asStruct(), deletes); } private Iterable getOrReadEqDeletes(DeleteFile deleteFile, Schema projection) { diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 5f07fb949278..87f1057943b1 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -74,6 +74,7 @@ orc = "1.9.3" parquet = "1.13.1" pig = "0.17.0" roaringbitmap = "1.2.0" +rocksdb = "7.9.2" s3mock-junit5 = "2.11.0" scala-collection-compat = "2.12.0" slf4j = "1.7.36"