Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
30facb9
HDDS-12742. Make RDBStoreAbstractIterator return a reference counted …
swamirishi Apr 12, 2025
aab6ebc
HDDS-12742. Remove log
swamirishi Apr 14, 2025
5dd7460
HDDS-12742. Remove synchronized
swamirishi Apr 15, 2025
e4bf8f7
HDDS-12742. Fix seek
swamirishi Apr 16, 2025
9ab91c9
HDDS-12742. Add test case
swamirishi Apr 16, 2025
abe06ab
HDDS-12742. Fix iter
swamirishi Apr 16, 2025
d215d4b
HDDS-12742. Fix NPE
swamirishi Apr 16, 2025
85c74b4
HDDS-12742. Fix NPE
swamirishi Apr 16, 2025
7d82a9e
HDDS-12742. Add Blocking Deque instead custom implementation of a blc…
swamirishi Apr 16, 2025
ce6ab81
HDDS-12742. Fix checkstyle
swamirishi Apr 16, 2025
091e8ca
HDDS-12742. Fix test cases
swamirishi Apr 17, 2025
6bc5c86
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Apr 17, 2025
1b394c2
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi May 2, 2025
7fdced2
HDDS-12742. Add Spliterator
swamirishi May 4, 2025
f5b633b
HDDS-12742. Fix Spliterator
swamirishi May 5, 2025
1971a96
Update hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/uti…
swamirishi May 5, 2025
a33f265
HDDS-12742. Make concurrent hash set
swamirishi May 6, 2025
0c3ae4f
Merge remote-tracking branch 'origin/HDDS-12742' into HEAD
swamirishi May 6, 2025
fbe213b
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi May 6, 2025
71f9c28
HDDS-12742. Fix checkstyle
swamirishi May 6, 2025
5734027
HDDS-12742. Fix pmd
swamirishi May 6, 2025
99c508e
HDDS-12742. Fix checkstyle
swamirishi May 6, 2025
f16e1b9
HDDS-12742. Fix max buffer definition
swamirishi May 13, 2025
86fe101
HDDS-12742. Add tests
swamirishi May 18, 2025
cfde81f
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi May 18, 2025
f825754
HDDS-12742. Fix exception handling for memory leaks
swamirishi May 21, 2025
e4155a6
HDDS-12742. Make spliterator an interface parameter
swamirishi May 23, 2025
985d612
HDDS-12742. Fix build
swamirishi May 27, 2025
b98968b
HDDS-12742. Fix findbugs
swamirishi May 27, 2025
78d33af
HDDS-12742. Convert to Named Class from anonymous class of RawSpliter…
swamirishi May 28, 2025
ed632a8
HDDS-12742. Fix checkstyle
swamirishi May 28, 2025
58f81cc
HDDS-12742. Move Buffer class out to upperlevel
swamirishi May 28, 2025
3f77cbe
HDDS-12742. Remove reference counted iterators
swamirishi May 28, 2025
1fecc85
HDDS-12742. Implement base class for RawIterator
swamirishi May 29, 2025
deb0011
HDDS-12742. Fix test class
swamirishi May 29, 2025
346a685
HDDS-12742. Fix test class
swamirishi May 29, 2025
f6b651c
HDDS-12742. Refactor class
swamirishi May 29, 2025
93c7f91
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Jun 9, 2025
f81c116
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Jun 12, 2025
7b37a29
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@
* @param <E> The exception type this function may throw.
*/
@FunctionalInterface
interface PutToByteBuffer<E extends Exception>
public interface PutToByteBuffer<E extends Exception>
extends CheckedFunction<ByteBuffer, Integer, E> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,21 @@ public void deleteWithBatch(BatchOperation batch, KEY key)
"version.");
}

@Override
public KeyValueSpliterator<KEY, VALUE> spliterator(int maxParallelism, boolean closeOnException) {
throw new UnsupportedOperationException("Iterating tables directly is not" +
" supported for datanode containers due to differing schema " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default implementation can throw exception as not supported, no need to add for all cases.

"version.");
}

@Override
public KeyValueSpliterator<KEY, VALUE> spliterator(KEY startKey, KEY prefix, int maxParallelism,
boolean closeOnException) {
throw new UnsupportedOperationException("Iterating tables directly is not" +
" supported for datanode containers due to differing schema " +
"version.");
}

