Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
30facb9
HDDS-12742. Make RDBStoreAbstractIterator return a reference counted …
swamirishi Apr 12, 2025
aab6ebc
HDDS-12742. Remove log
swamirishi Apr 14, 2025
5dd7460
HDDS-12742. Remove synchronized
swamirishi Apr 15, 2025
e4bf8f7
HDDS-12742. Fix seek
swamirishi Apr 16, 2025
9ab91c9
HDDS-12742. Add test case
swamirishi Apr 16, 2025
abe06ab
HDDS-12742. Fix iter
swamirishi Apr 16, 2025
d215d4b
HDDS-12742. Fix NPE
swamirishi Apr 16, 2025
85c74b4
HDDS-12742. Fix NPE
swamirishi Apr 16, 2025
7d82a9e
HDDS-12742. Add Blocking Deque instead custom implementation of a blc…
swamirishi Apr 16, 2025
ce6ab81
HDDS-12742. Fix checkstyle
swamirishi Apr 16, 2025
091e8ca
HDDS-12742. Fix test cases
swamirishi Apr 17, 2025
6bc5c86
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Apr 17, 2025
1b394c2
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi May 2, 2025
7fdced2
HDDS-12742. Add Spliterator
swamirishi May 4, 2025
f5b633b
HDDS-12742. Fix Spliterator
swamirishi May 5, 2025
1971a96
Update hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/uti…
swamirishi May 5, 2025
a33f265
HDDS-12742. Make concurrent hash set
swamirishi May 6, 2025
0c3ae4f
Merge remote-tracking branch 'origin/HDDS-12742' into HEAD
swamirishi May 6, 2025
fbe213b
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi May 6, 2025
71f9c28
HDDS-12742. Fix checkstyle
swamirishi May 6, 2025
5734027
HDDS-12742. Fix pmd
swamirishi May 6, 2025
99c508e
HDDS-12742. Fix checkstyle
swamirishi May 6, 2025
f16e1b9
HDDS-12742. Fix max buffer definition
swamirishi May 13, 2025
86fe101
HDDS-12742. Add tests
swamirishi May 18, 2025
cfde81f
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi May 18, 2025
f825754
HDDS-12742. Fix exception handling for memory leaks
swamirishi May 21, 2025
e4155a6
HDDS-12742. Make spliterator an interface parameter
swamirishi May 23, 2025
985d612
HDDS-12742. Fix build
swamirishi May 27, 2025
b98968b
HDDS-12742. Fix findbugs
swamirishi May 27, 2025
3abac4f
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi May 28, 2025
af6d30d
HDDS-12779. Parallelize table Spliterator with multiple multiple iter…
swamirishi May 28, 2025
231ad28
HDDS-12779. Revert creating an init function
swamirishi May 28, 2025
78d33af
HDDS-12742. Convert to Named Class from anonymous class of RawSpliter…
swamirishi May 28, 2025
ed632a8
HDDS-12742. Fix checkstyle
swamirishi May 28, 2025
764033d
Merge remote-tracking branch 'origin/HDDS-12742' into HEAD
swamirishi May 28, 2025
18deb3c
HDDS-12779. Fix checkstyle
swamirishi May 28, 2025
82c3f01
HDDS-12779. Fix bug
swamirishi May 28, 2025
a6c25e7
HDDS-12779. Fix Boundary iteration logic
swamirishi May 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
* A buffer used by {@link Codec}
* for supporting RocksDB direct {@link ByteBuffer} APIs.
*/
public class CodecBuffer implements UncheckedAutoCloseable {
public class CodecBuffer implements UncheckedAutoCloseable, Comparable<byte[]> {
private static final Logger LOG = LoggerFactory.getLogger(CodecBuffer.class);

private static final ByteBufAllocator POOL = PooledByteBufAllocator.DEFAULT;
Expand Down Expand Up @@ -376,6 +376,19 @@ public boolean startsWith(CodecBuffer prefix) {
return buf.slice(buf.readerIndex(), length).equals(prefix.buf);
}

public int compareTo(byte[] other) {
Objects.requireNonNull(other, "other == null");
final int size = Math.min(readableBytes(), other.length);
for (int i = 0; i < size; i++) {
final int b1 = buf.getByte(buf.readerIndex() + i) & 0xff;
final int b2 = other[i] & 0xff;
if (b1 != b2) {
return b1 - b2;
}
}
return readableBytes() - other.length;
}

/** @return an {@link InputStream} reading from this buffer. */
public InputStream getInputStream() {
return new ByteBufInputStream(buf.duplicate());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,21 @@ public void deleteWithBatch(BatchOperation batch, KEY key)
"version.");
}

@Override
public KeyValueSpliterator<KEY, VALUE> spliterator(int maxParallelism, boolean closeOnException) {
throw new UnsupportedOperationException("Iterating tables directly is not" +
" supported for datanode containers due to differing schema " +
"version.");
}

@Override
public KeyValueSpliterator<KEY, VALUE> spliterator(KEY startKey, KEY prefix, int maxParallelism,
boolean closeOnException) {
throw new UnsupportedOperationException("Iterating tables directly is not" +
" supported for datanode containers due to differing schema " +
"version.");
}

@Override
public String getName() {
return table.getName();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.utils.db;

import java.io.IOException;
import java.util.List;
import org.rocksdb.LiveFileMetaData;

/**
* Base table interface for Rocksdb.
*/
public interface BaseRDBTable<KEY, VALUE> extends Table<KEY, VALUE> {
List<LiveFileMetaData> getTableSstFiles() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,24 @@

package org.apache.hadoop.hdds.utils.db;

import java.util.Comparator;

/**
* No-op codec for byte arrays.
*/
public final class ByteArrayCodec implements Codec<byte[]> {

private static final Codec<byte[]> INSTANCE = new ByteArrayCodec();
private static final Comparator<byte[]> COMPARATOR = new ByteWiseComparator();

public static Codec<byte[]> get() {
return INSTANCE;
}

public static Comparator<byte[]> getComparator() {
return COMPARATOR;
}

private ByteArrayCodec() {
// singleton
}
Expand All @@ -51,4 +58,18 @@ public byte[] fromPersistedFormat(byte[] bytes) {
public byte[] copyObject(byte[] bytes) {
return bytes;
}

private static class ByteWiseComparator implements Comparator<byte[]> {
@Override
public int compare(byte[] o1, byte[] o2) {
int length = Math.min(o1.length, o2.length);
for (int i = 0; i < length; i++) {
int compareValue = Integer.compareUnsigned(Byte.toUnsignedInt(o1[i]), Byte.toUnsignedInt(o2[i]));
if (compareValue != 0) {
return compareValue;
}
}
return Integer.compare(o1.length, o2.length);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.NoSuchElementException;
import java.util.function.Consumer;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,30 +32,34 @@
* @param <RAW> the raw type.
*/
abstract class RDBStoreAbstractIterator<RAW>
implements TableIterator<RAW, Table.KeyValue<RAW, RAW>> {
implements TableIterator<RAW, RDBStoreAbstractIterator.AutoCloseableRawKeyValue<RAW>> {

private static final Logger LOG =
LoggerFactory.getLogger(RDBStoreAbstractIterator.class);

private final ManagedRocksIterator rocksDBIterator;
private final RDBTable rocksDBTable;
private Table.KeyValue<RAW, RAW> currentEntry;
private ReferenceCountedObject<RawKeyValue<RAW>> currentEntry;
private RawKeyValue<RAW> previousKeyValue;
// This is for schemas that use a fixed-length
// prefix for each key.
private final RAW prefix;
private Boolean hasNext;
private boolean closed;

RDBStoreAbstractIterator(ManagedRocksIterator iterator, RDBTable table,
RAW prefix) {
this.rocksDBIterator = iterator;
this.rocksDBTable = table;
this.prefix = prefix;
this.currentEntry = null;
this.hasNext = false;
this.closed = false;
this.previousKeyValue = null;
}

/** @return the key for the current entry. */
abstract RAW key();

/** @return the {@link Table.KeyValue} for the current entry. */
abstract Table.KeyValue<RAW, RAW> getKeyValue();
abstract ReferenceCountedObject<RawKeyValue<RAW>> getKeyValue();

/** Seek to the given key. */
abstract void seek0(RAW key);
Expand All @@ -78,32 +84,52 @@ final RAW getPrefix() {

@Override
public final void forEachRemaining(
Consumer<? super Table.KeyValue<RAW, RAW>> action) {
Consumer<? super AutoCloseableRawKeyValue<RAW>> action) {
while (hasNext()) {
action.accept(next());
AutoCloseableRawKeyValue<RAW> entry = next();
action.accept(entry);
}
}

private void releaseEntry() {
if (currentEntry != null) {
currentEntry.release();
}
currentEntry = null;
hasNext = null;
}

private void setCurrentEntry() {
if (rocksDBIterator.get().isValid()) {
boolean isValid = !closed && rocksDBIterator.get().isValid();
if (isValid) {
currentEntry = getKeyValue();
currentEntry.retain();
} else {
currentEntry = null;
}
setHasNext(isValid, currentEntry);
}

public void setHasNext(boolean isValid, ReferenceCountedObject<RawKeyValue<RAW>> entry) {
this.hasNext = isValid && (prefix == null || startsWithPrefix(entry.get().getKey()));
}

@Override
public final boolean hasNext() {
return rocksDBIterator.get().isValid() &&
(prefix == null || startsWithPrefix(key()));
if (hasNext == null) {
setCurrentEntry();
}
return hasNext;
}

@Override
public final Table.KeyValue<RAW, RAW> next() {
setCurrentEntry();
if (currentEntry != null) {
public final AutoCloseableRawKeyValue<RAW> next() {
if (hasNext()) {
AutoCloseableRawKeyValue<RAW> entry = new AutoCloseableRawKeyValue<>(currentEntry);
this.previousKeyValue = currentEntry.get();
rocksDBIterator.get().next();
return currentEntry;
releaseEntry();
return entry;
}
throw new NoSuchElementException("RocksDB Store has no more elements");
}
Expand All @@ -115,7 +141,7 @@ public final void seekToFirst() {
} else {
seek0(prefix);
}
setCurrentEntry();
releaseEntry();
}

@Override
Expand All @@ -125,23 +151,28 @@ public final void seekToLast() {
} else {
throw new UnsupportedOperationException("seekToLast: prefix != null");
}
setCurrentEntry();
releaseEntry();
}

@Override
public final Table.KeyValue<RAW, RAW> seek(RAW key) {
public final AutoCloseableRawKeyValue<RAW> seek(RAW key) {
seek0(key);
releaseEntry();
setCurrentEntry();
return currentEntry;
// Current entry should be only closed when the next() and thus closing the returned entry should be a noop.
if (hasNext()) {
return new AutoCloseableRawKeyValue<>(currentEntry);
}
return null;
}

@Override
public final void removeFromDB() throws IOException {
if (rocksDBTable == null) {
throw new UnsupportedOperationException("remove");
}
if (currentEntry != null) {
delete(currentEntry.getKey());
if (previousKeyValue != null) {
delete(previousKeyValue.getKey());
} else {
LOG.info("Failed to removeFromDB: currentEntry == null");
}
Expand All @@ -150,5 +181,21 @@ public final void removeFromDB() throws IOException {
@Override
public void close() {
rocksDBIterator.close();
closed = true;
releaseEntry();
}

public static final class AutoCloseableRawKeyValue<RAW> extends RawKeyValue<RAW> implements AutoCloseable {
private final UncheckedAutoCloseableSupplier<RawKeyValue<RAW>> keyValue;

private AutoCloseableRawKeyValue(ReferenceCountedObject<RawKeyValue<RAW>> kv) {
super(kv.get().getKey(), kv.get().getValue());
this.keyValue = kv.retainAndReleaseOnClose();
}

@Override
public void close() {
keyValue.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.ratis.util.ReferenceCountedObject;

/**
* RocksDB store iterator using the byte[] API.
Expand All @@ -33,14 +34,10 @@ class RDBStoreByteArrayIterator extends RDBStoreAbstractIterator<byte[]> {
}

@Override
byte[] key() {
return getRocksDBIterator().get().key();
}

@Override
Table.KeyValue<byte[], byte[]> getKeyValue() {
ReferenceCountedObject<RawKeyValue<byte[]>> getKeyValue() {
final ManagedRocksIterator i = getRocksDBIterator();
return RawKeyValue.create(i.get().key(), i.get().value());
RawKeyValue<byte[]> rawKV = RawKeyValue.create(i.get().key(), i.get().value());
return ReferenceCountedObject.wrap(rawKV);
}

@Override
Expand Down
Loading