diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java index ae8b5764d3f1..a3f1cdee5120 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java @@ -55,14 +55,22 @@ public GrpcContainerUploader( public OutputStream startUpload(long containerId, DatanodeDetails target, CompletableFuture callback, CopyContainerCompression compression) throws IOException { - GrpcReplicationClient client = null; + GrpcReplicationClient client = createReplicationClient(target, compression); try { - client = createReplicationClient(target, compression); StreamObserver requestStream = client.upload( new SendContainerResponseStreamObserver(containerId, target, callback)); - return new SendContainerOutputStream(client, requestStream, containerId, - GrpcReplicationService.BUFFER_SIZE, compression); + return new SendContainerOutputStream(requestStream, containerId, + GrpcReplicationService.BUFFER_SIZE, compression) { + @Override + public void close() throws IOException { + try { + super.close(); + } finally { + IOUtils.close(LOG, client); + } + } + }; } catch (Exception e) { IOUtils.close(LOG, client); throw e; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java index 5de63958fe53..b24dcc10f5dd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java @@ -18,31 +18,21 @@ package org.apache.hadoop.ozone.container.replication; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest; -import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; /** * Output stream adapter for SendContainerResponse. */ class SendContainerOutputStream extends GrpcOutputStream { - private static final Logger LOG = - LoggerFactory.getLogger(SendContainerOutputStream.class); - private final CopyContainerCompression compression; - private final AutoCloseable client; SendContainerOutputStream( - AutoCloseable client, StreamObserver streamObserver, + StreamObserver streamObserver, long containerId, int bufferSize, CopyContainerCompression compression) { super(streamObserver, containerId, bufferSize); this.compression = compression; - this.client = client; } @Override @@ -55,13 +45,4 @@ protected void sendPart(boolean eof, int length, ByteString data) { .build(); getStreamObserver().onNext(request); } - - @Override - public void close() throws IOException { - try { - super.close(); - } finally { - IOUtils.close(LOG, client); - } - } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerOutputStream.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerOutputStream.java index cfd5552d5663..747a4d293502 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerOutputStream.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerOutputStream.java @@ -21,7 +21,6 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; -import org.mockito.Mock; import java.io.OutputStream; @@ -35,23 +34,20 @@ class TestSendContainerOutputStream extends GrpcOutputStreamTest { - @Mock - private AutoCloseable client; - TestSendContainerOutputStream() { super(SendContainerRequest.class); } @Override protected OutputStream createSubject() { - return new SendContainerOutputStream(client, getObserver(), + return new SendContainerOutputStream(getObserver(), getContainerId(), getBufferSize(), NO_COMPRESSION); } @ParameterizedTest @EnumSource void usesCompression(CopyContainerCompression compression) throws Exception { - OutputStream subject = new SendContainerOutputStream(client, + OutputStream subject = new SendContainerOutputStream( getObserver(), getContainerId(), getBufferSize(), compression); byte[] bytes = getRandomBytes(16);