@Override
public String getName() {
return table.getName();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.utils.db;

import org.apache.ratis.util.Preconditions;

/**
* A utility class for managing an underlying {@link CodecBuffer} with dynamic capacity adjustment
* based on the requirements of a data source. This class encapsulates operations to allocate,
* prepare, and release the buffer, as well as retrieve data from a source.
*/
public class Buffer {
private final CodecBuffer.Capacity initialCapacity;
private final PutToByteBuffer<RuntimeException> source;
private CodecBuffer buffer;

public Buffer(CodecBuffer.Capacity initialCapacity,
PutToByteBuffer<RuntimeException> source) {
this.initialCapacity = initialCapacity;
this.source = source;
}

public void release() {
if (buffer != null) {
buffer.release();
}
}

public void prepare() {
if (buffer == null) {
allocate();
} else {
buffer.clear();
}
}

public void allocate() {
if (buffer != null) {
buffer.release();
}
buffer = CodecBuffer.allocateDirect(-initialCapacity.get());
}

public CodecBuffer getFromDb() {
for (prepare(); ; allocate()) {
final Integer required = buffer.putFromSource(source);
if (required == null) {
return null; // the source is unavailable
} else if (required == buffer.readableBytes()) {
return buffer; // buffer size is big enough
}
// buffer size too small, try increasing the capacity.
if (buffer.setCapacity(required)) {
buffer.clear();
// retry with the new capacity
final int retried = buffer.putFromSource(source);
Preconditions.assertSame(required.intValue(), retried, "required");
return buffer;
}

// failed to increase the capacity
// increase initial capacity and reallocate it
initialCapacity.increase(required);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public RDBStore build() throws IOException {
return new RDBStore(dbFile, rocksDBOption, statistics, writeOptions, tableConfigs,
openReadOnly, dbJmxBeanNameName, enableCompactionDag,
maxDbUpdatesSizeThreshold, createCheckpointDirs, configuration,
enableRocksDbMetrics);
enableRocksDbMetrics, rocksDBConfiguration.isThreadSafeIteratorEnabled());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not define configuration as threadsafe or not as we are not exposing SDK to be integrated to other solution. Ozone should work as thread safe in usages, may be some layer is not threadsafe.

} finally {
tableConfigs.forEach(TableConfig::close);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class RDBStore implements DBStore {
private final long maxDbUpdatesSizeThreshold;
private final ManagedDBOptions dbOptions;
private final ManagedStatistics statistics;
private final boolean initializeReferenceCountedIterator;

@SuppressWarnings("parameternumber")
RDBStore(File dbFile, ManagedDBOptions dbOptions, ManagedStatistics statistics,
Expand All @@ -82,13 +83,14 @@ public class RDBStore implements DBStore {
long maxDbUpdatesSizeThreshold,
boolean createCheckpointDirs,
ConfigurationSource configuration,
boolean enableRocksDBMetrics)
boolean enableRocksDBMetrics, boolean initializeReferenceCountedIterator)

throws IOException {
Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
Preconditions.checkNotNull(families);
Preconditions.checkArgument(!families.isEmpty());
this.maxDbUpdatesSizeThreshold = maxDbUpdatesSizeThreshold;
this.initializeReferenceCountedIterator = initializeReferenceCountedIterator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this name should be initializeThreadSafeIterator as being the config and used with this context.

dbLocation = dbFile;
this.dbOptions = dbOptions;
this.statistics = statistics;
Expand Down Expand Up @@ -308,7 +310,7 @@ public RDBTable getTable(String name) throws IOException {
if (handle == null) {
throw new IOException("No such table in this DB. TableName : " + name);
}
return new RDBTable(this.db, handle, rdbMetrics);
return new RDBTable(this.db, handle, rdbMetrics, initializeReferenceCountedIterator);
}

@Override
Expand All @@ -321,7 +323,7 @@ public <K, V> TypedTable<K, V> getTable(
public ArrayList<Table> listTables() {
ArrayList<Table> returnList = new ArrayList<>();
for (ColumnFamily family : getColumnFamilies()) {
returnList.add(new RDBTable(db, family, rdbMetrics));
returnList.add(new RDBTable(db, family, rdbMetrics, initializeReferenceCountedIterator));
}
return returnList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.function.Consumer;
import org.apache.hadoop.hdds.utils.db.iterator.BaseDBTableIterator;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,14 +31,14 @@
* @param <RAW> the raw type.
*/
abstract class RDBStoreAbstractIterator<RAW>
implements TableIterator<RAW, Table.KeyValue<RAW, RAW>> {
implements BaseDBTableIterator<RAW, RawKeyValue<RAW>> {

private static final Logger LOG =
LoggerFactory.getLogger(RDBStoreAbstractIterator.class);

private final ManagedRocksIterator rocksDBIterator;
private final RDBTable rocksDBTable;
private Table.KeyValue<RAW, RAW> currentEntry;
private RawKeyValue<RAW> currentEntry;
// This is for schemas that use a fixed-length
// prefix for each key.
private final RAW prefix;
Expand All @@ -53,7 +54,7 @@ abstract class RDBStoreAbstractIterator<RAW>
abstract RAW key();

/** @return the {@link Table.KeyValue} for the current entry. */
abstract Table.KeyValue<RAW, RAW> getKeyValue();
abstract RawKeyValue<RAW> getKeyValue();

/** Seek to the given key. */
abstract void seek0(RAW key);
Expand All @@ -78,7 +79,7 @@ final RAW getPrefix() {

@Override
public final void forEachRemaining(
Consumer<? super Table.KeyValue<RAW, RAW>> action) {
Consumer<? super RawKeyValue<RAW>> action) {
while (hasNext()) {
action.accept(next());
}
Expand All @@ -99,7 +100,7 @@ public final boolean hasNext() {
}

@Override
public final Table.KeyValue<RAW, RAW> next() {
public final RawKeyValue<RAW> next() {
setCurrentEntry();
if (currentEntry != null) {
rocksDBIterator.get().next();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rocksDbIterator.get().next() can also be moved to setCurrentEntry(), as we are getting next entry, updating hasNext flag together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no we cannot do that. What about for the first entry. You shouldn't call next for the first entry in the iterator

Expand Down Expand Up @@ -129,7 +130,7 @@ public final void seekToLast() {
}

@Override
public final Table.KeyValue<RAW, RAW> seek(RAW key) {
public final RawKeyValue<RAW> seek(RAW key) {
seek0(key);
setCurrentEntry();
return currentEntry;
Expand All @@ -151,4 +152,9 @@ public final void removeFromDB() throws IOException {
public void close() {
rocksDBIterator.close();
}

@Override
public boolean isKVCloseable() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ byte[] key() {
}

@Override
Table.KeyValue<byte[], byte[]> getKeyValue() {
RawKeyValue<byte[]> getKeyValue() {
final ManagedRocksIterator i = getRocksDBIterator();
return RawKeyValue.create(i.get().key(), i.get().value());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ CodecBuffer key() {
}

@Override
Table.KeyValue<CodecBuffer, CodecBuffer> getKeyValue() {
RawKeyValue<CodecBuffer> getKeyValue() {
assertOpen();
return Table.newKeyValue(key(), valueBuffer.getFromDb());
return new RawKeyValue(key(), valueBuffer.getFromDb());
}

@Override
Expand Down Expand Up @@ -96,60 +96,4 @@ public void close() {
valueBuffer.release();
}
}

static class Buffer {
private final CodecBuffer.Capacity initialCapacity;
private final PutToByteBuffer<RuntimeException> source;
private CodecBuffer buffer;

Buffer(CodecBuffer.Capacity initialCapacity,
PutToByteBuffer<RuntimeException> source) {
this.initialCapacity = initialCapacity;
this.source = source;
}

void release() {
if (buffer != null) {
buffer.release();
}
}

private void prepare() {
if (buffer == null) {
allocate();
} else {
buffer.clear();
}
}

private void allocate() {
if (buffer != null) {
buffer.release();
}
buffer = CodecBuffer.allocateDirect(-initialCapacity.get());
}

CodecBuffer getFromDb() {
for (prepare(); ; allocate()) {
final Integer required = buffer.putFromSource(source);
if (required == null) {
return null; // the source is unavailable
} else if (required == buffer.readableBytes()) {
return buffer; // buffer size is big enough
}
// buffer size too small, try increasing the capacity.
if (buffer.setCapacity(required)) {
buffer.clear();
// retry with the new capacity
final int retried = buffer.putFromSource(source);
Preconditions.assertSame(required.intValue(), retried, "required");
return buffer;
}

// failed to increase the capacity
// increase initial capacity and reallocate it
initialCapacity.increase(required);
}
}
}
}
Loading