Skip to content

Conversation

@swamirishi
Copy link
Contributor

@swamirishi swamirishi commented Mar 31, 2025

What changes were proposed in this pull request?

Currently when RDBStoreCodecBufferIterator returns a keyValue to a caller the KeyValue may not be consistent and could have been modified when the next() value is invoked from the iterator. The codecBuffer returned from the first call may have been modified and thus also making this entire Iterator implementation not thread safe.

Moreover RDBStoreAbstractIterator does redundant key & value native calls multiple times, even though the entry is there in the heap memory. This could have mighty implications on the performance since every native call implies a buffer copy between the native buffer to the DirectByteBuffer provided or creating a java heap byte array.

The proposal here is to have a pool of Buffers and return a reference counted KeyValue to the caller which would be only released when all references to the codec buffers are not released. Even though the next method for the RDBStoreAbstractIterator has to made synchronized but the above proposed changes would give us a huge performance bump up.

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/HDDS-12742

How was this patch tested?

Existing unit tests

@swamirishi swamirishi requested a review from szetszwo March 31, 2025 05:31
Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@swamirishi , Thanks for working not this!

... native calls multiple times, ...

I guess the native call is rocksDBIterator.get().isValid()? It is not clear from the description.

The usual way implementing an iterator is to add a boolean hasNext field and then set it when moving the iterator; see below:

diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java
index 75104a55ed..a28693c2b3 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java
@@ -38,6 +38,7 @@ abstract class RDBStoreAbstractIterator<RAW>
   private final ManagedRocksIterator rocksDBIterator;
   private final RDBTable rocksDBTable;
   private Table.KeyValue<RAW, RAW> currentEntry;
+  private boolean hasNext;
   // This is for schemas that use a fixed-length
   // prefix for each key.
   private final RAW prefix;
@@ -47,6 +48,7 @@ abstract class RDBStoreAbstractIterator<RAW>
     this.rocksDBIterator = iterator;
     this.rocksDBTable = table;
     this.prefix = prefix;
+    setHasNext(rocksDBIterator.get().isValid());
   }
 
   /** @return the key for the current entry. */
