Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
30facb9
HDDS-12742. Make RDBStoreAbstractIterator return a reference counted …
swamirishi Apr 12, 2025
aab6ebc
HDDS-12742. Remove log
swamirishi Apr 14, 2025
5dd7460
HDDS-12742. Remove synchronized
swamirishi Apr 15, 2025
e4bf8f7
HDDS-12742. Fix seek
swamirishi Apr 16, 2025
9ab91c9
HDDS-12742. Add test case
swamirishi Apr 16, 2025
abe06ab
HDDS-12742. Fix iter
swamirishi Apr 16, 2025
d215d4b
HDDS-12742. Fix NPE
swamirishi Apr 16, 2025
85c74b4
HDDS-12742. Fix NPE
swamirishi Apr 16, 2025
7d82a9e
HDDS-12742. Add Blocking Deque instead custom implementation of a blc…
swamirishi Apr 16, 2025
ce6ab81
HDDS-12742. Fix checkstyle
swamirishi Apr 16, 2025
091e8ca
HDDS-12742. Fix test cases
swamirishi Apr 17, 2025
6bc5c86
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Apr 17, 2025
1b394c2
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi May 2, 2025
7fdced2
HDDS-12742. Add Spliterator
swamirishi May 4, 2025
f5b633b
HDDS-12742. Fix Spliterator
swamirishi May 5, 2025
1971a96
Update hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/uti…
swamirishi May 5, 2025
a33f265
HDDS-12742. Make concurrent hash set
swamirishi May 6, 2025
0c3ae4f
Merge remote-tracking branch 'origin/HDDS-12742' into HEAD
swamirishi May 6, 2025
fbe213b
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi May 6, 2025
71f9c28
HDDS-12742. Fix checkstyle
swamirishi May 6, 2025
5734027
HDDS-12742. Fix pmd
swamirishi May 6, 2025
99c508e
HDDS-12742. Fix checkstyle
swamirishi May 6, 2025
f16e1b9
HDDS-12742. Fix max buffer definition
swamirishi May 13, 2025
86fe101
HDDS-12742. Add tests
swamirishi May 18, 2025
cfde81f
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi May 18, 2025
f825754
HDDS-12742. Fix exception handling for memory leaks
swamirishi May 21, 2025
e4155a6
HDDS-12742. Make spliterator an interface parameter
swamirishi May 23, 2025
985d612
HDDS-12742. Fix build
swamirishi May 27, 2025
b98968b
HDDS-12742. Fix findbugs
swamirishi May 27, 2025
78d33af
HDDS-12742. Convert to Named Class from anonymous class of RawSpliter…
swamirishi May 28, 2025
ed632a8
HDDS-12742. Fix checkstyle
swamirishi May 28, 2025
58f81cc
HDDS-12742. Move Buffer class out to upperlevel
swamirishi May 28, 2025
3f77cbe
HDDS-12742. Remove reference counted iterators
swamirishi May 28, 2025
1fecc85
HDDS-12742. Implement base class for RawIterator
swamirishi May 29, 2025
deb0011
HDDS-12742. Fix test class
swamirishi May 29, 2025
346a685
HDDS-12742. Fix test class
swamirishi May 29, 2025
f6b651c
HDDS-12742. Refactor class
swamirishi May 29, 2025
93c7f91
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Jun 9, 2025
f81c116
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Jun 12, 2025
7b37a29
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.NoSuchElementException;
import java.util.function.Consumer;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,30 +32,32 @@
* @param <RAW> the raw type.
*/
abstract class RDBStoreAbstractIterator<RAW>
implements TableIterator<RAW, Table.KeyValue<RAW, RAW>> {
implements TableIterator<RAW, UncheckedAutoCloseableSupplier<RawKeyValue<RAW>>> {

private static final Logger LOG =
LoggerFactory.getLogger(RDBStoreAbstractIterator.class);

private final ManagedRocksIterator rocksDBIterator;
private final RDBTable rocksDBTable;
private Table.KeyValue<RAW, RAW> currentEntry;
private ReferenceCountedObject<RawKeyValue<RAW>> currentEntry;
// This is for schemas that use a fixed-length
// prefix for each key.
private final RAW prefix;
private boolean hasNext;
private boolean closed;

RDBStoreAbstractIterator(ManagedRocksIterator iterator, RDBTable table,
RAW prefix) {
this.rocksDBIterator = iterator;
this.rocksDBTable = table;
this.prefix = prefix;
this.currentEntry = null;
this.hasNext = false;
this.closed = false;
}

/** @return the key for the current entry. */
abstract RAW key();

/** @return the {@link Table.KeyValue} for the current entry. */
abstract Table.KeyValue<RAW, RAW> getKeyValue();
abstract ReferenceCountedObject<RawKeyValue<RAW>> getKeyValue();

/** Seek to the given key. */
abstract void seek0(RAW key);
Expand All @@ -78,38 +82,50 @@ final RAW getPrefix() {

@Override
public final void forEachRemaining(
Consumer<? super Table.KeyValue<RAW, RAW>> action) {
Consumer<? super UncheckedAutoCloseableSupplier<RawKeyValue<RAW>>> action) {
while (hasNext()) {
action.accept(next());
UncheckedAutoCloseableSupplier<RawKeyValue<RAW>> entry = next();
action.accept(entry);
}
}

private void setCurrentEntry() {
if (rocksDBIterator.get().isValid()) {
if (currentEntry != null) {
currentEntry.release();
}

boolean isValid = !closed && rocksDBIterator.get().isValid();
if (isValid) {
currentEntry = getKeyValue();
currentEntry.retain();
} else {
currentEntry = null;
}
setHasNext(isValid, currentEntry);
}

public void setHasNext(boolean isValid, ReferenceCountedObject<RawKeyValue<RAW>> entry) {
this.hasNext = isValid && (prefix == null || startsWithPrefix(entry.get().getKey()));
}

@Override
public final boolean hasNext() {
return rocksDBIterator.get().isValid() &&
(prefix == null || startsWithPrefix(key()));
return hasNext;
}

@Override
public final Table.KeyValue<RAW, RAW> next() {
setCurrentEntry();
if (currentEntry != null) {
public final synchronized UncheckedAutoCloseableSupplier<RawKeyValue<RAW>> next() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it needs synchronized? Will an iterator object be invoked by multiple threads?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even for ConcurrentHashMap,

..., iterators are designed to be used by only one thread at a time. ...

See https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I removed the synchronized method. I will make the next set of PRs into a SplitIterator

if (hasNext()) {
UncheckedAutoCloseableSupplier<RawKeyValue<RAW>> entry = currentEntry.retainAndReleaseOnClose();
rocksDBIterator.get().next();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rocksDbIterator.get().next() can also be moved to setCurrentEntry(), as we are getting next entry, updating hasNext flag together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no we cannot do that. What about for the first entry. You shouldn't call next for the first entry in the iterator

return currentEntry;
setCurrentEntry();
return entry;
}
throw new NoSuchElementException("RocksDB Store has no more elements");
}

@Override
public final void seekToFirst() {
public final synchronized void seekToFirst() {
if (prefix == null) {
rocksDBIterator.get().seekToFirst();
} else {
Expand All @@ -119,7 +135,7 @@ public final void seekToFirst() {
}

@Override
public final void seekToLast() {
public final synchronized void seekToLast() {
if (prefix == null) {
rocksDBIterator.get().seekToLast();
} else {
Expand All @@ -129,26 +145,29 @@ public final void seekToLast() {
}

@Override
public final Table.KeyValue<RAW, RAW> seek(RAW key) {
public final synchronized UncheckedAutoCloseableSupplier<RawKeyValue<RAW>> seek(RAW key) {
seek0(key);
setCurrentEntry();
return currentEntry;
// Current entry should be only closed when the next() and thus closing the returned entry should be a noop.
return currentEntry.retainAndReleaseOnClose();
}

@Override
public final void removeFromDB() throws IOException {
public final synchronized void removeFromDB() throws IOException {
if (rocksDBTable == null) {
throw new UnsupportedOperationException("remove");
}
if (currentEntry != null) {
delete(currentEntry.getKey());
if (currentEntry.get() != null) {
delete(currentEntry.get().getKey());
} else {
LOG.info("Failed to removeFromDB: currentEntry == null");
}
}

@Override
public void close() {
public synchronized void close() {
rocksDBIterator.close();
closed = true;
setCurrentEntry();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.ratis.util.ReferenceCountedObject;

/**
* RocksDB store iterator using the byte[] API.
Expand All @@ -33,14 +34,10 @@ class RDBStoreByteArrayIterator extends RDBStoreAbstractIterator<byte[]> {
}

@Override
byte[] key() {
return getRocksDBIterator().get().key();
}

@Override
Table.KeyValue<byte[], byte[]> getKeyValue() {
ReferenceCountedObject<RawKeyValue<byte[]>> getKeyValue() {
final ManagedRocksIterator i = getRocksDBIterator();
return RawKeyValue.create(i.get().key(), i.get().value());
RawKeyValue<byte[]> rawKV = RawKeyValue.create(i.get().key(), i.get().value());
return ReferenceCountedObject.wrap(rawKV);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,21 @@
package org.apache.hadoop.hdds.utils.db;

import java.io.IOException;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.Stack;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.commons.lang3.exception.UncheckedInterruptedException;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ReferenceCountedObject;

/**
* Implement {@link RDBStoreAbstractIterator} using {@link CodecBuffer}.
* Any Key or Value returned will be only valid within the lifecycle of this iterator.
*/
class RDBStoreCodecBufferIterator
extends RDBStoreAbstractIterator<CodecBuffer> {
Expand Down Expand Up @@ -84,38 +92,77 @@ CodecBuffer getFromDb() {
}
}

private final Buffer keyBuffer;
private final Buffer valueBuffer;
private final Stack<RawKeyValue<Buffer>> availableBufferStack;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stack is not the best data structure in this case. We may use a BlockingQueue.

It is even better to use ThreadLoacl<Buffer> as mentioned my previous comment.

Copy link
Contributor Author

@swamirishi swamirishi Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially used a queue but I realized we may not have to allocate all buffers all the time. LIFO is better than FIFO in such cases. My buffer allocation would be minimal with a stack

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moreover ThreadLocal will end up creating too many buffers which IMO is very wasteful.

Copy link
Contributor Author

@swamirishi swamirishi Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just because we have initialized a Buffer object it doesn't mean we have allocated anything yet. It will still do a lazy allocation. Use of stack will minimize if there are not many concurrent threads in lock code block than the number of max buffers the allocation that is why it is better to use a stack. Moreover stack.pop() is a very cheap operation and the thread would spend more time in copying stuff from the rocksdb iterator to the key & value. There would be only one thread waiting for a non empty stack which should be fine. There should be no contention as well in this case ideally. We need the synchronized block only for the published subscriber model, optmize on the wait() & notify()

Copy link
Contributor Author

@swamirishi swamirishi Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having a custom blocking stack implementation I have changed it to using BlockingDeque instead which provides these functionalities. As part of this change I had to handle race conditions with the inuseBufferSet values. Since we need to handle 2 data structures consistently.
Please take a look at this and check which is lesser complex we can go ahead with whatever approach we all align with.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... LIFO is better than FIFO in such cases. ...

Why LIFO is better than FIFO?

BTW, for LIFO, we have https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingDeque.html

... My buffer allocation would be minimal with a stack

For any data structures, the allocation can be minimal since we can have lazy allocation -- allocate only when it is used.

... We need the synchronized block only for the published subscriber model, optmize on the wait() & notify()

It also makes the code complicated. Prone to a deadlock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I have made it using the BlockingDeque.

private final Set<RawKeyValue<Buffer>> inUseBuffers;
private final Object bufferLock;
private final AtomicBoolean closed = new AtomicBoolean();

RDBStoreCodecBufferIterator(ManagedRocksIterator iterator, RDBTable table,
CodecBuffer prefix) {
CodecBuffer prefix, int maxNumberOfBuffersInMemory) {
super(iterator, table, prefix);

// We need atleast 2 buffers one for setting next value and one for sending the current value.
maxNumberOfBuffersInMemory = Math.max(2, maxNumberOfBuffersInMemory);
final String name = table != null ? table.getName() : null;
this.keyBuffer = new Buffer(
new CodecBuffer.Capacity(name + "-iterator-key", 1 << 10),
buffer -> getRocksDBIterator().get().key(buffer));
this.valueBuffer = new Buffer(
new CodecBuffer.Capacity(name + "-iterator-value", 4 << 10),
buffer -> getRocksDBIterator().get().value(buffer));
this.availableBufferStack = new Stack<>();
this.inUseBuffers = new HashSet<>();
for (int i = 0; i < maxNumberOfBuffersInMemory; i++) {
Buffer keyBuffer = new Buffer(
new CodecBuffer.Capacity(name + "-iterator-key-" + i, 1 << 10),
buffer -> getRocksDBIterator().get().key(buffer));
Buffer valueBuffer = new Buffer(
new CodecBuffer.Capacity(name + "-iterator-value-" + i, 4 << 10),
buffer -> getRocksDBIterator().get().value(buffer));
availableBufferStack.push(new RawKeyValue<>(keyBuffer, valueBuffer));
}

this.bufferLock = new Object();
seekToFirst();
}

void assertOpen() {
Preconditions.assertTrue(!closed.get(), "Already closed");
}

@Override
CodecBuffer key() {
assertOpen();
return keyBuffer.getFromDb();
private <V> V getFromStack(Object lock, Stack<V> stack, Set<V> inUseSet) {
synchronized (Objects.requireNonNull(lock)) {
while (stack.isEmpty()) {
try {
assertOpen();
lock.wait(1000);
} catch (InterruptedException e) {
throw new UncheckedInterruptedException(e);
}
}
V popped = stack.pop();
inUseSet.add(popped);
return popped;
}
}

private ReferenceCountedObject<RawKeyValue<CodecBuffer>> getReferenceCountedBuffer(
RawKeyValue<Buffer> key, Stack<RawKeyValue<Buffer>> stack, Set<RawKeyValue<Buffer>> inUseSet,
Object lock, Function<RawKeyValue<Buffer>, RawKeyValue<CodecBuffer>> transformer) {
RawKeyValue<CodecBuffer> value = transformer.apply(key);
return ReferenceCountedObject.wrap(value, () -> {
}, completelyReleased -> {
if (!completelyReleased) {
return;
}
synchronized (lock) {
stack.push(key);
inUseSet.remove(key);
lock.notify();
}
});
}

@Override
Table.KeyValue<CodecBuffer, CodecBuffer> getKeyValue() {
ReferenceCountedObject<RawKeyValue<CodecBuffer>> getKeyValue() {
assertOpen();
return Table.newKeyValue(key(), valueBuffer.getFromDb());
RawKeyValue<Buffer> kvBuffer = getFromStack(bufferLock, availableBufferStack, inUseBuffers);
Function<RawKeyValue<Buffer>, RawKeyValue<CodecBuffer>> transformer =
kv -> new RawKeyValue<>(kv.getKey().getFromDb(), kv.getValue().getFromDb());
return getReferenceCountedBuffer(kvBuffer, availableBufferStack, inUseBuffers, bufferLock, transformer);
}

@Override
Expand Down Expand Up @@ -143,13 +190,29 @@ boolean startsWithPrefix(CodecBuffer key) {
return key.startsWith(prefix);
}

private <V> void release(Stack<V> valueStack, Set<V> inUseSet, Object lock, Function<V, Void> releaser) {
synchronized (Objects.requireNonNull(lock)) {
while (!valueStack.isEmpty()) {
V popped = valueStack.pop();
releaser.apply(popped);
}

for (V inUseValue : inUseSet) {
releaser.apply(inUseValue);
}
}
}

@Override
public void close() {
public synchronized void close() {
if (closed.compareAndSet(false, true)) {
super.close();
Optional.ofNullable(getPrefix()).ifPresent(CodecBuffer::release);
keyBuffer.release();
valueBuffer.release();
release(availableBufferStack, inUseBuffers, bufferLock, kv -> {
kv.getKey().release();
kv.getValue().release();
return null;
});
}
}
}
Loading