diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java index 274b977ef623..b68b56f67c72 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java @@ -111,6 +111,7 @@ void releaseBuffer(ChunkBuffer chunkBuffer) { } public void clearBufferPool() { + bufferList.forEach(ChunkBuffer::close); bufferList.clear(); currentBufferIndex = -1; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java index 97311b921c82..d78730550371 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java @@ -28,6 +28,7 @@ import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled; import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.UncheckedAutoCloseable; import org.apache.ratis.util.function.CheckedFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +51,7 @@ * A buffer used by {@link Codec} * for supporting RocksDB direct {@link ByteBuffer} APIs. */ -public class CodecBuffer implements AutoCloseable { +public class CodecBuffer implements UncheckedAutoCloseable { public static final Logger LOG = LoggerFactory.getLogger(CodecBuffer.class); /** To create {@link CodecBuffer} instances. */ @@ -340,6 +341,12 @@ public int readableBytes() { return buf.readableBytes(); } + /** @return a writable {@link ByteBuffer}. */ + public ByteBuffer asWritableByteBuffer() { + assertRefCnt(1); + return buf.nioBuffer(0, buf.maxCapacity()); + } + /** @return a readonly {@link ByteBuffer} view of this buffer. */ public ByteBuffer asReadOnlyByteBuffer() { assertRefCnt(1); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java index 3948b5f04fc0..20d89ad86e57 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java @@ -20,17 +20,22 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.hadoop.hdds.scm.ByteStringConversion; +import org.apache.hadoop.hdds.utils.db.CodecBuffer; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.UncheckedAutoCloseable; /** Buffer for a block chunk. */ -public interface ChunkBuffer { +public interface ChunkBuffer extends UncheckedAutoCloseable { /** Similar to {@link ByteBuffer#allocate(int)}. */ static ChunkBuffer allocate(int capacity) { @@ -52,6 +57,30 @@ static ChunkBuffer allocate(int capacity, int increment) { return new ChunkBufferImplWithByteBuffer(ByteBuffer.allocate(capacity)); } + /** Preallocate the entire buffer. */ + static ChunkBuffer preallocate(long capacity, int increment) { + Preconditions.assertTrue(increment > 0); + if (capacity <= increment) { + final CodecBuffer c = CodecBuffer.allocateDirect(Math.toIntExact(capacity)); + return new ChunkBufferImplWithByteBuffer(c.asWritableByteBuffer(), c); + } + + final List buffers = new ArrayList<>(); + try { + for (int size = 0; size < capacity; size += increment) { + final int n = Math.toIntExact(Math.min(increment, capacity - size)); + buffers.add(CodecBuffer.allocateDirect(n)); + } + } catch (Throwable t) { + buffers.forEach(CodecBuffer::release); + } + final List list = buffers.stream() + .map(CodecBuffer::asWritableByteBuffer) + .collect(Collectors.toList()); + return new ChunkBufferImplWithByteBufferList(list, + () -> buffers.forEach(CodecBuffer::close)); + } + /** Wrap the given {@link ByteBuffer} as a {@link ChunkBuffer}. */ static ChunkBuffer wrap(ByteBuffer buffer) { return new ChunkBufferImplWithByteBuffer(buffer); @@ -129,6 +158,9 @@ default ChunkBuffer put(ByteString b) { List asByteBufferList(); + /** Release the underlying buffer(s). */ + void close(); + /** * Write the contents of the buffer from the current position to the limit * to {@code channel}. diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java index 0cf49681cb16..fe2ee5fa8acb 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java @@ -28,13 +28,27 @@ import java.util.function.Function; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.util.UncheckedAutoCloseable; /** {@link ChunkBuffer} implementation using a single {@link ByteBuffer}. */ final class ChunkBufferImplWithByteBuffer implements ChunkBuffer { private final ByteBuffer buffer; + private final UncheckedAutoCloseable underlying; ChunkBufferImplWithByteBuffer(ByteBuffer buffer) { + this(buffer, null); + } + + ChunkBufferImplWithByteBuffer(ByteBuffer buffer, UncheckedAutoCloseable underlying) { this.buffer = Objects.requireNonNull(buffer, "buffer == null"); + this.underlying = underlying; + } + + @Override + public void close() { + if (underlying != null) { + underlying.close(); + } } @Override diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java index 7c3a0c7d2d56..314f644fc6ff 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java @@ -23,7 +23,9 @@ import java.util.Collections; import java.util.Iterator; import java.util.NoSuchElementException; + import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.util.UncheckedAutoCloseable; import java.io.IOException; import java.nio.BufferOverflowException; @@ -45,15 +47,21 @@ public class ChunkBufferImplWithByteBufferList implements ChunkBuffer { /** Buffer list backing the ChunkBuffer. */ private final List buffers; + private final UncheckedAutoCloseable underlying; private final int limit; private int limitPrecedingCurrent; private int currentIndex; ChunkBufferImplWithByteBufferList(List buffers) { + this(buffers, null); + } + + ChunkBufferImplWithByteBufferList(List buffers, UncheckedAutoCloseable underlying) { Objects.requireNonNull(buffers, "buffers == null"); this.buffers = !buffers.isEmpty() ? ImmutableList.copyOf(buffers) : EMPTY_BUFFER; + this.underlying = underlying; this.limit = buffers.stream().mapToInt(ByteBuffer::limit).sum(); findCurrent(); @@ -211,6 +219,13 @@ public List asByteBufferList() { return buffers; } + @Override + public void close() { + if (underlying != null) { + underlying.close(); + } + } + @Override public long writeTo(GatheringByteChannel channel) throws IOException { long bytes = channel.write(buffers.toArray(new ByteBuffer[0])); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java index 5a63c09f1234..7a14f1aa6805 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.common; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.utils.db.CodecBuffer; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import java.io.IOException; @@ -47,6 +48,8 @@ final class IncrementalChunkBuffer implements ChunkBuffer { private final int limitIndex; /** Buffer list to be allocated incrementally. */ private final List buffers; + /** The underlying buffers. */ + private final List underlying; /** Is this a duplicated buffer? (for debug only) */ private final boolean isDuplicated; /** The index of the first non-full buffer. */ @@ -58,11 +61,20 @@ final class IncrementalChunkBuffer implements ChunkBuffer { this.limit = limit; this.increment = increment; this.limitIndex = limit / increment; - this.buffers = new ArrayList<>( - limitIndex + (limit % increment == 0 ? 0 : 1)); + final int size = limitIndex + limit % increment == 0 ? 0 : 1; + this.buffers = new ArrayList<>(size); + this.underlying = isDuplicated ? null : new ArrayList<>(size); this.isDuplicated = isDuplicated; } + @Override + public void close() { + underlying.forEach(CodecBuffer::release); + underlying.clear(); + buffers.clear(); + clear(); + } + /** @return the capacity for the buffer at the given index. */ private int getBufferCapacityAtIndex(int i) { Preconditions.checkArgument(i >= 0); @@ -99,6 +111,7 @@ private ByteBuffer getAtIndex(int i) { /** @return the i-th buffer. It may allocate buffers. */ private ByteBuffer getAndAllocateAtIndex(int index) { + Preconditions.checkState(!isDuplicated, "Duplicated buffer is readonly."); Preconditions.checkArgument(index >= 0); // never allocate over limit if (limit % increment == 0) { @@ -115,7 +128,9 @@ private ByteBuffer getAndAllocateAtIndex(int index) { // allocate upto the given index ByteBuffer b = null; for (; i <= index; i++) { - b = ByteBuffer.allocate(getBufferCapacityAtIndex(i)); + final CodecBuffer c = CodecBuffer.allocateDirect(getBufferCapacityAtIndex(i)); + underlying.add(c); + b = c.asWritableByteBuffer(); buffers.add(b); } return b; diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java index c405b830123d..1a40611d90d6 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java @@ -29,7 +29,11 @@ import org.apache.hadoop.hdds.utils.MockGatheringChannel; +import org.apache.hadoop.hdds.utils.db.CodecBuffer; +import org.apache.hadoop.hdds.utils.db.CodecTestUtil; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -42,6 +46,16 @@ private static int nextInt(int n) { return ThreadLocalRandom.current().nextInt(n); } + @BeforeAll + public static void beforeAll() { + CodecBuffer.enableLeakDetection(); + } + + @AfterEach + public void after() throws Exception { + CodecTestUtil.gc(); + } + @Test @Timeout(1) public void testImplWithByteBuffer() { @@ -55,7 +69,9 @@ public void testImplWithByteBuffer() { private static void runTestImplWithByteBuffer(int n) { final byte[] expected = new byte[n]; ThreadLocalRandom.current().nextBytes(expected); - runTestImpl(expected, 0, ChunkBuffer.allocate(n)); + try (ChunkBuffer c = ChunkBuffer.allocate(n)) { + runTestImpl(expected, 0, c); + } } @Test @@ -74,8 +90,9 @@ public void testIncrementalChunkBuffer() { private static void runTestIncrementalChunkBuffer(int increment, int n) { final byte[] expected = new byte[n]; ThreadLocalRandom.current().nextBytes(expected); - runTestImpl(expected, increment, - new IncrementalChunkBuffer(n, increment, false)); + try (ChunkBuffer buffer = new IncrementalChunkBuffer(n, increment, false)) { + runTestImpl(expected, increment, buffer); + } } @Test diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java index 9c3f29d0f0cc..7a0850422f56 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller; import org.apache.ratis.thirdparty.com.google.protobuf.MessageLite; import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor; @@ -107,9 +108,20 @@ public StreamObserver send( @Override public void onNext(ContainerCommandRequestProto request) { + final DispatcherContext context; + switch (request.getCmdType()) { + case ReadChunk: + context = DispatcherContext.getHandleReadChunk(); + break; + case GetSmallFile: + context = DispatcherContext.getHandleGetSmallFile(); + break; + default: + context = null; + } + try { - ContainerCommandResponseProto resp = - dispatcher.dispatch(request, null); + final ContainerCommandResponseProto resp = dispatcher.dispatch(request, context); responseObserver.onNext(resp); } catch (Throwable e) { LOG.error("Got exception when processing" @@ -121,6 +133,9 @@ public void onNext(ContainerCommandRequestProto request) { if (popStream != null) { IOUtils.close(LOG, popStream); } + if (context != null) { + context.close(); + } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java index d6c976cb389e..ca154e7fb8f5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java @@ -20,9 +20,12 @@ import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.annotation.InterfaceStability; import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.UncheckedAutoCloseable; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; /** * DispatcherContext class holds transport protocol specific context info @@ -30,18 +33,14 @@ */ @InterfaceAudience.Private @InterfaceStability.Evolving -public final class DispatcherContext { - private static final DispatcherContext HANDLE_READ_CHUNK - = newBuilder(Op.HANDLE_READ_CHUNK).build(); +public final class DispatcherContext implements UncheckedAutoCloseable { private static final DispatcherContext HANDLE_WRITE_CHUNK = newBuilder(Op.HANDLE_WRITE_CHUNK).build(); - private static final DispatcherContext HANDLE_GET_SMALL_FILE - = newBuilder(Op.HANDLE_GET_SMALL_FILE).build(); private static final DispatcherContext HANDLE_PUT_SMALL_FILE = newBuilder(Op.HANDLE_PUT_SMALL_FILE).build(); public static DispatcherContext getHandleReadChunk() { - return HANDLE_READ_CHUNK; + return newBuilder(Op.HANDLE_READ_CHUNK).build(); } public static DispatcherContext getHandleWriteChunk() { @@ -49,7 +48,7 @@ public static DispatcherContext getHandleWriteChunk() { } public static DispatcherContext getHandleGetSmallFile() { - return HANDLE_GET_SMALL_FILE; + return newBuilder(Op.HANDLE_GET_SMALL_FILE).build(); } public static DispatcherContext getHandlePutSmallFile() { @@ -108,6 +107,11 @@ public static Op op(DispatcherContext context) { return context == null ? Op.NULL : context.getOp(); } + public static void assertOp(DispatcherContext context, Op expected) { + Objects.requireNonNull(context, "context == null"); + Preconditions.assertSame(expected, context.getOp(), "op"); + } + private final Op op; // whether the chunk data needs to be written or committed or both private final WriteChunkStage stage; @@ -118,6 +122,8 @@ public static Op op(DispatcherContext context) { private final Map container2BCSIDMap; + private final AtomicReference resource = new AtomicReference<>(); + private DispatcherContext(Builder b) { this.op = Objects.requireNonNull(b.op, "op == null"); this.term = b.term; @@ -147,6 +153,19 @@ public Map getContainer2BCSIDMap() { return container2BCSIDMap; } + public void setResource(UncheckedAutoCloseable closeable) { + final UncheckedAutoCloseable previous = resource.getAndSet(closeable); + Preconditions.assertNull(previous, "Resource is already set"); + } + + @Override + public void close() { + final UncheckedAutoCloseable closeable = resource.getAndSet(null); + if (closeable != null) { + closeable.close(); + } + } + @Override public String toString() { return op + "-" + stage + TermIndex.valueOf(term, logIndex); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 362c08c6a94b..57ee3c426811 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -74,6 +74,7 @@ import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.Op; import org.apache.hadoop.ozone.container.common.utils.ContainerLogger; import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; @@ -268,10 +269,9 @@ static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler, case CompactChunk: return handler.handleUnsupportedOp(request); case PutSmallFile: - return handler - .handlePutSmallFile(request, kvContainer, dispatcherContext); + return handler.handlePutSmallFile(request, kvContainer, dispatcherContext); case GetSmallFile: - return handler.handleGetSmallFile(request, kvContainer); + return handler.handleGetSmallFile(request, kvContainer, dispatcherContext); case GetCommittedBlockLength: return handler.handleGetCommittedBlockLength(request, kvContainer); default: @@ -686,6 +686,7 @@ ContainerCommandResponseProto handleDeleteBlock( ContainerCommandResponseProto handleReadChunk( ContainerCommandRequestProto request, KeyValueContainer kvContainer, DispatcherContext dispatcherContext) { + DispatcherContext.assertOp(dispatcherContext, Op.HANDLE_READ_CHUNK); if (!request.hasReadChunk()) { if (LOG.isDebugEnabled()) { @@ -695,7 +696,7 @@ ContainerCommandResponseProto handleReadChunk( return malformedRequest(request); } - ChunkBuffer data; + ChunkBuffer data = null; try { BlockID blockID = BlockID.getFromProtobuf( request.getReadChunk().getBlockID()); @@ -705,9 +706,6 @@ ContainerCommandResponseProto handleReadChunk( checkContainerIsHealthy(kvContainer, blockID, Type.ReadChunk); BlockUtils.verifyBCSId(kvContainer, blockID); - if (dispatcherContext == null) { - dispatcherContext = DispatcherContext.getHandleReadChunk(); - } boolean isReadChunkV0 = getReadChunkVersion(request.getReadChunk()) .equals(ContainerProtos.ReadChunkVersion.V0); @@ -727,26 +725,19 @@ ContainerCommandResponseProto handleReadChunk( if (DispatcherContext.op(dispatcherContext).readFromTmpFile()) { validateChunkChecksumData(data, chunkInfo); } + dispatcherContext.setResource(data); metrics.incContainerBytesStats(Type.ReadChunk, chunkInfo.getLen()); - } catch (StorageContainerException ex) { + return getReadChunkResponse(request, data, byteBufferToByteString); + } catch (Exception e) { + if (data != null) { + data.close(); + } + final StorageContainerException ex = e instanceof StorageContainerException? + (StorageContainerException) e : new StorageContainerException("Read Chunk failed", e, IO_EXCEPTION); return ContainerUtils.logAndReturnError(LOG, ex, request); - } catch (IOException ex) { - return ContainerUtils.logAndReturnError(LOG, - new StorageContainerException("Read Chunk failed", ex, IO_EXCEPTION), - request); } - - Preconditions.checkNotNull(data, "Chunk data is null"); - - return getReadChunkResponse(request, data, byteBufferToByteString); } - /** - * Throw an exception if the container is unhealthy. - * - * @throws StorageContainerException if the container is unhealthy. - */ - @VisibleForTesting void checkContainerIsHealthy(KeyValueContainer kvContainer, BlockID blockID, Type cmd) { kvContainer.readLock(); @@ -908,7 +899,9 @@ ContainerCommandResponseProto handlePutSmallFile( * ChunkManager to process the request. */ ContainerCommandResponseProto handleGetSmallFile( - ContainerCommandRequestProto request, KeyValueContainer kvContainer) { + ContainerCommandRequestProto request, KeyValueContainer kvContainer, + DispatcherContext dispatcherContext) { + DispatcherContext.assertOp(dispatcherContext, Op.HANDLE_GET_SMALL_FILE); if (!request.hasGetSmallFile()) { if (LOG.isDebugEnabled()) { @@ -928,8 +921,7 @@ ContainerCommandResponseProto handleGetSmallFile( ContainerProtos.ChunkInfo chunkInfoProto = null; List dataBuffers = new ArrayList<>(); - final DispatcherContext dispatcherContext - = DispatcherContext.getHandleGetSmallFile(); + List chunkBuffers = new ArrayList<>(); for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) { // if the block is committed, all chunks must have been committed. // Tmp chunk files won't exist here. @@ -946,10 +938,12 @@ ContainerCommandResponseProto handleGetSmallFile( ChunkBuffer data = chunkManager.readChunk(kvContainer, blockID, chunkInfo, dispatcherContext); dataBuffers.addAll(data.toByteStringList(byteBufferToByteString)); + chunkBuffers.add(data); chunkInfoProto = chunk; } metrics.incContainerBytesStats(Type.GetSmallFile, BufferUtils.getBuffersLen(dataBuffers)); + dispatcherContext.setResource(() -> chunkBuffers.forEach(ChunkBuffer::close)); return getGetSmallFileResponseSuccess(request, dataBuffers, chunkInfoProto); } catch (StorageContainerException e) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java index 7266904139df..ed5fcd65b264 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.common.ChunkBuffer; -import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.util.Time; @@ -196,11 +195,16 @@ public static ChunkBuffer readData(long len, int bufferCapacity, return ChunkBuffer.wrap(Collections.emptyList()); } - final ByteBuffer[] buffers = BufferUtils.assignByteBuffers(len, - bufferCapacity); - readData(file, off, len, c -> c.position(off).read(buffers), volume); - Arrays.stream(buffers).forEach(ByteBuffer::flip); - return ChunkBuffer.wrap(Arrays.asList(buffers)); + final ChunkBuffer chunkBuffer = ChunkBuffer.preallocate(len, bufferCapacity); + try { + final ByteBuffer[] buffers = chunkBuffer.asByteBufferList().toArray(new ByteBuffer[0]); + readData(file, off, len, c -> c.position(off).read(buffers), volume); + Arrays.stream(buffers).forEach(ByteBuffer::flip); + return chunkBuffer; + } catch (Exception e) { + chunkBuffer.close(); + throw e; + } } private static void readData(File file, long offset, long len, diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDummyImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDummyImpl.java index 40361774e059..d898dcda66af 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDummyImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDummyImpl.java @@ -32,8 +32,6 @@ import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils; import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; -import java.nio.ByteBuffer; - import static org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils.limitReadSize; /** @@ -76,7 +74,7 @@ public ChunkBuffer readChunk(Container container, BlockID blockID, limitReadSize(info.getLen()); // stats are handled in ChunkManagerImpl - return ChunkBuffer.wrap(ByteBuffer.allocate((int) info.getLen())); + return ChunkBuffer.allocate(Math.toIntExact(info.getLen())); } @Override diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index a3312f6773de..8f2db523f9ab 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -41,6 +41,8 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.utils.db.CodecBuffer; +import org.apache.hadoop.hdds.utils.db.CodecTestUtil; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.ozone.OzoneConfigKeys; @@ -144,6 +146,7 @@ private static Iterable versionInfo() { @BeforeAll public static void init() { + CodecBuffer.enableLeakDetection(); conf = new OzoneConfiguration(); hddsPath = GenericTestUtils .getTempPath(TestContainerPersistence.class.getSimpleName()); @@ -180,7 +183,7 @@ public void setupPaths() throws IOException { } @AfterEach - public void cleanupDir() throws IOException { + public void cleanupDir() throws Exception { // Cleanup cache BlockUtils.shutdownCache(conf); @@ -193,6 +196,7 @@ public void cleanupDir() throws IOException { StorageLocation location = StorageLocation.parse(dir); FileUtils.deleteDirectory(new File(location.getNormalizedUri())); } + CodecTestUtil.gc(); } private long getTestContainerID() { @@ -692,10 +696,11 @@ public void testWritReadManyChunks(ContainerTestVersionInfo versionInfo) // Read chunk via ReadChunk call. for (int x = 0; x < chunkCount; x++) { ChunkInfo info = chunks.get(x); - final ChunkBuffer data = chunkManager.readChunk(container, blockID, info, - DispatcherContext.getHandleReadChunk()); - ChecksumData checksumData = checksum.computeChecksum(data); - assertEquals(info.getChecksumData(), checksumData); + try (ChunkBuffer data = chunkManager.readChunk( + container, blockID, info, DispatcherContext.getHandleReadChunk())) { + final ChecksumData checksumData = checksum.computeChecksum(data); + assertEquals(info.getChecksumData(), checksumData); + } } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index 76cc5f25f010..9e533f944d32 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -247,7 +247,7 @@ public void testHandlerCommandHandling() throws Exception { KeyValueHandler .dispatchRequest(handler, getSmallFileRequest, container, null); Mockito.verify(handler, times(1)).handleGetSmallFile( - any(ContainerCommandRequestProto.class), any()); + any(ContainerCommandRequestProto.class), any(), any()); } @Test diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java index 8fd8d08f24fd..6d30349fd284 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java @@ -23,6 +23,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; +import org.apache.hadoop.hdds.utils.db.CodecBuffer; +import org.apache.hadoop.hdds.utils.db.CodecTestUtil; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; @@ -30,8 +32,10 @@ import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -66,6 +70,12 @@ public class TestKeyValueHandlerWithUnhealthyContainer { @BeforeEach public void init() { mockIcrSender = Mockito.mock(IncrementalReportSender.class); + CodecBuffer.enableLeakDetection(); + } + + @AfterEach + public void check() throws Exception { + CodecTestUtil.gc(); } @Test @@ -110,12 +120,14 @@ public void testReadChunk() { KeyValueContainer container = getMockUnhealthyContainer(); KeyValueHandler handler = getDummyHandler(); - ContainerProtos.ContainerCommandResponseProto response = - handler.handleReadChunk( - getDummyCommandRequestProto( - ContainerProtos.Type.ReadChunk), - container, null); - assertEquals(UNKNOWN_BCSID, response.getResult()); + try (DispatcherContext ctx = DispatcherContext.getHandleReadChunk()) { + ContainerProtos.ContainerCommandResponseProto response = + handler.handleReadChunk( + getDummyCommandRequestProto( + ContainerProtos.Type.ReadChunk), + container, ctx); + assertEquals(UNKNOWN_BCSID, response.getResult()); + } } @Test @@ -123,12 +135,14 @@ public void testGetSmallFile() { KeyValueContainer container = getMockUnhealthyContainer(); KeyValueHandler handler = getDummyHandler(); - ContainerProtos.ContainerCommandResponseProto response = - handler.handleGetSmallFile( - getDummyCommandRequestProto( - ContainerProtos.Type.GetSmallFile), - container); - assertEquals(UNKNOWN_BCSID, response.getResult()); + try (DispatcherContext ctx = DispatcherContext.getHandleGetSmallFile()) { + ContainerProtos.ContainerCommandResponseProto response = + handler.handleGetSmallFile( + getDummyCommandRequestProto( + ContainerProtos.Type.GetSmallFile), + container, ctx); + assertEquals(UNKNOWN_BCSID, response.getResult()); + } } @Test diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/CommonChunkManagerTestCases.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/CommonChunkManagerTestCases.java index ad859704946f..70ab926dfae3 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/CommonChunkManagerTestCases.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/CommonChunkManagerTestCases.java @@ -22,11 +22,16 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.utils.db.CodecBuffer; +import org.apache.hadoop.hdds.utils.db.CodecTestUtil; +import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import java.io.File; @@ -48,6 +53,16 @@ public abstract class CommonChunkManagerTestCases extends AbstractTestChunkManager { + @BeforeAll + public static void beforeAll() { + CodecBuffer.enableLeakDetection(); + } + + @AfterEach + public void after() throws Exception { + CodecTestUtil.gc(); + } + @Test public void testWriteChunkIncorrectLength() { // GIVEN @@ -95,6 +110,7 @@ public void testReadOversizeChunk() throws IOException { // WHEN+THEN assertThrows(StorageContainerException.class, () -> chunkManager.readChunk(container, blockID, chunkInfo, null) + .close() ); } @@ -133,14 +149,14 @@ chunkInfo, getData(), blockData.addChunk(chunkInfo.getProtoBufMessage()); getBlockManager().putBlock(container, blockData); - ByteBuffer expectedData = chunkManager - .readChunk(container, blockID, chunkInfo, null) - .toByteString().asReadOnlyByteBuffer(); - // THEN - assertEquals(chunkInfo.getLen(), expectedData.remaining()); - assertEquals(expectedData.rewind(), rewindBufferToDataStart()); - checkReadIOStats(expectedData.limit(), 1); + try (ChunkBuffer chunk = chunkManager.readChunk( + container, blockID, chunkInfo, null)) { + final ByteBuffer bytes = chunk.toByteString().asReadOnlyByteBuffer(); + assertEquals(chunkInfo.getLen(), bytes.remaining()); + assertEquals(rewindBufferToDataStart(), bytes.rewind()); + checkReadIOStats(bytes.limit(), 1); + } } @Test @@ -186,11 +202,10 @@ public void testDeletePartialChunkUnsupportedRequest() { public void testReadChunkFileNotExists() { // GIVEN ChunkManager chunkManager = createTestSubject(); - try { - // WHEN - chunkManager.readChunk(getKeyValueContainer(), - getBlockID(), getChunkInfo(), null); + // WHEN + try (ChunkBuffer ignored = chunkManager.readChunk(getKeyValueContainer(), + getBlockID(), getChunkInfo(), null)) { // THEN fail("testReadChunkFileNotExists failed"); @@ -228,7 +243,7 @@ public void testWriteAndReadChunkMultipleTimes() throws Exception { for (int i = 0; i < count; i++) { ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", localID, i), i * len, len); - chunkManager.readChunk(container, blockID, info, null); + chunkManager.readChunk(container, blockID, info, null).close(); } // THEN diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestChunkManagerDummyImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestChunkManagerDummyImpl.java index 714426108bc6..e3c28dc358f4 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestChunkManagerDummyImpl.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestChunkManagerDummyImpl.java @@ -50,9 +50,9 @@ public void dummyManagerDoesNotWriteToFile() throws Exception { public void dummyManagerReadsAnyChunk() throws Exception { ChunkManager dummy = createTestSubject(); - ChunkBuffer dataRead = dummy.readChunk(getKeyValueContainer(), - getBlockID(), getChunkInfo(), null); - - assertNotNull(dataRead); + try (ChunkBuffer dataRead = dummy.readChunk(getKeyValueContainer(), + getBlockID(), getChunkInfo(), null)) { + assertNotNull(dataRead); + } } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java index 304bfa7f2065..899cce0f104c 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java @@ -30,7 +30,6 @@ import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; import org.junit.jupiter.api.Test; -import java.nio.ByteBuffer; import java.security.MessageDigest; import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk; @@ -96,12 +95,11 @@ public void testMultipleWriteSingleRead() throws Exception { // Request to read the whole data in a single go. ChunkInfo largeChunk = getChunk(blockID.getLocalID(), 0, 0, datalen * chunkCount); - ChunkBuffer chunk = - subject.readChunk(container, blockID, largeChunk, - null); - ByteBuffer newdata = chunk.toByteString().asReadOnlyByteBuffer(); MessageDigest newSha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); - newSha.update(newdata); + try (ChunkBuffer chunk = subject.readChunk( + container, blockID, largeChunk, null)) { + chunk.asByteBufferList().forEach(newSha::update); + } assertEquals(Hex.encodeHexString(oldSha.digest()), Hex.encodeHexString(newSha.digest())); } @@ -123,18 +121,23 @@ public void testPartialRead() throws Exception { ChunkManager subject = createTestSubject(); subject.writeChunk(container, blockID, info, data, WRITE_STAGE); - ChunkBuffer readData = subject.readChunk(container, blockID, info, null); // data will be ChunkBufferImplWithByteBuffer and readData will return // ChunkBufferImplWithByteBufferList. Hence, convert both ByteStrings // before comparing. - assertEquals(data.rewind().toByteString(), - readData.rewind().toByteString()); + try (ChunkBuffer readData = subject.readChunk( + container, blockID, info, null)) { + assertEquals(data.rewind().toByteString(), + readData.rewind().toByteString()); + } ChunkInfo info2 = getChunk(blockID.getLocalID(), 0, start, length); - ChunkBuffer readData2 = subject.readChunk(container, blockID, info2, null); - assertEquals(length, info2.getLen()); - assertEquals(data.rewind().toByteString().substring(start, start + length), - readData2.rewind().toByteString()); + try (ChunkBuffer readData2 = subject.readChunk( + container, blockID, info2, null)) { + assertEquals(length, info2.getLen()); + assertEquals( + data.rewind().toByteString().substring(start, start + length), + readData2.rewind().toByteString()); + } } @Override diff --git a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/upgrade/TestUpgradeManager.java b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/upgrade/TestUpgradeManager.java index 9f4c0aa26fa7..94b92270acc5 100644 --- a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/upgrade/TestUpgradeManager.java +++ b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/upgrade/TestUpgradeManager.java @@ -23,6 +23,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.utils.db.CodecBuffer; +import org.apache.hadoop.hdds.utils.db.CodecTestUtil; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.common.Checksum; @@ -47,6 +49,7 @@ import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaThreeImpl; import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -143,9 +146,15 @@ public void setup() throws Exception { chunkManager = new FilePerBlockStrategy(true, blockManager, null); } + @BeforeAll + public static void beforeClass() { + CodecBuffer.enableLeakDetection(); + } + @AfterEach - public void after() throws IOException { + public void after() throws Exception { FileUtils.deleteDirectory(testRoot); + CodecTestUtil.gc(); } @Test @@ -200,23 +209,24 @@ private Map putAnyBlockData(KeyValueContainerData data, private void putChunksInBlock(int numOfChunksPerBlock, int i, List chunks, KeyValueContainer container, BlockID blockID) { - long chunkLength = 100; + final int chunkLength = 100; try { for (int k = 0; k < numOfChunksPerBlock; k++) { final String chunkName = String.format("%d_chunk_%d_block_%d", blockID.getContainerBlockID().getLocalID(), k, i); - final long offset = k * chunkLength; + final long offset = k * (long)chunkLength; ContainerProtos.ChunkInfo info = ContainerProtos.ChunkInfo.newBuilder().setChunkName(chunkName) .setLen(chunkLength).setOffset(offset) .setChecksumData(Checksum.getNoChecksumDataProto()).build(); chunks.add(info); ChunkInfo chunkInfo = new ChunkInfo(chunkName, offset, chunkLength); - final ChunkBuffer chunkData = ChunkBuffer.allocate((int) chunkLength); - chunkManager - .writeChunk(container, blockID, chunkInfo, chunkData, WRITE_STAGE); - chunkManager - .writeChunk(container, blockID, chunkInfo, chunkData, COMMIT_STAGE); + try (ChunkBuffer chunkData = ChunkBuffer.allocate(chunkLength)) { + chunkManager.writeChunk(container, blockID, chunkInfo, chunkData, + WRITE_STAGE); + chunkManager.writeChunk(container, blockID, chunkInfo, chunkData, + COMMIT_STAGE); + } } } catch (IOException ex) { LOG.warn("Putting chunks in blocks was not successful for BlockID: " diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithInAdequateDN.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithInAdequateDN.java index 131ce7055394..2f61116886fc 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithInAdequateDN.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithInAdequateDN.java @@ -39,6 +39,8 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.utils.IOUtils; +import org.apache.hadoop.hdds.utils.db.CodecBuffer; +import org.apache.hadoop.hdds.utils.db.CodecTestUtil; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; @@ -68,6 +70,8 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; + +import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Assume; @@ -91,11 +95,10 @@ public class TestDeleteWithInAdequateDN { /** * Create a MiniDFSCluster for testing. - * - * @throws IOException */ @BeforeClass public static void init() throws Exception { + CodecBuffer.enableLeakDetection(); conf = new OzoneConfiguration(); path = GenericTestUtils .getTempPath(TestContainerStateMachineFailures.class.getSimpleName()); @@ -170,6 +173,11 @@ public static void init() throws Exception { objectStore.getVolume(volumeName).createBucket(bucketName); } + @After + public void after() throws Exception { + CodecTestUtil.gc(); + } + /** * Shutdown MiniDFSCluster. */ @@ -282,9 +290,9 @@ public void testDeleteKeyWithInAdequateDN() throws Exception { // deleteBlock handler is invoked try { for (ContainerProtos.ChunkInfo chunkInfo : blockData.getChunks()) { - keyValueHandler.getChunkManager() - .readChunk(container, blockID, ChunkInfo.getFromProtoBuf(chunkInfo), - null); + final ChunkInfo i = ChunkInfo.getFromProtoBuf(chunkInfo); + keyValueHandler.getChunkManager().readChunk( + container, blockID, i, null).close(); } } catch (IOException ioe) { Assert.fail("Exception should not be thrown."); @@ -314,8 +322,9 @@ public void testDeleteKeyWithInAdequateDN() throws Exception { // make sure the chunk is now deleted on the all dns try { for (ContainerProtos.ChunkInfo chunkInfo : blockData.getChunks()) { - keyValueHandler.getChunkManager().readChunk(container, blockID, - ChunkInfo.getFromProtoBuf(chunkInfo), null); + final ChunkInfo i = ChunkInfo.getFromProtoBuf(chunkInfo); + keyValueHandler.getChunkManager().readChunk( + container, blockID, i, null).close(); } Assert.fail("Expected exception is not thrown"); } catch (IOException ioe) {