Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public static String bytes2Hex(ByteBuffer buffer) {
return bytes2Hex(buffer, buffer.remaining());
}

public static String bytes2Hex(byte[] array) {
return bytes2Hex(ByteBuffer.wrap(array));
}

/**
* Decode a specific range of bytes of the given byte array to a string
* using UTF8.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
*/
package org.apache.hadoop.hdds.utils.db;

import org.apache.hadoop.hdds.StringUtils;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufAllocator;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufInputStream;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufOutputStream;
import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.function.CheckedFunction;
import org.slf4j.Logger;
Expand All @@ -32,6 +34,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntFunction;
Expand All @@ -47,6 +50,38 @@
public final class CodecBuffer implements AutoCloseable {
public static final Logger LOG = LoggerFactory.getLogger(CodecBuffer.class);

/** The size of a buffer. */
public static class Capacity {
private final Object name;
private final AtomicInteger value;

public Capacity(Object name, int initialCapacity) {
this.name = name;
this.value = new AtomicInteger(initialCapacity);
}

public int get() {
return value.get();
}

private static int nextValue(int n) {
// round up to the next power of 2.
final long roundUp = Long.highestOneBit(n) << 1;
return roundUp > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) roundUp;
}

/** Increase this size to accommodate the given required size. */
public void increase(int required) {
final MemoizedSupplier<Integer> newBufferSize = MemoizedSupplier.valueOf(
() -> nextValue(required));
final int previous = value.getAndUpdate(
current -> required <= current ? current : newBufferSize.get());
if (newBufferSize.isInitialized()) {
LOG.info("{}: increase {} -> {}", name, previous, newBufferSize.get());
}
}
}

private static final ByteBufAllocator POOL
= PooledByteBufAllocator.DEFAULT;
private static final IntFunction<ByteBuf> POOL_DIRECT = c -> c >= 0
Expand Down Expand Up @@ -206,6 +241,16 @@ public byte[] getArray() {
return array;
}

/** Does the content of this buffer start with the given prefix? */
public boolean startsWith(CodecBuffer prefix) {
Objects.requireNonNull(prefix, "prefix == null");
final int length = prefix.readableBytes();
if (this.readableBytes() < length) {
return false;
}
return buf.slice(buf.readerIndex(), length).equals(prefix.buf);
}

/** @return an {@link InputStream} reading from this buffer. */
public InputStream getInputStream() {
return new ByteBufInputStream(buf.duplicate());
Expand Down Expand Up @@ -316,7 +361,7 @@ public CodecBuffer put(
* @throws E in case the source throws it.
*/
<E extends Exception> Integer putFromSource(
CheckedFunction<ByteBuffer, Integer, E> source) throws E {
PutToByteBuffer<E> source) throws E {
assertRefCnt(1);
final int i = buf.writerIndex();
final int writable = buf.writableBytes();
Expand All @@ -330,4 +375,15 @@ <E extends Exception> Integer putFromSource(
}
return size;
}

@Override
public String toString() {
return getClass().getSimpleName()
+ "[" + buf.readerIndex()
+ "<=" + buf.writerIndex()
+ "<=" + buf.capacity()
+ ": "
+ StringUtils.bytes2Hex(asReadOnlyByteBuffer(), 10)
+ "]";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.utils.db;

import org.apache.ratis.util.function.CheckedFunction;

import java.nio.ByteBuffer;

/**
* A function puts data from a source to the {@link ByteBuffer}
* specified in the parameter.
* The source may or may not be available.
* This function must return the required size (possibly 0)
* if the source is available; otherwise, return null.
* When the {@link ByteBuffer}'s capacity is smaller than the required size,
* partial data may be put to the {@link ByteBuffer}.
*
* @param <E> The exception type this function may throw.
*/
@FunctionalInterface
interface PutToByteBuffer<E extends Exception>
extends CheckedFunction<ByteBuffer, Integer, E> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.hadoop.hdds.StringUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.function.CheckedFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -95,7 +94,7 @@ public int getSerializedSizeUpperBound(String s) {
return maxBytesPerChar * s.length();
}

private <E extends Exception> CheckedFunction<ByteBuffer, Integer, E> encode(
private <E extends Exception> PutToByteBuffer<E> encode(
String string, Integer serializedSize, Function<String, E> newE) {
return buffer -> {
final CoderResult result = newEncoder().encode(
Expand Down Expand Up @@ -140,8 +139,7 @@ <E extends Exception> byte[] string2Bytes(String string,
Function<String, E> newE) throws E {
final int upperBound = getSerializedSizeUpperBound(string);
final Integer serializedSize = isFixedLength() ? upperBound : null;
final CheckedFunction<ByteBuffer, Integer, E> encoder
= encode(string, serializedSize, newE);
final PutToByteBuffer<E> encoder = encode(string, serializedSize, newE);

if (serializedSize != null) {
// When the serialized size is known, create an array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1059,12 +1059,13 @@ private void assertBlockDataTableRecordCount(int expectedCount,
DBHandle handle, KeyPrefixFilter filter, long containerID)
throws IOException {
long count = 0L;
BlockIterator<BlockData> iterator = handle.getStore().
getBlockIterator(containerID, filter);
iterator.seekToFirst();
while (iterator.hasNext()) {
iterator.nextBlock();
count += 1;
try (BlockIterator<BlockData> iterator = handle.getStore().
getBlockIterator(containerID, filter)) {
iterator.seekToFirst();
while (iterator.hasNext()) {
iterator.nextBlock();
count += 1;
}
}
Assert.assertEquals("Excepted: " + expectedCount
+ ", but actual: " + count + " in the blockData table of container: "
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.utils.db;

import java.util.function.Supplier;

/** An {@link AutoCloseable} {@link Supplier}. */
@FunctionalInterface
interface AutoCloseSupplier<RAW> extends AutoCloseable, Supplier<RAW> {
@Override
default void close() {
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*
* @param <RAW> the raw type.
*/
public abstract class RDBStoreAbstractIterator<RAW>
abstract class RDBStoreAbstractIterator<RAW>
implements TableIterator<RAW, Table.KeyValue<RAW, RAW>> {

private static final Logger LOG =
Expand All @@ -48,7 +48,6 @@ public abstract class RDBStoreAbstractIterator<RAW>
this.rocksDBIterator = iterator;
this.rocksDBTable = table;
this.prefix = prefix;
seekToFirst();
}

/** @return the key for the current entry. */
Expand Down Expand Up @@ -150,7 +149,7 @@ public final void removeFromDB() throws IOException {
}

@Override
public void close() throws IOException {
public void close() {
rocksDBIterator.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,12 @@
/**
* RocksDB store iterator using the byte[] API.
*/
public class RDBStoreByteArrayIterator
extends RDBStoreAbstractIterator<byte[]> {
public RDBStoreByteArrayIterator(ManagedRocksIterator iterator,
RDBTable table) {
this(iterator, table, null);
}

public RDBStoreByteArrayIterator(ManagedRocksIterator iterator,
class RDBStoreByteArrayIterator extends RDBStoreAbstractIterator<byte[]> {
RDBStoreByteArrayIterator(ManagedRocksIterator iterator,
RDBTable table, byte[] prefix) {
super(iterator, table,
prefix == null ? null : Arrays.copyOf(prefix, prefix.length));
seekToFirst();
}

@Override
Expand Down
Loading