Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;

/**
* Wrapper class to represent a table in a datanode RocksDB instance.
Expand Down Expand Up @@ -75,15 +74,7 @@ public void deleteWithBatch(BatchOperation batch, KEY key)
}

@Override
public final TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator() {
throw new UnsupportedOperationException("Iterating tables directly is not" +
" supported for datanode containers due to differing schema " +
"version.");
}

@Override
public final TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator(
KEY prefix) {
public final KeyValueIterator<KEY, VALUE> iterator(KEY prefix, KeyValueIterator.Type type) {
throw new UnsupportedOperationException("Iterating tables directly is not" +
" supported for datanode containers due to differing schema " +
"version.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import org.slf4j.LoggerFactory;

/**
* An abstract {@link TableIterator} to iterate raw {@link Table.KeyValue}s.
* An abstract {@link Table.KeyValueIterator} to iterate raw {@link Table.KeyValue}s.
*
* @param <RAW> the raw type.
*/
abstract class RDBStoreAbstractIterator<RAW>
implements TableIterator<RAW, Table.KeyValue<RAW, RAW>> {
implements Table.KeyValueIterator<RAW, RAW> {

private static final Logger LOG =
LoggerFactory.getLogger(RDBStoreAbstractIterator.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@
* RocksDB store iterator using the byte[] API.
*/
class RDBStoreByteArrayIterator extends RDBStoreAbstractIterator<byte[]> {
private static final byte[] EMPTY = {};

private final Type type;

RDBStoreByteArrayIterator(ManagedRocksIterator iterator,
RDBTable table, byte[] prefix) {
RDBTable table, byte[] prefix, Type type) {
super(iterator, table,
prefix == null ? null : Arrays.copyOf(prefix, prefix.length));
this.type = type;
seekToFirst();
}

Expand All @@ -40,7 +45,9 @@ byte[] key() {
@Override
Table.KeyValue<byte[], byte[]> getKeyValue() {
final ManagedRocksIterator i = getRocksDBIterator();
return Table.newKeyValue(i.get().key(), i.get().value());
final byte[] key = type.readKey() ? i.get().key() : EMPTY;
final byte[] value = type.readValue() ? i.get().value() : EMPTY;
return Table.newKeyValue(key, value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@ class RDBStoreCodecBufferIterator extends RDBStoreAbstractIterator<CodecBuffer>
private final AtomicBoolean closed = new AtomicBoolean();

RDBStoreCodecBufferIterator(ManagedRocksIterator iterator, RDBTable table,
CodecBuffer prefix) {
CodecBuffer prefix, Type type) {
super(iterator, table, prefix);

final String name = table != null ? table.getName() : null;
this.keyBuffer = new Buffer(
new CodecBuffer.Capacity(name + "-iterator-key", 1 << 10),
buffer -> getRocksDBIterator().get().key(buffer));
type.readKey() ? buffer -> getRocksDBIterator().get().key(buffer) : null);
this.valueBuffer = new Buffer(
new CodecBuffer.Capacity(name + "-iterator-value", 4 << 10),
buffer -> getRocksDBIterator().get().value(buffer));
type.readValue() ? buffer -> getRocksDBIterator().get().value(buffer) : null);
seekToFirst();
}

Expand Down Expand Up @@ -130,6 +130,10 @@ private void allocate() {
}

CodecBuffer getFromDb() {
if (source == null) {
return CodecBuffer.getEmptyBuffer();
}

for (prepare(); ; allocate()) {
final Integer required = buffer.putFromSource(source);
if (required == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void putWithBatch(BatchOperation batch, byte[] key, byte[] value)

@Override
public boolean isEmpty() throws IOException {
try (TableIterator<byte[], KeyValue<byte[], byte[]>> keyIter = iterator()) {
try (KeyValueIterator<byte[], byte[]> keyIter = iterator((byte[]) null, KeyValueIterator.Type.NEITHER)) {
keyIter.seekToFirst();
return !keyIter.hasNext();
}
Expand Down Expand Up @@ -210,22 +210,16 @@ public void deleteWithBatch(BatchOperation batch, byte[] key)
}

@Override
public TableIterator<byte[], KeyValue<byte[], byte[]>> iterator()
throws IOException {
return iterator((byte[])null);
}

@Override
public TableIterator<byte[], KeyValue<byte[], byte[]>> iterator(byte[] prefix)
throws IOException {
public KeyValueIterator<byte[], byte[]> iterator(byte[] prefix, KeyValueIterator.Type type)
throws RocksDatabaseException {
return new RDBStoreByteArrayIterator(db.newIterator(family, false), this,
prefix);
prefix, type);
}

TableIterator<CodecBuffer, KeyValue<CodecBuffer, CodecBuffer>> iterator(
CodecBuffer prefix) throws IOException {
KeyValueIterator<CodecBuffer, CodecBuffer> iterator(
CodecBuffer prefix, KeyValueIterator.Type type) throws IOException {
return new RDBStoreCodecBufferIterator(db.newIterator(family, false),
this, prefix);
this, prefix, type);
}

@Override
Expand Down Expand Up @@ -262,8 +256,7 @@ public List<KeyValue<byte[], byte[]>> getSequentialRangeKVs(byte[] startKey,
@Override
public void deleteBatchWithPrefix(BatchOperation batch, byte[] prefix)
throws IOException {
try (TableIterator<byte[], KeyValue<byte[], byte[]>> iter
= iterator(prefix)) {
try (KeyValueIterator<byte[], byte[]> iter = iterator(prefix)) {
while (iter.hasNext()) {
deleteWithBatch(batch, iter.next().getKey());
}
Expand All @@ -273,7 +266,7 @@ public void deleteBatchWithPrefix(BatchOperation batch, byte[] prefix)
@Override
public void dumpToFileWithPrefix(File externalFile, byte[] prefix)
throws IOException {
try (TableIterator<byte[], KeyValue<byte[], byte[]>> iter = iterator(prefix);
try (KeyValueIterator<byte[], byte[]> iter = iterator(prefix);
RDBSstFileWriter fileWriter = new RDBSstFileWriter(externalFile)) {
while (iter.hasNext()) {
final KeyValue<byte[], byte[]> entry = iter.next();
Expand All @@ -298,8 +291,7 @@ private List<KeyValue<byte[], byte[]>> getRangeKVs(byte[] startKey,
"Invalid count given " + count + ", count must be greater than 0");
}
final List<KeyValue<byte[], byte[]>> result = new ArrayList<>();
try (TableIterator<byte[], KeyValue<byte[], byte[]>> it
= iterator(prefix)) {
try (KeyValueIterator<byte[], byte[]> it = iterator(prefix)) {
if (startKey == null) {
it.seekToFirst();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,21 +153,24 @@ default VALUE getReadCopy(KEY key) throws IOException {
*/
void deleteRange(KEY beginKey, KEY endKey) throws IOException;

/**
* Returns the iterator for this metadata store.
*
* @return MetaStoreIterator
* @throws IOException on failure.
*/
TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator()
throws IOException;
/** The same as iterator(null). */
default KeyValueIterator<KEY, VALUE> iterator() throws IOException {
return iterator(null);
}

/** The same as iterator(prefix, KEY_AND_VALUE). */
default KeyValueIterator<KEY, VALUE> iterator(KEY prefix) throws IOException {
return iterator(prefix, KeyValueIterator.Type.KEY_AND_VALUE);
}

/**
* Returns a prefixed iterator for this metadata store.
* @param prefix
* @return MetaStoreIterator
* Iterate the elements in this table.
*
* @param prefix The prefix of the elements to be iterated.
* @param type Specify whether key and/or value are required.
* @return an iterator.
*/
TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator(KEY prefix)
KeyValueIterator<KEY, VALUE> iterator(KEY prefix, KeyValueIterator.Type type)
throws IOException;

/**
Expand Down Expand Up @@ -328,12 +331,12 @@ void deleteBatchWithPrefix(BatchOperation batch, KEY prefix)
final class KeyValue<K, V> {
private final K key;
private final V value;
private final int rawSize;
private final int valueByteSize;

private KeyValue(K key, V value, int rawSize) {
private KeyValue(K key, V value, int valueByteSize) {
this.key = key;
this.value = value;
this.rawSize = rawSize;
this.valueByteSize = valueByteSize;
}

public K getKey() {
Expand All @@ -344,8 +347,8 @@ public V getValue() {
return value;
}

public int getRawSize() {
return rawSize;
public int getValueByteSize() {
return valueByteSize;
}

@Override
Expand Down Expand Up @@ -375,12 +378,32 @@ static <K, V> KeyValue<K, V> newKeyValue(K key, V value) {
return newKeyValue(key, value, 0);
}

static <K, V> KeyValue<K, V> newKeyValue(K key, V value, int rawSize) {
return new KeyValue<>(key, value, rawSize);
static <K, V> KeyValue<K, V> newKeyValue(K key, V value, int valueByteSize) {
return new KeyValue<>(key, value, valueByteSize);
}

/** A {@link TableIterator} to iterate {@link KeyValue}s. */
interface KeyValueIterator<KEY, VALUE>
extends TableIterator<KEY, KeyValue<KEY, VALUE>> {

/** The iterator type. */
enum Type {
/** Neither read key nor value. */
NEITHER,
/** Read key only. */
KEY_ONLY,
/** Read value only. */
VALUE_ONLY,
/** Read both key and value. */
KEY_AND_VALUE;

boolean readKey() {
return (this.ordinal() & KEY_ONLY.ordinal()) != 0;
}

boolean readValue() {
return (this.ordinal() & VALUE_ONLY.ordinal()) != 0;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
if (cacheType == CacheType.FULL_CACHE) {
cache = new FullTableCache<>(threadNamePrefix);
//fill cache
try (TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> tableIterator = iterator()) {
try (KeyValueIterator<KEY, VALUE> tableIterator = iterator()) {

while (tableIterator.hasNext()) {
KeyValue< KEY, VALUE > kv = tableIterator.next();
Expand Down Expand Up @@ -124,11 +124,11 @@ private byte[] encodeValue(VALUE value) throws IOException {
}

private KEY decodeKey(byte[] key) throws CodecException {
return key == null ? null : keyCodec.fromPersistedFormat(key);
return key != null && key.length > 0 ? keyCodec.fromPersistedFormat(key) : null;
}

private VALUE decodeValue(byte[] value) throws CodecException {
return value == null ? null : valueCodec.fromPersistedFormat(value);
return value != null && value.length > 0 ? valueCodec.fromPersistedFormat(value) : null;
Copy link
Contributor

@chungen0126 chungen0126 Jun 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @szetszwo @adoroszlai, is the value.length > 0 check necessary here?
It seems to cause an problem in HDDS-13302.
FSORepairTool writes zero-length byte arrays into the table when there are reachable files/dirs.
With this check, the method returns null, making it indistinguishable from a missing key. This means we can no longer tell whether the key exists with a zero-length byte arrays or doesn't exist at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chungen0126 , that's a good point! We should properly support empty array/String. Filed HDDS-13317.

}

@Override
Expand Down Expand Up @@ -395,17 +395,11 @@ public void deleteRange(KEY beginKey, KEY endKey) throws IOException {
}

@Override
public Table.KeyValueIterator<KEY, VALUE> iterator() throws IOException {
return iterator(null);
}

@Override
public Table.KeyValueIterator<KEY, VALUE> iterator(KEY prefix)
throws IOException {
public Table.KeyValueIterator<KEY, VALUE> iterator(KEY prefix, KeyValueIterator.Type type) throws IOException {
if (supportCodecBuffer) {
final CodecBuffer prefixBuffer = encodeKeyCodecBuffer(prefix);
try {
return newCodecBufferTableIterator(rawTable.iterator(prefixBuffer));
return newCodecBufferTableIterator(rawTable.iterator(prefixBuffer, type));
} catch (Throwable t) {
if (prefixBuffer != null) {
prefixBuffer.release();
Expand All @@ -414,7 +408,7 @@ public Table.KeyValueIterator<KEY, VALUE> iterator(KEY prefix)
}
} else {
final byte[] prefixBytes = encodeKey(prefix);
return new TypedTableIterator(rawTable.iterator(prefixBytes));
return new TypedTableIterator(rawTable.iterator(prefixBytes, type));
}
}

Expand Down Expand Up @@ -534,7 +528,7 @@ TableCache<KEY, VALUE> getCache() {
}

RawIterator<CodecBuffer> newCodecBufferTableIterator(
TableIterator<CodecBuffer, KeyValue<CodecBuffer, CodecBuffer>> i) {
KeyValueIterator<CodecBuffer, CodecBuffer> i) {
return new RawIterator<CodecBuffer>(i) {
@Override
AutoCloseSupplier<CodecBuffer> convert(KEY key) throws IOException {
Expand All @@ -554,10 +548,14 @@ public CodecBuffer get() {

@Override
KeyValue<KEY, VALUE> convert(KeyValue<CodecBuffer, CodecBuffer> raw) throws CodecException {
final int rawSize = raw.getValue().readableBytes();
final KEY key = keyCodec.fromCodecBuffer(raw.getKey());
final VALUE value = valueCodec.fromCodecBuffer(raw.getValue());
return Table.newKeyValue(key, value, rawSize);
final CodecBuffer keyBuffer = raw.getKey();
final KEY key = keyBuffer.readableBytes() > 0 ? keyCodec.fromCodecBuffer(keyBuffer) : null;

final CodecBuffer valueBuffer = raw.getValue();
final int valueByteSize = valueBuffer.readableBytes();
final VALUE value = valueByteSize > 0 ? valueCodec.fromCodecBuffer(valueBuffer) : null;

return Table.newKeyValue(key, value, valueByteSize);
}
};
}
Expand All @@ -566,8 +564,7 @@ KeyValue<KEY, VALUE> convert(KeyValue<CodecBuffer, CodecBuffer> raw) throws Code
* Table Iterator implementation for strongly typed tables.
*/
public class TypedTableIterator extends RawIterator<byte[]> {
TypedTableIterator(
TableIterator<byte[], KeyValue<byte[], byte[]>> rawIterator) {
TypedTableIterator(KeyValueIterator<byte[], byte[]> rawIterator) {
super(rawIterator);
}

Expand All @@ -579,7 +576,8 @@ AutoCloseSupplier<byte[]> convert(KEY key) throws IOException {

@Override
KeyValue<KEY, VALUE> convert(KeyValue<byte[], byte[]> raw) throws CodecException {
return Table.newKeyValue(decodeKey(raw.getKey()), decodeValue(raw.getValue()));
final byte[] valueBytes = raw.getValue();
return Table.newKeyValue(decodeKey(raw.getKey()), decodeValue(valueBytes), valueBytes.length);
}
}

Expand All @@ -590,9 +588,9 @@ KeyValue<KEY, VALUE> convert(KeyValue<byte[], byte[]> raw) throws CodecException
*/
abstract class RawIterator<RAW>
implements Table.KeyValueIterator<KEY, VALUE> {
private final TableIterator<RAW, KeyValue<RAW, RAW>> rawIterator;
private final KeyValueIterator<RAW, RAW> rawIterator;

RawIterator(TableIterator<RAW, KeyValue<RAW, RAW>> rawIterator) {
RawIterator(KeyValueIterator<RAW, RAW> rawIterator) {
this.rawIterator = rawIterator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,7 @@ public void deleteRange(KEY beginKey, KEY endKey) {
}

@Override
public TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator() {
throw new UnsupportedOperationException();
}

@Override
public TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator(KEY prefix) {
public KeyValueIterator<KEY, VALUE> iterator(KEY prefix, KeyValueIterator.Type type) {
throw new UnsupportedOperationException();
}

Expand Down
Loading
Loading