Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ void releaseBuffer(ChunkBuffer chunkBuffer) {
}

public void clearBufferPool() {
bufferList.forEach(ChunkBuffer::close);
bufferList.clear();
currentBufferIndex = -1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.apache.ratis.util.function.CheckedFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -50,7 +51,7 @@
* A buffer used by {@link Codec}
* for supporting RocksDB direct {@link ByteBuffer} APIs.
*/
public class CodecBuffer implements AutoCloseable {
public class CodecBuffer implements UncheckedAutoCloseable {
public static final Logger LOG = LoggerFactory.getLogger(CodecBuffer.class);

/** To create {@link CodecBuffer} instances. */
Expand Down Expand Up @@ -340,6 +341,12 @@ public int readableBytes() {
return buf.readableBytes();
}

/** @return a writable {@link ByteBuffer}. */
public ByteBuffer asWritableByteBuffer() {
assertRefCnt(1);
return buf.nioBuffer(0, buf.maxCapacity());
}

/** @return a readonly {@link ByteBuffer} view of this buffer. */
public ByteBuffer asReadOnlyByteBuffer() {
assertRefCnt(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@

import org.apache.hadoop.hdds.scm.ByteStringConversion;

import org.apache.hadoop.hdds.utils.db.CodecBuffer;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.UncheckedAutoCloseable;

/** Buffer for a block chunk. */
public interface ChunkBuffer {
public interface ChunkBuffer extends UncheckedAutoCloseable {

/** Similar to {@link ByteBuffer#allocate(int)}. */
static ChunkBuffer allocate(int capacity) {
Expand All @@ -49,7 +51,8 @@ static ChunkBuffer allocate(int capacity, int increment) {
if (increment > 0 && increment < capacity) {
return new IncrementalChunkBuffer(capacity, increment, false);
}
return new ChunkBufferImplWithByteBuffer(ByteBuffer.allocate(capacity));
CodecBuffer codecBuffer = CodecBuffer.allocateDirect(capacity);
return new ChunkBufferImplWithByteBuffer(codecBuffer.asWritableByteBuffer(), codecBuffer);
}

/** Wrap the given {@link ByteBuffer} as a {@link ChunkBuffer}. */
Expand Down Expand Up @@ -86,6 +89,9 @@ default boolean hasRemaining() {
/** Similar to {@link ByteBuffer#clear()}. */
ChunkBuffer clear();

default void close() {
}

/** Similar to {@link ByteBuffer#put(ByteBuffer)}. */
ChunkBuffer put(ByteBuffer b);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,27 @@
import java.util.function.Function;

import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.UncheckedAutoCloseable;

/** {@link ChunkBuffer} implementation using a single {@link ByteBuffer}. */
final class ChunkBufferImplWithByteBuffer implements ChunkBuffer {
private final ByteBuffer buffer;
private final UncheckedAutoCloseable underlying;

ChunkBufferImplWithByteBuffer(ByteBuffer buffer) {
this(buffer, null);
}

ChunkBufferImplWithByteBuffer(ByteBuffer buffer, UncheckedAutoCloseable underlying) {
this.buffer = Objects.requireNonNull(buffer, "buffer == null");
this.underlying = underlying;
}

@Override
public void close() {
if (underlying != null) {
underlying.close();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.common;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.utils.db.CodecBuffer;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

import java.io.IOException;
Expand Down Expand Up @@ -47,6 +48,8 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
private final int limitIndex;
/** Buffer list to be allocated incrementally. */
private final List<ByteBuffer> buffers;
/** The underlying buffers. */
private final List<CodecBuffer> underlying;
/** Is this a duplicated buffer? (for debug only) */
private final boolean isDuplicated;
/** The index of the first non-full buffer. */
Expand All @@ -58,11 +61,18 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
this.limit = limit;
this.increment = increment;
this.limitIndex = limit / increment;
this.buffers = new ArrayList<>(
limitIndex + (limit % increment == 0 ? 0 : 1));
int size = limitIndex + (limit % increment == 0 ? 0 : 1);
this.buffers = new ArrayList<>(size);
this.underlying = isDuplicated ? Collections.emptyList() : new ArrayList<>(size);
this.isDuplicated = isDuplicated;
}

@Override
public void close() {
underlying.forEach(CodecBuffer::release);
underlying.clear();
}

/** @return the capacity for the buffer at the given index. */
private int getBufferCapacityAtIndex(int i) {
Preconditions.checkArgument(i >= 0);
Expand Down Expand Up @@ -99,6 +109,7 @@ private ByteBuffer getAtIndex(int i) {

/** @return the i-th buffer. It may allocate buffers. */
private ByteBuffer getAndAllocateAtIndex(int index) {
Preconditions.checkState(!isDuplicated, "Duplicated buffer is readonly.");
Preconditions.checkArgument(index >= 0);
// never allocate over limit
if (limit % increment == 0) {
Expand All @@ -115,7 +126,9 @@ private ByteBuffer getAndAllocateAtIndex(int index) {
// allocate upto the given index
ByteBuffer b = null;
for (; i <= index; i++) {
b = ByteBuffer.allocate(getBufferCapacityAtIndex(i));
final CodecBuffer c = CodecBuffer.allocateDirect(getBufferCapacityAtIndex(i));
underlying.add(c);
b = c.asWritableByteBuffer();
buffers.add(b);
}
return b;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@

import org.apache.hadoop.hdds.utils.MockGatheringChannel;

import org.apache.hadoop.hdds.utils.db.CodecBuffer;
import org.apache.hadoop.hdds.utils.db.CodecTestUtil;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

Expand All @@ -47,6 +51,16 @@ private static int nextInt(int n) {
return ThreadLocalRandom.current().nextInt(n);
}

@BeforeAll
public static void beforeAll() {
CodecBuffer.enableLeakDetection();
}

@AfterEach
public void after() throws Exception {
CodecTestUtil.gc();
}

@Test
@Timeout(1)
public void testImplWithByteBuffer() {
Expand All @@ -60,7 +74,9 @@ public void testImplWithByteBuffer() {
private static void runTestImplWithByteBuffer(int n) {
final byte[] expected = new byte[n];
ThreadLocalRandom.current().nextBytes(expected);
runTestImpl(expected, 0, ChunkBuffer.allocate(n));
try (ChunkBuffer c = ChunkBuffer.allocate(n)) {
runTestImpl(expected, 0, c);
}
}

@Test
Expand All @@ -79,8 +95,9 @@ public void testIncrementalChunkBuffer() {
private static void runTestIncrementalChunkBuffer(int increment, int n) {
final byte[] expected = new byte[n];
ThreadLocalRandom.current().nextBytes(expected);
runTestImpl(expected, increment,
new IncrementalChunkBuffer(n, increment, false));
try (IncrementalChunkBuffer c = new IncrementalChunkBuffer(n, increment, false)) {
runTestImpl(expected, increment, c);
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.utils.db.CodecBuffer;
import org.apache.hadoop.hdds.utils.db.CodecTestUtil;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.Checksum;
Expand All @@ -44,6 +46,8 @@
import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
import org.apache.hadoop.ozone.container.metadata.DatanodeSchemaThreeDBDefinition;
import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaThreeImpl;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -135,6 +139,16 @@ public void setup() throws Exception {
chunkManager = new FilePerBlockStrategy(true, blockManager, null);
}

@BeforeAll
public static void beforeClass() {
CodecBuffer.enableLeakDetection();
}

@AfterEach
public void after() throws Exception {
CodecTestUtil.gc();
}

@Test
public void testUpgrade() throws IOException {
int num = 2;
Expand Down Expand Up @@ -187,7 +201,7 @@ private Map<String, BlockData> putAnyBlockData(KeyValueContainerData data,
private void putChunksInBlock(int numOfChunksPerBlock, int i,
List<ContainerProtos.ChunkInfo> chunks,
KeyValueContainer container, BlockID blockID) {
long chunkLength = 100;
final long chunkLength = 100;
try {
for (int k = 0; k < numOfChunksPerBlock; k++) {
final String chunkName = String.format("%d_chunk_%d_block_%d",
Expand All @@ -199,11 +213,10 @@ private void putChunksInBlock(int numOfChunksPerBlock, int i,
.setChecksumData(Checksum.getNoChecksumDataProto()).build();
chunks.add(info);
ChunkInfo chunkInfo = new ChunkInfo(chunkName, offset, chunkLength);
final ChunkBuffer chunkData = ChunkBuffer.allocate((int) chunkLength);
chunkManager
.writeChunk(container, blockID, chunkInfo, chunkData, WRITE_STAGE);
chunkManager
.writeChunk(container, blockID, chunkInfo, chunkData, COMMIT_STAGE);
try (ChunkBuffer chunkData = ChunkBuffer.allocate((int) chunkLength)) {
chunkManager.writeChunk(container, blockID, chunkInfo, chunkData, WRITE_STAGE);
chunkManager.writeChunk(container, blockID, chunkInfo, chunkData, COMMIT_STAGE);
}
}
} catch (IOException ex) {
LOG.warn("Putting chunks in blocks was not successful for BlockID: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ public void testReleaseBuffers() throws Exception {
assertThat(watcher.getFutureMap()).isEmpty();
assertThat(watcher.getCommitIndexMap()).isEmpty();
}
} finally {
bufferPool.clearBufferPool();
}
}

Expand Down Expand Up @@ -328,6 +330,8 @@ public void testReleaseBuffersOnException() throws Exception {
assertThat(watcher.getCommitIndexMap()).isEmpty();
}
}
} finally {
bufferPool.clearBufferPool();
}
}
}