diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/PutToByteBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/PutToByteBuffer.java index b0f8110c914c..9d3b4e66f4b8 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/PutToByteBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/PutToByteBuffer.java @@ -32,6 +32,6 @@ * @param The exception type this function may throw. */ @FunctionalInterface -interface PutToByteBuffer +public interface PutToByteBuffer extends CheckedFunction { } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java index ec9359fb3725..a3c12349a899 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java @@ -89,6 +89,21 @@ public void deleteWithBatch(BatchOperation batch, KEY key) "version."); } + @Override + public KeyValueSpliterator spliterator(int maxParallelism, boolean closeOnException) { + throw new UnsupportedOperationException("Iterating tables directly is not" + + " supported for datanode containers due to differing schema " + + "version."); + } + + @Override + public KeyValueSpliterator spliterator(KEY startKey, KEY prefix, int maxParallelism, + boolean closeOnException) { + throw new UnsupportedOperationException("Iterating tables directly is not" + + " supported for datanode containers due to differing schema " + + "version."); + } + @Override public String getName() { return table.getName(); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Buffer.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Buffer.java new file mode 100644 index 000000000000..354171b7d960 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Buffer.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import org.apache.ratis.util.Preconditions; + +/** + * A utility class for managing an underlying {@link CodecBuffer} with dynamic capacity adjustment + * based on the requirements of a data source. This class encapsulates operations to allocate, + * prepare, and release the buffer, as well as retrieve data from a source. + */ +public class Buffer { + private final CodecBuffer.Capacity initialCapacity; + private final PutToByteBuffer source; + private CodecBuffer buffer; + + public Buffer(CodecBuffer.Capacity initialCapacity, + PutToByteBuffer source) { + this.initialCapacity = initialCapacity; + this.source = source; + } + + public void release() { + if (buffer != null) { + buffer.release(); + } + } + + public void prepare() { + if (buffer == null) { + allocate(); + } else { + buffer.clear(); + } + } + + public void allocate() { + if (buffer != null) { + buffer.release(); + } + buffer = CodecBuffer.allocateDirect(-initialCapacity.get()); + } + + public CodecBuffer getFromDb() { + for (prepare(); ; allocate()) { + final Integer required = buffer.putFromSource(source); + if (required == null) { + return null; // the source is unavailable + } else if (required == buffer.readableBytes()) { + return buffer; // buffer size is big enough + } + // buffer size too small, try increasing the capacity. + if (buffer.setCapacity(required)) { + buffer.clear(); + // retry with the new capacity + final int retried = buffer.putFromSource(source); + Preconditions.assertSame(required.intValue(), retried, "required"); + return buffer; + } + + // failed to increase the capacity + // increase initial capacity and reallocate it + initialCapacity.increase(required); + } + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java index 076b6bc933d0..26d8f83aa46d 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java @@ -226,7 +226,7 @@ public RDBStore build() throws IOException { return new RDBStore(dbFile, rocksDBOption, statistics, writeOptions, tableConfigs, openReadOnly, dbJmxBeanNameName, enableCompactionDag, maxDbUpdatesSizeThreshold, createCheckpointDirs, configuration, - enableRocksDbMetrics); + enableRocksDbMetrics, rocksDBConfiguration.isThreadSafeIteratorEnabled()); } finally { tableConfigs.forEach(TableConfig::close); } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java index 3539a906117d..560cbdc7e0fd 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java @@ -73,6 +73,7 @@ public class RDBStore implements DBStore { private final long maxDbUpdatesSizeThreshold; private final ManagedDBOptions dbOptions; private final ManagedStatistics statistics; + private final boolean initializeReferenceCountedIterator; @SuppressWarnings("parameternumber") RDBStore(File dbFile, ManagedDBOptions dbOptions, ManagedStatistics statistics, @@ -82,13 +83,14 @@ public class RDBStore implements DBStore { long maxDbUpdatesSizeThreshold, boolean createCheckpointDirs, ConfigurationSource configuration, - boolean enableRocksDBMetrics) + boolean enableRocksDBMetrics, boolean initializeReferenceCountedIterator) throws IOException { Preconditions.checkNotNull(dbFile, "DB file location cannot be null"); Preconditions.checkNotNull(families); Preconditions.checkArgument(!families.isEmpty()); this.maxDbUpdatesSizeThreshold = maxDbUpdatesSizeThreshold; + this.initializeReferenceCountedIterator = initializeReferenceCountedIterator; dbLocation = dbFile; this.dbOptions = dbOptions; this.statistics = statistics; @@ -308,7 +310,7 @@ public RDBTable getTable(String name) throws IOException { if (handle == null) { throw new IOException("No such table in this DB. TableName : " + name); } - return new RDBTable(this.db, handle, rdbMetrics); + return new RDBTable(this.db, handle, rdbMetrics, initializeReferenceCountedIterator); } @Override @@ -321,7 +323,7 @@ public TypedTable getTable( public ArrayList listTables() { ArrayList
returnList = new ArrayList<>(); for (ColumnFamily family : getColumnFamilies()) { - returnList.add(new RDBTable(db, family, rdbMetrics)); + returnList.add(new RDBTable(db, family, rdbMetrics, initializeReferenceCountedIterator)); } return returnList; } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java index 75104a55ed72..54d3f36fb87a 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.NoSuchElementException; import java.util.function.Consumer; +import org.apache.hadoop.hdds.utils.db.iterator.BaseDBTableIterator; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,14 +31,14 @@ * @param the raw type. */ abstract class RDBStoreAbstractIterator - implements TableIterator> { + implements BaseDBTableIterator> { private static final Logger LOG = LoggerFactory.getLogger(RDBStoreAbstractIterator.class); private final ManagedRocksIterator rocksDBIterator; private final RDBTable rocksDBTable; - private Table.KeyValue currentEntry; + private RawKeyValue currentEntry; // This is for schemas that use a fixed-length // prefix for each key. private final RAW prefix; @@ -53,7 +54,7 @@ abstract class RDBStoreAbstractIterator abstract RAW key(); /** @return the {@link Table.KeyValue} for the current entry. */ - abstract Table.KeyValue getKeyValue(); + abstract RawKeyValue getKeyValue(); /** Seek to the given key. */ abstract void seek0(RAW key); @@ -78,7 +79,7 @@ final RAW getPrefix() { @Override public final void forEachRemaining( - Consumer> action) { + Consumer> action) { while (hasNext()) { action.accept(next()); } @@ -99,7 +100,7 @@ public final boolean hasNext() { } @Override - public final Table.KeyValue next() { + public final RawKeyValue next() { setCurrentEntry(); if (currentEntry != null) { rocksDBIterator.get().next(); @@ -129,7 +130,7 @@ public final void seekToLast() { } @Override - public final Table.KeyValue seek(RAW key) { + public final RawKeyValue seek(RAW key) { seek0(key); setCurrentEntry(); return currentEntry; @@ -151,4 +152,9 @@ public final void removeFromDB() throws IOException { public void close() { rocksDBIterator.close(); } + + @Override + public boolean isKVCloseable() { + return false; + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java index 62303c8a3bb7..02648d53b78c 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java @@ -38,7 +38,7 @@ byte[] key() { } @Override - Table.KeyValue getKeyValue() { + RawKeyValue getKeyValue() { final ManagedRocksIterator i = getRocksDBIterator(); return RawKeyValue.create(i.get().key(), i.get().value()); } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreCodecBufferIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreCodecBufferIterator.java index 107b71ee5045..eaac8501bfde 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreCodecBufferIterator.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreCodecBufferIterator.java @@ -57,9 +57,9 @@ CodecBuffer key() { } @Override - Table.KeyValue getKeyValue() { + RawKeyValue getKeyValue() { assertOpen(); - return Table.newKeyValue(key(), valueBuffer.getFromDb()); + return new RawKeyValue(key(), valueBuffer.getFromDb()); } @Override @@ -96,60 +96,4 @@ public void close() { valueBuffer.release(); } } - - static class Buffer { - private final CodecBuffer.Capacity initialCapacity; - private final PutToByteBuffer source; - private CodecBuffer buffer; - - Buffer(CodecBuffer.Capacity initialCapacity, - PutToByteBuffer source) { - this.initialCapacity = initialCapacity; - this.source = source; - } - - void release() { - if (buffer != null) { - buffer.release(); - } - } - - private void prepare() { - if (buffer == null) { - allocate(); - } else { - buffer.clear(); - } - } - - private void allocate() { - if (buffer != null) { - buffer.release(); - } - buffer = CodecBuffer.allocateDirect(-initialCapacity.get()); - } - - CodecBuffer getFromDb() { - for (prepare(); ; allocate()) { - final Integer required = buffer.putFromSource(source); - if (required == null) { - return null; // the source is unavailable - } else if (required == buffer.readableBytes()) { - return buffer; // buffer size is big enough - } - // buffer size too small, try increasing the capacity. - if (buffer.setCapacity(required)) { - buffer.clear(); - // retry with the new capacity - final int retried = buffer.putFromSource(source); - Preconditions.assertSame(required.intValue(), retried, "required"); - return buffer; - } - - // failed to increase the capacity - // increase initial capacity and reallocate it - initialCapacity.increase(required); - } - } - } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java index f72b0084bdc5..787b7c41279c 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java @@ -27,6 +27,9 @@ import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.utils.MetadataKeyFilters; import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; +import org.apache.hadoop.hdds.utils.db.iterator.BaseDBTableIterator; +import org.apache.hadoop.hdds.utils.db.iterator.ReferenceCountedRDBStoreByteArrayIterator; +import org.apache.hadoop.hdds.utils.db.iterator.ReferenceCountedRDBStoreCodecBufferIterator; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +40,7 @@ * metadata store content. All other user's using Table should use TypedTable. */ @InterfaceAudience.Private -class RDBTable implements Table { +public class RDBTable implements Table { private static final Logger LOG = LoggerFactory.getLogger(RDBTable.class); @@ -45,6 +48,7 @@ class RDBTable implements Table { private final RocksDatabase db; private final ColumnFamily family; private final RDBMetrics rdbMetrics; + private final boolean initializeThreadSafeIterator; /** * Constructs a TableStore. @@ -53,10 +57,11 @@ class RDBTable implements Table { * @param family - ColumnFamily Handle. */ RDBTable(RocksDatabase db, ColumnFamily family, - RDBMetrics rdbMetrics) { + RDBMetrics rdbMetrics, boolean initializeThreadSafeIterator) { this.db = db; this.family = family; this.rdbMetrics = rdbMetrics; + this.initializeThreadSafeIterator = initializeThreadSafeIterator; } public ColumnFamily getColumnFamily() { @@ -94,7 +99,7 @@ public void putWithBatch(BatchOperation batch, byte[] key, byte[] value) @Override public boolean isEmpty() throws IOException { - try (TableIterator> keyIter = iterator()) { + try (TableIterator> keyIter = iterator()) { keyIter.seekToFirst(); return !keyIter.hasNext(); } @@ -210,20 +215,43 @@ public void deleteWithBatch(BatchOperation batch, byte[] key) } @Override - public TableIterator> iterator() + public BaseDBTableIterator> iterator() throws IOException { return iterator((byte[])null); } @Override - public TableIterator> iterator(byte[] prefix) + public BaseDBTableIterator> iterator(byte[] prefix) throws IOException { - return new RDBStoreByteArrayIterator(db.newIterator(family, false), this, - prefix); + if (initializeThreadSafeIterator) { + return new ReferenceCountedRDBStoreByteArrayIterator(db.newIterator(family, false), this, prefix); + } + return new RDBStoreByteArrayIterator(db.newIterator(family, false), this, prefix); + } + + @Override + public KeyValueSpliterator spliterator(int maxParallelism, boolean closeOnException) + throws IOException { + return spliterator(null, null, maxParallelism, closeOnException); + } + + @Override + public KeyValueSpliterator spliterator(byte[] startKey, byte[] prefix, int maxParallelism, + boolean closeOnException) throws IOException { + return newByteArraySpliterator(prefix, startKey, maxParallelism, closeOnException); } - TableIterator> iterator( + BaseDBTableIterator> iterator( CodecBuffer prefix) throws IOException { + return iterator(prefix, 1); + } + + BaseDBTableIterator> iterator( + CodecBuffer prefix, int maxNumberOfBuffers) throws IOException { + if (initializeThreadSafeIterator) { + return new ReferenceCountedRDBStoreCodecBufferIterator(db.newIterator(family, false), + this, prefix, maxNumberOfBuffers); + } return new RDBStoreCodecBufferIterator(db.newIterator(family, false), this, prefix); } @@ -262,7 +290,7 @@ public List> getSequentialRangeKVs(byte[] startKey, @Override public void deleteBatchWithPrefix(BatchOperation batch, byte[] prefix) throws IOException { - try (TableIterator> iter + try (TableIterator> iter = iterator(prefix)) { while (iter.hasNext()) { deleteWithBatch(batch, iter.next().getKey()); @@ -273,7 +301,7 @@ public void deleteBatchWithPrefix(BatchOperation batch, byte[] prefix) @Override public void dumpToFileWithPrefix(File externalFile, byte[] prefix) throws IOException { - try (TableIterator> iter = iterator(prefix); + try (TableIterator> iter = iterator(prefix); RDBSstFileWriter fileWriter = new RDBSstFileWriter(externalFile)) { while (iter.hasNext()) { final KeyValue entry = iter.next(); @@ -298,7 +326,7 @@ private List> getRangeKVs(byte[] startKey, "Invalid count given " + count + ", count must be greater than 0"); } final List> result = new ArrayList<>(); - try (TableIterator> it + try (TableIterator> it = iterator(prefix)) { if (startKey == null) { it.seekToFirst(); @@ -357,4 +385,37 @@ && get(startKey) == null) { } return result; } + + private KeyValueSpliterator newByteArraySpliterator(byte[] prefix, byte[] startKey, + int maxParallelism, boolean closeOnException) throws IOException { + if (initializeThreadSafeIterator) { + return new ByteArrayRawSpliterator(prefix, startKey, maxParallelism, closeOnException); + } + return new ByteArrayRawSpliterator(prefix, startKey, maxParallelism, closeOnException); + } + + private final class ByteArrayRawSpliterator extends RawSpliterator { + + private ByteArrayRawSpliterator(byte[] prefix, byte[] startKey, int maxParallelism, boolean closeOnException) + throws IOException { + super(prefix, startKey, maxParallelism, closeOnException); + initializeIterator(); + } + + @Override + KeyValue convert(RawKeyValue kv) { + final int rawSize = kv.getValue().length; + return Table.newKeyValue(kv.getKey(), kv.getValue(), rawSize); + } + + @Override + BaseDBTableIterator> getRawIterator( + byte[] prefix, byte[] startKey, int maxParallelism) throws IOException { + BaseDBTableIterator> itr = iterator(prefix); + if (startKey != null) { + itr.seek(startKey); + } + return itr; + } + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RawKeyValue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RawKeyValue.java index c7250617b6f9..ef8a0fe8be9a 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RawKeyValue.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RawKeyValue.java @@ -25,7 +25,7 @@ * * @param The raw type. */ -public abstract class RawKeyValue implements KeyValue { +public class RawKeyValue implements KeyValue { private final RAW key; private final RAW value; @@ -62,7 +62,7 @@ public byte[] getValue() { } } - private RawKeyValue(RAW key, RAW value) { + public RawKeyValue(RAW key, RAW value) { this.key = key; this.value = value; } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RawSpliterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RawSpliterator.java new file mode 100644 index 000000000000..4da6b03d571a --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RawSpliterator.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import java.io.IOException; +import java.util.Spliterator; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import org.apache.hadoop.hdds.utils.db.iterator.BaseDBTableIterator; +import org.apache.ratis.util.ReferenceCountedObject; + +/** + * An abstract implementation of {@link Table.KeyValueSpliterator} designed + * for iterating and splitting over raw key-value pairs retrieved via a + * {@link TableIterator}. + * + *

This class manages asynchronous raw iterator resources and provides + * functionality for converting raw key-value pairs into a structured + * {@link Table.KeyValue} representation. It also allows controlled splitting + * of the underlying iterator for parallel processing, while ensuring proper + * resource handling and thread safety. + * + * @param The raw representation type of the key-value pair. + * @param The type of key in the key-value pair. + * @param The type of value in the key-value pair. + */ +abstract class RawSpliterator + implements Table.KeyValueSpliterator { + + private ReferenceCountedObject>> rawIterator; + private final KEY keyPrefix; + private final KEY startKey; + private final AtomicInteger maxNumberOfAdditionalSplits; + private final AtomicReference closeException = new AtomicReference<>(); + private boolean initialized; + private final Lock lock; + private boolean closed; + private final boolean closeOnException; + + abstract Table.KeyValue convert(RawKeyValue kv) throws IOException; + + abstract BaseDBTableIterator> getRawIterator( + KEY prefix, KEY start, int maxParallelism) throws IOException; + + RawSpliterator(KEY keyPrefix, KEY startKey, int maxParallelism, boolean closeOnException) { + this.keyPrefix = keyPrefix; + this.startKey = startKey; + this.closeOnException = closeOnException; + this.lock = new ReentrantLock(); + this.maxNumberOfAdditionalSplits = new AtomicInteger(maxParallelism); + this.initialized = false; + this.closed = false; + } + + synchronized void initializeIterator() throws IOException { + if (initialized) { + return; + } + BaseDBTableIterator> itr = getRawIterator(keyPrefix, + startKey, maxNumberOfAdditionalSplits.decrementAndGet()); + try { + this.rawIterator = ReferenceCountedObject.wrap(itr, () -> { }, + (completelyReleased) -> { + if (completelyReleased) { + closeRawIteratorWithLock(); + } + this.maxNumberOfAdditionalSplits.incrementAndGet(); + }); + this.rawIterator.retain(); + } catch (Throwable e) { + itr.close(); + throw e; + } + initialized = true; + } + + private RawKeyValue getNextKV() { + try { + if (!closed && rawIterator.get().hasNext()) { + return rawIterator.get().next(); + } else { + closeRawIterator(); + return null; + } + } catch (Throwable e) { + if (closeOnException) { + closeRawIterator(); + } + throw new IllegalStateException("Failed next()", e); + } + } + + private boolean tryAdvanceForCloseableKV(Consumer> action) { + lock.lock(); + RawKeyValue rawKV; + try { + rawKV = getNextKV(); + } finally { + lock.unlock(); + } + try (AutoCloseable ignored = (AutoCloseable) rawKV) { + if (rawKV != null) { + action.accept(convert(rawKV)); + return true; + } + return false; + } catch (Throwable e) { + if (closeOnException) { + closeRawIteratorWithLock(); + } + throw new IllegalStateException("Failed while running action(kv)", e); + } + } + + private boolean tryAdvanceForKV(Consumer> action) { + lock.lock(); + Table.KeyValue kv = null; + try { + RawKeyValue rawKV = getNextKV(); + if (rawKV != null) { + kv = convert(rawKV); + } + } catch (Throwable e) { + if (closeOnException) { + closeRawIterator(); + } + throw new IllegalStateException("Failed next()", e); + } finally { + lock.unlock(); + } + try { + if (kv != null) { + action.accept(kv); + return true; + } + return false; + } catch (Throwable e) { + if (closeOnException) { + closeRawIteratorWithLock(); + } + throw new IllegalStateException("Failed next()", e); + } + } + + @Override + public boolean tryAdvance(Consumer> action) { + if (rawIterator.get().isKVCloseable()) { + return tryAdvanceForCloseableKV(action); + } + return tryAdvanceForKV(action); + } + + @Override + public int characteristics() { + return Spliterator.DISTINCT; + } + + @Override + public Spliterator> trySplit() { + int val = maxNumberOfAdditionalSplits.decrementAndGet(); + if (val >= 0) { + try { + this.rawIterator.retain(); + } catch (Exception e) { + maxNumberOfAdditionalSplits.incrementAndGet(); + return null; + } + return this; + } else { + maxNumberOfAdditionalSplits.incrementAndGet(); + } + return null; + } + + void closeRawIterator() { + if (!closed) { + try { + closed = true; + this.rawIterator.get().close(); + } catch (IOException e) { + closeException.set(e); + } + } + } + + void closeRawIteratorWithLock() { + if (!closed) { + this.lock.lock(); + try { + closeRawIterator(); + } finally { + this.lock.unlock(); + } + } + } + + @Override + public void close() throws IOException { + this.rawIterator.release(); + if (closeException.get() != null) { + throw closeException.get(); + } + } + + @Override + public long estimateSize() { + return Long.MAX_VALUE; + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDBConfiguration.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDBConfiguration.java index 74c8d95e670e..7c384297a740 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDBConfiguration.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDBConfiguration.java @@ -84,6 +84,13 @@ public class RocksDBConfiguration { + "Default 0 means no limit.") private long walSizeLimit = 0; + @Config(key = "rocksdb.threadsafe.iterator.enabled", + type = ConfigType.BOOLEAN, + defaultValue = "true", + tags = {OM, SCM, DATANODE}, + description = "Flag value for allowing initialization of thread safe iterator") + private boolean threadSafeIteratorEnabled = true; + public void setRocksdbLoggingEnabled(boolean enabled) { this.rocksdbLogEnabled = enabled; } @@ -139,4 +146,8 @@ public void setKeepLogFileNum(int fileNum) { public int getKeepLogFileNum() { return rocksdbKeepLogFileNum; } + + public boolean isThreadSafeIteratorEnabled() { + return threadSafeIteratorEnabled; + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java index 9552f327e2e8..8ca7258041b7 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java @@ -17,12 +17,14 @@ package org.apache.hadoop.hdds.utils.db; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Spliterator; import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.hdds.annotation.InterfaceStability; import org.apache.hadoop.hdds.utils.MetadataKeyFilters; @@ -170,6 +172,34 @@ default VALUE getReadCopy(KEY key) throws IOException { TableIterator> iterator(KEY prefix) throws IOException; + /** + * Creates a spliterator over the key-value pairs in the table with the specified + * maximum parallelism and exception handling behavior. + * + * @param maxParallelism the maximum number of threads that can be used for parallel processing. + * @param closeOnException if true, the spliterator will automatically close resources + * if an exception occurs during processing. + * @return a {@code Table.KeyValueSpliterator} instance for iterating over the key-value pairs. + * @throws IOException if an I/O error occurs during spliterator creation. + */ + Table.KeyValueSpliterator spliterator(int maxParallelism, boolean closeOnException) + throws IOException; + + /** + * Returns a spliterator that iterates over the key-value pairs starting from a given key + * and within a specified key prefix. The spliterator can be parallelized up to a given level + * of parallelism and may optionally close resources in case of exceptions. + * + * @param startKey the starting key from which iteration begins + * @param prefix the key prefix used to limit the keys being iterated + * @param maxParallelism the maximum level of parallelism allowed for the iteration + * @param closeOnException if true, closes resources when an exception occurs during iteration + * @return a spliterator that supports parallel iteration over the key-value pairs + * @throws IOException if an I/O error occurs during the creation of the spliterator + */ + Table.KeyValueSpliterator spliterator(KEY startKey, KEY prefix, int maxParallelism, + boolean closeOnException) throws IOException; + /** * Returns the Name of this Table. * @return - Table Name. @@ -419,4 +449,19 @@ public int hashCode() { interface KeyValueIterator extends TableIterator> { } + + /** + * A generic interface extending {@link Spliterator} and {@link Closeable}, + * designed for iterating over and splitting key-value pairs. + * + * @param The type of keys in the key-value pairs. + * @param The type of values in the key-value pairs. + * + * This interface facilitates traversal and splitting of key-value pairs + * while also providing resource management capabilities via {@link Closeable}. + * Implementations must handle key-value pair splitting to enable parallel processing, + * and ensure proper resource management when the spliterator is closed. + */ + interface KeyValueSpliterator extends Spliterator>, Closeable { + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index dcf482aad2ad..c81a402550c9 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -41,7 +41,9 @@ import org.apache.hadoop.hdds.utils.db.cache.TableCache; import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType; import org.apache.hadoop.hdds.utils.db.cache.TableNoCache; +import org.apache.hadoop.hdds.utils.db.iterator.BaseDBTableIterator; import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.UncheckedAutoCloseable; import org.apache.ratis.util.function.CheckedBiFunction; /** @@ -399,6 +401,21 @@ public Table.KeyValueIterator iterator() throws IOException { return iterator(null); } + @Override + public Table.KeyValueSpliterator spliterator(int maxParallelism, boolean closeOnException) + throws IOException { + return spliterator(null, null, maxParallelism, closeOnException); + } + + @Override + public Table.KeyValueSpliterator spliterator(KEY startKey, KEY prefix, int maxParallelism, + boolean closeOnException) throws IOException { + if (supportCodecBuffer) { + return newCodecBufferSpliterator(prefix, startKey, maxParallelism, closeOnException); + } + return newByteArraySpliterator(prefix, startKey, maxParallelism, closeOnException); + } + @Override public Table.KeyValueIterator iterator(KEY prefix) throws IOException { @@ -554,8 +571,8 @@ public VALUE getValue() throws IOException { } } - RawIterator newCodecBufferTableIterator( - TableIterator> i) { + private RawIterator newCodecBufferTableIterator( + BaseDBTableIterator> i) { return new RawIterator(i) { @Override AutoCloseSupplier convert(KEY key) throws IOException { @@ -574,7 +591,7 @@ public CodecBuffer get() { } @Override - KeyValue convert(KeyValue raw) + KeyValue convert(RawKeyValue raw) throws IOException { final int rawSize = raw.getValue().readableBytes(); final KEY key = keyCodec.fromCodecBuffer(raw.getKey()); @@ -584,12 +601,22 @@ KeyValue convert(KeyValue raw) }; } + private RawSpliterator newCodecBufferSpliterator( + KEY prefix, KEY startKey, int maxParallelism, boolean closeOnException) throws IOException { + return new CodecBufferTypedRawSpliterator(prefix, startKey, maxParallelism, closeOnException); + } + + private RawSpliterator newByteArraySpliterator(KEY prefix, KEY startKey, int maxParallelism, + boolean closeOnException) throws IOException { + return new ByteArrayTypedRawSpliterator(prefix, startKey, maxParallelism, closeOnException); + } + /** * Table Iterator implementation for strongly typed tables. */ public class TypedTableIterator extends RawIterator { TypedTableIterator( - TableIterator> rawIterator) { + BaseDBTableIterator> rawIterator) { super(rawIterator); } @@ -600,7 +627,7 @@ AutoCloseSupplier convert(KEY key) throws IOException { } @Override - KeyValue convert(KeyValue raw) { + KeyValue convert(RawKeyValue raw) { return new TypedKeyValue(raw); } } @@ -612,9 +639,9 @@ KeyValue convert(KeyValue raw) { */ abstract class RawIterator implements Table.KeyValueIterator { - private final TableIterator> rawIterator; + private final BaseDBTableIterator> rawIterator; - RawIterator(TableIterator> rawIterator) { + RawIterator(BaseDBTableIterator> rawIterator) { this.rawIterator = rawIterator; } @@ -622,10 +649,10 @@ abstract class RawIterator abstract AutoCloseSupplier convert(KEY key) throws IOException; /** - * Covert the given {@link Table.KeyValue} + * Covert the given {@link RawKeyValue} * from ({@link RAW}, {@link RAW}) to ({@link KEY}, {@link VALUE}). */ - abstract KeyValue convert(KeyValue raw) + abstract KeyValue convert(RawKeyValue raw) throws IOException; @Override @@ -640,9 +667,17 @@ public void seekToLast() { @Override public KeyValue seek(KEY key) throws IOException { + UncheckedAutoCloseable closeable = null; try (AutoCloseSupplier rawKey = convert(key)) { - final KeyValue result = rawIterator.seek(rawKey.get()); + RawKeyValue result = rawIterator.seek(rawKey.get()); + if (rawIterator.isKVCloseable()) { + closeable = (UncheckedAutoCloseable) result; + } return result == null ? null : convert(result); + } finally { + if (closeable != null) { + closeable.close(); + } } } @@ -658,10 +693,19 @@ public boolean hasNext() { @Override public KeyValue next() { + UncheckedAutoCloseable closeable = null; try { - return convert(rawIterator.next()); + RawKeyValue kv = rawIterator.next(); + if (rawIterator.isKVCloseable()) { + closeable = (UncheckedAutoCloseable) kv; + } + return convert(kv); } catch (IOException e) { throw new IllegalStateException("Failed next()", e); + } finally { + if (closeable != null) { + closeable.close(); + } } } @@ -670,4 +714,75 @@ public void removeFromDB() throws IOException { rawIterator.removeFromDB(); } } + + private final class CodecBufferTypedRawSpliterator extends RawSpliterator { + + private CodecBufferTypedRawSpliterator(KEY prefix, KEY startKey, int maxParallelism, boolean closeOnException) + throws IOException { + super(prefix, startKey, maxParallelism, closeOnException); + initializeIterator(); + } + + @Override + KeyValue convert(RawKeyValue kv) throws IOException { + final int rawSize = kv.getValue().readableBytes(); + final KEY key = keyCodec.fromCodecBuffer(kv.getKey()); + final VALUE value = valueCodec.fromCodecBuffer(kv.getValue()); + return Table.newKeyValue(key, value, rawSize); + } + + @Override + BaseDBTableIterator> getRawIterator( + KEY prefix, KEY startKey, int maxParallelism) throws IOException { + + CodecBuffer prefixBuffer = encodeKeyCodecBuffer(prefix); + CodecBuffer startKeyBuffer = encodeKeyCodecBuffer(startKey); + try { + BaseDBTableIterator> itr = + rawTable.iterator(prefixBuffer, maxParallelism); + if (startKeyBuffer != null) { + itr.seek(startKeyBuffer); + } + return itr; + } catch (Throwable t) { + if (prefixBuffer != null) { + prefixBuffer.release(); + } + throw t; + } finally { + if (startKeyBuffer != null) { + startKeyBuffer.release(); + } + } + } + } + + private final class ByteArrayTypedRawSpliterator extends RawSpliterator { + + private ByteArrayTypedRawSpliterator(KEY prefix, KEY startKey, int maxParallelism, boolean closeOnException) + throws IOException { + super(prefix, startKey, maxParallelism, closeOnException); + initializeIterator(); + } + + @Override + KeyValue convert(RawKeyValue kv) throws IOException { + final int rawSize = kv.getValue().length; + final KEY key = keyCodec.fromPersistedFormat(kv.getKey()); + final VALUE value = valueCodec.fromPersistedFormat(kv.getValue()); + return Table.newKeyValue(key, value, rawSize); + } + + @Override + BaseDBTableIterator> getRawIterator( + KEY prefix, KEY startKey, int maxParallelism) throws IOException { + byte[] prefixBytes = encodeKey(prefix); + byte[] startKeyBytes = encodeKey(startKey); + BaseDBTableIterator> itr = rawTable.iterator(prefixBytes); + if (startKeyBytes != null) { + itr.seek(startKeyBytes); + } + return itr; + } + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/iterator/BaseDBTableIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/iterator/BaseDBTableIterator.java new file mode 100644 index 000000000000..9fcbc4368433 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/iterator/BaseDBTableIterator.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db.iterator; + +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; + +/** + * Base class for Raw DB TableIterator. + */ +public interface BaseDBTableIterator> extends TableIterator { + boolean isKVCloseable(); +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/iterator/CloseableRawKeyValue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/iterator/CloseableRawKeyValue.java new file mode 100644 index 000000000000..5f66c6f11640 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/iterator/CloseableRawKeyValue.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db.iterator; + +import org.apache.hadoop.hdds.utils.db.RawKeyValue; +import org.apache.ratis.util.ReferenceCountedObject; +import org.apache.ratis.util.UncheckedAutoCloseable; +import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; + +/** + * Closeable implementation of {@link RawKeyValue} that ensures proper resource management + * using an {@link UncheckedAutoCloseableSupplier}. This class provides a mechanism to + * automatically release resources when no longer needed. + */ +public final class CloseableRawKeyValue extends RawKeyValue implements UncheckedAutoCloseable { + private final UncheckedAutoCloseableSupplier> keyValue; + + public CloseableRawKeyValue(ReferenceCountedObject> kv) { + super(kv.get().getKey(), kv.get().getValue()); + this.keyValue = kv.retainAndReleaseOnClose(); + } + + @Override + public void close() { + keyValue.close(); + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/iterator/ReferenceCountedRDBStoreAbstractIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/iterator/ReferenceCountedRDBStoreAbstractIterator.java new file mode 100644 index 000000000000..7b2d2c8eea7f --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/iterator/ReferenceCountedRDBStoreAbstractIterator.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db.iterator; + +import java.io.IOException; +import java.util.NoSuchElementException; +import java.util.function.Consumer; +import org.apache.hadoop.hdds.utils.db.RDBTable; +import org.apache.hadoop.hdds.utils.db.RawKeyValue; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; +import org.apache.ratis.util.ReferenceCountedObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An abstract {@link TableIterator} to iterate raw {@link Table.KeyValue}s. + * + * @param the raw type. + */ +public abstract class ReferenceCountedRDBStoreAbstractIterator + implements BaseDBTableIterator> { + + private static final Logger LOG = + LoggerFactory.getLogger(ReferenceCountedRDBStoreAbstractIterator.class); + + private final ManagedRocksIterator rocksDBIterator; + private final RDBTable rocksDBTable; + private ReferenceCountedObject> currentEntry; + private RawKeyValue previousKeyValue; + // This is for schemas that use a fixed-length + // prefix for each key. + private final RAW prefix; + private Boolean hasNext; + private boolean closed; + + ReferenceCountedRDBStoreAbstractIterator(ManagedRocksIterator iterator, RDBTable table, + RAW prefix) { + this.rocksDBIterator = iterator; + this.rocksDBTable = table; + this.prefix = prefix; + this.currentEntry = null; + this.hasNext = false; + this.closed = false; + this.previousKeyValue = null; + } + + /** @return the {@link Table.KeyValue} for the current entry. */ + abstract ReferenceCountedObject> getKeyValue(); + + /** Seek to the given key. */ + abstract void seek0(RAW key); + + /** Delete the given key. */ + abstract void delete(RAW key) throws IOException; + + /** Does the given key start with the prefix? */ + abstract boolean startsWithPrefix(RAW key); + + final ManagedRocksIterator getRocksDBIterator() { + return rocksDBIterator; + } + + final RDBTable getRocksDBTable() { + return rocksDBTable; + } + + final RAW getPrefix() { + return prefix; + } + + @Override + public final void forEachRemaining( + Consumer> action) { + while (hasNext()) { + CloseableRawKeyValue entry = next(); + action.accept(entry); + } + } + + private void releaseEntry() { + if (currentEntry != null) { + currentEntry.release(); + } + currentEntry = null; + hasNext = null; + } + + private void setCurrentEntry() { + boolean isValid = !closed && rocksDBIterator.get().isValid(); + if (isValid) { + currentEntry = getKeyValue(); + currentEntry.retain(); + } else { + currentEntry = null; + } + setHasNext(isValid, currentEntry); + } + + public void setHasNext(boolean isValid, ReferenceCountedObject> entry) { + this.hasNext = isValid && (prefix == null || startsWithPrefix(entry.get().getKey())); + } + + @Override + public final boolean hasNext() { + if (hasNext == null) { + setCurrentEntry(); + } + return hasNext; + } + + @Override + public final CloseableRawKeyValue next() { + if (hasNext()) { + CloseableRawKeyValue entry = new CloseableRawKeyValue<>(currentEntry); + this.previousKeyValue = currentEntry.get(); + rocksDBIterator.get().next(); + releaseEntry(); + return entry; + } + throw new NoSuchElementException("RocksDB Store has no more elements"); + } + + @Override + public final void seekToFirst() { + if (prefix == null) { + rocksDBIterator.get().seekToFirst(); + } else { + seek0(prefix); + } + releaseEntry(); + } + + @Override + public final void seekToLast() { + if (prefix == null) { + rocksDBIterator.get().seekToLast(); + } else { + throw new UnsupportedOperationException("seekToLast: prefix != null"); + } + releaseEntry(); + } + + @Override + public final CloseableRawKeyValue seek(RAW key) { + seek0(key); + releaseEntry(); + setCurrentEntry(); + // Current entry should be only closed when the next() and thus closing the returned entry should be a noop. + if (hasNext()) { + return new CloseableRawKeyValue<>(currentEntry); + } + return null; + } + + @Override + public final void removeFromDB() throws IOException { + if (rocksDBTable == null) { + throw new UnsupportedOperationException("remove"); + } + if (previousKeyValue != null) { + delete(previousKeyValue.getKey()); + } else { + LOG.info("Failed to removeFromDB: currentEntry == null"); + } + } + + @Override + public void close() { + rocksDBIterator.close(); + closed = true; + releaseEntry(); + } + + @Override + public boolean isKVCloseable() { + return true; + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/iterator/ReferenceCountedRDBStoreByteArrayIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/iterator/ReferenceCountedRDBStoreByteArrayIterator.java new file mode 100644 index 000000000000..f7165974d34b --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/iterator/ReferenceCountedRDBStoreByteArrayIterator.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db.iterator; + +import java.io.IOException; +import java.util.Arrays; +import org.apache.hadoop.hdds.utils.db.RDBTable; +import org.apache.hadoop.hdds.utils.db.RawKeyValue; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; +import org.apache.ratis.util.ReferenceCountedObject; + +/** + * RocksDB store iterator using the byte[] API. + */ +public class ReferenceCountedRDBStoreByteArrayIterator extends ReferenceCountedRDBStoreAbstractIterator { + public ReferenceCountedRDBStoreByteArrayIterator(ManagedRocksIterator iterator, + RDBTable table, byte[] prefix) { + super(iterator, table, + prefix == null ? null : Arrays.copyOf(prefix, prefix.length)); + seekToFirst(); + } + + @Override + ReferenceCountedObject> getKeyValue() { + final ManagedRocksIterator i = getRocksDBIterator(); + RawKeyValue rawKV = RawKeyValue.create(i.get().key(), i.get().value()); + return ReferenceCountedObject.wrap(rawKV); + } + + @Override + void seek0(byte[] key) { + getRocksDBIterator().get().seek(key); + } + + @Override + void delete(byte[] key) throws IOException { + getRocksDBTable().delete(key); + } + + @Override + boolean startsWithPrefix(byte[] value) { + final byte[] prefix = getPrefix(); + if (prefix == null) { + return true; + } + if (value == null) { + return false; + } + + int length = prefix.length; + if (value.length < length) { + return false; + } + + for (int i = 0; i < length; i++) { + if (value[i] != prefix[i]) { + return false; + } + } + return true; + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/iterator/ReferenceCountedRDBStoreCodecBufferIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/iterator/ReferenceCountedRDBStoreCodecBufferIterator.java new file mode 100644 index 000000000000..661531e16daa --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/iterator/ReferenceCountedRDBStoreCodecBufferIterator.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db.iterator; + +import java.io.IOException; +import java.util.Deque; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import org.apache.commons.lang3.exception.UncheckedInterruptedException; +import org.apache.hadoop.hdds.utils.db.Buffer; +import org.apache.hadoop.hdds.utils.db.CodecBuffer; +import org.apache.hadoop.hdds.utils.db.RDBTable; +import org.apache.hadoop.hdds.utils.db.RawKeyValue; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; +import org.apache.hadoop.util.Sets; +import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.ReferenceCountedObject; + +/** + * An implementation of {@link ReferenceCountedRDBStoreAbstractIterator} that uses {@link CodecBuffer} + * for efficient memory management when iterating over RocksDB entries. + * This iterator employs a buffer pooling strategy to minimize memory allocations + * during iteration. Key and value buffers are pre-allocated and reused through a + * reference-counting mechanism, which significantly reduces GC pressure when + * processing large datasets. + * Key features: + **/ +public class ReferenceCountedRDBStoreCodecBufferIterator extends ReferenceCountedRDBStoreAbstractIterator { + + private final BlockingDeque> availableBufferStack; + private final Set> inUseBuffers; + private final AtomicReference closed = new AtomicReference<>(false); + + public ReferenceCountedRDBStoreCodecBufferIterator(ManagedRocksIterator iterator, RDBTable table, + CodecBuffer prefix, int maxNumberOfBuffersInMemory) { + super(iterator, table, prefix); + // We need atleast 1 buffers one for setting next value and one for sending the current value. + maxNumberOfBuffersInMemory = Math.max(1, maxNumberOfBuffersInMemory); + final String name = table != null ? table.getName() : null; + this.availableBufferStack = new LinkedBlockingDeque<>(maxNumberOfBuffersInMemory); + this.inUseBuffers = Sets.newConcurrentHashSet(); + for (int i = 0; i < maxNumberOfBuffersInMemory; i++) { + Buffer keyBuffer = new Buffer( + new CodecBuffer.Capacity(name + "-iterator-key-" + i, 1 << 10), + buffer -> getRocksDBIterator().get().key(buffer)); + Buffer valueBuffer = new Buffer( + new CodecBuffer.Capacity(name + "-iterator-value-" + i, 4 << 10), + buffer -> getRocksDBIterator().get().value(buffer)); + availableBufferStack.add(new RawKeyValue<>(keyBuffer, valueBuffer)); + } + + seekToFirst(); + } + + void assertOpen() { + Preconditions.assertTrue(!closed.get(), "Already closed"); + } + + private V getFromDeque(BlockingDeque deque, Set inUseSet) { + V popped; + do { + try { + popped = deque.poll(1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new UncheckedInterruptedException(e); + } + } while (popped == null); + assertOpen(); + inUseSet.add(popped); + return popped; + } + + private ReferenceCountedObject> getReferenceCountedBuffer( + RawKeyValue key, Deque> stack, Set> inUseSet, + Function, RawKeyValue> transformer) { + RawKeyValue value = transformer.apply(key); + return ReferenceCountedObject.wrap(value, () -> { + }, completelyReleased -> { + if (!completelyReleased) { + return; + } + closed.updateAndGet((prev) -> { + // If already closed the data structure should not be manipulated since the buffer would have already been + // closed. + if (!prev) { + //Entire block done inside this code block to avoid race condition with close() method. + //Remove from the set before adding it back to the stack. Otherwise there could be a race condition with + // #getFromDeque function. + inUseSet.remove(key); + stack.push(key); + } + return prev; + }); + }); + } + + @Override + ReferenceCountedObject> getKeyValue() { + assertOpen(); + RawKeyValue kvBuffer = getFromDeque(availableBufferStack, inUseBuffers); + Function, RawKeyValue> transformer = + kv -> new RawKeyValue<>(kv.getKey().getFromDb(), kv.getValue().getFromDb()); + return getReferenceCountedBuffer(kvBuffer, availableBufferStack, inUseBuffers, transformer); + } + + @Override + void seek0(CodecBuffer key) { + assertOpen(); + getRocksDBIterator().get().seek(key.asReadOnlyByteBuffer()); + } + + @Override + void delete(CodecBuffer key) throws IOException { + assertOpen(); + getRocksDBTable().delete(key.asReadOnlyByteBuffer()); + } + + @Override + boolean startsWithPrefix(CodecBuffer key) { + assertOpen(); + final CodecBuffer prefix = getPrefix(); + if (prefix == null) { + return true; + } + if (key == null) { + return false; + } + return key.startsWith(prefix); + } + + private void release(Deque valueStack, Set inUseSet, Function releaser) { + while (!valueStack.isEmpty()) { + V popped = valueStack.pop(); + releaser.apply(popped); + } + for (V inUseValue : inUseSet) { + releaser.apply(inUseValue); + } + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + super.close(); + Optional.ofNullable(getPrefix()).ifPresent(CodecBuffer::release); + release(availableBufferStack, inUseBuffers, kv -> { + kv.getKey().release(); + kv.getValue().release(); + return null; + }); + } + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/iterator/package-info.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/iterator/package-info.java new file mode 100644 index 000000000000..6df6af53462d --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/iterator/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Provides an interface for iterating over the contents of a DB table. + */ +package org.apache.hadoop.hdds.utils.db.iterator; diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java index 62e15a08662b..24ae9d9de091 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java @@ -99,6 +99,17 @@ public void deleteRange(KEY beginKey, KEY endKey) { throw new UnsupportedOperationException(); } + @Override + public KeyValueSpliterator spliterator(int maxParallelism, boolean closeOnException) { + throw new UnsupportedOperationException(); + } + + @Override + public KeyValueSpliterator spliterator(KEY startKey, KEY prefix, int maxParallelism, + boolean closeOnException) { + throw new UnsupportedOperationException(); + } + @Override public String getName() { return ""; diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/RawSpliteratorTest.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/RawSpliteratorTest.java new file mode 100644 index 000000000000..f1977e34c60d --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/RawSpliteratorTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Spliterator; +import java.util.function.Consumer; +import org.apache.hadoop.hdds.utils.db.iterator.BaseDBTableIterator; +import org.apache.hadoop.hdds.utils.db.iterator.CloseableRawKeyValue; +import org.junit.jupiter.api.Test; + +/** + * Unit test class for verifying the behavior of the {@code RawSpliterator}. + * This class contains tests to evaluate the functionality of its methods, + * such as {@code tryAdvance}, {@code trySplit}, and error handling during + * method execution. + */ +class RawSpliteratorTest { + + @Test + void testTryAdvanceWithValidElement() throws IOException { + // Mock dependencies + BaseDBTableIterator> rawIteratorMock = + mock(BaseDBTableIterator.class); + when(rawIteratorMock.isKVCloseable()).thenReturn(true); + CloseableRawKeyValue rawKeyValueMock = + mock(CloseableRawKeyValue.class); + + when(rawIteratorMock.hasNext()).thenReturn(true); + when(rawIteratorMock.next()).thenReturn(rawKeyValueMock); + + RawSpliterator rawSpliterator = new MockRawSpliterator(1) { + @Override + BaseDBTableIterator> getRawIterator(String prefix, + String startKey, int maxParallelism) { + return rawIteratorMock; + } + }; + + when(rawKeyValueMock.getKey()).thenReturn("key"); + when(rawKeyValueMock.getValue()).thenReturn("value"); + + Consumer> action = keyValue -> { + try { + assertEquals("key", keyValue.getKey()); + assertEquals("value", keyValue.getValue()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }; + + boolean result = rawSpliterator.tryAdvance(action); + + assertTrue(result); + verify(rawIteratorMock).hasNext(); + verify(rawIteratorMock).next(); + verify(rawKeyValueMock).close(); + } + + @Test + void testTryAdvanceWithNoElement() throws IOException { + BaseDBTableIterator> rawIteratorMock = + mock(BaseDBTableIterator.class); + + when(rawIteratorMock.hasNext()).thenReturn(false); + + RawSpliterator rawSpliterator = new MockRawSpliterator(1) { + @Override + BaseDBTableIterator> getRawIterator(String prefix, + String startKey, int maxParallelism) { + return rawIteratorMock; + } + }; + + Consumer> action = keyValue -> fail("Action should not be called"); + + boolean result = rawSpliterator.tryAdvance(action); + + assertFalse(result); + verify(rawIteratorMock).hasNext(); + verify(rawIteratorMock, never()).next(); + } + + @Test + void testTryAdvanceWhenConvertThrowsIOException() throws IOException { + BaseDBTableIterator> rawIteratorMock = mock(BaseDBTableIterator.class); + CloseableRawKeyValue rawKeyValueMock = mock(CloseableRawKeyValue.class); + + when(rawIteratorMock.hasNext()).thenReturn(true); + when(rawIteratorMock.next()).thenReturn(rawKeyValueMock); + + RawSpliterator rawSpliterator = new RawSpliterator(null, null, 1, + true) { + + @Override + Table.KeyValue convert(RawKeyValue kv) throws IOException { + throw new IOException("Mocked exception"); + } + + @Override + BaseDBTableIterator> getRawIterator(String prefix, + String startKey, int maxParallelism) { + return rawIteratorMock; + } + }; + rawSpliterator.initializeIterator(); + + Consumer> action = keyValue -> { + }; + + Exception exception = assertThrows(IllegalStateException.class, () -> rawSpliterator.tryAdvance(action)); + + assertEquals("Failed next()", exception.getMessage()); + assertNotNull(exception.getCause()); + assertEquals(IOException.class, exception.getCause().getClass()); + assertEquals("Mocked exception", exception.getCause().getMessage()); + verify(rawIteratorMock).hasNext(); + verify(rawIteratorMock).next(); + } + + @Test + void testTrySplits() throws IOException { + BaseDBTableIterator> rawIteratorMock = + mock(BaseDBTableIterator.class); + + RawSpliterator rawSpliterator = new MockRawSpliterator(2) { + @Override + BaseDBTableIterator> getRawIterator(String prefix, + String startKey, int maxParallelism) { + return rawIteratorMock; + } + }; + Spliterator> split1 = rawSpliterator.trySplit(); + Spliterator> split2 = rawSpliterator.trySplit(); + assertNotNull(split1); + assertNull(split2); + rawSpliterator.close(); + Spliterator> split3 = split1.trySplit(); + assertNotNull(split3); + } + + private abstract static class MockRawSpliterator extends RawSpliterator { + + MockRawSpliterator(int maxParallelism) throws IOException { + super(null, null, maxParallelism, true); + super.initializeIterator(); + } + + @Override + Table.KeyValue convert(RawKeyValue kv) { + return Table.newKeyValue(kv.getKey(), kv.getValue()); + } + } +} diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java index 01be3a73f8e2..57ba67a85845 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java @@ -89,7 +89,7 @@ public static RDBStore newRDBStore(File dbFile, ManagedDBOptions options, throws IOException { return new RDBStore(dbFile, options, null, new ManagedWriteOptions(), families, false, null, false, - maxDbUpdatesSizeThreshold, true, null, true); + maxDbUpdatesSizeThreshold, true, null, true, true); } @BeforeEach diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java index 05a4c946394e..eb7cb235647f 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java @@ -31,7 +31,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -117,12 +116,8 @@ public void testForEachRemaining() throws Exception { List> remaining = new ArrayList<>(); try (RDBStoreCodecBufferIterator i = newIterator()) { i.forEachRemaining(kv -> { - try { - remaining.add(RawKeyValue.create( - kv.getKey().getArray(), kv.getValue().getArray())); - } catch (IOException e) { - throw new RuntimeException(e); - } + remaining.add(RawKeyValue.create( + kv.getKey().getArray(), kv.getValue().getArray())); }); System.out.println("remaining: " + remaining); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java index 065e8728e748..b9c6edcee185 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java @@ -493,6 +493,7 @@ public void testIteratorRemoveFromDB() throws Exception { writeToTable(testTable, 3); try (TableIterator> iterator = testTable.iterator()) { + iterator.next(); iterator.removeFromDB(); } assertNull(testTable.get(bytesOf[1])); @@ -506,6 +507,7 @@ public void testIteratorRemoveFromDB() throws Exception { try (TableIterator> iterator = testTable.iterator()) { iterator.seekToLast(); + iterator.next(); iterator.removeFromDB(); } assertNotNull(testTable.get(bytesOf[1])); @@ -519,6 +521,7 @@ public void testIteratorRemoveFromDB() throws Exception { try (TableIterator> iterator = testTable.iterator()) { iterator.seek(bytesOf[3]); + iterator.next(); iterator.removeFromDB(); } assertNotNull(testTable.get(bytesOf[1])); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestReferenceCountedRDBStoreByteArrayIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestReferenceCountedRDBStoreByteArrayIterator.java new file mode 100644 index 000000000000..7e0d733b6640 --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestReferenceCountedRDBStoreByteArrayIterator.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentCaptor.forClass; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.NoSuchElementException; +import java.util.function.Consumer; +import org.apache.hadoop.hdds.utils.db.iterator.CloseableRawKeyValue; +import org.apache.hadoop.hdds.utils.db.iterator.ReferenceCountedRDBStoreByteArrayIterator; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; +import org.rocksdb.RocksIterator; + +/** + * This test prescribe expected behaviour + * from {@link ReferenceCountedRDBStoreByteArrayIterator} which wraps + * RocksDB's own iterator. Ozone internally in TypedTableIterator uses, the + * RDBStoreIterator to provide iteration over table elements in a typed manner. + * The tests are to ensure we access RocksDB via the iterator properly. + */ +public class TestReferenceCountedRDBStoreByteArrayIterator { + + private RocksIterator rocksDBIteratorMock; + private ManagedRocksIterator managedRocksIterator; + private RDBTable rocksTableMock; + + @BeforeEach + public void setup() { + rocksDBIteratorMock = mock(RocksIterator.class); + managedRocksIterator = new ManagedRocksIterator(rocksDBIteratorMock); + rocksTableMock = mock(RDBTable.class); + Logger.getLogger(ManagedRocksObjectUtils.class).setLevel(Level.DEBUG); + } + + ReferenceCountedRDBStoreByteArrayIterator newIterator() { + return new ReferenceCountedRDBStoreByteArrayIterator(managedRocksIterator, null, null); + } + + ReferenceCountedRDBStoreByteArrayIterator newIterator(byte[] prefix) { + return new ReferenceCountedRDBStoreByteArrayIterator( + managedRocksIterator, rocksTableMock, prefix); + } + + @Test + public void testForeachRemainingCallsConsumerWithAllElements() throws IOException { + when(rocksDBIteratorMock.isValid()) + .thenReturn(true, true, true, false); + when(rocksDBIteratorMock.key()) + .thenReturn(new byte[]{0x00}, new byte[]{0x01}, + new byte[]{0x02}) + .thenThrow(new NoSuchElementException()); + when(rocksDBIteratorMock.value()) + .thenReturn(new byte[]{0x7f}, new byte[]{0x7e}, new byte[]{0x7d}) + .thenThrow(new NoSuchElementException()); + + final Consumer> consumerStub + = mock(Consumer.class); + + try (ReferenceCountedRDBStoreByteArrayIterator iter = newIterator()) { + iter.forEachRemaining(consumerStub); + ArgumentCaptor> capture = forClass(CloseableRawKeyValue.class); + verify(consumerStub, times(3)).accept(capture.capture()); + assertArrayEquals( + new byte[]{0x00}, capture.getAllValues().get(0).getKey()); + assertArrayEquals( + new byte[]{0x7f}, capture.getAllValues().get(0).getValue()); + assertArrayEquals( + new byte[]{0x01}, capture.getAllValues().get(1).getKey()); + assertArrayEquals( + new byte[]{0x7e}, capture.getAllValues().get(1).getValue()); + assertArrayEquals( + new byte[]{0x02}, capture.getAllValues().get(2).getKey()); + assertArrayEquals( + new byte[]{0x7d}, capture.getAllValues().get(2).getValue()); + } + } + + @Test + public void testHasNextDoesNotDependsOnIsvalid() { + when(rocksDBIteratorMock.isValid()).thenReturn(true, false); + when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00}); + when(rocksDBIteratorMock.value()).thenReturn(new byte[]{0x00f}); + try (ReferenceCountedRDBStoreByteArrayIterator iter = newIterator()) { + assertTrue(iter.hasNext()); + assertTrue(iter.hasNext()); + iter.next(); + assertFalse(iter.hasNext()); + assertThrows(NoSuchElementException.class, iter::next); + assertFalse(iter.hasNext()); + } + } + + @Test + public void testNextCallsIsValidThenGetsTheValueAndStepsToNext() { + byte[] testKey = new byte[]{0x00}; + when(rocksDBIteratorMock.isValid()).thenReturn(true); + when(rocksDBIteratorMock.key()).thenReturn(testKey); + when(rocksDBIteratorMock.value()).thenReturn(testKey); + ReferenceCountedRDBStoreByteArrayIterator iter = newIterator(); + + InOrder verifier = inOrder(rocksDBIteratorMock); + + iter.next(); + + verifier.verify(rocksDBIteratorMock).isValid(); + verifier.verify(rocksDBIteratorMock).key(); + verifier.verify(rocksDBIteratorMock).value(); + verifier.verify(rocksDBIteratorMock).next(); + } + + @Test + public void testConstructorSeeksToFirstElement() { + newIterator(); + + verify(rocksDBIteratorMock, times(1)).seekToFirst(); + } + + @Test + public void testSeekToFirstSeeks() { + ReferenceCountedRDBStoreByteArrayIterator iter = newIterator(); + + iter.seekToFirst(); + + verify(rocksDBIteratorMock, times(2)).seekToFirst(); + } + + @Test + public void testSeekToLastSeeks() { + ReferenceCountedRDBStoreByteArrayIterator iter = newIterator(); + + iter.seekToLast(); + + verify(rocksDBIteratorMock, times(1)).seekToLast(); + } + + @Test + public void testSeekWithInvalidValue() { + when(rocksDBIteratorMock.isValid()).thenReturn(false); + + try (ReferenceCountedRDBStoreByteArrayIterator iter = newIterator()) { + final CloseableRawKeyValue val = iter.seek(new byte[] {0x55}); + assertFalse(iter.hasNext()); + InOrder verifier = inOrder(rocksDBIteratorMock); + verify(rocksDBIteratorMock, times(1)).seekToFirst(); //at construct time + verify(rocksDBIteratorMock, never()).seekToLast(); + verifier.verify(rocksDBIteratorMock, times(1)).seek(any(byte[].class)); + verifier.verify(rocksDBIteratorMock, times(1)).isValid(); + verifier.verify(rocksDBIteratorMock, never()).key(); + verifier.verify(rocksDBIteratorMock, never()).value(); + assertNull(val); + } + } + + @Test + public void testSeekReturnsTheActualKey() throws Exception { + when(rocksDBIteratorMock.isValid()).thenReturn(true); + when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00}); + when(rocksDBIteratorMock.value()).thenReturn(new byte[]{0x7f}); + + ReferenceCountedRDBStoreByteArrayIterator iter = newIterator(); + final CloseableRawKeyValue val = iter.seek(new byte[]{0x55}); + assertTrue(iter.hasNext()); + InOrder verifier = inOrder(rocksDBIteratorMock); + + verify(rocksDBIteratorMock, times(1)).seekToFirst(); //at construct time + verify(rocksDBIteratorMock, never()).seekToLast(); + verifier.verify(rocksDBIteratorMock, times(1)).seek(any(byte[].class)); + verifier.verify(rocksDBIteratorMock, times(1)).isValid(); + verifier.verify(rocksDBIteratorMock, times(1)).key(); + verifier.verify(rocksDBIteratorMock, times(1)).value(); + assertArrayEquals(new byte[]{0x00}, val.getKey()); + assertArrayEquals(new byte[]{0x7f}, val.getValue()); + } + + @Test + public void testGettingTheKeyIfIteratorIsValid() throws Exception { + when(rocksDBIteratorMock.isValid()).thenReturn(true); + when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00}); + when(rocksDBIteratorMock.value()).thenReturn(new byte[]{0x00}); + ReferenceCountedRDBStoreByteArrayIterator iter = newIterator(); + byte[] key = null; + if (iter.hasNext()) { + try (CloseableRawKeyValue entry = iter.next()) { + key = entry.getKey(); + } + } + + InOrder verifier = inOrder(rocksDBIteratorMock); + + verifier.verify(rocksDBIteratorMock, times(1)).isValid(); + verifier.verify(rocksDBIteratorMock, times(1)).key(); + assertArrayEquals(new byte[]{0x00}, key); + } + + @Test + public void testGettingTheValueIfIteratorIsValid() throws Exception { + when(rocksDBIteratorMock.isValid()).thenReturn(true); + when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00}); + when(rocksDBIteratorMock.value()).thenReturn(new byte[]{0x7f}); + + try (ReferenceCountedRDBStoreByteArrayIterator iter = newIterator()) { + CloseableRawKeyValue entry; + byte[] key = null; + byte[] value = null; + if (iter.hasNext()) { + entry = iter.next(); + key = entry.getKey(); + value = entry.getValue(); + } + InOrder verifier = inOrder(rocksDBIteratorMock); + + verifier.verify(rocksDBIteratorMock, times(1)).isValid(); + verifier.verify(rocksDBIteratorMock, times(1)).key(); + assertArrayEquals(new byte[]{0x00}, key); + assertArrayEquals(new byte[]{0x7f}, value); + } + } + + @Test + public void testRemovingFromDBActuallyDeletesFromTable() throws Exception { + byte[] testKey = new byte[]{0x00}; + when(rocksDBIteratorMock.isValid()).thenReturn(true); + when(rocksDBIteratorMock.key()).thenReturn(testKey); + when(rocksDBIteratorMock.value()).thenReturn(testKey); + + ReferenceCountedRDBStoreByteArrayIterator iter = newIterator(null); + iter.next(); + iter.removeFromDB(); + + InOrder verifier = inOrder(rocksDBIteratorMock, rocksTableMock); + + verifier.verify(rocksDBIteratorMock, times(1)).isValid(); + verifier.verify(rocksTableMock, times(1)).delete(testKey); + } + + @Test + public void testRemoveFromDBWithoutDBTableSet() { + ReferenceCountedRDBStoreByteArrayIterator iter = newIterator(); + assertThrows(UnsupportedOperationException.class, + iter::removeFromDB); + } + + @Test + public void testCloseCloses() throws Exception { + ReferenceCountedRDBStoreByteArrayIterator iter = newIterator(); + iter.close(); + + verify(rocksDBIteratorMock, times(1)).close(); + } + + @Test + public void testNullPrefixedIterator() throws IOException { + ReferenceCountedRDBStoreByteArrayIterator iter = newIterator(null); + verify(rocksDBIteratorMock, times(1)).seekToFirst(); + clearInvocations(rocksDBIteratorMock); + when(rocksDBIteratorMock.isValid()).thenReturn(true); + iter.seekToFirst(); + verify(rocksDBIteratorMock, times(0)).isValid(); + verify(rocksDBIteratorMock, times(0)).key(); + verify(rocksDBIteratorMock, times(1)).seekToFirst(); + clearInvocations(rocksDBIteratorMock); + verify(rocksDBIteratorMock, times(0)).isValid(); + verify(rocksDBIteratorMock, times(0)).key(); + verify(rocksDBIteratorMock, times(0)).seekToFirst(); + iter.hasNext(); + clearInvocations(rocksDBIteratorMock); + assertTrue(iter.hasNext()); + verify(rocksDBIteratorMock, times(0)).isValid(); + verify(rocksDBIteratorMock, times(0)).key(); + + iter.seekToLast(); + verify(rocksDBIteratorMock, times(1)).seekToLast(); + + iter.close(); + } + + @Test + public void testNormalPrefixedIterator() throws IOException { + byte[] testPrefix = "sample".getBytes(StandardCharsets.UTF_8); + ReferenceCountedRDBStoreByteArrayIterator iter = newIterator(testPrefix); + verify(rocksDBIteratorMock, times(1)).seek(testPrefix); + clearInvocations(rocksDBIteratorMock); + when(rocksDBIteratorMock.isValid()).thenReturn(true); + when(rocksDBIteratorMock.key()).thenReturn(testPrefix); + iter.seekToFirst(); + verify(rocksDBIteratorMock, times(0)).isValid(); + verify(rocksDBIteratorMock, times(0)).key(); + verify(rocksDBIteratorMock, times(1)).seek(testPrefix); + clearInvocations(rocksDBIteratorMock); + iter.hasNext(); + verify(rocksDBIteratorMock, times(1)).isValid(); + verify(rocksDBIteratorMock, times(1)).key(); + verify(rocksDBIteratorMock, times(0)).seek(testPrefix); + clearInvocations(rocksDBIteratorMock); + assertTrue(iter.hasNext()); + // hasNext shouldn't make isValid() redundant calls. + verify(rocksDBIteratorMock, times(0)).isValid(); + verify(rocksDBIteratorMock, times(0)).key(); + Exception e = + assertThrows(Exception.class, () -> iter.seekToLast(), "Prefixed iterator does not support seekToLast"); + assertInstanceOf(UnsupportedOperationException.class, e); + + iter.close(); + } +} diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestReferenceCountedRDBStoreCodecBufferIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestReferenceCountedRDBStoreCodecBufferIterator.java new file mode 100644 index 000000000000..06b07caae415 --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestReferenceCountedRDBStoreCodecBufferIterator.java @@ -0,0 +1,482 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.hdds.StringUtils; +import org.apache.hadoop.hdds.utils.db.iterator.CloseableRawKeyValue; +import org.apache.hadoop.hdds.utils.db.iterator.ReferenceCountedRDBStoreCodecBufferIterator; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.InOrder; +import org.mockito.stubbing.Answer; +import org.rocksdb.RocksIterator; + +/** + * This test is similar to {@link TestReferenceCountedRDBStoreByteArrayIterator} + * except that this test is for {@link ReferenceCountedRDBStoreCodecBufferIterator}. + */ +public class TestReferenceCountedRDBStoreCodecBufferIterator { + + private RocksIterator rocksIteratorMock; + private ManagedRocksIterator managedRocksIterator; + private RDBTable rdbTableMock; + + @BeforeEach + public void setup() { + CodecBuffer.enableLeakDetection(); + rocksIteratorMock = mock(RocksIterator.class); + managedRocksIterator = newManagedRocksIterator(); + rdbTableMock = mock(RDBTable.class); + Logger.getLogger(ManagedRocksObjectUtils.class).setLevel(Level.DEBUG); + } + + ManagedRocksIterator newManagedRocksIterator() { + return new ManagedRocksIterator(rocksIteratorMock); + } + + ReferenceCountedRDBStoreCodecBufferIterator newIterator() { + return newIterator(1); + } + + ReferenceCountedRDBStoreCodecBufferIterator newIterator(int maxNumberOfBuffers) { + return new ReferenceCountedRDBStoreCodecBufferIterator(managedRocksIterator, null, null, maxNumberOfBuffers); + } + + ReferenceCountedRDBStoreCodecBufferIterator newIterator(CodecBuffer prefix) { + return new ReferenceCountedRDBStoreCodecBufferIterator( + managedRocksIterator, rdbTableMock, prefix, 1); + } + + Answer newAnswerInt(String name, int b) { + return newAnswer(name, (byte) b); + } + + Answer newAnswer(String name, byte... b) { + return invocation -> { + System.out.printf("answer %s: %s%n", name, StringUtils.bytes2Hex(b)); + Object[] args = invocation.getArguments(); + final ByteBuffer buffer = (ByteBuffer) args[0]; + return writeToBuffer(buffer, b); + }; + } + + private int writeToBuffer(ByteBuffer buffer, byte... bytesToWrite) { + buffer.clear(); + buffer.put(bytesToWrite); + buffer.flip(); + return bytesToWrite.length; + } + + @ParameterizedTest + @ValueSource(ints = {0, 1, 2, 3, 5, 10}) + public void testRDBStoreCodecBufferIterGetsFailBeyondMaxBuffers(int maxBuffers) + throws InterruptedException, TimeoutException { + List> vals = new ArrayList<>(); + when(rocksIteratorMock.isValid()).thenReturn(true); + AtomicInteger counter = new AtomicInteger(0); + + when(rocksIteratorMock.key(any())) + .thenAnswer(i -> writeToBuffer(i.getArgument(0), (byte)counter.getAndIncrement())); + when(rocksIteratorMock.value(any())) + .thenAnswer(i -> writeToBuffer(i.getArgument(0), (byte)counter.getAndIncrement())); + try (ReferenceCountedRDBStoreCodecBufferIterator iterator = newIterator(maxBuffers)) { + for (int i = 0; i < maxBuffers; i++) { + vals.add(iterator.next()); + } + assertEquals(Math.max(maxBuffers, 0), vals.size()); + ExecutorService executor = Executors.newSingleThreadExecutor(); + AtomicReference> nextThread = new AtomicReference<>(CompletableFuture.supplyAsync( + () -> { + CloseableRawKeyValue v = iterator.next(); + vals.add(v); + return true; + }, + executor)); + + if (maxBuffers < 1) { + // Number of max buffers is always going to be at least 1. We need at least 1 buffers one for getting the next + // value and one for returning the current value. + GenericTestUtils.waitFor(() -> nextThread.get().isDone() && nextThread.get().join(), 10, 100); + System.out.println(Thread.currentThread().getName()); + nextThread.set(CompletableFuture.supplyAsync( + () -> { + CloseableRawKeyValue v = iterator.next(); + vals.add(v); + return true; + }, + executor)); + } + assertEquals(Math.max(1, maxBuffers), vals.size()); + for (int i = 0; i < vals.size(); i++) { + assertEquals(2 * i, vals.get(i).getKey().getArray()[0]); + assertEquals(2 * i + 1, vals.get(i).getValue().getArray()[0]); + } + assertFalse(nextThread.get().isDone()); + int size = vals.size(); + vals.get(0).close(); + GenericTestUtils.waitFor(() -> nextThread.get().isDone() && nextThread.get().join(), 10, 100); + assertEquals(size + 1, vals.size()); + assertEquals(2 * size, vals.get(size).getKey().getArray()[0]); + assertEquals(2 * size + 1, vals.get(size).getValue().getArray()[0]); + for (CloseableRawKeyValue v : vals) { + v.close(); + } + executor.shutdown(); + } + } + + @Test + public void testForEachRemaining() throws Exception { + when(rocksIteratorMock.isValid()) + .thenReturn(true, true, true, false); + when(rocksIteratorMock.key(any())) + .then(newAnswerInt("key1", 0x00)) + .then(newAnswerInt("key2", 0x01)) + .then(newAnswerInt("key3", 0x02)) + .thenThrow(new NoSuchElementException()); + when(rocksIteratorMock.value(any())) + .then(newAnswerInt("val1", 0x7f)) + .then(newAnswerInt("val2", 0x7e)) + .then(newAnswerInt("val3", 0x7d)) + .thenThrow(new NoSuchElementException()); + + List> remaining = new ArrayList<>(); + try (ReferenceCountedRDBStoreCodecBufferIterator i = newIterator()) { + i.forEachRemaining(kvSupplier -> { + try { + remaining.add(RawKeyValue.create( + kvSupplier.getKey().getArray(), kvSupplier.getValue().getArray())); + } finally { + kvSupplier.close(); + } + }); + + System.out.println("remaining: " + remaining); + assertArrayEquals(new byte[]{0x00}, remaining.get(0).getKey()); + assertArrayEquals(new byte[]{0x7f}, remaining.get(0).getValue()); + assertArrayEquals(new byte[]{0x01}, remaining.get(1).getKey()); + assertArrayEquals(new byte[]{0x7e}, remaining.get(1).getValue()); + assertArrayEquals(new byte[]{0x02}, remaining.get(2).getKey()); + assertArrayEquals(new byte[]{0x7d}, remaining.get(2).getValue()); + } + + CodecTestUtil.gc(); + } + + @Test + public void testHasNextDoesNotDependsOnIsvalid() throws Exception { + when(rocksIteratorMock.isValid()).thenReturn(true, false); + + try (ReferenceCountedRDBStoreCodecBufferIterator i = newIterator()) { + assertTrue(i.hasNext()); + assertTrue(i.hasNext()); + i.next(); + assertFalse(i.hasNext()); + assertThrows(NoSuchElementException.class, i::next); + assertFalse(i.hasNext()); + } + + CodecTestUtil.gc(); + } + + @Test + public void testNextCallsIsValidThenGetsTheValueAndStepsToNext() + throws Exception { + when(rocksIteratorMock.isValid()).thenReturn(true); + InOrder verifier = inOrder(rocksIteratorMock); + try (ReferenceCountedRDBStoreCodecBufferIterator i = newIterator()) { + i.next(); + } + + verifier.verify(rocksIteratorMock).isValid(); + verifier.verify(rocksIteratorMock).key(any()); + verifier.verify(rocksIteratorMock).value(any()); + verifier.verify(rocksIteratorMock).next(); + + CodecTestUtil.gc(); + } + + @Test + public void testConstructorSeeksToFirstElement() throws Exception { + newIterator().close(); + + verify(rocksIteratorMock, times(1)).seekToFirst(); + + CodecTestUtil.gc(); + } + + @Test + public void testSeekToFirstSeeks() throws Exception { + try (ReferenceCountedRDBStoreCodecBufferIterator i = newIterator()) { + i.seekToFirst(); + } + verify(rocksIteratorMock, times(2)).seekToFirst(); + + CodecTestUtil.gc(); + } + + @Test + public void testSeekToLastSeeks() throws Exception { + try (ReferenceCountedRDBStoreCodecBufferIterator i = newIterator()) { + i.seekToLast(); + } + + verify(rocksIteratorMock, times(1)).seekToLast(); + + CodecTestUtil.gc(); + } + + @Test + public void testSeekWithInvalidValue() { + when(rocksIteratorMock.isValid()).thenReturn(false); + + try (ReferenceCountedRDBStoreCodecBufferIterator i = newIterator(); + CodecBuffer target = CodecBuffer.wrap(new byte[] {0x55}); + CloseableRawKeyValue valSupplier = i.seek(target)) { + assertFalse(i.hasNext()); + InOrder verifier = inOrder(rocksIteratorMock); + verify(rocksIteratorMock, times(1)).seekToFirst(); //at construct time + verify(rocksIteratorMock, never()).seekToLast(); + verifier.verify(rocksIteratorMock, times(1)) + .seek(any(ByteBuffer.class)); + verifier.verify(rocksIteratorMock, times(1)).isValid(); + verifier.verify(rocksIteratorMock, never()).key(any()); + verifier.verify(rocksIteratorMock, never()).value(any()); + assertNull(valSupplier); + } + } + + @Test + public void testSeekReturnsTheActualKey() throws Exception { + when(rocksIteratorMock.isValid()).thenReturn(true); + when(rocksIteratorMock.key(any())) + .then(newAnswerInt("key1", 0x00)); + when(rocksIteratorMock.value(any())) + .then(newAnswerInt("val1", 0x7f)); + + try (ReferenceCountedRDBStoreCodecBufferIterator i = newIterator(); + CodecBuffer target = CodecBuffer.wrap(new byte[] {0x55}); + CloseableRawKeyValue valSupplier = i.seek(target)) { + assertTrue(i.hasNext()); + InOrder verifier = inOrder(rocksIteratorMock); + verify(rocksIteratorMock, times(1)).seekToFirst(); //at construct time + verify(rocksIteratorMock, never()).seekToLast(); + verifier.verify(rocksIteratorMock, times(1)) + .seek(any(ByteBuffer.class)); + verifier.verify(rocksIteratorMock, times(1)).isValid(); + verifier.verify(rocksIteratorMock, times(1)).key(any()); + verifier.verify(rocksIteratorMock, times(1)).value(any()); + assertArrayEquals(new byte[] {0x00}, valSupplier.getKey().getArray()); + assertArrayEquals(new byte[] {0x7f}, valSupplier.getValue().getArray()); + } + + CodecTestUtil.gc(); + } + + @Test + public void testGettingTheKeyIfIteratorIsValid() throws Exception { + when(rocksIteratorMock.isValid()).thenReturn(true); + when(rocksIteratorMock.key(any())) + .then(newAnswerInt("key1", 0x00)); + + byte[] key = null; + try (ReferenceCountedRDBStoreCodecBufferIterator i = newIterator()) { + if (i.hasNext()) { + try (CloseableRawKeyValue kv = i.next()) { + key = kv.getKey().getArray(); + } + } + } + + InOrder verifier = inOrder(rocksIteratorMock); + + verifier.verify(rocksIteratorMock, times(1)).isValid(); + verifier.verify(rocksIteratorMock, times(1)).key(any()); + assertArrayEquals(new byte[]{0x00}, key); + + CodecTestUtil.gc(); + } + + @Test + public void testGettingTheValueIfIteratorIsValid() throws Exception { + when(rocksIteratorMock.isValid()).thenReturn(true); + when(rocksIteratorMock.key(any())) + .then(newAnswerInt("key1", 0x00)); + when(rocksIteratorMock.value(any())) + .then(newAnswerInt("val1", 0x7f)); + + byte[] key = null; + byte[] value = null; + try (ReferenceCountedRDBStoreCodecBufferIterator i = newIterator()) { + if (i.hasNext()) { + try (CloseableRawKeyValue entry = i.next()) { + key = entry.getKey().getArray(); + value = entry.getValue().getArray(); + } + } + } + + InOrder verifier = inOrder(rocksIteratorMock); + + verifier.verify(rocksIteratorMock, times(1)).isValid(); + verifier.verify(rocksIteratorMock, times(1)).key(any()); + assertArrayEquals(new byte[]{0x00}, key); + assertArrayEquals(new byte[]{0x7f}, value); + + CodecTestUtil.gc(); + } + + @Test + public void testRemovingFromDBActuallyDeletesFromTable() throws Exception { + final byte[] testKey = new byte[10]; + ThreadLocalRandom.current().nextBytes(testKey); + when(rocksIteratorMock.isValid()).thenReturn(true); + when(rocksIteratorMock.key(any())) + .then(newAnswer("key1", testKey)); + + try (ReferenceCountedRDBStoreCodecBufferIterator i = newIterator(null)) { + i.next(); + i.removeFromDB(); + } + + InOrder verifier = inOrder(rocksIteratorMock, rdbTableMock); + + verifier.verify(rocksIteratorMock, times(1)).isValid(); + verifier.verify(rdbTableMock, times(1)) + .delete(ByteBuffer.wrap(testKey)); + CodecTestUtil.gc(); + } + + @Test + public void testRemoveFromDBWithoutDBTableSet() throws Exception { + try (ReferenceCountedRDBStoreCodecBufferIterator i = newIterator()) { + assertThrows(UnsupportedOperationException.class, + i::removeFromDB); + } + + CodecTestUtil.gc(); + } + + @Test + public void testCloseCloses() throws Exception { + newIterator().close(); + + verify(rocksIteratorMock, times(1)).close(); + + CodecTestUtil.gc(); + } + + @Test + public void testNullPrefixedIterator() throws Exception { + + try (ReferenceCountedRDBStoreCodecBufferIterator i = newIterator()) { + verify(rocksIteratorMock, times(1)).seekToFirst(); + clearInvocations(rocksIteratorMock); + when(rocksIteratorMock.isValid()).thenReturn(true); + i.seekToFirst(); + verify(rocksIteratorMock, times(0)).isValid(); + verify(rocksIteratorMock, times(0)).key(any()); + verify(rocksIteratorMock, times(1)).seekToFirst(); + clearInvocations(rocksIteratorMock); + i.hasNext(); + verify(rocksIteratorMock, times(1)).isValid(); + verify(rocksIteratorMock, times(1)).key(any()); + clearInvocations(rocksIteratorMock); + + assertTrue(i.hasNext()); + // hasNext shouldn't make isValid() redundant calls. + verify(rocksIteratorMock, times(0)).isValid(); + verify(rocksIteratorMock, times(0)).key(any()); + + i.seekToLast(); + verify(rocksIteratorMock, times(1)).seekToLast(); + } + + CodecTestUtil.gc(); + } + + @Test + public void testNormalPrefixedIterator() throws Exception { + final byte[] prefixBytes = "sample".getBytes(StandardCharsets.UTF_8); + try (ReferenceCountedRDBStoreCodecBufferIterator i = newIterator( + CodecBuffer.wrap(prefixBytes))) { + final ByteBuffer prefix = ByteBuffer.wrap(prefixBytes); + verify(rocksIteratorMock, times(1)).seek(prefix); + clearInvocations(rocksIteratorMock); + when(rocksIteratorMock.isValid()).thenReturn(true); + when(rocksIteratorMock.key(any())) + .then(newAnswer("key1", prefixBytes)); + i.seekToFirst(); + verify(rocksIteratorMock, times(0)).isValid(); + verify(rocksIteratorMock, times(0)).key(any()); + verify(rocksIteratorMock, times(1)).seek(prefix); + clearInvocations(rocksIteratorMock); + i.hasNext(); + verify(rocksIteratorMock, times(1)).isValid(); + verify(rocksIteratorMock, times(1)).key(any()); + verify(rocksIteratorMock, times(0)).seek(prefix); + clearInvocations(rocksIteratorMock); + + assertTrue(i.hasNext()); + // Ensure redundant native call is made since key and value already have been fetched as part of seek + verify(rocksIteratorMock, times(0)).isValid(); + verify(rocksIteratorMock, times(0)).key(any()); + + Exception e = + assertThrows(Exception.class, () -> i.seekToLast(), "Prefixed iterator does not support seekToLast"); + assertInstanceOf(UnsupportedOperationException.class, e); + } + + CodecTestUtil.gc(); + } +}