@@ -85,17 +87,22 @@ public final void forEachRemaining(
   }
 
   private void setCurrentEntry() {
-    if (rocksDBIterator.get().isValid()) {
+    final boolean isValid = rocksDBIterator.get().isValid();
+    if (isValid) {
       currentEntry = getKeyValue();
     } else {
       currentEntry = null;
     }
+    setHasNext(isValid);
+  }
+
+  private void setHasNext(boolean isValid) {
+    hasNext = isValid && (prefix == null || startsWithPrefix(key()));
   }
 
   @Override
   public final boolean hasNext() {
-    return rocksDBIterator.get().isValid() &&
-        (prefix == null || startsWithPrefix(key()));
+    return hasNext;
   }
 
   @Override

Comment on lines 101 to 121
try {
if (!seekDoneAtleastOnce) {
throw new NoSuchElementException("Cannot check iterator hasNext before seeking either to start of iter or " +
"seeking to a specific key");
}
return currentEntry != null &&
(prefix == null || startsWithPrefix(currentEntry.getKey()));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Copy link
Contributor

@szetszwo szetszwo Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not introduce new exceptions. Why the exceptions were not needed previously?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key() call from iterator is what is inefficient. In the implementation of this class it would be called from the iterator over and over again which would involve a buffer copy. In byte array it would be direct buffer to indirect buffer copy and in codec buffer it would be direct buffer to direct buffer copy which is still inefficient and unnecessary.

@adoroszlai adoroszlai changed the title HDDS-12742. RDBStoreAbstractIterator does redundant key and value native calls unnecessarily HDDS-12742. Avoid repeated native calls in RDBStoreAbstractIterator Apr 8, 2025
…KeyValue

Change-Id: Ifd24aadf501f3af1f1385ea2a43b036ddee2c5a1
@swamirishi swamirishi changed the title HDDS-12742. Avoid repeated native calls in RDBStoreAbstractIterator HDDS-12742. Make RDBStoreAbstractIterator return a reference counted KeyValue Apr 12, 2025
@swamirishi swamirishi changed the title HDDS-12742. Make RDBStoreAbstractIterator return a reference counted KeyValue HDDS-12742. Make RDBStoreAbstractIterator Return Reference-Counted KeyValues to Improve Consistency and Performance Apr 12, 2025
@swamirishi
Copy link
Contributor Author

@szetszwo I have updated the description and changed the scope of the patch here. Please check it out. I am working on benchmarking the proposed changes comparing to our existing implementation. I should have the numbers soon and will publish it here soon.

@swamirishi swamirishi requested a review from kerneltime April 12, 2025 17:37
@swamirishi
Copy link
Contributor Author

@szetszwo @kerneltime @errose28 I iterated over a table with varying key lengths, comparing the performance of each iteration. Without the patch since the deserialization has to happen synchronously whereas with this patch we can delegate deserialization across different threads. I am using 60 threads to test this out.
40Benchmark
400Benchmark
4000Benchmark
40000Benchmark

Change-Id: Ifdfafe0ec0a3467b012a0abb407bc8d5e75a88a3
@swamirishi swamirishi requested a review from errose28 April 15, 2025 18:11
Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@swamirishi , thanks for working on this! Please see the comment/question inlined.

public final Table.KeyValue<RAW, RAW> next() {
setCurrentEntry();
if (currentEntry != null) {
public final synchronized UncheckedAutoCloseableSupplier<RawKeyValue<RAW>> next() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it needs synchronized? Will an iterator object be invoked by multiple threads?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even for ConcurrentHashMap,

..., iterators are designed to be used by only one thread at a time. ...

See https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I removed the synchronized method. I will make the next set of PRs into a SplitIterator

Change-Id: I2b0ec8b91d0c9ac47b20b05665b5d31d8d486067
@swamirishi
Copy link
Contributor Author

swamirishi commented Apr 16, 2025

@szetszwo @kerneltime I compared the changes with and without the buffer stack. Allocating direct byte buffer is still a significant portion of the Iteration. For bigger keys the copy takes longer. For smaller keys we are seeing better performance.
40_10Benchmark
400_10Benchmark
4000_10Benchmark
40000_10Benchmark
40_1000Benchmark
400_1000Benchmark
4000_1000Benchmark
40000_1000Benchmark

package org.apache.hadoop.hdds.utils.db;

import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES;

import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.ratis.util.function.CheckedFunction;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Utility to benchmark Single threaded Iterator.
 */
public final class BenchmarkOmDBCodecBufferIter {
  private static final Logger LOG = LoggerFactory.getLogger(BenchmarkOmDBCodecBufferIter.class);
  private BenchmarkOmDBCodecBufferIter() {
  }

  public static void main(String[] args) throws IOException {
    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
    ozoneConfiguration.setInt(OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES, -1);
    ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS, args[0]);
    RocksDBConfiguration rocksDBConfiguration = ozoneConfiguration.getObject(RocksDBConfiguration.class);
    Long timeToLive = Long.valueOf(args[2]);
    int numberOfBuffers = Integer.valueOf(args[3]);
    long loggingThreshold = Long.parseLong(args[1]);
    ozoneConfiguration.setFromObject(rocksDBConfiguration);
    OmMetadataManagerImpl omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration, null);
    AtomicLong counter = new AtomicLong();
    long startTime = System.currentTimeMillis();
    AtomicBoolean finished = new AtomicBoolean(true);
    Queue<UncheckedAutoCloseableSupplier<RawKeyValue<CodecBuffer>>> q = new LinkedList<>();
    CheckedFunction<UncheckedAutoCloseableSupplier<RawKeyValue<CodecBuffer>>, Boolean, IOException> func = kv -> {
      long id = counter.incrementAndGet();
      q.add(kv);
      if (id % loggingThreshold == 0) {
        long time = System.currentTimeMillis();
        LOG.info("Benchmark: " + id + "\t" + (time - startTime) + "\t" + StringCodec.get().fromCodecBuffer(kv.get().getKey()));
        finished.set(time - startTime <= timeToLive);
      }
      if (q.size() > numberOfBuffers) {
        q.poll().close();
      }
      return finished.get();
    };
    RDBStore rdbStore = (RDBStore) omMetadataManager.getStore();
    RDBTable rdbTable = rdbStore.getTable("fileTable");
    RocksDatabase rocksDatabase = rdbStore.getDb();
    try (TableIterator<CodecBuffer, UncheckedAutoCloseableSupplier<RawKeyValue<CodecBuffer>>> itr =
             new RDBStoreCodecBufferIterator(rocksDatabase.newIterator(rdbTable.getColumnFamily()), rdbTable,
                 null, numberOfBuffers + 2)) {
      while (itr.hasNext()) {
        if (!func.apply(itr.next())) {
          break;
        }
      }
      while (!q.isEmpty()) {
        q.poll().close();
      }
    }
    omMetadataManager.getStore().close();
  }
}


Change-Id: I5bd5babb70126b2a59ab082e7c0c7469398817b2
Change-Id: Id34d08479c4baedc28f177e82ed9707aff481da8
Change-Id: I54f7ebeb3967947131ae7acfefa17990965f5ca4
Change-Id: Ic0e600dfa46951e048231a4e56a20bd9ce6fa423
@szetszwo
Copy link
Contributor

@swamirishi , you made a good point that we should have a buffer per thread. Then, we don't need the buffer stack. Just use a ThreadLocal buffer. No synchronization needed.

Change-Id: Ia271b1d6be0a4a526a4a65fda7d1ad6dc9cd2bd2
Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@swamirishi , thanks for the update! Please see the comments inlined

public final Table.KeyValue<RAW, RAW> next() {
setCurrentEntry();
if (currentEntry != null) {
public final UncheckedAutoCloseableSupplier<RawKeyValue<RAW>> next() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@swamirishi , The current single-threaded API is below

    TypedTable<ContainerID, String> table = ...;
    try (TableIterator<ContainerID, ? extends Table.KeyValue<ContainerID, String>> i = table.iterator()) {
      final KeyValue<ContainerID, String> keyValue = i.next();
      // process it
    }

What is the multi-threaded API in your mind? What would the above code look like?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ThreadLocal will not work. If you see the change in the algorithm, the value set here in this is for the next value in the iterator, the thread receives the prev value that is stored in the memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am planning to implement the java.util.SplitIterator for multi threaded api

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableIterator<UncheckAutoCloseableSupplier<RawKeyValue>> itr = ...;
boolean tryAdvance(Consumer<? super T> action) {
        lock.lock();
        final UncheckAutoCloseableSupplier<RawKeyValue> keyValue;
        try {
          if (itr.hasNext()) {

             keValue = itr.next();
          }


       } finally {
         lock.unlock();
      }
     try {
        // DeserialedKeyValue  = deserialize(keyValue);
       
     }  finally {
       keyValue.close();
     } 
    //process DeserialedKeyValue     
}

Above is the high level idea I have for the parallel iterator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableIterator<UncheckAutoCloseableSupplier> itr = ...;

What is "..."? How to obtain an iterator? What are the parameters?

boolean tryAdvance(Consumer<? super T> action) { ....

I guess you are not saying that all OM, recon code have to implement a SplitIterator.

My question is the API using the new iterator -- how the OM, Recon code will look like?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Table interface will have splitIterator function.


public class RDBTable {
  // This function would be part of the table interface.
 public void performOperationInParallel(byte[] prefix, Consumer<RawKeyValue<byte[]>> op) {
     TableIterator<UncheckAutoCloseableSupplier<RawKeyValue>> itr = ...;
     SplitTableIterator itr = new SplitTableIterator(itr);
     performOperationInParallel(itr, op);
  }

  public void performOperationInParallel(CodecBuffer prefix, Consumer<RawKeyValue<CodecBuffer>> op) {
     TableIterator<UncheckAutoCloseableSupplier<RawKeyValue>> itr = ...;
     SplitTableIterator itr = new SplitTableIterator(itr);
     performOperationInParallel(itr, op)
  }

  private performOperationInParallel(SplitTableIterator splittableIterator, Consumer<RawKeyValue<CodecBuffer>> op) {
   ThreadPoolExecutor pool = ...;
   pool.submit(splittableIterator.tryAdvance()); 
   .....
}
}

Just chalking out the high level idea of the interface changes in the next set of PRs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about simply passing an executor and a boolean ordered to table.iterator(..)?

    TypedTable<ContainerID, String> table = ...;
    Executor executor = ...;
    boolean ordered = true|false; // Are the returned elements preserving the original iteration order?
    try (TableIterator<ContainerID, ? extends Table.KeyValue<ContainerID, String>> I
        = table.iterator(executor, orderd)) {
      final KeyValue<ContainerID, String> keyValue = i.next();
      // process it as before
    }

The OM, Recon code should not care about the implemetation details such as how the buffers gets released.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the typed table interface will translate the consumer operation


private final Buffer keyBuffer;
private final Buffer valueBuffer;
private final Stack<RawKeyValue<Buffer>> availableBufferStack;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stack is not the best data structure in this case. We may use a BlockingQueue.

It is even better to use ThreadLoacl<Buffer> as mentioned my previous comment.

Copy link
Contributor Author

@swamirishi swamirishi Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially used a queue but I realized we may not have to allocate all buffers all the time. LIFO is better than FIFO in such cases. My buffer allocation would be minimal with a stack

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moreover ThreadLocal will end up creating too many buffers which IMO is very wasteful.

Copy link
Contributor Author

@swamirishi swamirishi Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just because we have initialized a Buffer object it doesn't mean we have allocated anything yet. It will still do a lazy allocation. Use of stack will minimize if there are not many concurrent threads in lock code block than the number of max buffers the allocation that is why it is better to use a stack. Moreover stack.pop() is a very cheap operation and the thread would spend more time in copying stuff from the rocksdb iterator to the key & value. There would be only one thread waiting for a non empty stack which should be fine. There should be no contention as well in this case ideally. We need the synchronized block only for the published subscriber model, optmize on the wait() & notify()

Copy link
Contributor Author

@swamirishi swamirishi Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having a custom blocking stack implementation I have changed it to using BlockingDeque instead which provides these functionalities. As part of this change I had to handle race conditions with the inuseBufferSet values. Since we need to handle 2 data structures consistently.
Please take a look at this and check which is lesser complex we can go ahead with whatever approach we all align with.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... LIFO is better than FIFO in such cases. ...

Why LIFO is better than FIFO?

BTW, for LIFO, we have https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingDeque.html

... My buffer allocation would be minimal with a stack

For any data structures, the allocation can be minimal since we can have lazy allocation -- allocate only when it is used.

... We need the synchronized block only for the published subscriber model, optmize on the wait() & notify()

It also makes the code complicated. Prone to a deadlock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I have made it using the BlockingDeque.

…king stack

Change-Id: I78272536972d58b76a10092bfde418a29ba8adae
Change-Id: I95dcc0d6e370329846272441dc70f4be2b671a29
Change-Id: I4db792cbf85d60f371653e7b9119d5155661c228
@szetszwo
Copy link
Contributor

ThreadLocal will not work.

It will work.

Moreover ThreadLocal will end up creating too many buffers which IMO is very wasteful.

No, it won't.

@swamirishi , Since this is a new iterator implementation, let's create a new class for it. It is quite invasive and risky to change the existing code. We should make it configurable. If we find some bugs later on, we could set the conf to use the old iterator.

Change-Id: I91605a0d5e6e9390c5a75a518d75ef8e5998c97d
@swamirishi
Copy link
Contributor Author

swamirishi commented Apr 17, 2025

ThreadLocal will not work.

It will work.

Moreover ThreadLocal will end up creating too many buffers which IMO is very wasteful.

No, it won't.

@swamirishi , Since this is a new iterator implementation, let's create a new class for it. It is quite invasive and risky to change the existing code. We should make it configurable. If we find some bugs later on, we could set the conf to use the old iterator.

@szetszwo I can make this as a separate implementation. But the downstream usages of these classes would be really messy with if else conditions spluttered around places, the idea of which I don't like. I would be still ok if we eventually want to get rid of the old implementation.

@swamirishi swamirishi requested a review from sumitagrawl April 29, 2025 16:21
@jojochuang jojochuang added the snapshot https://issues.apache.org/jira/browse/HDDS-6517 label Apr 30, 2025
Change-Id: I87dcb615ed6f21e2527074af121f75c24717562d
Change-Id: I7803634370d779ba286969398d7fab926f8cfc39
Change-Id: I00bdba29fac867207b115f27d962591dfd2e98cf
Change-Id: Ie3fcfa626255e73b8634880cc01510d2243b0f75
Change-Id: I9c47a281caf33a87255085f6493c62be417d1e0e
@swamirishi
Copy link
Contributor Author

I strongly oppose and -1 on changing the existing iterator for the new implementation since this is an invasive and risky change. We need the old iterator in case that we find bugs later.

@szetszwo I have added the change behind a flag please take a look at the change if the changes are good.

@szetszwo
Copy link
Contributor

szetszwo commented May 29, 2025

@swamirishi , thanks for the update!

Could you move all the new code to a new package? The idea is similar to java.util.Map: there are TreeMap, HashMap and many other different implementations. We should not mixing the code together. In case that there is a bug in the new implementation, it won't affect the existing implementation.

As mentioned previously, we should not change the existing iterator (except for code refactoring or extracting interfaces).

Change-Id: If200d38b9924b6556a13509e0aab233ef8e402c5
@swamirishi
Copy link
Contributor Author

swamirishi commented May 30, 2025

@swamirishi , thanks for the update!

Could you move all the new code to a new package? The idea is similar to java.util.Map: there are TreeMap, HashMap and many other different implementations. We should not mixing the code together. In case that there is a bug in the new implementation, it won't affect the existing implementation.

Done

As mentioned previously, we should not change the existing iterator (except for code refactoring or extracting interfaces).

Yup I kept the previous implementation the same.

Copy link
Contributor

@sumitagrawl sumitagrawl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have given few comments, still need review again with those fixes and other areas.

@Override
public KeyValueSpliterator<KEY, VALUE> spliterator(int maxParallelism, boolean closeOnException) {
throw new UnsupportedOperationException("Iterating tables directly is not" +
" supported for datanode containers due to differing schema " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default implementation can throw exception as not supported, no need to add for all cases.

Preconditions.checkNotNull(families);
Preconditions.checkArgument(!families.isEmpty());
this.maxDbUpdatesSizeThreshold = maxDbUpdatesSizeThreshold;
this.initializeReferenceCountedIterator = initializeReferenceCountedIterator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this name should be initializeThreadSafeIterator as being the config and used with this context.

return new RDBStoreByteArrayIterator(db.newIterator(family, false), this,
prefix);
if (initializeThreadSafeIterator) {
return new ReferenceCountedRDBStoreByteArrayIterator(db.newIterator(family, false), this, prefix);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReferenceCountedRDBStoreByteArrayIterator -- not getting weather this have any thread safety issue. generally data is read to new byte[] and not using shared Buffer in CodecBuffer, so what kind of thread safety its providing ?


BaseDBTableIterator<CodecBuffer, ? extends RawKeyValue<CodecBuffer>> iterator(
CodecBuffer prefix, int maxNumberOfBuffers) throws IOException {
if (initializeThreadSafeIterator) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, we do not need support ReferenceCountedRDBStoreCodecBufferIterator as if required performance with parallel, can use SplitIterator as provided separately


private KeyValueSpliterator<byte[], byte[]> newByteArraySpliterator(byte[] prefix, byte[] startKey,
int maxParallelism, boolean closeOnException) throws IOException {
if (initializeThreadSafeIterator) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no use of initializeThreadSafeIterator, if else are same, no need have this condition

openReadOnly, dbJmxBeanNameName, enableCompactionDag,
maxDbUpdatesSizeThreshold, createCheckpointDirs, configuration,
enableRocksDbMetrics);
enableRocksDbMetrics, rocksDBConfiguration.isThreadSafeIteratorEnabled());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not define configuration as threadsafe or not as we are not exposing SDK to be integrated to other solution. Ozone should work as thread safe in usages, may be some layer is not threadsafe.

UncheckedAutoCloseable closeable = null;
try (AutoCloseSupplier<RAW> rawKey = convert(key)) {
final KeyValue<RAW, RAW> result = rawIterator.seek(rawKey.get());
RawKeyValue<RAW> result = rawIterator.seek(rawKey.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of iterator telling keyValue as closable, RawKeyValue itself can tell as closable or its derived one.

@szetszwo
Copy link
Contributor

szetszwo commented Jun 2, 2025

@sumitagrawl , @swamirishi ,

Below are the current API methods

//Table.java
  TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator() throws IOException;
  TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator(KEY prefix) throws IOException;

The new API method should look like:

//Table.java
  TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator(KEY prefix,
      Executor executor, boolean ordered) throws IOException;

See also this comment. When executor is non-null, the new implementation will use it for parallel execution. When it is null, it will call the existing method iterator(prefix).

SplitIterator and reference count are all implementation details in the new implementation. It should not be exposed to the API. (Just like that iterator can use CodecBuffer or byte[], which are not in the API.)

If there is a need, we could add a new nextAsync() method to TableIterator for asynchronous execution at the caller side, i.e. the caller can process the elements in parallel.

//TableIterator
  CompletableFuture<T> nextAsync();

@swamirishi
Copy link
Contributor Author

I am working on breaking this PR into smaller components as suggested by @szetszwo I have created https://issues.apache.org/jira/browse/HDDS-13151 which would be just have the refactoring changes for iterators in the code base.

@swamirishi
Copy link
Contributor Author

swamirishi commented Jun 2, 2025

@sumitagrawl , @swamirishi ,

Below are the current API methods

//Table.java
  TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator() throws IOException;
  TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator(KEY prefix) throws IOException;

The new API method should look like:

//Table.java
  TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator(KEY prefix,
      Executor executor, boolean ordered) throws IOException;

See also this comment. The existing methods should just pass null for executor and true for ordered to the new method. When executor is non-null, the new implementation will use it for parallel execution. When it is null, it will use the existing (single-thread) implementation.

SplitIterator and reference count are all implementation detail in the new implementation. It should not be exposed to the API. (Just like that iterator can use CodecBuffer or byte[], which are not in the API.)

If there is a need, we could add a new nextAsync() method to TableIterator for asynchronous execution at the caller side, i.e. the caller can process the elements in parallel.

//TableIterator
  CompletableFuture<T> nextAsync();

@szetszwo I don't like the idea of having a parallel iterator implementation which doesn't follow the already defined interface java.util.Spliterator. Any implementation of Spliterator would give the downstream api users the flexibility to use the object in the way they deem fit may be wanting to submit the spliterator to a forkjoin pool to do a parallel iteration using multiple threads. If you look at this implementation of trySplit in the follow up PR where we are splitting this iteration over multiple rocksdb iterators. The spliterator model really fits our usecase pretty well having an adhoc method within the TableIterator doesn't really make sense to me. I like the fact that you want to have the interface portable behind the flag so that we could turn off parallel iteration feature in case of a bug in the implementation. We can achieve the same by returning single threaded thread safe iterator wrapped around a spliterator that should not be a big deal. I would be open to other people's opinions on this thread as well thereby helping us finalize on an approach.
cc: @kerneltime @sumitagrawl

@szetszwo
Copy link
Contributor

szetszwo commented Jun 2, 2025

... Any implementation of Spliterator would give the downstream api users the flexibility to use the object in the way they deem fit may be wanting to submit the spliterator to a forkjoin pool to do a parallel iteration using multiple threads. ...

The point here is to minimize the change to the existing code. The new parallel iterator currently is an experimental feature. Once the parallel iterator has become stable in the future, we may add Spliterator to the API if there is a need.

@swamirishi
Copy link
Contributor Author

swamirishi commented Jun 2, 2025

... Any implementation of Spliterator would give the downstream api users the flexibility to use the object in the way they deem fit may be wanting to submit the spliterator to a forkjoin pool to do a parallel iteration using multiple threads. ...

The point here is to minimize the change to the existing code. The new parallel iterator currently is an experimental feature. Once the parallel iterator has become stable in the future, we may add Spliterator to the API if there is a need.

Why can't we wrap the iterator inside a spliterator and then return it. Even if the api tries to use it as a spliterator it would still be an iterator inside a synchronized spliterator. Something like this, when the flag is disabled we should be just returning a SynchronizedSpliterator which doesn't have much logic here. Do you see any risk if we do this @szetszwo
This is just a rough implementation.

import java.util.Iterator;
import java.util.Spliterator;
import java.util.function.Consumer;

public class SynchronizedSpliterator<T> implements Spliterator<T> {

    private final Iterator<T> iterator;

    public SynchronizedSpliterator(Iterator<T> iterator) {
        this.iterator = iterator;
    }

    @Override
    public synchronized boolean tryAdvance(Consumer<? super T> action) {
        if (iterator.hasNext()) {
            action.accept(iterator.next());
            return true;
        }
        return false;
    }

    @Override
    public Spliterator<T> trySplit() {
        return null; // No splitting support
    }

    @Override
    public long estimateSize() {
        return Long.MAX_VALUE; // Unknown size
    }

    @Override
    public int characteristics() {
        return ORDERED;
    }
}

@szetszwo
Copy link
Contributor

szetszwo commented Jun 3, 2025

... Do you see any risk if we do this ...

@swamirishi , yes.

     public synchronized boolean tryAdvance(Consumer<? super T> action) {

Are you sure that it is correct to have synchronized in tryAdvance? The Java subclass implementations do not have synchronized.

Moreover, the change looks simple today but it may require more changes tomorrow (Just like this PR, it keeps growing unexpectedly.) Let's have a solid implementation before making a bigger change to the existing code. Spliterator is just for convenience but not any real improvement. Of course, the new implementation may still use it internally.

Change-Id: Ifcb4c261b879c24752cf0e7535a1213d9790a2c0
@swamirishi swamirishi marked this pull request as draft June 9, 2025 14:04
Change-Id: I4e898813fd60b834ebc9e4e0a39a2a8f36f486ce
Change-Id: Ib7c82cd9989c1fc79e67b10a66d95d58a067e799
* used as part of TypedTable as it's underlying implementation to access the
* metadata store content. All other user's using Table should use TypedTable.
*/
@InterfaceAudience.Private

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this is still Private

@github-actions
Copy link

This PR has been marked as stale due to 21 days of inactivity. Please comment or remove the stale label to keep it open. Otherwise, it will be automatically closed in 7 days.

@github-actions github-actions bot added the stale label Nov 12, 2025
@github-actions
Copy link

Thank you for your contribution. This PR is being closed due to inactivity. If needed, feel free to reopen it.

@github-actions github-actions bot closed this Nov 19, 2025
@szetszwo
Copy link
Contributor

The SynchronizedSpliterator approach is inefficient -- Spliterator is designed to split a collection into multiple parts and then iterate the individual parts in parallel.

For example, suppose there is a list with 10,000,000 elements and it is going to be processed by 100 threads.

  • Spliterator is designed to do the split first, say splitting it into 1000 parts with 10,000 elements each. Then, the 100 threads only have to be synchronized 1000 times for picking up the individual parts.
  • For SynchronizedSpliterator, since it has synchronized at tryAdvance(..), the 100 threads have to be synchronized 10,000,000 times for picking up the individual elements.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants