diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java index e9108112bd49..4d83e64d6ea1 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java @@ -50,7 +50,7 @@ * A buffer used by {@link Codec} * for supporting RocksDB direct {@link ByteBuffer} APIs. */ -public class CodecBuffer implements UncheckedAutoCloseable { +public class CodecBuffer implements UncheckedAutoCloseable, Comparable { private static final Logger LOG = LoggerFactory.getLogger(CodecBuffer.class); private static final ByteBufAllocator POOL = PooledByteBufAllocator.DEFAULT; @@ -376,6 +376,19 @@ public boolean startsWith(CodecBuffer prefix) { return buf.slice(buf.readerIndex(), length).equals(prefix.buf); } + public int compareTo(byte[] other) { + Objects.requireNonNull(other, "other == null"); + final int size = Math.min(readableBytes(), other.length); + for (int i = 0; i < size; i++) { + final int b1 = buf.getByte(buf.readerIndex() + i) & 0xff; + final int b2 = other[i] & 0xff; + if (b1 != b2) { + return b1 - b2; + } + } + return readableBytes() - other.length; + } + /** @return an {@link InputStream} reading from this buffer. */ public InputStream getInputStream() { return new ByteBufInputStream(buf.duplicate()); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java index ec9359fb3725..a3c12349a899 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java @@ -89,6 +89,21 @@ public void deleteWithBatch(BatchOperation batch, KEY key) "version."); } + @Override + public KeyValueSpliterator spliterator(int maxParallelism, boolean closeOnException) { + throw new UnsupportedOperationException("Iterating tables directly is not" + + " supported for datanode containers due to differing schema " + + "version."); + } + + @Override + public KeyValueSpliterator spliterator(KEY startKey, KEY prefix, int maxParallelism, + boolean closeOnException) { + throw new UnsupportedOperationException("Iterating tables directly is not" + + " supported for datanode containers due to differing schema " + + "version."); + } + @Override public String getName() { return table.getName(); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BaseRDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BaseRDBTable.java new file mode 100644 index 000000000000..7ff912b3df66 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BaseRDBTable.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import java.io.IOException; +import java.util.List; +import org.rocksdb.LiveFileMetaData; + +/** + * Base table interface for Rocksdb. + */ +public interface BaseRDBTable extends Table { + List getTableSstFiles() throws IOException; +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/ByteArrayCodec.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/ByteArrayCodec.java index 7ae65652d18f..c40a6aeea29e 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/ByteArrayCodec.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/ByteArrayCodec.java @@ -17,17 +17,24 @@ package org.apache.hadoop.hdds.utils.db; +import java.util.Comparator; + /** * No-op codec for byte arrays. */ public final class ByteArrayCodec implements Codec { private static final Codec INSTANCE = new ByteArrayCodec(); + private static final Comparator COMPARATOR = new ByteWiseComparator(); public static Codec get() { return INSTANCE; } + public static Comparator getComparator() { + return COMPARATOR; + } + private ByteArrayCodec() { // singleton } @@ -51,4 +58,18 @@ public byte[] fromPersistedFormat(byte[] bytes) { public byte[] copyObject(byte[] bytes) { return bytes; } + + private static class ByteWiseComparator implements Comparator { + @Override + public int compare(byte[] o1, byte[] o2) { + int length = Math.min(o1.length, o2.length); + for (int i = 0; i < length; i++) { + int compareValue = Integer.compareUnsigned(Byte.toUnsignedInt(o1[i]), Byte.toUnsignedInt(o2[i])); + if (compareValue != 0) { + return compareValue; + } + } + return Integer.compare(o1.length, o2.length); + } + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java index 75104a55ed72..ac2c28938791 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java @@ -21,6 +21,8 @@ import java.util.NoSuchElementException; import java.util.function.Consumer; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; +import org.apache.ratis.util.ReferenceCountedObject; +import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,30 +32,34 @@ * @param the raw type. */ abstract class RDBStoreAbstractIterator - implements TableIterator> { + implements TableIterator> { private static final Logger LOG = LoggerFactory.getLogger(RDBStoreAbstractIterator.class); private final ManagedRocksIterator rocksDBIterator; private final RDBTable rocksDBTable; - private Table.KeyValue currentEntry; + private ReferenceCountedObject> currentEntry; + private RawKeyValue previousKeyValue; // This is for schemas that use a fixed-length // prefix for each key. private final RAW prefix; + private Boolean hasNext; + private boolean closed; RDBStoreAbstractIterator(ManagedRocksIterator iterator, RDBTable table, RAW prefix) { this.rocksDBIterator = iterator; this.rocksDBTable = table; this.prefix = prefix; + this.currentEntry = null; + this.hasNext = false; + this.closed = false; + this.previousKeyValue = null; } - /** @return the key for the current entry. */ - abstract RAW key(); - /** @return the {@link Table.KeyValue} for the current entry. */ - abstract Table.KeyValue getKeyValue(); + abstract ReferenceCountedObject> getKeyValue(); /** Seek to the given key. */ abstract void seek0(RAW key); @@ -78,32 +84,52 @@ final RAW getPrefix() { @Override public final void forEachRemaining( - Consumer> action) { + Consumer> action) { while (hasNext()) { - action.accept(next()); + AutoCloseableRawKeyValue entry = next(); + action.accept(entry); + } + } + + private void releaseEntry() { + if (currentEntry != null) { + currentEntry.release(); } + currentEntry = null; + hasNext = null; } private void setCurrentEntry() { - if (rocksDBIterator.get().isValid()) { + boolean isValid = !closed && rocksDBIterator.get().isValid(); + if (isValid) { currentEntry = getKeyValue(); + currentEntry.retain(); } else { currentEntry = null; } + setHasNext(isValid, currentEntry); + } + + public void setHasNext(boolean isValid, ReferenceCountedObject> entry) { + this.hasNext = isValid && (prefix == null || startsWithPrefix(entry.get().getKey())); } @Override public final boolean hasNext() { - return rocksDBIterator.get().isValid() && - (prefix == null || startsWithPrefix(key())); + if (hasNext == null) { + setCurrentEntry(); + } + return hasNext; } @Override - public final Table.KeyValue next() { - setCurrentEntry(); - if (currentEntry != null) { + public final AutoCloseableRawKeyValue next() { + if (hasNext()) { + AutoCloseableRawKeyValue entry = new AutoCloseableRawKeyValue<>(currentEntry); + this.previousKeyValue = currentEntry.get(); rocksDBIterator.get().next(); - return currentEntry; + releaseEntry(); + return entry; } throw new NoSuchElementException("RocksDB Store has no more elements"); } @@ -115,7 +141,7 @@ public final void seekToFirst() { } else { seek0(prefix); } - setCurrentEntry(); + releaseEntry(); } @Override @@ -125,14 +151,19 @@ public final void seekToLast() { } else { throw new UnsupportedOperationException("seekToLast: prefix != null"); } - setCurrentEntry(); + releaseEntry(); } @Override - public final Table.KeyValue seek(RAW key) { + public final AutoCloseableRawKeyValue seek(RAW key) { seek0(key); + releaseEntry(); setCurrentEntry(); - return currentEntry; + // Current entry should be only closed when the next() and thus closing the returned entry should be a noop. + if (hasNext()) { + return new AutoCloseableRawKeyValue<>(currentEntry); + } + return null; } @Override @@ -140,8 +171,8 @@ public final void removeFromDB() throws IOException { if (rocksDBTable == null) { throw new UnsupportedOperationException("remove"); } - if (currentEntry != null) { - delete(currentEntry.getKey()); + if (previousKeyValue != null) { + delete(previousKeyValue.getKey()); } else { LOG.info("Failed to removeFromDB: currentEntry == null"); } @@ -150,5 +181,21 @@ public final void removeFromDB() throws IOException { @Override public void close() { rocksDBIterator.close(); + closed = true; + releaseEntry(); + } + + public static final class AutoCloseableRawKeyValue extends RawKeyValue implements AutoCloseable { + private final UncheckedAutoCloseableSupplier> keyValue; + + private AutoCloseableRawKeyValue(ReferenceCountedObject> kv) { + super(kv.get().getKey(), kv.get().getValue()); + this.keyValue = kv.retainAndReleaseOnClose(); + } + + @Override + public void close() { + keyValue.close(); + } } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java index 62303c8a3bb7..6b653f7eadab 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; +import org.apache.ratis.util.ReferenceCountedObject; /** * RocksDB store iterator using the byte[] API. @@ -33,14 +34,10 @@ class RDBStoreByteArrayIterator extends RDBStoreAbstractIterator { } @Override - byte[] key() { - return getRocksDBIterator().get().key(); - } - - @Override - Table.KeyValue getKeyValue() { + ReferenceCountedObject> getKeyValue() { final ManagedRocksIterator i = getRocksDBIterator(); - return RawKeyValue.create(i.get().key(), i.get().value()); + RawKeyValue rawKV = RawKeyValue.create(i.get().key(), i.get().value()); + return ReferenceCountedObject.wrap(rawKV); } @Override diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreCodecBufferIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreCodecBufferIterator.java index 107b71ee5045..4621c18804a1 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreCodecBufferIterator.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreCodecBufferIterator.java @@ -18,31 +18,53 @@ package org.apache.hadoop.hdds.utils.db; import java.io.IOException; +import java.util.Deque; import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Set; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import org.apache.commons.lang3.exception.UncheckedInterruptedException; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; +import org.apache.hadoop.util.Sets; import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.ReferenceCountedObject; /** - * Implement {@link RDBStoreAbstractIterator} using {@link CodecBuffer}. - */ + * An implementation of {@link RDBStoreAbstractIterator} that uses {@link CodecBuffer} + * for efficient memory management when iterating over RocksDB entries. + * This iterator employs a buffer pooling strategy to minimize memory allocations + * during iteration. Key and value buffers are pre-allocated and reused through a + * reference-counting mechanism, which significantly reduces GC pressure when + * processing large datasets. + * Key features: + **/ class RDBStoreCodecBufferIterator extends RDBStoreAbstractIterator { - private final Buffer keyBuffer; - private final Buffer valueBuffer; - private final AtomicBoolean closed = new AtomicBoolean(); + private final BlockingDeque> availableBufferStack; + private final Set> inUseBuffers; + private final AtomicReference closed = new AtomicReference<>(false); RDBStoreCodecBufferIterator(ManagedRocksIterator iterator, RDBTable table, - CodecBuffer prefix) { + CodecBuffer prefix, int maxNumberOfBuffersInMemory) { super(iterator, table, prefix); - + // We need atleast 1 buffers one for setting next value and one for sending the current value. + maxNumberOfBuffersInMemory = Math.max(1, maxNumberOfBuffersInMemory); final String name = table != null ? table.getName() : null; - this.keyBuffer = new Buffer( - new CodecBuffer.Capacity(name + "-iterator-key", 1 << 10), - buffer -> getRocksDBIterator().get().key(buffer)); - this.valueBuffer = new Buffer( - new CodecBuffer.Capacity(name + "-iterator-value", 4 << 10), - buffer -> getRocksDBIterator().get().value(buffer)); + this.availableBufferStack = new LinkedBlockingDeque<>(maxNumberOfBuffersInMemory); + this.inUseBuffers = Sets.newConcurrentHashSet(); + for (int i = 0; i < maxNumberOfBuffersInMemory; i++) { + Buffer keyBuffer = new Buffer( + new CodecBuffer.Capacity(name + "-iterator-key-" + i, 1 << 10), + buffer -> getRocksDBIterator().get().key(buffer)); + Buffer valueBuffer = new Buffer( + new CodecBuffer.Capacity(name + "-iterator-value-" + i, 4 << 10), + buffer -> getRocksDBIterator().get().value(buffer)); + availableBufferStack.add(new RawKeyValue<>(keyBuffer, valueBuffer)); + } + seekToFirst(); } @@ -50,16 +72,51 @@ void assertOpen() { Preconditions.assertTrue(!closed.get(), "Already closed"); } - @Override - CodecBuffer key() { + private V getFromDeque(BlockingDeque deque, Set inUseSet) { + V popped; + do { + try { + popped = deque.poll(1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new UncheckedInterruptedException(e); + } + } while (popped == null); assertOpen(); - return keyBuffer.getFromDb(); + inUseSet.add(popped); + return popped; + } + + private ReferenceCountedObject> getReferenceCountedBuffer( + RawKeyValue key, Deque> stack, Set> inUseSet, + Function, RawKeyValue> transformer) { + RawKeyValue value = transformer.apply(key); + return ReferenceCountedObject.wrap(value, () -> { + }, completelyReleased -> { + if (!completelyReleased) { + return; + } + closed.updateAndGet((prev) -> { + // If already closed the data structure should not be manipulated since the buffer would have already been + // closed. + if (!prev) { + //Entire block done inside this code block to avoid race condition with close() method. + //Remove from the set before adding it back to the stack. Otherwise there could be a race condition with + // #getFromDeque function. + inUseSet.remove(key); + stack.push(key); + } + return prev; + }); + }); } @Override - Table.KeyValue getKeyValue() { + ReferenceCountedObject> getKeyValue() { assertOpen(); - return Table.newKeyValue(key(), valueBuffer.getFromDb()); + RawKeyValue kvBuffer = getFromDeque(availableBufferStack, inUseBuffers); + Function, RawKeyValue> transformer = + kv -> new RawKeyValue<>(kv.getKey().getFromDb(), kv.getValue().getFromDb()); + return getReferenceCountedBuffer(kvBuffer, availableBufferStack, inUseBuffers, transformer); } @Override @@ -87,13 +144,26 @@ boolean startsWithPrefix(CodecBuffer key) { return key.startsWith(prefix); } + private void release(Deque valueStack, Set inUseSet, Function releaser) { + while (!valueStack.isEmpty()) { + V popped = valueStack.pop(); + releaser.apply(popped); + } + for (V inUseValue : inUseSet) { + releaser.apply(inUseValue); + } + } + @Override public void close() { if (closed.compareAndSet(false, true)) { super.close(); Optional.ofNullable(getPrefix()).ifPresent(CodecBuffer::release); - keyBuffer.release(); - valueBuffer.release(); + release(availableBufferStack, inUseBuffers, kv -> { + kv.getKey().release(); + kv.getValue().release(); + return null; + }); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java index f72b0084bdc5..f7d3e83b6bf7 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java @@ -17,17 +17,24 @@ package org.apache.hadoop.hdds.utils.db; +import static org.apache.hadoop.hdds.utils.db.RocksDatabase.bytes2String; + import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Spliterator; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.utils.MetadataKeyFilters; +import org.apache.hadoop.hdds.utils.db.RDBStoreAbstractIterator.AutoCloseableRawKeyValue; import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; import org.apache.hadoop.util.Time; +import org.rocksdb.LiveFileMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +44,7 @@ * metadata store content. All other user's using Table should use TypedTable. */ @InterfaceAudience.Private -class RDBTable implements Table { +class RDBTable implements BaseRDBTable { private static final Logger LOG = LoggerFactory.getLogger(RDBTable.class); @@ -94,7 +101,7 @@ public void putWithBatch(BatchOperation batch, byte[] key, byte[] value) @Override public boolean isEmpty() throws IOException { - try (TableIterator> keyIter = iterator()) { + try (TableIterator> keyIter = iterator()) { keyIter.seekToFirst(); return !keyIter.hasNext(); } @@ -210,22 +217,38 @@ public void deleteWithBatch(BatchOperation batch, byte[] key) } @Override - public TableIterator> iterator() + public TableIterator> iterator() throws IOException { return iterator((byte[])null); } @Override - public TableIterator> iterator(byte[] prefix) + public TableIterator> iterator(byte[] prefix) + throws IOException { + return new RDBStoreByteArrayIterator(db.newIterator(family, false), this, prefix); + } + + @Override + public KeyValueSpliterator spliterator(int maxParallelism, boolean closeOnException) throws IOException { - return new RDBStoreByteArrayIterator(db.newIterator(family, false), this, - prefix); + return spliterator(null, null, maxParallelism, closeOnException); } - TableIterator> iterator( + @Override + public KeyValueSpliterator spliterator(byte[] startKey, byte[] prefix, int maxParallelism, + boolean closeOnException) throws IOException { + return newByteArraySpliterator(prefix, startKey, maxParallelism, closeOnException); + } + + TableIterator> iterator( CodecBuffer prefix) throws IOException { + return iterator(prefix, 1); + } + + TableIterator> iterator( + CodecBuffer prefix, int maxNumberOfBuffers) throws IOException { return new RDBStoreCodecBufferIterator(db.newIterator(family, false), - this, prefix); + this, prefix, maxNumberOfBuffers); } @Override @@ -262,7 +285,7 @@ public List> getSequentialRangeKVs(byte[] startKey, @Override public void deleteBatchWithPrefix(BatchOperation batch, byte[] prefix) throws IOException { - try (TableIterator> iter + try (TableIterator> iter = iterator(prefix)) { while (iter.hasNext()) { deleteWithBatch(batch, iter.next().getKey()); @@ -273,7 +296,7 @@ public void deleteBatchWithPrefix(BatchOperation batch, byte[] prefix) @Override public void dumpToFileWithPrefix(File externalFile, byte[] prefix) throws IOException { - try (TableIterator> iter = iterator(prefix); + try (TableIterator> iter = iterator(prefix); RDBSstFileWriter fileWriter = new RDBSstFileWriter(externalFile)) { while (iter.hasNext()) { final KeyValue entry = iter.next(); @@ -298,7 +321,7 @@ private List> getRangeKVs(byte[] startKey, "Invalid count given " + count + ", count must be greater than 0"); } final List> result = new ArrayList<>(); - try (TableIterator> it + try (TableIterator> it = iterator(prefix)) { if (startKey == null) { it.seekToFirst(); @@ -357,4 +380,76 @@ && get(startKey) == null) { } return result; } + + private RawSpliterator newByteArraySpliterator(byte[] prefix, byte[] startKey, + int maxParallelism, boolean closeOnException) throws IOException { + return new ByteArrayRawSpliterator(prefix, startKey, maxParallelism, closeOnException); + } + + @Override + public List getTableSstFiles() throws IOException { + return this.db.getSstFileList().stream() + .filter(liveFileMetaData -> getName().equals(bytes2String(liveFileMetaData.columnFamilyName()))) + .collect(Collectors.toList()); + } + + private final class ByteArrayRawSpliterator extends RawSpliterator { + + private ByteArrayRawSpliterator(byte[] prefix, byte[] startKey, int maxParallelism, boolean closeOnException) + throws IOException { + super(prefix, startKey, maxParallelism, closeOnException); + initializeIterator(); + } + + private ByteArrayRawSpliterator(byte[] prefix, byte[] startKey, int maxParallelism, boolean closeOnException, + List boundKeys) throws IOException { + super(prefix, startKey, maxParallelism, closeOnException, boundKeys); + initializeIterator(); + } + + @Override + KeyValue convert(RawKeyValue kv) { + final int rawSize = kv.getValue().length; + return Table.newKeyValue(kv.getKey(), kv.getValue(), rawSize); + } + + @Override + List getBoundaryKeys(byte[] prefix, byte[] startKey) throws IOException { + return getTableSstFiles().stream() + .flatMap(liveFileMetaData -> Stream.of(liveFileMetaData.smallestKey(), liveFileMetaData.largestKey())) + .filter(value -> { + if (value.length < prefix.length) { + return false; + } + for (int i = 0; i < prefix.length; i++) { + if (value[i] != prefix[i]) { + return false; + } + } + return true; + }).filter(value -> ByteArrayCodec.getComparator().compare(value, startKey) >= 0) + .collect(Collectors.toList()); + } + + @Override + int compare(byte[] value1, byte[] value2) { + return ByteArrayCodec.getComparator().compare(value1, value2); + } + + @Override + TableIterator> getRawIterator( + byte[] prefix, byte[] startKey, int maxParallelism) throws IOException { + TableIterator> itr = iterator(prefix); + if (startKey != null) { + itr.seek(startKey); + } + return itr; + } + + @Override + Spliterator> createNewSpliterator(byte[] prefix, byte[] startKey, int maxParallelism, + boolean closeOnException, List boundaryKeys) throws IOException { + return new ByteArrayRawSpliterator(prefix, startKey, maxParallelism, closeOnException, boundaryKeys); + } + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RawKeyValue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RawKeyValue.java index c7250617b6f9..ef8a0fe8be9a 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RawKeyValue.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RawKeyValue.java @@ -25,7 +25,7 @@ * * @param The raw type. */ -public abstract class RawKeyValue implements KeyValue { +public class RawKeyValue implements KeyValue { private final RAW key; private final RAW value; @@ -62,7 +62,7 @@ public byte[] getValue() { } } - private RawKeyValue(RAW key, RAW value) { + public RawKeyValue(RAW key, RAW value) { this.key = key; this.value = value; } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RawSpliterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RawSpliterator.java new file mode 100644 index 000000000000..01955a30ea8c --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RawSpliterator.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import java.io.IOException; +import java.util.List; +import java.util.Spliterator; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.hadoop.hdds.utils.db.RDBStoreAbstractIterator.AutoCloseableRawKeyValue; +import org.apache.ratis.util.ReferenceCountedObject; + +/** + * An abstract implementation of {@link Table.KeyValueSpliterator} designed + * for iterating and splitting over raw key-value pairs retrieved via a + * {@link TableIterator}. + * + *

This class manages asynchronous raw iterator resources and provides + * functionality for converting raw key-value pairs into a structured + * {@link Table.KeyValue} representation. It also allows controlled splitting + * of the underlying iterator for parallel processing, while ensuring proper + * resource handling and thread safety. + * + * @param The raw representation type of the key-value pair. + * @param The type of key in the key-value pair. + * @param The type of value in the key-value pair. + */ +abstract class RawSpliterator implements Table.KeyValueSpliterator { + + private ReferenceCountedObject>> rawIterator; + private final KEY keyPrefix; + private final KEY startKey; + private final AtomicInteger maxNumberOfAdditionalSplits; + private final Lock lock; + private final AtomicReference closeException = new AtomicReference<>(); + private boolean closed; + private final boolean closeOnException; + private boolean initialized; + private List boundaryKeys; + private int boundIndex; + + abstract Table.KeyValue convert(RawKeyValue kv) throws IOException; + + /** + * Retrieves a list of boundary keys based on the provided prefix and start key. + * These boundary keys can be used to split data into smaller ranges when processing. + * + * @param prefix the prefix key that logically groups the keys of interest + * @param start the key from which to start retrieving boundary keys + * @return a list of byte arrays representing the boundary keys. + * @throws IOException if an I/O error occurs while retrieving the boundary keys + */ + abstract List getBoundaryKeys(KEY prefix, KEY start) throws IOException; + + abstract int compare(RAW value1, byte[] value2); + + abstract TableIterator> getRawIterator( + KEY prefix, KEY start, int maxParallelism) throws IOException; + + abstract Spliterator> createNewSpliterator(KEY prefix, byte[] start, int maxParallelism, + boolean closeOnEx, List boundKeys) throws IOException; + + RawSpliterator(KEY prefix, KEY startKey, int maxParallelism, boolean closeOnException) throws IOException { + this(prefix, startKey, maxParallelism, closeOnException, null); + } + + RawSpliterator(KEY keyPrefix, KEY startKey, int maxParallelism, boolean closeOnException, + List boundaryKeys) throws IOException { + this.keyPrefix = keyPrefix; + this.startKey = startKey; + this.closeOnException = closeOnException; + this.lock = new ReentrantLock(); + this.maxNumberOfAdditionalSplits = new AtomicInteger(maxParallelism); + this.initialized = false; + this.closed = false; + if (boundaryKeys == null) { + this.boundaryKeys = getBoundaryKeys(keyPrefix, startKey).stream().sorted(ByteArrayCodec.getComparator()).collect( + Collectors.toList()); + this.boundaryKeys.add(null); + } else { + this.boundaryKeys = boundaryKeys; + } + this.boundIndex = 0; + } + + synchronized void initializeIterator() throws IOException { + if (initialized) { + return; + } + TableIterator> itr = getRawIterator(keyPrefix, + startKey, maxNumberOfAdditionalSplits.decrementAndGet()); + try { + this.rawIterator = ReferenceCountedObject.wrap(itr, () -> { }, + (completelyReleased) -> { + if (completelyReleased) { + closeRawIteratorWithLock(); + } + this.maxNumberOfAdditionalSplits.incrementAndGet(); + }); + this.rawIterator.retain(); + } catch (Throwable e) { + itr.close(); + throw e; + } + initialized = true; + } + + @Override + public boolean tryAdvance(Consumer> action) { + lock.lock(); + AutoCloseableRawKeyValue kv; + try { + if (!closed && this.rawIterator.get().hasNext()) { + kv = rawIterator.get().next(); + while (boundaryKeys.get(boundIndex) != null && compare(kv.getKey(), boundaryKeys.get(boundIndex)) >= 0) { + boundIndex++; + if (boundIndex >= boundaryKeys.size()) { + closeRawIterator(); + return false; + } + } + } else { + closeRawIterator(); + return false; + } + } finally { + lock.unlock(); + } + try (AutoCloseableRawKeyValue keyValue = kv) { + if (keyValue != null) { + action.accept(convert(keyValue)); + return true; + } + return false; + } catch (Throwable e) { + if (closeOnException) { + closeRawIteratorWithLock(); + } + throw new IllegalStateException("Failed next()", e); + } + } + + @Override + public int characteristics() { + return Spliterator.DISTINCT; + } + + @Override + public Spliterator> trySplit() { + if (boundIndex < boundaryKeys.size() - 1) { + lock.lock(); + try { + List boundaryKeysTmp = this.boundaryKeys; + int totalNumberOfElements = boundaryKeysTmp.size() - boundIndex; + if (totalNumberOfElements > 1) { + int splitIndex = boundIndex + totalNumberOfElements / 2; + this.boundaryKeys = boundaryKeysTmp.subList(0, splitIndex); + List nextSplitBoundaryKeys = boundaryKeysTmp.subList(splitIndex, boundaryKeysTmp.size()); + return createNewSpliterator(this.keyPrefix, this.boundaryKeys.get(this.boundaryKeys.size() - 1), + maxNumberOfAdditionalSplits.get() + 1, closeOnException, nextSplitBoundaryKeys); + } + } catch (IOException ignored) { + // In case of exception, we can fall back to delegated deserialization for the spliterator. + } finally { + lock.unlock(); + } + } + int val = maxNumberOfAdditionalSplits.decrementAndGet(); + if (val >= 0) { + try { + this.rawIterator.retain(); + } catch (Exception e) { + maxNumberOfAdditionalSplits.incrementAndGet(); + return null; + } + return this; + } else { + maxNumberOfAdditionalSplits.incrementAndGet(); + } + return null; + } + + private void closeRawIterator() { + if (!closed) { + try { + closed = true; + this.rawIterator.get().close(); + } catch (IOException e) { + closeException.set(e); + } + } + } + + private void closeRawIteratorWithLock() { + if (!closed) { + this.lock.lock(); + try { + closeRawIterator(); + } finally { + this.lock.unlock(); + } + } + } + + @Override + public void close() throws IOException { + this.rawIterator.release(); + if (closeException.get() != null) { + throw closeException.get(); + } + } + + @Override + public long estimateSize() { + return Long.MAX_VALUE; + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java index 9552f327e2e8..8ca7258041b7 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java @@ -17,12 +17,14 @@ package org.apache.hadoop.hdds.utils.db; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Spliterator; import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.hdds.annotation.InterfaceStability; import org.apache.hadoop.hdds.utils.MetadataKeyFilters; @@ -170,6 +172,34 @@ default VALUE getReadCopy(KEY key) throws IOException { TableIterator> iterator(KEY prefix) throws IOException; + /** + * Creates a spliterator over the key-value pairs in the table with the specified + * maximum parallelism and exception handling behavior. + * + * @param maxParallelism the maximum number of threads that can be used for parallel processing. + * @param closeOnException if true, the spliterator will automatically close resources + * if an exception occurs during processing. + * @return a {@code Table.KeyValueSpliterator} instance for iterating over the key-value pairs. + * @throws IOException if an I/O error occurs during spliterator creation. + */ + Table.KeyValueSpliterator spliterator(int maxParallelism, boolean closeOnException) + throws IOException; + + /** + * Returns a spliterator that iterates over the key-value pairs starting from a given key + * and within a specified key prefix. The spliterator can be parallelized up to a given level + * of parallelism and may optionally close resources in case of exceptions. + * + * @param startKey the starting key from which iteration begins + * @param prefix the key prefix used to limit the keys being iterated + * @param maxParallelism the maximum level of parallelism allowed for the iteration + * @param closeOnException if true, closes resources when an exception occurs during iteration + * @return a spliterator that supports parallel iteration over the key-value pairs + * @throws IOException if an I/O error occurs during the creation of the spliterator + */ + Table.KeyValueSpliterator spliterator(KEY startKey, KEY prefix, int maxParallelism, + boolean closeOnException) throws IOException; + /** * Returns the Name of this Table. * @return - Table Name. @@ -419,4 +449,19 @@ public int hashCode() { interface KeyValueIterator extends TableIterator> { } + + /** + * A generic interface extending {@link Spliterator} and {@link Closeable}, + * designed for iterating over and splitting key-value pairs. + * + * @param The type of keys in the key-value pairs. + * @param The type of values in the key-value pairs. + * + * This interface facilitates traversal and splitting of key-value pairs + * while also providing resource management capabilities via {@link Closeable}. + * Implementations must handle key-value pair splitting to enable parallel processing, + * and ensure proper resource management when the spliterator is closed. + */ + interface KeyValueSpliterator extends Spliterator>, Closeable { + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index dcf482aad2ad..b60d0cc43d07 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -30,9 +30,13 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Spliterator; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.MetadataKeyFilters; import org.apache.hadoop.hdds.utils.TableCacheMetrics; +import org.apache.hadoop.hdds.utils.db.RDBStoreAbstractIterator.AutoCloseableRawKeyValue; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; import org.apache.hadoop.hdds.utils.db.cache.CacheResult; import org.apache.hadoop.hdds.utils.db.cache.CacheValue; @@ -43,6 +47,7 @@ import org.apache.hadoop.hdds.utils.db.cache.TableNoCache; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.function.CheckedBiFunction; +import org.rocksdb.LiveFileMetaData; /** * Strongly typed table implementation. @@ -53,7 +58,7 @@ * @param type of the keys in the store. * @param type of the values in the store. */ -public class TypedTable implements Table { +public class TypedTable implements BaseRDBTable { private static final long EPOCH_DEFAULT = -1L; static final int BUFFER_SIZE_DEFAULT = 4 << 10; // 4 KB @@ -399,6 +404,21 @@ public Table.KeyValueIterator iterator() throws IOException { return iterator(null); } + @Override + public Table.KeyValueSpliterator spliterator(int maxParallelism, boolean closeOnException) + throws IOException { + return spliterator(null, null, maxParallelism, closeOnException); + } + + @Override + public Table.KeyValueSpliterator spliterator(KEY startKey, KEY prefix, int maxParallelism, + boolean closeOnException) throws IOException { + if (supportCodecBuffer) { + return newCodecBufferSpliterator(prefix, startKey, maxParallelism, closeOnException); + } + return newByteArraySpliterator(prefix, startKey, maxParallelism, closeOnException); + } + @Override public Table.KeyValueIterator iterator(KEY prefix) throws IOException { @@ -532,6 +552,38 @@ TableCache getCache() { return cache; } + @Override + public List getTableSstFiles() throws IOException { + return rawTable.getTableSstFiles(); + } + + private List getBoundaryKeys(KEY prefix, KEY startKey) throws IOException { + return getTableSstFiles().stream() + .flatMap(liveFileMetaData -> Stream.of(liveFileMetaData.smallestKey(), liveFileMetaData.largestKey())) + .filter(value -> { + try { + byte[] prefixByteArray = encodeKey(prefix); + if (value.length < prefixByteArray.length) { + return false; + } + for (int i = 0; i < prefixByteArray.length; i++) { + if (value[i] != prefixByteArray[i]) { + return false; + } + } + } catch (IOException e) { + return false; + } + return true; + }).filter(value -> { + try { + return ByteArrayCodec.getComparator().compare(value, encodeKey(startKey)) >= 0; + } catch (IOException e) { + return false; + } + }).collect(Collectors.toList()); + } + /** * Key value implementation for strongly typed tables. */ @@ -554,8 +606,8 @@ public VALUE getValue() throws IOException { } } - RawIterator newCodecBufferTableIterator( - TableIterator> i) { + private RawIterator newCodecBufferTableIterator( + TableIterator> i) { return new RawIterator(i) { @Override AutoCloseSupplier convert(KEY key) throws IOException { @@ -574,7 +626,7 @@ public CodecBuffer get() { } @Override - KeyValue convert(KeyValue raw) + KeyValue convert(RawKeyValue raw) throws IOException { final int rawSize = raw.getValue().readableBytes(); final KEY key = keyCodec.fromCodecBuffer(raw.getKey()); @@ -584,12 +636,22 @@ KeyValue convert(KeyValue raw) }; } + private RawSpliterator newCodecBufferSpliterator( + KEY prefix, KEY startKey, int maxParallelism, boolean closeOnException) throws IOException { + return new CodecBufferTypedRawSpliterator(prefix, startKey, maxParallelism, closeOnException); + } + + private RawSpliterator newByteArraySpliterator(KEY prefix, KEY startKey, int maxParallelism, + boolean closeOnException) throws IOException { + return new ByteArrayTypedRawSpliterator(prefix, startKey, maxParallelism, closeOnException); + } + /** * Table Iterator implementation for strongly typed tables. */ public class TypedTableIterator extends RawIterator { TypedTableIterator( - TableIterator> rawIterator) { + TableIterator> rawIterator) { super(rawIterator); } @@ -600,7 +662,7 @@ AutoCloseSupplier convert(KEY key) throws IOException { } @Override - KeyValue convert(KeyValue raw) { + KeyValue convert(RawKeyValue raw) { return new TypedKeyValue(raw); } } @@ -612,9 +674,9 @@ KeyValue convert(KeyValue raw) { */ abstract class RawIterator implements Table.KeyValueIterator { - private final TableIterator> rawIterator; + private final TableIterator> rawIterator; - RawIterator(TableIterator> rawIterator) { + RawIterator(TableIterator> rawIterator) { this.rawIterator = rawIterator; } @@ -622,10 +684,10 @@ abstract class RawIterator abstract AutoCloseSupplier convert(KEY key) throws IOException; /** - * Covert the given {@link Table.KeyValue} + * Covert the given {@link RawKeyValue} * from ({@link RAW}, {@link RAW}) to ({@link KEY}, {@link VALUE}). */ - abstract KeyValue convert(KeyValue raw) + abstract KeyValue convert(RawKeyValue raw) throws IOException; @Override @@ -640,8 +702,8 @@ public void seekToLast() { @Override public KeyValue seek(KEY key) throws IOException { - try (AutoCloseSupplier rawKey = convert(key)) { - final KeyValue result = rawIterator.seek(rawKey.get()); + try (AutoCloseSupplier rawKey = convert(key); + AutoCloseableRawKeyValue result = rawIterator.seek(rawKey.get())) { return result == null ? null : convert(result); } } @@ -658,8 +720,8 @@ public boolean hasNext() { @Override public KeyValue next() { - try { - return convert(rawIterator.next()); + try (AutoCloseableRawKeyValue kv = rawIterator.next()) { + return convert(kv); } catch (IOException e) { throw new IllegalStateException("Failed next()", e); } @@ -670,4 +732,121 @@ public void removeFromDB() throws IOException { rawIterator.removeFromDB(); } } + + private final class CodecBufferTypedRawSpliterator extends RawSpliterator { + + private CodecBufferTypedRawSpliterator(KEY prefix, KEY startKey, int maxParallelism, boolean closeOnException) + throws IOException { + super(prefix, startKey, maxParallelism, closeOnException); + initializeIterator(); + } + + private CodecBufferTypedRawSpliterator(KEY prefix, KEY startKey, int maxParallelism, boolean closeOnException, + List boundKeys) throws IOException { + super(prefix, startKey, maxParallelism, closeOnException, boundKeys); + initializeIterator(); + } + + @Override + KeyValue convert(RawKeyValue kv) throws IOException { + final int rawSize = kv.getValue().readableBytes(); + final KEY key = keyCodec.fromCodecBuffer(kv.getKey()); + final VALUE value = valueCodec.fromCodecBuffer(kv.getValue()); + return Table.newKeyValue(key, value, rawSize); + } + + @Override + List getBoundaryKeys(KEY prefix, KEY startKey) throws IOException { + return TypedTable.this.getBoundaryKeys(prefix, startKey); + } + + @Override + int compare(CodecBuffer value1, byte[] value2) { + return value1.compareTo(value2); + } + + @Override + TableIterator> getRawIterator( + KEY prefix, KEY startKey, int maxParallelism) throws IOException { + + CodecBuffer prefixBuffer = encodeKeyCodecBuffer(prefix); + CodecBuffer startKeyBuffer = encodeKeyCodecBuffer(startKey); + try { + TableIterator> itr = + rawTable.iterator(prefixBuffer, maxParallelism); + if (startKeyBuffer != null) { + itr.seek(startKeyBuffer); + } + return itr; + } catch (Throwable t) { + if (prefixBuffer != null) { + prefixBuffer.release(); + } + throw t; + } finally { + if (startKeyBuffer != null) { + startKeyBuffer.release(); + } + } + } + + @Override + Spliterator> createNewSpliterator(KEY prefix, byte[] startKey, int maxParallelism, + boolean closeOnException, List boundaryKeys) throws IOException { + return new CodecBufferTypedRawSpliterator(prefix, decodeKey(startKey), maxParallelism, closeOnException, + boundaryKeys); + } + } + + private final class ByteArrayTypedRawSpliterator extends RawSpliterator { + + private ByteArrayTypedRawSpliterator(KEY prefix, KEY startKey, int maxParallelism, boolean closeOnException) + throws IOException { + super(prefix, startKey, maxParallelism, closeOnException); + initializeIterator(); + } + + private ByteArrayTypedRawSpliterator(KEY prefix, KEY startKey, int maxParallelism, boolean closeOnException, + List boundKeys) throws IOException { + super(prefix, startKey, maxParallelism, closeOnException, boundKeys); + initializeIterator(); + } + + @Override + KeyValue convert(RawKeyValue kv) throws IOException { + final int rawSize = kv.getValue().length; + final KEY key = keyCodec.fromPersistedFormat(kv.getKey()); + final VALUE value = valueCodec.fromPersistedFormat(kv.getValue()); + return Table.newKeyValue(key, value, rawSize); + } + + @Override + List getBoundaryKeys(KEY prefix, KEY startKey) throws IOException { + return TypedTable.this.getBoundaryKeys(prefix, startKey); + } + + @Override + int compare(byte[] value1, byte[] value2) { + return ByteArrayCodec.getComparator().compare(value1, value2); + } + + @Override + TableIterator> getRawIterator( + KEY prefix, KEY startKey, int maxParallelism) throws IOException { + byte[] prefixBytes = encodeKey(prefix); + byte[] startKeyBytes = encodeKey(startKey); + TableIterator> itr = rawTable.iterator(prefixBytes); + if (startKeyBytes != null) { + itr.seek(startKeyBytes); + } + return itr; + } + + @Override + Spliterator> createNewSpliterator(KEY prefix, byte[] startKey, int maxParallelism, + boolean closeOnException, List boundaryKeys) throws IOException { + return new ByteArrayTypedRawSpliterator(prefix, decodeKey(startKey), maxParallelism, closeOnException, + boundaryKeys); + } + } } diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java index 62e15a08662b..24ae9d9de091 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java @@ -99,6 +99,17 @@ public void deleteRange(KEY beginKey, KEY endKey) { throw new UnsupportedOperationException(); } + @Override + public KeyValueSpliterator spliterator(int maxParallelism, boolean closeOnException) { + throw new UnsupportedOperationException(); + } + + @Override + public KeyValueSpliterator spliterator(KEY startKey, KEY prefix, int maxParallelism, + boolean closeOnException) { + throw new UnsupportedOperationException(); + } + @Override public String getName() { return ""; diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/RawSpliteratorTest.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/RawSpliteratorTest.java new file mode 100644 index 000000000000..49241d2a0d52 --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/RawSpliteratorTest.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.List; +import java.util.Spliterator; +import java.util.function.Consumer; +import org.junit.jupiter.api.Test; + +/** + * Unit test class for verifying the behavior of the {@code RawSpliterator}. + * This class contains tests to evaluate the functionality of its methods, + * such as {@code tryAdvance}, {@code trySplit}, and error handling during + * method execution. + */ +class RawSpliteratorTest { + + @Test + void testTryAdvanceWithValidElement() throws IOException { + // Mock dependencies + TableIterator> rawIteratorMock = + mock(TableIterator.class); + RDBStoreAbstractIterator.AutoCloseableRawKeyValue rawKeyValueMock = + mock(RDBStoreAbstractIterator.AutoCloseableRawKeyValue.class); + + when(rawIteratorMock.hasNext()).thenReturn(true); + when(rawIteratorMock.next()).thenReturn(rawKeyValueMock); + + RawSpliterator rawSpliterator = new MockRawSpliterator(1) { + @Override + List getBoundaryKeys(String prefix, String startKey) { + return Collections.emptyList(); + } + + @Override + int compare(String value1, byte[] value2) { + return 0; + } + + @Override + TableIterator> getRawIterator(String prefix, + String startKey, int maxParallelism) { + return rawIteratorMock; + } + + @Override + Spliterator> createNewSpliterator(String prefix, byte[] startKey, + int maxParallelism, boolean closeOnException, List boundaryKeys) throws IOException { + return null; + } + }; + + when(rawKeyValueMock.getKey()).thenReturn("key"); + when(rawKeyValueMock.getValue()).thenReturn("value"); + + Consumer> action = keyValue -> { + try { + assertEquals("key", keyValue.getKey()); + assertEquals("value", keyValue.getValue()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }; + + boolean result = rawSpliterator.tryAdvance(action); + + assertTrue(result); + verify(rawIteratorMock).hasNext(); + verify(rawIteratorMock).next(); + verify(rawKeyValueMock).close(); + } + + @Test + void testTryAdvanceWithNoElement() throws IOException { + TableIterator> rawIteratorMock = + mock(TableIterator.class); + + when(rawIteratorMock.hasNext()).thenReturn(false); + + RawSpliterator rawSpliterator = new MockRawSpliterator(1) { + @Override + List getBoundaryKeys(String prefix, String startKey) { + return Collections.emptyList(); + } + + @Override + int compare(String value1, byte[] value2) { + return 0; + } + + @Override + TableIterator> getRawIterator(String prefix, + String startKey, int maxParallelism) { + return rawIteratorMock; + } + + @Override + Spliterator> createNewSpliterator(String prefix, byte[] startKey, + int maxParallelism, boolean closeOnException, List boundaryKeys) throws IOException { + return null; + } + }; + + Consumer> action = keyValue -> fail("Action should not be called"); + + boolean result = rawSpliterator.tryAdvance(action); + + assertFalse(result); + verify(rawIteratorMock).hasNext(); + verify(rawIteratorMock, never()).next(); + } + + @Test + void testTryAdvanceWhenConvertThrowsIOException() throws IOException { + TableIterator> rawIteratorMock = + mock(TableIterator.class); + RDBStoreAbstractIterator.AutoCloseableRawKeyValue rawKeyValueMock = + mock(RDBStoreAbstractIterator.AutoCloseableRawKeyValue.class); + + when(rawIteratorMock.hasNext()).thenReturn(true); + when(rawIteratorMock.next()).thenReturn(rawKeyValueMock); + + RawSpliterator rawSpliterator = new RawSpliterator(null, null, 1, + true) { + + @Override + Table.KeyValue convert(RawKeyValue kv) throws IOException { + throw new IOException("Mocked exception"); + } + + @Override + List getBoundaryKeys(String prefix, String startKey) { + return Collections.emptyList(); + } + + @Override + int compare(String value1, byte[] value2) { + return 0; + } + + @Override + TableIterator> getRawIterator(String prefix, + String startKey, int maxParallelism) { + return rawIteratorMock; + } + + @Override + Spliterator> createNewSpliterator(String prefix, byte[] startKey, + int maxParallelism, boolean closeOnException, List boundaryKeys) throws IOException { + return null; + } + }; + rawSpliterator.initializeIterator(); + + Consumer> action = keyValue -> { + }; + + Exception exception = assertThrows(IllegalStateException.class, () -> rawSpliterator.tryAdvance(action)); + + assertEquals("Failed next()", exception.getMessage()); + assertNotNull(exception.getCause()); + assertEquals(IOException.class, exception.getCause().getClass()); + assertEquals("Mocked exception", exception.getCause().getMessage()); + verify(rawIteratorMock).hasNext(); + verify(rawIteratorMock).next(); + } + + @Test + void testTrySplitsWithNoBoundaryKeys() throws IOException { + TableIterator> rawIteratorMock = + mock(TableIterator.class); + + RawSpliterator rawSpliterator = new MockRawSpliterator(2) { + @Override + List getBoundaryKeys(String prefix, String startKey) { + return Collections.emptyList(); + } + + @Override + int compare(String value1, byte[] value2) { + return 0; + } + + @Override + TableIterator> getRawIterator(String prefix, + String startKey, int maxParallelism) { + return rawIteratorMock; + } + + @Override + Spliterator> createNewSpliterator(String prefix, byte[] startKey, + int maxParallelism, boolean closeOnException, List boundaryKeys) throws IOException { + return null; + } + }; + Spliterator> split1 = rawSpliterator.trySplit(); + Spliterator> split2 = rawSpliterator.trySplit(); + assertNotNull(split1); + assertNull(split2); + rawSpliterator.close(); + Spliterator> split3 = split1.trySplit(); + assertNotNull(split3); + } + + private abstract static class MockRawSpliterator extends RawSpliterator { + + MockRawSpliterator(int maxParallelism) throws IOException { + super(null, null, maxParallelism, true); + super.initializeIterator(); + } + + @Override + Table.KeyValue convert(RawKeyValue kv) { + return Table.newKeyValue(kv.getKey(), kv.getValue()); + } + } +} diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreByteArrayIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreByteArrayIterator.java index 7a11f55720cb..2460dea79718 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreByteArrayIterator.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreByteArrayIterator.java @@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentCaptor.forClass; @@ -36,6 +37,7 @@ import java.nio.charset.StandardCharsets; import java.util.NoSuchElementException; import java.util.function.Consumer; +import org.apache.hadoop.hdds.utils.db.RDBStoreAbstractIterator.AutoCloseableRawKeyValue; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils; import org.apache.log4j.Level; @@ -77,54 +79,60 @@ RDBStoreByteArrayIterator newIterator(byte[] prefix) { } @Test - public void testForeachRemainingCallsConsumerWithAllElements() { + public void testForeachRemainingCallsConsumerWithAllElements() throws IOException { when(rocksDBIteratorMock.isValid()) - .thenReturn(true, true, true, true, true, true, true, false); + .thenReturn(true, true, true, false); when(rocksDBIteratorMock.key()) - .thenReturn(new byte[]{0x00}, new byte[]{0x00}, new byte[]{0x01}, + .thenReturn(new byte[]{0x00}, new byte[]{0x01}, new byte[]{0x02}) .thenThrow(new NoSuchElementException()); when(rocksDBIteratorMock.value()) - .thenReturn(new byte[]{0x7f}, new byte[]{0x7f}, new byte[]{0x7e}, - new byte[]{0x7d}) + .thenReturn(new byte[]{0x7f}, new byte[]{0x7e}, new byte[]{0x7d}) .thenThrow(new NoSuchElementException()); - final Consumer> consumerStub + final Consumer> consumerStub = mock(Consumer.class); - RDBStoreByteArrayIterator iter = newIterator(); - iter.forEachRemaining(consumerStub); - - ArgumentCaptor capture = - forClass(RawKeyValue.ByteArray.class); - verify(consumerStub, times(3)).accept(capture.capture()); - assertArrayEquals( - new byte[]{0x00}, capture.getAllValues().get(0).getKey()); - assertArrayEquals( - new byte[]{0x7f}, capture.getAllValues().get(0).getValue()); - assertArrayEquals( - new byte[]{0x01}, capture.getAllValues().get(1).getKey()); - assertArrayEquals( - new byte[]{0x7e}, capture.getAllValues().get(1).getValue()); - assertArrayEquals( - new byte[]{0x02}, capture.getAllValues().get(2).getKey()); - assertArrayEquals( - new byte[]{0x7d}, capture.getAllValues().get(2).getValue()); + try (RDBStoreByteArrayIterator iter = newIterator()) { + iter.forEachRemaining(consumerStub); + ArgumentCaptor> capture = forClass(AutoCloseableRawKeyValue.class); + verify(consumerStub, times(3)).accept(capture.capture()); + assertArrayEquals( + new byte[]{0x00}, capture.getAllValues().get(0).getKey()); + assertArrayEquals( + new byte[]{0x7f}, capture.getAllValues().get(0).getValue()); + assertArrayEquals( + new byte[]{0x01}, capture.getAllValues().get(1).getKey()); + assertArrayEquals( + new byte[]{0x7e}, capture.getAllValues().get(1).getValue()); + assertArrayEquals( + new byte[]{0x02}, capture.getAllValues().get(2).getKey()); + assertArrayEquals( + new byte[]{0x7d}, capture.getAllValues().get(2).getValue()); + } } @Test - public void testHasNextDependsOnIsvalid() { - when(rocksDBIteratorMock.isValid()).thenReturn(true, true, false); - - RDBStoreByteArrayIterator iter = newIterator(); - - assertTrue(iter.hasNext()); - assertFalse(iter.hasNext()); + public void testHasNextDoesNotDependsOnIsvalid() { + when(rocksDBIteratorMock.isValid()).thenReturn(true, false); + when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00}); + when(rocksDBIteratorMock.value()).thenReturn(new byte[]{0x00f}); + try (RDBStoreByteArrayIterator iter = newIterator()) { + assertTrue(iter.hasNext()); + assertTrue(iter.hasNext()); + iter.next(); + assertFalse(iter.hasNext()); + assertThrows(NoSuchElementException.class, iter::next); + assertFalse(iter.hasNext()); + } } @Test public void testNextCallsIsValidThenGetsTheValueAndStepsToNext() { + byte[] testKey = new byte[]{0x00}; when(rocksDBIteratorMock.isValid()).thenReturn(true); + when(rocksDBIteratorMock.key()).thenReturn(testKey); + when(rocksDBIteratorMock.value()).thenReturn(testKey); RDBStoreByteArrayIterator iter = newIterator(); InOrder verifier = inOrder(rocksDBIteratorMock); @@ -162,6 +170,24 @@ public void testSeekToLastSeeks() { verify(rocksDBIteratorMock, times(1)).seekToLast(); } + @Test + public void testSeekWithInvalidValue() { + when(rocksDBIteratorMock.isValid()).thenReturn(false); + + try (RDBStoreByteArrayIterator iter = newIterator()) { + final AutoCloseableRawKeyValue val = iter.seek(new byte[] {0x55}); + assertFalse(iter.hasNext()); + InOrder verifier = inOrder(rocksDBIteratorMock); + verify(rocksDBIteratorMock, times(1)).seekToFirst(); //at construct time + verify(rocksDBIteratorMock, never()).seekToLast(); + verifier.verify(rocksDBIteratorMock, times(1)).seek(any(byte[].class)); + verifier.verify(rocksDBIteratorMock, times(1)).isValid(); + verifier.verify(rocksDBIteratorMock, never()).key(); + verifier.verify(rocksDBIteratorMock, never()).value(); + assertNull(val); + } + } + @Test public void testSeekReturnsTheActualKey() throws Exception { when(rocksDBIteratorMock.isValid()).thenReturn(true); @@ -169,8 +195,8 @@ public void testSeekReturnsTheActualKey() throws Exception { when(rocksDBIteratorMock.value()).thenReturn(new byte[]{0x7f}); RDBStoreByteArrayIterator iter = newIterator(); - final Table.KeyValue val = iter.seek(new byte[]{0x55}); - + final AutoCloseableRawKeyValue val = iter.seek(new byte[]{0x55}); + assertTrue(iter.hasNext()); InOrder verifier = inOrder(rocksDBIteratorMock); verify(rocksDBIteratorMock, times(1)).seekToFirst(); //at construct time @@ -187,12 +213,13 @@ public void testSeekReturnsTheActualKey() throws Exception { public void testGettingTheKeyIfIteratorIsValid() throws Exception { when(rocksDBIteratorMock.isValid()).thenReturn(true); when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00}); - + when(rocksDBIteratorMock.value()).thenReturn(new byte[]{0x00}); RDBStoreByteArrayIterator iter = newIterator(); byte[] key = null; if (iter.hasNext()) { - final Table.KeyValue entry = iter.next(); - key = entry.getKey(); + try (AutoCloseableRawKeyValue entry = iter.next()) { + key = entry.getKey(); + } } InOrder verifier = inOrder(rocksDBIteratorMock); @@ -208,22 +235,22 @@ public void testGettingTheValueIfIteratorIsValid() throws Exception { when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00}); when(rocksDBIteratorMock.value()).thenReturn(new byte[]{0x7f}); - RDBStoreByteArrayIterator iter = newIterator(); - Table.KeyValue entry; - byte[] key = null; - byte[] value = null; - if (iter.hasNext()) { - entry = iter.next(); - key = entry.getKey(); - value = entry.getValue(); + try (RDBStoreByteArrayIterator iter = newIterator()) { + AutoCloseableRawKeyValue entry; + byte[] key = null; + byte[] value = null; + if (iter.hasNext()) { + entry = iter.next(); + key = entry.getKey(); + value = entry.getValue(); + } + InOrder verifier = inOrder(rocksDBIteratorMock); + + verifier.verify(rocksDBIteratorMock, times(1)).isValid(); + verifier.verify(rocksDBIteratorMock, times(1)).key(); + assertArrayEquals(new byte[]{0x00}, key); + assertArrayEquals(new byte[]{0x7f}, value); } - - InOrder verifier = inOrder(rocksDBIteratorMock); - - verifier.verify(rocksDBIteratorMock, times(1)).isValid(); - verifier.verify(rocksDBIteratorMock, times(1)).key(); - assertArrayEquals(new byte[]{0x00}, key); - assertArrayEquals(new byte[]{0x7f}, value); } @Test @@ -231,8 +258,10 @@ public void testRemovingFromDBActuallyDeletesFromTable() throws Exception { byte[] testKey = new byte[]{0x00}; when(rocksDBIteratorMock.isValid()).thenReturn(true); when(rocksDBIteratorMock.key()).thenReturn(testKey); + when(rocksDBIteratorMock.value()).thenReturn(testKey); RDBStoreByteArrayIterator iter = newIterator(null); + iter.next(); iter.removeFromDB(); InOrder verifier = inOrder(rocksDBIteratorMock, rocksTableMock); @@ -261,14 +290,19 @@ public void testNullPrefixedIterator() throws IOException { RDBStoreByteArrayIterator iter = newIterator(null); verify(rocksDBIteratorMock, times(1)).seekToFirst(); clearInvocations(rocksDBIteratorMock); - + when(rocksDBIteratorMock.isValid()).thenReturn(true); iter.seekToFirst(); + verify(rocksDBIteratorMock, times(0)).isValid(); + verify(rocksDBIteratorMock, times(0)).key(); verify(rocksDBIteratorMock, times(1)).seekToFirst(); clearInvocations(rocksDBIteratorMock); - - when(rocksDBIteratorMock.isValid()).thenReturn(true); + verify(rocksDBIteratorMock, times(0)).isValid(); + verify(rocksDBIteratorMock, times(0)).key(); + verify(rocksDBIteratorMock, times(0)).seekToFirst(); + iter.hasNext(); + clearInvocations(rocksDBIteratorMock); assertTrue(iter.hasNext()); - verify(rocksDBIteratorMock, times(1)).isValid(); + verify(rocksDBIteratorMock, times(0)).isValid(); verify(rocksDBIteratorMock, times(0)).key(); iter.seekToLast(); @@ -283,16 +317,22 @@ public void testNormalPrefixedIterator() throws IOException { RDBStoreByteArrayIterator iter = newIterator(testPrefix); verify(rocksDBIteratorMock, times(1)).seek(testPrefix); clearInvocations(rocksDBIteratorMock); - + when(rocksDBIteratorMock.isValid()).thenReturn(true); + when(rocksDBIteratorMock.key()).thenReturn(testPrefix); iter.seekToFirst(); + verify(rocksDBIteratorMock, times(0)).isValid(); + verify(rocksDBIteratorMock, times(0)).key(); verify(rocksDBIteratorMock, times(1)).seek(testPrefix); clearInvocations(rocksDBIteratorMock); - - when(rocksDBIteratorMock.isValid()).thenReturn(true); - when(rocksDBIteratorMock.key()).thenReturn(testPrefix); - assertTrue(iter.hasNext()); + iter.hasNext(); verify(rocksDBIteratorMock, times(1)).isValid(); verify(rocksDBIteratorMock, times(1)).key(); + verify(rocksDBIteratorMock, times(0)).seek(testPrefix); + clearInvocations(rocksDBIteratorMock); + assertTrue(iter.hasNext()); + // hasNext shouldn't make isValid() redundant calls. + verify(rocksDBIteratorMock, times(0)).isValid(); + verify(rocksDBIteratorMock, times(0)).key(); Exception e = assertThrows(Exception.class, () -> iter.seekToLast(), "Prefixed iterator does not support seekToLast"); assertInstanceOf(UnsupportedOperationException.class, e); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java index 05a4c946394e..ec6d2820c7eb 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hdds.utils.db; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.any; @@ -31,20 +33,29 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hdds.StringUtils; +import org.apache.hadoop.hdds.utils.db.RDBStoreAbstractIterator.AutoCloseableRawKeyValue; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; +import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.InOrder; import org.mockito.stubbing.Answer; import org.rocksdb.RocksIterator; @@ -73,12 +84,16 @@ ManagedRocksIterator newManagedRocksIterator() { } RDBStoreCodecBufferIterator newIterator() { - return new RDBStoreCodecBufferIterator(managedRocksIterator, null, null); + return newIterator(1); + } + + RDBStoreCodecBufferIterator newIterator(int maxNumberOfBuffers) { + return new RDBStoreCodecBufferIterator(managedRocksIterator, null, null, maxNumberOfBuffers); } RDBStoreCodecBufferIterator newIterator(CodecBuffer prefix) { return new RDBStoreCodecBufferIterator( - managedRocksIterator, rdbTableMock, prefix); + managedRocksIterator, rdbTableMock, prefix, 1); } Answer newAnswerInt(String name, int b) { @@ -90,38 +105,98 @@ Answer newAnswer(String name, byte... b) { System.out.printf("answer %s: %s%n", name, StringUtils.bytes2Hex(b)); Object[] args = invocation.getArguments(); final ByteBuffer buffer = (ByteBuffer) args[0]; - buffer.clear(); - buffer.put(b); - buffer.flip(); - return b.length; + return writeToBuffer(buffer, b); }; } + private int writeToBuffer(ByteBuffer buffer, byte... bytesToWrite) { + buffer.clear(); + buffer.put(bytesToWrite); + buffer.flip(); + return bytesToWrite.length; + } + + @ParameterizedTest + @ValueSource(ints = {0, 1, 2, 3, 5, 10}) + public void testRDBStoreCodecBufferIterGetsFailBeyondMaxBuffers(int maxBuffers) + throws InterruptedException, TimeoutException { + List> vals = new ArrayList<>(); + when(rocksIteratorMock.isValid()).thenReturn(true); + AtomicInteger counter = new AtomicInteger(0); + + when(rocksIteratorMock.key(any())) + .thenAnswer(i -> writeToBuffer(i.getArgument(0), (byte)counter.getAndIncrement())); + when(rocksIteratorMock.value(any())) + .thenAnswer(i -> writeToBuffer(i.getArgument(0), (byte)counter.getAndIncrement())); + try (RDBStoreCodecBufferIterator iterator = newIterator(maxBuffers)) { + for (int i = 0; i < maxBuffers; i++) { + vals.add(iterator.next()); + } + assertEquals(Math.max(maxBuffers, 0), vals.size()); + ExecutorService executor = Executors.newSingleThreadExecutor(); + AtomicReference> nextThread = new AtomicReference<>(CompletableFuture.supplyAsync( + () -> { + AutoCloseableRawKeyValue v = iterator.next(); + vals.add(v); + return true; + }, + executor)); + + if (maxBuffers < 1) { + // Number of max buffers is always going to be at least 1. We need at least 1 buffers one for getting the next + // value and one for returning the current value. + GenericTestUtils.waitFor(() -> nextThread.get().isDone() && nextThread.get().join(), 10, 100); + System.out.println(Thread.currentThread().getName()); + nextThread.set(CompletableFuture.supplyAsync( + () -> { + AutoCloseableRawKeyValue v = iterator.next(); + vals.add(v); + return true; + }, + executor)); + } + assertEquals(Math.max(1, maxBuffers), vals.size()); + for (int i = 0; i < vals.size(); i++) { + assertEquals(2 * i, vals.get(i).getKey().getArray()[0]); + assertEquals(2 * i + 1, vals.get(i).getValue().getArray()[0]); + } + assertFalse(nextThread.get().isDone()); + int size = vals.size(); + vals.get(0).close(); + GenericTestUtils.waitFor(() -> nextThread.get().isDone() && nextThread.get().join(), 10, 100); + assertEquals(size + 1, vals.size()); + assertEquals(2 * size, vals.get(size).getKey().getArray()[0]); + assertEquals(2 * size + 1, vals.get(size).getValue().getArray()[0]); + for (AutoCloseableRawKeyValue v : vals) { + v.close(); + } + executor.shutdown(); + } + } + @Test public void testForEachRemaining() throws Exception { when(rocksIteratorMock.isValid()) - .thenReturn(true, true, true, true, true, true, true, false); + .thenReturn(true, true, true, false); when(rocksIteratorMock.key(any())) .then(newAnswerInt("key1", 0x00)) - .then(newAnswerInt("key2", 0x00)) - .then(newAnswerInt("key3", 0x01)) - .then(newAnswerInt("key4", 0x02)) + .then(newAnswerInt("key2", 0x01)) + .then(newAnswerInt("key3", 0x02)) .thenThrow(new NoSuchElementException()); when(rocksIteratorMock.value(any())) .then(newAnswerInt("val1", 0x7f)) - .then(newAnswerInt("val2", 0x7f)) - .then(newAnswerInt("val3", 0x7e)) - .then(newAnswerInt("val4", 0x7d)) + .then(newAnswerInt("val2", 0x7e)) + .then(newAnswerInt("val3", 0x7d)) .thenThrow(new NoSuchElementException()); List> remaining = new ArrayList<>(); try (RDBStoreCodecBufferIterator i = newIterator()) { - i.forEachRemaining(kv -> { + i.forEachRemaining(kvSupplier -> { try { remaining.add(RawKeyValue.create( - kv.getKey().getArray(), kv.getValue().getArray())); - } catch (IOException e) { - throw new RuntimeException(e); + kvSupplier.getKey().getArray(), kvSupplier.getValue().getArray())); + } finally { + kvSupplier.close(); } }); @@ -138,11 +213,15 @@ public void testForEachRemaining() throws Exception { } @Test - public void testHasNextDependsOnIsvalid() throws Exception { - when(rocksIteratorMock.isValid()).thenReturn(true, true, false); + public void testHasNextDoesNotDependsOnIsvalid() throws Exception { + when(rocksIteratorMock.isValid()).thenReturn(true, false); try (RDBStoreCodecBufferIterator i = newIterator()) { assertTrue(i.hasNext()); + assertTrue(i.hasNext()); + i.next(); + assertFalse(i.hasNext()); + assertThrows(NoSuchElementException.class, i::next); assertFalse(i.hasNext()); } @@ -196,6 +275,26 @@ public void testSeekToLastSeeks() throws Exception { CodecTestUtil.gc(); } + @Test + public void testSeekWithInvalidValue() { + when(rocksIteratorMock.isValid()).thenReturn(false); + + try (RDBStoreCodecBufferIterator i = newIterator(); + CodecBuffer target = CodecBuffer.wrap(new byte[] {0x55}); + AutoCloseableRawKeyValue valSupplier = i.seek(target)) { + assertFalse(i.hasNext()); + InOrder verifier = inOrder(rocksIteratorMock); + verify(rocksIteratorMock, times(1)).seekToFirst(); //at construct time + verify(rocksIteratorMock, never()).seekToLast(); + verifier.verify(rocksIteratorMock, times(1)) + .seek(any(ByteBuffer.class)); + verifier.verify(rocksIteratorMock, times(1)).isValid(); + verifier.verify(rocksIteratorMock, never()).key(any()); + verifier.verify(rocksIteratorMock, never()).value(any()); + assertNull(valSupplier); + } + } + @Test public void testSeekReturnsTheActualKey() throws Exception { when(rocksIteratorMock.isValid()).thenReturn(true); @@ -205,11 +304,10 @@ public void testSeekReturnsTheActualKey() throws Exception { .then(newAnswerInt("val1", 0x7f)); try (RDBStoreCodecBufferIterator i = newIterator(); - CodecBuffer target = CodecBuffer.wrap(new byte[]{0x55})) { - final Table.KeyValue val = i.seek(target); - + CodecBuffer target = CodecBuffer.wrap(new byte[] {0x55}); + AutoCloseableRawKeyValue valSupplier = i.seek(target)) { + assertTrue(i.hasNext()); InOrder verifier = inOrder(rocksIteratorMock); - verify(rocksIteratorMock, times(1)).seekToFirst(); //at construct time verify(rocksIteratorMock, never()).seekToLast(); verifier.verify(rocksIteratorMock, times(1)) @@ -217,8 +315,8 @@ public void testSeekReturnsTheActualKey() throws Exception { verifier.verify(rocksIteratorMock, times(1)).isValid(); verifier.verify(rocksIteratorMock, times(1)).key(any()); verifier.verify(rocksIteratorMock, times(1)).value(any()); - assertArrayEquals(new byte[]{0x00}, val.getKey().getArray()); - assertArrayEquals(new byte[]{0x7f}, val.getValue().getArray()); + assertArrayEquals(new byte[] {0x00}, valSupplier.getKey().getArray()); + assertArrayEquals(new byte[] {0x7f}, valSupplier.getValue().getArray()); } CodecTestUtil.gc(); @@ -233,7 +331,9 @@ public void testGettingTheKeyIfIteratorIsValid() throws Exception { byte[] key = null; try (RDBStoreCodecBufferIterator i = newIterator()) { if (i.hasNext()) { - key = i.next().getKey().getArray(); + try (AutoCloseableRawKeyValue kv = i.next()) { + key = kv.getKey().getArray(); + } } } @@ -258,9 +358,10 @@ public void testGettingTheValueIfIteratorIsValid() throws Exception { byte[] value = null; try (RDBStoreCodecBufferIterator i = newIterator()) { if (i.hasNext()) { - Table.KeyValue entry = i.next(); - key = entry.getKey().getArray(); - value = entry.getValue().getArray(); + try (AutoCloseableRawKeyValue entry = i.next()) { + key = entry.getKey().getArray(); + value = entry.getValue().getArray(); + } } } @@ -283,6 +384,7 @@ public void testRemovingFromDBActuallyDeletesFromTable() throws Exception { .then(newAnswer("key1", testKey)); try (RDBStoreCodecBufferIterator i = newIterator(null)) { + i.next(); i.removeFromDB(); } @@ -291,7 +393,6 @@ public void testRemovingFromDBActuallyDeletesFromTable() throws Exception { verifier.verify(rocksIteratorMock, times(1)).isValid(); verifier.verify(rdbTableMock, times(1)) .delete(ByteBuffer.wrap(testKey)); - CodecTestUtil.gc(); } @@ -316,17 +417,24 @@ public void testCloseCloses() throws Exception { @Test public void testNullPrefixedIterator() throws Exception { + try (RDBStoreCodecBufferIterator i = newIterator()) { verify(rocksIteratorMock, times(1)).seekToFirst(); clearInvocations(rocksIteratorMock); - + when(rocksIteratorMock.isValid()).thenReturn(true); i.seekToFirst(); + verify(rocksIteratorMock, times(0)).isValid(); + verify(rocksIteratorMock, times(0)).key(any()); verify(rocksIteratorMock, times(1)).seekToFirst(); clearInvocations(rocksIteratorMock); + i.hasNext(); + verify(rocksIteratorMock, times(1)).isValid(); + verify(rocksIteratorMock, times(1)).key(any()); + clearInvocations(rocksIteratorMock); - when(rocksIteratorMock.isValid()).thenReturn(true); assertTrue(i.hasNext()); - verify(rocksIteratorMock, times(1)).isValid(); + // hasNext shouldn't make isValid() redundant calls. + verify(rocksIteratorMock, times(0)).isValid(); verify(rocksIteratorMock, times(0)).key(any()); i.seekToLast(); @@ -344,17 +452,24 @@ public void testNormalPrefixedIterator() throws Exception { final ByteBuffer prefix = ByteBuffer.wrap(prefixBytes); verify(rocksIteratorMock, times(1)).seek(prefix); clearInvocations(rocksIteratorMock); - - i.seekToFirst(); - verify(rocksIteratorMock, times(1)).seek(prefix); - clearInvocations(rocksIteratorMock); - when(rocksIteratorMock.isValid()).thenReturn(true); when(rocksIteratorMock.key(any())) .then(newAnswer("key1", prefixBytes)); - assertTrue(i.hasNext()); + i.seekToFirst(); + verify(rocksIteratorMock, times(0)).isValid(); + verify(rocksIteratorMock, times(0)).key(any()); + verify(rocksIteratorMock, times(1)).seek(prefix); + clearInvocations(rocksIteratorMock); + i.hasNext(); verify(rocksIteratorMock, times(1)).isValid(); verify(rocksIteratorMock, times(1)).key(any()); + verify(rocksIteratorMock, times(0)).seek(prefix); + clearInvocations(rocksIteratorMock); + + assertTrue(i.hasNext()); + // Ensure redundant native call is made since key and value already have been fetched as part of seek + verify(rocksIteratorMock, times(0)).isValid(); + verify(rocksIteratorMock, times(0)).key(any()); Exception e = assertThrows(Exception.class, () -> i.seekToLast(), "Prefixed iterator does not support seekToLast"); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java index 065e8728e748..b9c6edcee185 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java @@ -493,6 +493,7 @@ public void testIteratorRemoveFromDB() throws Exception { writeToTable(testTable, 3); try (TableIterator> iterator = testTable.iterator()) { + iterator.next(); iterator.removeFromDB(); } assertNull(testTable.get(bytesOf[1])); @@ -506,6 +507,7 @@ public void testIteratorRemoveFromDB() throws Exception { try (TableIterator> iterator = testTable.iterator()) { iterator.seekToLast(); + iterator.next(); iterator.removeFromDB(); } assertNotNull(testTable.get(bytesOf[1])); @@ -519,6 +521,7 @@ public void testIteratorRemoveFromDB() throws Exception { try (TableIterator> iterator = testTable.iterator()) { iterator.seek(bytesOf[3]); + iterator.next(); iterator.removeFromDB(); } assertNotNull(testTable.get(bytesOf[1]));