Skip to content
Closed
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
30facb9
HDDS-12742. Make RDBStoreAbstractIterator return a reference counted …
swamirishi Apr 12, 2025
aab6ebc
HDDS-12742. Remove log
swamirishi Apr 14, 2025
5dd7460
HDDS-12742. Remove synchronized
swamirishi Apr 15, 2025
e4bf8f7
HDDS-12742. Fix seek
swamirishi Apr 16, 2025
9ab91c9
HDDS-12742. Add test case
swamirishi Apr 16, 2025
abe06ab
HDDS-12742. Fix iter
swamirishi Apr 16, 2025
d215d4b
HDDS-12742. Fix NPE
swamirishi Apr 16, 2025
85c74b4
HDDS-12742. Fix NPE
swamirishi Apr 16, 2025
7d82a9e
HDDS-12742. Add Blocking Deque instead custom implementation of a blc…
swamirishi Apr 16, 2025
ce6ab81
HDDS-12742. Fix checkstyle
swamirishi Apr 16, 2025
091e8ca
HDDS-12742. Fix test cases
swamirishi Apr 17, 2025
6bc5c86
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Apr 17, 2025
1b394c2
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi May 2, 2025
7fdced2
HDDS-12742. Add Spliterator
swamirishi May 4, 2025
f5b633b
HDDS-12742. Fix Spliterator
swamirishi May 5, 2025
1971a96
Update hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/uti…
swamirishi May 5, 2025
a33f265
HDDS-12742. Make concurrent hash set
swamirishi May 6, 2025
0c3ae4f
Merge remote-tracking branch 'origin/HDDS-12742' into HEAD
swamirishi May 6, 2025
fbe213b
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi May 6, 2025
71f9c28
HDDS-12742. Fix checkstyle
swamirishi May 6, 2025
5734027
HDDS-12742. Fix pmd
swamirishi May 6, 2025
99c508e
HDDS-12742. Fix checkstyle
swamirishi May 6, 2025
f16e1b9
HDDS-12742. Fix max buffer definition
swamirishi May 13, 2025
86fe101
HDDS-12742. Add tests
swamirishi May 18, 2025
cfde81f
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi May 18, 2025
f825754
HDDS-12742. Fix exception handling for memory leaks
swamirishi May 21, 2025
e4155a6
HDDS-12742. Make spliterator an interface parameter
swamirishi May 23, 2025
985d612
HDDS-12742. Fix build
swamirishi May 27, 2025
b98968b
HDDS-12742. Fix findbugs
swamirishi May 27, 2025
78d33af
HDDS-12742. Convert to Named Class from anonymous class of RawSpliter…
swamirishi May 28, 2025
ed632a8
HDDS-12742. Fix checkstyle
swamirishi May 28, 2025
58f81cc
HDDS-12742. Move Buffer class out to upperlevel
swamirishi May 28, 2025
3f77cbe
HDDS-12742. Remove reference counted iterators
swamirishi May 28, 2025
1fecc85
HDDS-12742. Implement base class for RawIterator
swamirishi May 29, 2025
deb0011
HDDS-12742. Fix test class
swamirishi May 29, 2025
346a685
HDDS-12742. Fix test class
swamirishi May 29, 2025
f6b651c
HDDS-12742. Refactor class
swamirishi May 29, 2025
93c7f91
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Jun 9, 2025
f81c116
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Jun 12, 2025
7b37a29
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,21 @@ public void deleteWithBatch(BatchOperation batch, KEY key)
"version.");
}

@Override
public KeyValueSpliterator<KEY, VALUE> spliterator(int maxParallelism, boolean closeOnException) {
throw new UnsupportedOperationException("Iterating tables directly is not" +
" supported for datanode containers due to differing schema " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default implementation can throw exception as not supported, no need to add for all cases.

"version.");
}

@Override
public KeyValueSpliterator<KEY, VALUE> spliterator(KEY startKey, KEY prefix, int maxParallelism,
boolean closeOnException) {
throw new UnsupportedOperationException("Iterating tables directly is not" +
" supported for datanode containers due to differing schema " +
"version.");
}

