Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ void releaseBuffer(ChunkBuffer chunkBuffer) {
}

public void clearBufferPool() {
bufferList.forEach(ChunkBuffer::close);
bufferList.clear();
currentBufferIndex = -1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.apache.ratis.util.function.CheckedFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -50,7 +51,7 @@
* A buffer used by {@link Codec}
* for supporting RocksDB direct {@link ByteBuffer} APIs.
*/
public class CodecBuffer implements AutoCloseable {
public class CodecBuffer implements UncheckedAutoCloseable {
public static final Logger LOG = LoggerFactory.getLogger(CodecBuffer.class);

/** To create {@link CodecBuffer} instances. */
Expand Down Expand Up @@ -340,6 +341,12 @@ public int readableBytes() {
return buf.readableBytes();
}

/** @return a writable {@link ByteBuffer}. */
public ByteBuffer asWritableByteBuffer() {
assertRefCnt(1);
return buf.nioBuffer(0, buf.maxCapacity());
}

/** @return a readonly {@link ByteBuffer} view of this buffer. */
public ByteBuffer asReadOnlyByteBuffer() {
assertRefCnt(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,22 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.hadoop.hdds.scm.ByteStringConversion;

import org.apache.hadoop.hdds.utils.db.CodecBuffer;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.UncheckedAutoCloseable;

/** Buffer for a block chunk. */
public interface ChunkBuffer {
public interface ChunkBuffer extends UncheckedAutoCloseable {

/** Similar to {@link ByteBuffer#allocate(int)}. */
static ChunkBuffer allocate(int capacity) {
Expand All @@ -52,6 +57,30 @@ static ChunkBuffer allocate(int capacity, int increment) {
return new ChunkBufferImplWithByteBuffer(ByteBuffer.allocate(capacity));
}

/** Preallocate the entire buffer. */
static ChunkBuffer preallocate(long capacity, int increment) {
Preconditions.assertTrue(increment > 0);
if (capacity <= increment) {
final CodecBuffer c = CodecBuffer.allocateDirect(Math.toIntExact(capacity));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be cleaner if we directly deal with ByteBufAllocator and ByteBuf in ChunkBuffer. CodecBuffer logic doesn't provide much, but adds an unnecessary dependency.

return new ChunkBufferImplWithByteBuffer(c.asWritableByteBuffer(), c);
}

final List<CodecBuffer> buffers = new ArrayList<>();
try {
for (int size = 0; size < capacity; size += increment) {
final int n = Math.toIntExact(Math.min(increment, capacity - size));
buffers.add(CodecBuffer.allocateDirect(n));
}
} catch (Throwable t) {
buffers.forEach(CodecBuffer::release);
}
final List<ByteBuffer> list = buffers.stream()
.map(CodecBuffer::asWritableByteBuffer)
.collect(Collectors.toList());
return new ChunkBufferImplWithByteBufferList(list,
() -> buffers.forEach(CodecBuffer::close));
}

/** Wrap the given {@link ByteBuffer} as a {@link ChunkBuffer}. */
static ChunkBuffer wrap(ByteBuffer buffer) {
return new ChunkBufferImplWithByteBuffer(buffer);
Expand Down Expand Up @@ -129,6 +158,9 @@ default ChunkBuffer put(ByteString b) {

List<ByteBuffer> asByteBufferList();

/** Release the underlying buffer(s). */
void close();

/**
* Write the contents of the buffer from the current position to the limit
* to {@code channel}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,27 @@
import java.util.function.Function;

import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.UncheckedAutoCloseable;

/** {@link ChunkBuffer} implementation using a single {@link ByteBuffer}. */
final class ChunkBufferImplWithByteBuffer implements ChunkBuffer {
private final ByteBuffer buffer;
private final UncheckedAutoCloseable underlying;

ChunkBufferImplWithByteBuffer(ByteBuffer buffer) {
this(buffer, null);
}

ChunkBufferImplWithByteBuffer(ByteBuffer buffer, UncheckedAutoCloseable underlying) {
this.buffer = Objects.requireNonNull(buffer, "buffer == null");
this.underlying = underlying;
}

@Override
public void close() {
if (underlying != null) {
underlying.close();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;

import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.UncheckedAutoCloseable;

import java.io.IOException;
import java.nio.BufferOverflowException;
Expand All @@ -45,15 +47,21 @@ public class ChunkBufferImplWithByteBufferList implements ChunkBuffer {

/** Buffer list backing the ChunkBuffer. */
private final List<ByteBuffer> buffers;
private final UncheckedAutoCloseable underlying;
private final int limit;

private int limitPrecedingCurrent;
private int currentIndex;

ChunkBufferImplWithByteBufferList(List<ByteBuffer> buffers) {
this(buffers, null);
}

ChunkBufferImplWithByteBufferList(List<ByteBuffer> buffers, UncheckedAutoCloseable underlying) {
Objects.requireNonNull(buffers, "buffers == null");
this.buffers = !buffers.isEmpty() ? ImmutableList.copyOf(buffers) :
EMPTY_BUFFER;
this.underlying = underlying;
this.limit = buffers.stream().mapToInt(ByteBuffer::limit).sum();

findCurrent();
Expand Down Expand Up @@ -211,6 +219,13 @@ public List<ByteBuffer> asByteBufferList() {
return buffers;
}

@Override
public void close() {
if (underlying != null) {
underlying.close();
}
}

@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
long bytes = channel.write(buffers.toArray(new ByteBuffer[0]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.common;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.utils.db.CodecBuffer;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

import java.io.IOException;
Expand Down Expand Up @@ -47,6 +48,8 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
private final int limitIndex;
/** Buffer list to be allocated incrementally. */
private final List<ByteBuffer> buffers;
/** The underlying buffers. */
private final List<CodecBuffer> underlying;
/** Is this a duplicated buffer? (for debug only) */
private final boolean isDuplicated;
/** The index of the first non-full buffer. */
Expand All @@ -58,11 +61,20 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
this.limit = limit;
this.increment = increment;
this.limitIndex = limit / increment;
this.buffers = new ArrayList<>(
limitIndex + (limit % increment == 0 ? 0 : 1));
final int size = limitIndex + limit % increment == 0 ? 0 : 1;
this.buffers = new ArrayList<>(size);
this.underlying = isDuplicated ? null : new ArrayList<>(size);
this.isDuplicated = isDuplicated;
}

@Override
public void close() {
underlying.forEach(CodecBuffer::release);
underlying.clear();
buffers.clear();
clear();
}

/** @return the capacity for the buffer at the given index. */
private int getBufferCapacityAtIndex(int i) {
Preconditions.checkArgument(i >= 0);
Expand Down Expand Up @@ -99,6 +111,7 @@ private ByteBuffer getAtIndex(int i) {

/** @return the i-th buffer. It may allocate buffers. */
private ByteBuffer getAndAllocateAtIndex(int index) {
Preconditions.checkState(!isDuplicated, "Duplicated buffer is readonly.");
Preconditions.checkArgument(index >= 0);
// never allocate over limit
if (limit % increment == 0) {
Expand All @@ -115,7 +128,9 @@ private ByteBuffer getAndAllocateAtIndex(int index) {
// allocate upto the given index
ByteBuffer b = null;
for (; i <= index; i++) {
b = ByteBuffer.allocate(getBufferCapacityAtIndex(i));
final CodecBuffer c = CodecBuffer.allocateDirect(getBufferCapacityAtIndex(i));
underlying.add(c);
b = c.asWritableByteBuffer();
buffers.add(b);
}
return b;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@

import org.apache.hadoop.hdds.utils.MockGatheringChannel;

import org.apache.hadoop.hdds.utils.db.CodecBuffer;
import org.apache.hadoop.hdds.utils.db.CodecTestUtil;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand All @@ -42,6 +46,16 @@ private static int nextInt(int n) {
return ThreadLocalRandom.current().nextInt(n);
}

@BeforeAll
public static void beforeAll() {
CodecBuffer.enableLeakDetection();
}

@AfterEach
public void after() throws Exception {
CodecTestUtil.gc();
}

@Test
@Timeout(1)
public void testImplWithByteBuffer() {
Expand All @@ -55,7 +69,9 @@ public void testImplWithByteBuffer() {
private static void runTestImplWithByteBuffer(int n) {
final byte[] expected = new byte[n];
ThreadLocalRandom.current().nextBytes(expected);
runTestImpl(expected, 0, ChunkBuffer.allocate(n));
try (ChunkBuffer c = ChunkBuffer.allocate(n)) {
runTestImpl(expected, 0, c);
}
}

@Test
Expand All @@ -74,8 +90,9 @@ public void testIncrementalChunkBuffer() {
private static void runTestIncrementalChunkBuffer(int increment, int n) {
final byte[] expected = new byte[n];
ThreadLocalRandom.current().nextBytes(expected);
runTestImpl(expected, increment,
new IncrementalChunkBuffer(n, increment, false));
try (ChunkBuffer buffer = new IncrementalChunkBuffer(n, increment, false)) {
runTestImpl(expected, increment, buffer);
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller;
import org.apache.ratis.thirdparty.com.google.protobuf.MessageLite;
import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor;
Expand Down Expand Up @@ -107,9 +108,20 @@ public StreamObserver<ContainerCommandRequestProto> send(

@Override
public void onNext(ContainerCommandRequestProto request) {
final DispatcherContext context;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we got to do the same in ContainerStateMachine.readStateMachineData. Otherwise there'll be memory leak. Not sure if I should put this comment in #5805

switch (request.getCmdType()) {
case ReadChunk:
context = DispatcherContext.getHandleReadChunk();
break;
case GetSmallFile:
context = DispatcherContext.getHandleGetSmallFile();
break;
default:
context = null;
}

try {
ContainerCommandResponseProto resp =
dispatcher.dispatch(request, null);
final ContainerCommandResponseProto resp = dispatcher.dispatch(request, context);
responseObserver.onNext(resp);
} catch (Throwable e) {
LOG.error("Got exception when processing"
Expand All @@ -121,6 +133,9 @@ public void onNext(ContainerCommandRequestProto request) {
if (popStream != null) {
IOUtils.close(LOG, popStream);
}
if (context != null) {
context.close();
}
}
}

Expand Down
Loading