diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/StringUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/StringUtils.java index 3cf747723057..e7746d9d5cad 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/StringUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/StringUtils.java @@ -85,6 +85,10 @@ public static String bytes2Hex(ByteBuffer buffer) { return bytes2Hex(buffer, buffer.remaining()); } + public static String bytes2Hex(byte[] array) { + return bytes2Hex(ByteBuffer.wrap(array)); + } + /** * Decode a specific range of bytes of the given byte array to a string * using UTF8. diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java index c6784a83571c..8254022a307f 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java @@ -17,12 +17,14 @@ */ package org.apache.hadoop.hdds.utils.db; +import org.apache.hadoop.hdds.StringUtils; import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf; import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufAllocator; import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufInputStream; import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufOutputStream; import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator; import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled; +import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.function.CheckedFunction; import org.slf4j.Logger; @@ -32,6 +34,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntFunction; @@ -47,6 +50,38 @@ public final class CodecBuffer implements AutoCloseable { public static final Logger LOG = LoggerFactory.getLogger(CodecBuffer.class); + /** The size of a buffer. */ + public static class Capacity { + private final Object name; + private final AtomicInteger value; + + public Capacity(Object name, int initialCapacity) { + this.name = name; + this.value = new AtomicInteger(initialCapacity); + } + + public int get() { + return value.get(); + } + + private static int nextValue(int n) { + // round up to the next power of 2. + final long roundUp = Long.highestOneBit(n) << 1; + return roundUp > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) roundUp; + } + + /** Increase this size to accommodate the given required size. */ + public void increase(int required) { + final MemoizedSupplier newBufferSize = MemoizedSupplier.valueOf( + () -> nextValue(required)); + final int previous = value.getAndUpdate( + current -> required <= current ? current : newBufferSize.get()); + if (newBufferSize.isInitialized()) { + LOG.info("{}: increase {} -> {}", name, previous, newBufferSize.get()); + } + } + } + private static final ByteBufAllocator POOL = PooledByteBufAllocator.DEFAULT; private static final IntFunction POOL_DIRECT = c -> c >= 0 @@ -206,6 +241,16 @@ public byte[] getArray() { return array; } + /** Does the content of this buffer start with the given prefix? */ + public boolean startsWith(CodecBuffer prefix) { + Objects.requireNonNull(prefix, "prefix == null"); + final int length = prefix.readableBytes(); + if (this.readableBytes() < length) { + return false; + } + return buf.slice(buf.readerIndex(), length).equals(prefix.buf); + } + /** @return an {@link InputStream} reading from this buffer. */ public InputStream getInputStream() { return new ByteBufInputStream(buf.duplicate()); @@ -316,7 +361,7 @@ public CodecBuffer put( * @throws E in case the source throws it. */ Integer putFromSource( - CheckedFunction source) throws E { + PutToByteBuffer source) throws E { assertRefCnt(1); final int i = buf.writerIndex(); final int writable = buf.writableBytes(); @@ -330,4 +375,15 @@ Integer putFromSource( } return size; } + + @Override + public String toString() { + return getClass().getSimpleName() + + "[" + buf.readerIndex() + + "<=" + buf.writerIndex() + + "<=" + buf.capacity() + + ": " + + StringUtils.bytes2Hex(asReadOnlyByteBuffer(), 10) + + "]"; + } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/PutToByteBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/PutToByteBuffer.java new file mode 100644 index 000000000000..a81d1b17aa2b --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/PutToByteBuffer.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.utils.db; + +import org.apache.ratis.util.function.CheckedFunction; + +import java.nio.ByteBuffer; + +/** + * A function puts data from a source to the {@link ByteBuffer} + * specified in the parameter. + * The source may or may not be available. + * This function must return the required size (possibly 0) + * if the source is available; otherwise, return null. + * When the {@link ByteBuffer}'s capacity is smaller than the required size, + * partial data may be put to the {@link ByteBuffer}. + * + * @param The exception type this function may throw. + */ +@FunctionalInterface +interface PutToByteBuffer + extends CheckedFunction { +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/StringCodecBase.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/StringCodecBase.java index f9db36d889d4..f2c516e20105 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/StringCodecBase.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/StringCodecBase.java @@ -20,7 +20,6 @@ import org.apache.hadoop.hdds.StringUtils; import org.apache.ratis.util.Preconditions; -import org.apache.ratis.util.function.CheckedFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,7 +94,7 @@ public int getSerializedSizeUpperBound(String s) { return maxBytesPerChar * s.length(); } - private CheckedFunction encode( + private PutToByteBuffer encode( String string, Integer serializedSize, Function newE) { return buffer -> { final CoderResult result = newEncoder().encode( @@ -140,8 +139,7 @@ byte[] string2Bytes(String string, Function newE) throws E { final int upperBound = getSerializedSizeUpperBound(string); final Integer serializedSize = isFixedLength() ? upperBound : null; - final CheckedFunction encoder - = encode(string, serializedSize, newE); + final PutToByteBuffer encoder = encode(string, serializedSize, newE); if (serializedSize != null) { // When the serialized size is known, create an array diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java index 09984fef8009..4c9442eec4c6 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java @@ -1059,12 +1059,13 @@ private void assertBlockDataTableRecordCount(int expectedCount, DBHandle handle, KeyPrefixFilter filter, long containerID) throws IOException { long count = 0L; - BlockIterator iterator = handle.getStore(). - getBlockIterator(containerID, filter); - iterator.seekToFirst(); - while (iterator.hasNext()) { - iterator.nextBlock(); - count += 1; + try (BlockIterator iterator = handle.getStore(). + getBlockIterator(containerID, filter)) { + iterator.seekToFirst(); + while (iterator.hasNext()) { + iterator.nextBlock(); + count += 1; + } } Assert.assertEquals("Excepted: " + expectedCount + ", but actual: " + count + " in the blockData table of container: " diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/AutoCloseSupplier.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/AutoCloseSupplier.java new file mode 100644 index 000000000000..43be5ec82538 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/AutoCloseSupplier.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.utils.db; + +import java.util.function.Supplier; + +/** An {@link AutoCloseable} {@link Supplier}. */ +@FunctionalInterface +interface AutoCloseSupplier extends AutoCloseable, Supplier { + @Override + default void close() { + // no-op + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java index e7b8fb4c6386..17fcf5fc0c1e 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java @@ -30,7 +30,7 @@ * * @param the raw type. */ -public abstract class RDBStoreAbstractIterator +abstract class RDBStoreAbstractIterator implements TableIterator> { private static final Logger LOG = @@ -48,7 +48,6 @@ public abstract class RDBStoreAbstractIterator this.rocksDBIterator = iterator; this.rocksDBTable = table; this.prefix = prefix; - seekToFirst(); } /** @return the key for the current entry. */ @@ -150,7 +149,7 @@ public final void removeFromDB() throws IOException { } @Override - public void close() throws IOException { + public void close() { rocksDBIterator.close(); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java index 2fac7038c879..e2fcdda7a3f8 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java @@ -25,17 +25,12 @@ /** * RocksDB store iterator using the byte[] API. */ -public class RDBStoreByteArrayIterator - extends RDBStoreAbstractIterator { - public RDBStoreByteArrayIterator(ManagedRocksIterator iterator, - RDBTable table) { - this(iterator, table, null); - } - - public RDBStoreByteArrayIterator(ManagedRocksIterator iterator, +class RDBStoreByteArrayIterator extends RDBStoreAbstractIterator { + RDBStoreByteArrayIterator(ManagedRocksIterator iterator, RDBTable table, byte[] prefix) { super(iterator, table, prefix == null ? null : Arrays.copyOf(prefix, prefix.length)); + seekToFirst(); } @Override diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreCodecBufferIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreCodecBufferIterator.java new file mode 100644 index 000000000000..d4b00c42c759 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreCodecBufferIterator.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.utils.db; + +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; +import org.apache.ratis.util.Preconditions; + +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Implement {@link RDBStoreAbstractIterator} using {@link CodecBuffer}. + */ +class RDBStoreCodecBufferIterator + extends RDBStoreAbstractIterator { + static class Buffer { + private final CodecBuffer.Capacity initialCapacity; + private final PutToByteBuffer source; + private CodecBuffer buffer; + + Buffer(CodecBuffer.Capacity initialCapacity, + PutToByteBuffer source) { + this.initialCapacity = initialCapacity; + this.source = source; + } + + void release() { + if (buffer != null) { + buffer.release(); + } + } + + private void prepare() { + if (buffer == null) { + allocate(); + } else { + buffer.clear(); + } + } + + private void allocate() { + if (buffer != null) { + buffer.release(); + } + buffer = CodecBuffer.allocateDirect(-initialCapacity.get()); + } + + CodecBuffer getFromDb() { + for (prepare(); ; allocate()) { + final Integer required = buffer.putFromSource(source); + if (required == null) { + return null; // the source is unavailable + } else if (required == buffer.readableBytes()) { + return buffer; // buffer size is big enough + } + // buffer size too small, try increasing the capacity. + if (buffer.setCapacity(required)) { + buffer.clear(); + // retry with the new capacity + final int retried = buffer.putFromSource(source); + Preconditions.assertSame(required.intValue(), retried, "required"); + return buffer; + } + + // failed to increase the capacity + // increase initial capacity and reallocate it + initialCapacity.increase(required); + } + } + } + + private final Buffer keyBuffer; + private final Buffer valueBuffer; + private final AtomicBoolean closed = new AtomicBoolean(); + + RDBStoreCodecBufferIterator(ManagedRocksIterator iterator, RDBTable table, + CodecBuffer prefix) { + super(iterator, table, prefix); + + final String name = table != null ? table.getName() : null; + this.keyBuffer = new Buffer( + new CodecBuffer.Capacity(name + "-iterator-key", 1 << 10), + buffer -> getRocksDBIterator().get().key(buffer)); + this.valueBuffer = new Buffer( + new CodecBuffer.Capacity(name + "-iterator-value", 4 << 10), + buffer -> getRocksDBIterator().get().value(buffer)); + seekToFirst(); + } + + void assertOpen() { + Preconditions.assertTrue(!closed.get(), "Already closed"); + } + + @Override + CodecBuffer key() { + assertOpen(); + return keyBuffer.getFromDb(); + } + + @Override + Table.KeyValue getKeyValue() { + assertOpen(); + return Table.newKeyValue(key(), valueBuffer.getFromDb()); + } + + @Override + void seek0(CodecBuffer key) { + assertOpen(); + getRocksDBIterator().get().seek(key.asReadOnlyByteBuffer()); + } + + @Override + void delete(CodecBuffer key) throws IOException { + assertOpen(); + getRocksDBTable().delete(key.asReadOnlyByteBuffer()); + } + + @Override + boolean startsWithPrefix(CodecBuffer key) { + assertOpen(); + final CodecBuffer prefix = getPrefix(); + if (prefix == null) { + return true; + } + if (key == null) { + return false; + } + return key.startsWith(prefix); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + super.close(); + Optional.ofNullable(getPrefix()).ifPresent(CodecBuffer::release); + keyBuffer.release(); + valueBuffer.release(); + } + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java index f57a6bcfe5fe..d80d05f27b96 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java @@ -217,7 +217,7 @@ public void deleteWithBatch(BatchOperation batch, byte[] key) @Override public TableIterator> iterator() throws IOException { - return new RDBStoreByteArrayIterator(db.newIterator(family, false), this); + return iterator((byte[])null); } @Override @@ -227,6 +227,12 @@ public TableIterator> iterator(byte[] prefix) prefix); } + TableIterator> iterator( + CodecBuffer prefix) throws IOException { + return new RDBStoreCodecBufferIterator(db.newIterator(family, false), + this, prefix); + } + @Override public String getName() { return family.getName(); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java index 6399718ce20c..c818c07b1acc 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java @@ -338,6 +338,25 @@ interface KeyValue { VALUE getValue() throws IOException; } + static KeyValue newKeyValue(K key, V value) { + return new KeyValue() { + @Override + public K getKey() { + return key; + } + + @Override + public V getValue() { + return value; + } + + @Override + public String toString() { + return "(key=" + key + ", value=" + value + ")"; + } + }; + } + /** A {@link TableIterator} to iterate {@link KeyValue}s. */ interface KeyValueIterator extends TableIterator> { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index 7a825f9df92d..070f1cf10fb3 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.utils.MetadataKeyFilters; @@ -38,11 +37,8 @@ import org.apache.hadoop.hdds.utils.db.cache.PartialTableCache; import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType; import org.apache.hadoop.hdds.utils.db.cache.TableCache; -import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.function.CheckedBiFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.apache.hadoop.hdds.utils.db.cache.CacheResult.CacheStatus.EXISTS; import static org.apache.hadoop.hdds.utils.db.cache.CacheResult.CacheStatus.NOT_EXIST; @@ -57,8 +53,6 @@ * @param type of the values in the store. */ public class TypedTable implements Table { - static final Logger LOG = LoggerFactory.getLogger(TypedTable.class); - private static final long EPOCH_DEFAULT = -1L; static final int BUFFER_SIZE_DEFAULT = 4 << 10; // 4 KB @@ -70,8 +64,8 @@ public class TypedTable implements Table { private final Codec valueCodec; private final boolean supportCodecBuffer; - private final AtomicInteger bufferSize - = new AtomicInteger(BUFFER_SIZE_DEFAULT); + private final CodecBuffer.Capacity bufferCapacity + = new CodecBuffer.Capacity(this, BUFFER_SIZE_DEFAULT); private final TableCache, CacheValue> cache; /** @@ -134,6 +128,10 @@ public TypedTable(RDBTable rawTable, } } + private CodecBuffer encodeKeyCodecBuffer(KEY key) throws IOException { + return key == null ? null : keyCodec.toDirectCodecBuffer(key); + } + private byte[] encodeKey(KEY key) throws IOException { return key == null ? null : keyCodec.toPersistedFormat(key); } @@ -297,23 +295,6 @@ public VALUE getIfExist(KEY key) throws IOException { } } - private static int nextBufferSize(int n) { - // round up to the next power of 2. - final long roundUp = Long.highestOneBit(n) << 1; - return roundUp > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) roundUp; - } - - private void increaseBufferSize(int required) { - final MemoizedSupplier newBufferSize = MemoizedSupplier.valueOf( - () -> nextBufferSize(required)); - final int previous = bufferSize.getAndUpdate( - current -> required <= current ? current : newBufferSize.get()); - if (newBufferSize.isInitialized()) { - LOG.info("{}: increaseBufferSize {} -> {}", - this, previous, newBufferSize.get()); - } - } - /** * Use {@link RDBTable#get(ByteBuffer, ByteBuffer)} * to get a value mapped to the given key. @@ -358,7 +339,7 @@ private VALUE getFromTable(KEY key, try (CodecBuffer inKey = keyCodec.toDirectCodecBuffer(key)) { for (; ;) { final Integer required; - final int initial = -bufferSize.get(); // allocate a resizable buffer + final int initial = -bufferCapacity.get(); // resizable try (CodecBuffer outValue = CodecBuffer.allocateDirect(initial)) { required = get.apply(inKey, outValue); if (required == null) { @@ -386,7 +367,7 @@ private VALUE getFromTable(KEY key, } // buffer size too small, reallocate a new buffer. - increaseBufferSize(required); + bufferCapacity.increase(required); } } } @@ -425,14 +406,19 @@ public void deleteRange(KEY beginKey, KEY endKey) throws IOException { @Override public Table.KeyValueIterator iterator() throws IOException { - return new TypedTableIterator(rawTable.iterator()); + return iterator(null); } @Override public Table.KeyValueIterator iterator(KEY prefix) throws IOException { - final byte[] prefixBytes = encodeKey(prefix); - return new TypedTableIterator(rawTable.iterator(prefixBytes)); + if (supportCodecBuffer) { + final CodecBuffer prefixBuffer = encodeKeyCodecBuffer(prefix); + return newCodecBufferTableIterator(rawTable.iterator(prefixBuffer)); + } else { + final byte[] prefixBytes = encodeKey(prefix); + return new TypedTableIterator(rawTable.iterator(prefixBytes)); + } } @Override @@ -570,18 +556,48 @@ public VALUE getValue() throws IOException { } } + RawIterator newCodecBufferTableIterator( + TableIterator> i) { + return new RawIterator(i) { + @Override + AutoCloseSupplier convert(KEY key) throws IOException { + final CodecBuffer buffer = encodeKeyCodecBuffer(key); + return new AutoCloseSupplier() { + @Override + public void close() { + buffer.release(); + } + + @Override + public CodecBuffer get() { + return buffer; + } + }; + } + + @Override + KeyValue convert(KeyValue raw) + throws IOException { + final KEY key = keyCodec.fromCodecBuffer(raw.getKey()); + final VALUE value = valueCodec.fromCodecBuffer(raw.getValue()); + return Table.newKeyValue(key, value); + } + }; + } + /** * Table Iterator implementation for strongly typed tables. */ - public class TypedTableIterator extends AbstractIterator { - public TypedTableIterator( + public class TypedTableIterator extends RawIterator { + TypedTableIterator( TableIterator> rawIterator) { super(rawIterator); } @Override - byte[] convert(KEY key) throws IOException { - return encodeKey(key); + AutoCloseSupplier convert(KEY key) throws IOException { + final byte[] keyArray = encodeKey(key); + return () -> keyArray; } @Override @@ -591,26 +607,27 @@ KeyValue convert(KeyValue raw) { } /** - * An abstract {@link Table.KeyValueIterator} backed by a raw iterator. + * A {@link Table.KeyValueIterator} backed by a raw iterator. * * @param The raw type. */ - abstract class AbstractIterator + abstract class RawIterator implements Table.KeyValueIterator { private final TableIterator> rawIterator; - AbstractIterator(TableIterator> rawIterator) { + RawIterator(TableIterator> rawIterator) { this.rawIterator = rawIterator; } /** Covert the given key to the {@link RAW} type. */ - abstract RAW convert(KEY key) throws IOException; + abstract AutoCloseSupplier convert(KEY key) throws IOException; /** * Covert the given {@link Table.KeyValue} * from ({@link RAW}, {@link RAW}) to ({@link KEY}, {@link VALUE}). */ - abstract KeyValue convert(KeyValue raw); + abstract KeyValue convert(KeyValue raw) + throws IOException; @Override public void seekToFirst() { @@ -624,8 +641,10 @@ public void seekToLast() { @Override public KeyValue seek(KEY key) throws IOException { - final KeyValue result = rawIterator.seek(convert(key)); - return result == null ? null : convert(result); + try (AutoCloseSupplier rawKey = convert(key)) { + final KeyValue result = rawIterator.seek(rawKey.get()); + return result == null ? null : convert(result); + } } @Override @@ -640,7 +659,11 @@ public boolean hasNext() { @Override public KeyValue next() { - return convert(rawIterator.next()); + try { + return convert(rawIterator.next()); + } catch (IOException e) { + throw new IllegalStateException("Failed next()", e); + } } @Override diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreByteArrayIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreByteArrayIterator.java index 23c8dee14204..223b82c353c4 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreByteArrayIterator.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreByteArrayIterator.java @@ -23,7 +23,6 @@ import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; -import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -39,7 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.inOrder; @@ -50,7 +49,8 @@ import static org.mockito.Mockito.when; /** - * This test prescribe expected behaviour from the RDBStoreIterator which wraps + * This test prescribe expected behaviour + * from {@link RDBStoreByteArrayIterator} which wraps * RocksDB's own iterator. Ozone internally in TypedTableIterator uses, the * RDBStoreIterator to provide iteration over table elements in a typed manner. * The tests are to ensure we access RocksDB via the iterator properly. @@ -70,7 +70,7 @@ public void setup() { } RDBStoreByteArrayIterator newIterator() { - return new RDBStoreByteArrayIterator(managedRocksIterator, null); + return new RDBStoreByteArrayIterator(managedRocksIterator, null, null); } RDBStoreByteArrayIterator newIterator(byte[] prefix) { @@ -234,8 +234,7 @@ public void testRemovingFromDBActuallyDeletesFromTable() throws Exception { when(rocksDBIteratorMock.isValid()).thenReturn(true); when(rocksDBIteratorMock.key()).thenReturn(testKey); - RDBStoreByteArrayIterator iter = - new RDBStoreByteArrayIterator(managedRocksIterator, rocksTableMock); + RDBStoreByteArrayIterator iter = newIterator(null); iter.removeFromDB(); InOrder verifier = inOrder(rocksDBIteratorMock, rocksTableMock); @@ -329,6 +328,6 @@ public void testGetStackTrace() { } String expectedTrace = sb.toString(); String fromObjectInit = iterator.getStackTrace(); - Assert.assertTrue(fromObjectInit.contains(expectedTrace)); + assertTrue(fromObjectInit.contains(expectedTrace)); } } diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java new file mode 100644 index 000000000000..f9806b47d210 --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.utils.db; + +import org.apache.hadoop.hdds.StringUtils; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.InOrder; +import org.mockito.stubbing.Answer; +import org.rocksdb.RocksIterator; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * This test is similar to {@link TestRDBStoreByteArrayIterator} + * except that this test is for {@link RDBStoreCodecBufferIterator}. + */ +public class TestRDBStoreCodecBufferIterator { + + private RocksIterator rocksIteratorMock; + private ManagedRocksIterator managedRocksIterator; + private RDBTable rdbTableMock; + + @BeforeEach + public void setup() { + rocksIteratorMock = mock(RocksIterator.class); + managedRocksIterator = newManagedRocksIterator(); + rdbTableMock = mock(RDBTable.class); + Logger.getLogger(ManagedRocksObjectUtils.class).setLevel(Level.DEBUG); + } + + ManagedRocksIterator newManagedRocksIterator() { + return new ManagedRocksIterator(rocksIteratorMock); + } + + RDBStoreCodecBufferIterator newIterator() { + return new RDBStoreCodecBufferIterator(managedRocksIterator, null, null); + } + + RDBStoreCodecBufferIterator newIterator(CodecBuffer prefix) { + return new RDBStoreCodecBufferIterator( + managedRocksIterator, rdbTableMock, prefix); + } + + Answer newAnswerInt(String name, int b) { + return newAnswer(name, (byte) b); + } + + Answer newAnswer(String name, byte... b) { + return invocation -> { + System.out.printf("answer %s: %s%n", name, StringUtils.bytes2Hex(b)); + Object[] args = invocation.getArguments(); + final ByteBuffer buffer = (ByteBuffer) args[0]; + buffer.clear(); + buffer.put(b); + buffer.flip(); + return b.length; + }; + } + + @Test + public void testForEachRemaining() throws Exception { + when(rocksIteratorMock.isValid()) + .thenReturn(true, true, true, true, true, true, true, false); + when(rocksIteratorMock.key(any())) + .then(newAnswerInt("key1", 0x00)) + .then(newAnswerInt("key2", 0x00)) + .then(newAnswerInt("key3", 0x01)) + .then(newAnswerInt("key4", 0x02)) + .thenThrow(new NoSuchElementException()); + when(rocksIteratorMock.value(any())) + .then(newAnswerInt("val1", 0x7f)) + .then(newAnswerInt("val2", 0x7f)) + .then(newAnswerInt("val3", 0x7e)) + .then(newAnswerInt("val4", 0x7d)) + .thenThrow(new NoSuchElementException()); + + List> remaining = new ArrayList<>(); + try (RDBStoreCodecBufferIterator i = newIterator()) { + i.forEachRemaining(kv -> { + try { + remaining.add(RawKeyValue.create( + kv.getKey().getArray(), kv.getValue().getArray())); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + System.out.println("remaining: " + remaining); + assertArrayEquals(new byte[]{0x00}, remaining.get(0).getKey()); + assertArrayEquals(new byte[]{0x7f}, remaining.get(0).getValue()); + assertArrayEquals(new byte[]{0x01}, remaining.get(1).getKey()); + assertArrayEquals(new byte[]{0x7e}, remaining.get(1).getValue()); + assertArrayEquals(new byte[]{0x02}, remaining.get(2).getKey()); + assertArrayEquals(new byte[]{0x7d}, remaining.get(2).getValue()); + } + + CodecTestUtil.gc(); + } + + @Test + public void testHasNextDependsOnIsvalid() throws Exception { + when(rocksIteratorMock.isValid()).thenReturn(true, true, false); + + try (RDBStoreCodecBufferIterator i = newIterator()) { + assertTrue(i.hasNext()); + assertFalse(i.hasNext()); + } + + CodecTestUtil.gc(); + } + + @Test + public void testNextCallsIsValidThenGetsTheValueAndStepsToNext() + throws Exception { + when(rocksIteratorMock.isValid()).thenReturn(true); + InOrder verifier = inOrder(rocksIteratorMock); + try (RDBStoreCodecBufferIterator i = newIterator()) { + i.next(); + } + + verifier.verify(rocksIteratorMock).isValid(); + verifier.verify(rocksIteratorMock).key(any()); + verifier.verify(rocksIteratorMock).value(any()); + verifier.verify(rocksIteratorMock).next(); + + CodecTestUtil.gc(); + } + + @Test + public void testConstructorSeeksToFirstElement() throws Exception { + newIterator().close(); + + verify(rocksIteratorMock, times(1)).seekToFirst(); + + CodecTestUtil.gc(); + } + + @Test + public void testSeekToFirstSeeks() throws Exception { + try (RDBStoreCodecBufferIterator i = newIterator()) { + i.seekToFirst(); + } + verify(rocksIteratorMock, times(2)).seekToFirst(); + + CodecTestUtil.gc(); + } + + @Test + public void testSeekToLastSeeks() throws Exception { + try (RDBStoreCodecBufferIterator i = newIterator()) { + i.seekToLast(); + } + + verify(rocksIteratorMock, times(1)).seekToLast(); + + CodecTestUtil.gc(); + } + + @Test + public void testSeekReturnsTheActualKey() throws Exception { + when(rocksIteratorMock.isValid()).thenReturn(true); + when(rocksIteratorMock.key(any())) + .then(newAnswerInt("key1", 0x00)); + when(rocksIteratorMock.value(any())) + .then(newAnswerInt("val1", 0x7f)); + + try (RDBStoreCodecBufferIterator i = newIterator(); + CodecBuffer target = CodecBuffer.wrap(new byte[]{0x55})) { + final Table.KeyValue val = i.seek(target); + + InOrder verifier = inOrder(rocksIteratorMock); + + verify(rocksIteratorMock, times(1)).seekToFirst(); //at construct time + verify(rocksIteratorMock, never()).seekToLast(); + verifier.verify(rocksIteratorMock, times(1)) + .seek(any(ByteBuffer.class)); + verifier.verify(rocksIteratorMock, times(1)).isValid(); + verifier.verify(rocksIteratorMock, times(1)).key(any()); + verifier.verify(rocksIteratorMock, times(1)).value(any()); + assertArrayEquals(new byte[]{0x00}, val.getKey().getArray()); + assertArrayEquals(new byte[]{0x7f}, val.getValue().getArray()); + } + + CodecTestUtil.gc(); + } + + @Test + public void testGettingTheKeyIfIteratorIsValid() throws Exception { + when(rocksIteratorMock.isValid()).thenReturn(true); + when(rocksIteratorMock.key(any())) + .then(newAnswerInt("key1", 0x00)); + + byte[] key = null; + try (RDBStoreCodecBufferIterator i = newIterator()) { + if (i.hasNext()) { + key = i.next().getKey().getArray(); + } + } + + InOrder verifier = inOrder(rocksIteratorMock); + + verifier.verify(rocksIteratorMock, times(1)).isValid(); + verifier.verify(rocksIteratorMock, times(1)).key(any()); + assertArrayEquals(new byte[]{0x00}, key); + + CodecTestUtil.gc(); + } + + @Test + public void testGettingTheValueIfIteratorIsValid() throws Exception { + when(rocksIteratorMock.isValid()).thenReturn(true); + when(rocksIteratorMock.key(any())) + .then(newAnswerInt("key1", 0x00)); + when(rocksIteratorMock.value(any())) + .then(newAnswerInt("val1", 0x7f)); + + byte[] key = null; + byte[] value = null; + try (RDBStoreCodecBufferIterator i = newIterator()) { + if (i.hasNext()) { + Table.KeyValue entry = i.next(); + key = entry.getKey().getArray(); + value = entry.getValue().getArray(); + } + } + + InOrder verifier = inOrder(rocksIteratorMock); + + verifier.verify(rocksIteratorMock, times(1)).isValid(); + verifier.verify(rocksIteratorMock, times(1)).key(any()); + assertArrayEquals(new byte[]{0x00}, key); + assertArrayEquals(new byte[]{0x7f}, value); + + CodecTestUtil.gc(); + } + + @Test + public void testRemovingFromDBActuallyDeletesFromTable() throws Exception { + final byte[] testKey = new byte[10]; + ThreadLocalRandom.current().nextBytes(testKey); + when(rocksIteratorMock.isValid()).thenReturn(true); + when(rocksIteratorMock.key(any())) + .then(newAnswer("key1", testKey)); + + try (RDBStoreCodecBufferIterator i = newIterator(null)) { + i.removeFromDB(); + } + + InOrder verifier = inOrder(rocksIteratorMock, rdbTableMock); + + verifier.verify(rocksIteratorMock, times(1)).isValid(); + verifier.verify(rdbTableMock, times(1)) + .delete(ByteBuffer.wrap(testKey)); + + CodecTestUtil.gc(); + } + + @Test + public void testRemoveFromDBWithoutDBTableSet() throws Exception { + try (RDBStoreCodecBufferIterator i = newIterator()) { + assertThrows(UnsupportedOperationException.class, + i::removeFromDB); + } + + CodecTestUtil.gc(); + } + + @Test + public void testCloseCloses() throws Exception { + newIterator().close(); + + verify(rocksIteratorMock, times(1)).close(); + + CodecTestUtil.gc(); + } + + @Test + public void testNullPrefixedIterator() throws Exception { + try (RDBStoreCodecBufferIterator i = newIterator()) { + verify(rocksIteratorMock, times(1)).seekToFirst(); + clearInvocations(rocksIteratorMock); + + i.seekToFirst(); + verify(rocksIteratorMock, times(1)).seekToFirst(); + clearInvocations(rocksIteratorMock); + + when(rocksIteratorMock.isValid()).thenReturn(true); + assertTrue(i.hasNext()); + verify(rocksIteratorMock, times(1)).isValid(); + verify(rocksIteratorMock, times(0)).key(any()); + + i.seekToLast(); + verify(rocksIteratorMock, times(1)).seekToLast(); + } + + CodecTestUtil.gc(); + } + + @Test + public void testNormalPrefixedIterator() throws Exception { + final byte[] prefixBytes = "sample".getBytes(StandardCharsets.UTF_8); + try (RDBStoreCodecBufferIterator i = newIterator( + CodecBuffer.wrap(prefixBytes))) { + final ByteBuffer prefix = ByteBuffer.wrap(prefixBytes); + verify(rocksIteratorMock, times(1)).seek(prefix); + clearInvocations(rocksIteratorMock); + + i.seekToFirst(); + verify(rocksIteratorMock, times(1)).seek(prefix); + clearInvocations(rocksIteratorMock); + + when(rocksIteratorMock.isValid()).thenReturn(true); + when(rocksIteratorMock.key(any())) + .then(newAnswer("key1", prefixBytes)); + assertTrue(i.hasNext()); + verify(rocksIteratorMock, times(1)).isValid(); + verify(rocksIteratorMock, times(1)).key(any()); + + try { + i.seekToLast(); + fail("Prefixed iterator does not support seekToLast"); + } catch (Exception e) { + assertTrue(e instanceof UnsupportedOperationException); + } + } + + CodecTestUtil.gc(); + } +}