@Override
public String getName() {
return table.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.NoSuchElementException;
import java.util.function.Consumer;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,30 +32,34 @@
* @param <RAW> the raw type.
*/
abstract class RDBStoreAbstractIterator<RAW>
implements TableIterator<RAW, Table.KeyValue<RAW, RAW>> {
implements TableIterator<RAW, RDBStoreAbstractIterator.AutoCloseableRawKeyValue<RAW>> {

private static final Logger LOG =
LoggerFactory.getLogger(RDBStoreAbstractIterator.class);

private final ManagedRocksIterator rocksDBIterator;
private final RDBTable rocksDBTable;
private Table.KeyValue<RAW, RAW> currentEntry;
private ReferenceCountedObject<RawKeyValue<RAW>> currentEntry;
private RawKeyValue<RAW> previousKeyValue;
// This is for schemas that use a fixed-length
// prefix for each key.
private final RAW prefix;
private Boolean hasNext;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of multi-thread, we should use volatile or atomic variables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterators are not thread safe in general. So this is not supposed to be used by multiple threads.

private boolean closed;

RDBStoreAbstractIterator(ManagedRocksIterator iterator, RDBTable table,
RAW prefix) {
this.rocksDBIterator = iterator;
this.rocksDBTable = table;
this.prefix = prefix;
this.currentEntry = null;
this.hasNext = false;
this.closed = false;
this.previousKeyValue = null;
}

/** @return the key for the current entry. */
abstract RAW key();

/** @return the {@link Table.KeyValue} for the current entry. */
abstract Table.KeyValue<RAW, RAW> getKeyValue();
abstract ReferenceCountedObject<RawKeyValue<RAW>> getKeyValue();

/** Seek to the given key. */
abstract void seek0(RAW key);
Expand All @@ -78,32 +84,52 @@ final RAW getPrefix() {

@Override
public final void forEachRemaining(
Consumer<? super Table.KeyValue<RAW, RAW>> action) {
Consumer<? super AutoCloseableRawKeyValue<RAW>> action) {
while (hasNext()) {
action.accept(next());
AutoCloseableRawKeyValue<RAW> entry = next();
action.accept(entry);
}
}

private void releaseEntry() {
if (currentEntry != null) {
currentEntry.release();
}
currentEntry = null;
hasNext = null;
}

private void setCurrentEntry() {
if (rocksDBIterator.get().isValid()) {
boolean isValid = !closed && rocksDBIterator.get().isValid();
if (isValid) {
currentEntry = getKeyValue();
currentEntry.retain();
} else {
currentEntry = null;
}
setHasNext(isValid, currentEntry);
}

public void setHasNext(boolean isValid, ReferenceCountedObject<RawKeyValue<RAW>> entry) {
this.hasNext = isValid && (prefix == null || startsWithPrefix(entry.get().getKey()));
}

@Override
public final boolean hasNext() {
return rocksDBIterator.get().isValid() &&
(prefix == null || startsWithPrefix(key()));
if (hasNext == null) {
setCurrentEntry();
}
return hasNext;
}

@Override
public final Table.KeyValue<RAW, RAW> next() {
setCurrentEntry();
if (currentEntry != null) {
public final AutoCloseableRawKeyValue<RAW> next() {
if (hasNext()) {
AutoCloseableRawKeyValue<RAW> entry = new AutoCloseableRawKeyValue<>(currentEntry);
this.previousKeyValue = currentEntry.get();
rocksDBIterator.get().next();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rocksDbIterator.get().next() can also be moved to setCurrentEntry(), as we are getting next entry, updating hasNext flag together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no we cannot do that. What about for the first entry. You shouldn't call next for the first entry in the iterator

return currentEntry;
releaseEntry();
return entry;
}
throw new NoSuchElementException("RocksDB Store has no more elements");
}
Expand All @@ -115,7 +141,7 @@ public final void seekToFirst() {
} else {
seek0(prefix);
}
setCurrentEntry();
releaseEntry();
}

@Override
Expand All @@ -125,23 +151,28 @@ public final void seekToLast() {
} else {
throw new UnsupportedOperationException("seekToLast: prefix != null");
}
setCurrentEntry();
releaseEntry();
}

@Override
public final Table.KeyValue<RAW, RAW> seek(RAW key) {
public final AutoCloseableRawKeyValue<RAW> seek(RAW key) {
seek0(key);
releaseEntry();
setCurrentEntry();
return currentEntry;
// Current entry should be only closed when the next() and thus closing the returned entry should be a noop.
if (hasNext()) {
return new AutoCloseableRawKeyValue<>(currentEntry);
}
return null;
}

@Override
public final void removeFromDB() throws IOException {
if (rocksDBTable == null) {
throw new UnsupportedOperationException("remove");
}
if (currentEntry != null) {
delete(currentEntry.getKey());
if (previousKeyValue != null) {
delete(previousKeyValue.getKey());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since being reference counted, if after release, this is called can have issue. Need keep reference Counted only, and throw exception if its closed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RemoveFromDB should not be called after release. That is the whole point of returning a CloseableValue. Even getKey() and getValue() won't be valid.

} else {
LOG.info("Failed to removeFromDB: currentEntry == null");
}
Expand All @@ -150,5 +181,21 @@ public final void removeFromDB() throws IOException {
@Override
public void close() {
rocksDBIterator.close();
closed = true;
releaseEntry();
}

public static final class AutoCloseableRawKeyValue<RAW> extends RawKeyValue<RAW> implements AutoCloseable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can rename class as CloseableRawKeyValue, can remove Auto keyword

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closeable is for specifically throwing IOExceptions. AutoCloseable is the base interface. Since we are not throwing any exceptions we should call it AutoCloseableRawKeyValue ideally

private final UncheckedAutoCloseableSupplier<RawKeyValue<RAW>> keyValue;

private AutoCloseableRawKeyValue(ReferenceCountedObject<RawKeyValue<RAW>> kv) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This itself can be RefrenceCounted AutoClosable class, instead of wrapping at caller.

  • retain() --> add reference count
  • release()/close() --> release reference count and when "0", close the object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we should only return the autoCloseable value and not the reference counted object itself if that is what you mean

super(kv.get().getKey(), kv.get().getValue());
this.keyValue = kv.retainAndReleaseOnClose();
}

@Override
public void close() {
keyValue.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.ratis.util.ReferenceCountedObject;

/**
* RocksDB store iterator using the byte[] API.
Expand All @@ -33,14 +34,10 @@ class RDBStoreByteArrayIterator extends RDBStoreAbstractIterator<byte[]> {
}

@Override
byte[] key() {
return getRocksDBIterator().get().key();
}

@Override
Table.KeyValue<byte[], byte[]> getKeyValue() {
ReferenceCountedObject<RawKeyValue<byte[]>> getKeyValue() {
final ManagedRocksIterator i = getRocksDBIterator();
return RawKeyValue.create(i.get().key(), i.get().value());
RawKeyValue<byte[]> rawKV = RawKeyValue.create(i.get().key(), i.get().value());
return ReferenceCountedObject.wrap(rawKV);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,105 @@
package org.apache.hadoop.hdds.utils.db;

import java.io.IOException;
import java.util.Deque;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.commons.lang3.exception.UncheckedInterruptedException;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.hadoop.util.Sets;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ReferenceCountedObject;

/**
* Implement {@link RDBStoreAbstractIterator} using {@link CodecBuffer}.
*/
* An implementation of {@link RDBStoreAbstractIterator} that uses {@link CodecBuffer}
* for efficient memory management when iterating over RocksDB entries.
* This iterator employs a buffer pooling strategy to minimize memory allocations
* during iteration. Key and value buffers are pre-allocated and reused through a
* reference-counting mechanism, which significantly reduces GC pressure when
* processing large datasets.
* Key features:
**/
class RDBStoreCodecBufferIterator extends RDBStoreAbstractIterator<CodecBuffer> {

private final Buffer keyBuffer;
private final Buffer valueBuffer;
private final AtomicBoolean closed = new AtomicBoolean();
private final BlockingDeque<RawKeyValue<Buffer>> availableBufferStack;
private final Set<RawKeyValue<Buffer>> inUseBuffers;
private final AtomicReference<Boolean> closed = new AtomicReference<>(false);

RDBStoreCodecBufferIterator(ManagedRocksIterator iterator, RDBTable table,
CodecBuffer prefix) {
CodecBuffer prefix, int maxNumberOfBuffersInMemory) {
super(iterator, table, prefix);

// We need atleast 1 buffers one for setting next value and one for sending the current value.
maxNumberOfBuffersInMemory = Math.max(1, maxNumberOfBuffersInMemory);
final String name = table != null ? table.getName() : null;
this.keyBuffer = new Buffer(
new CodecBuffer.Capacity(name + "-iterator-key", 1 << 10),
buffer -> getRocksDBIterator().get().key(buffer));
this.valueBuffer = new Buffer(
new CodecBuffer.Capacity(name + "-iterator-value", 4 << 10),
buffer -> getRocksDBIterator().get().value(buffer));
this.availableBufferStack = new LinkedBlockingDeque<>(maxNumberOfBuffersInMemory);
this.inUseBuffers = Sets.newConcurrentHashSet();
for (int i = 0; i < maxNumberOfBuffersInMemory; i++) {
Buffer keyBuffer = new Buffer(
new CodecBuffer.Capacity(name + "-iterator-key-" + i, 1 << 10),
buffer -> getRocksDBIterator().get().key(buffer));
Buffer valueBuffer = new Buffer(
new CodecBuffer.Capacity(name + "-iterator-value-" + i, 4 << 10),
buffer -> getRocksDBIterator().get().value(buffer));
availableBufferStack.add(new RawKeyValue<>(keyBuffer, valueBuffer));
}

seekToFirst();
}

void assertOpen() {
Preconditions.assertTrue(!closed.get(), "Already closed");
}

@Override
CodecBuffer key() {
private <V> V getFromDeque(BlockingDeque<V> deque, Set<V> inUseSet) {
V popped;
do {
try {
popped = deque.poll(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new UncheckedInterruptedException(e);
}
} while (popped == null);
assertOpen();
return keyBuffer.getFromDb();
inUseSet.add(popped);
return popped;
}

private ReferenceCountedObject<RawKeyValue<CodecBuffer>> getReferenceCountedBuffer(
RawKeyValue<Buffer> key, Deque<RawKeyValue<Buffer>> stack, Set<RawKeyValue<Buffer>> inUseSet,
Function<RawKeyValue<Buffer>, RawKeyValue<CodecBuffer>> transformer) {
RawKeyValue<CodecBuffer> value = transformer.apply(key);
return ReferenceCountedObject.wrap(value, () -> {
}, completelyReleased -> {
if (!completelyReleased) {
return;
}
closed.updateAndGet((prev) -> {
// If already closed the data structure should not be manipulated since the buffer would have already been
// closed.
if (!prev) {
//Entire block done inside this code block to avoid race condition with close() method.
//Remove from the set before adding it back to the stack. Otherwise there could be a race condition with
// #getFromDeque function.
inUseSet.remove(key);
stack.push(key);
}
return prev;
});
});
}

@Override
Table.KeyValue<CodecBuffer, CodecBuffer> getKeyValue() {
ReferenceCountedObject<RawKeyValue<CodecBuffer>> getKeyValue() {
assertOpen();
return Table.newKeyValue(key(), valueBuffer.getFromDb());
RawKeyValue<Buffer> kvBuffer = getFromDeque(availableBufferStack, inUseBuffers);
Function<RawKeyValue<Buffer>, RawKeyValue<CodecBuffer>> transformer =
kv -> new RawKeyValue<>(kv.getKey().getFromDb(), kv.getValue().getFromDb());
return getReferenceCountedBuffer(kvBuffer, availableBufferStack, inUseBuffers, transformer);
}

@Override
Expand Down Expand Up @@ -87,13 +144,26 @@ boolean startsWithPrefix(CodecBuffer key) {
return key.startsWith(prefix);
}

private <V> void release(Deque<V> valueStack, Set<V> inUseSet, Function<V, Void> releaser) {
while (!valueStack.isEmpty()) {
V popped = valueStack.pop();
releaser.apply(popped);
}
for (V inUseValue : inUseSet) {
releaser.apply(inUseValue);
}
}

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
super.close();
Optional.ofNullable(getPrefix()).ifPresent(CodecBuffer::release);
keyBuffer.release();
valueBuffer.release();
release(availableBufferStack, inUseBuffers, kv -> {
kv.getKey().release();
kv.getValue().release();
return null;
});
}
}

Expand Down
Loading