diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index aab70a692e30..220e5481cbd3 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -27,6 +27,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -94,6 +95,9 @@ public class BlockOutputStream extends OutputStream { KeyValue.newBuilder().setKey(FULL_CHUNK).build(); private AtomicReference blockID; + // planned block full size + private long blockSize; + private AtomicBoolean eofSent = new AtomicBoolean(false); private final AtomicReference previousChunkInfo = new AtomicReference<>(); @@ -164,6 +168,7 @@ public class BlockOutputStream extends OutputStream { @SuppressWarnings("checkstyle:ParameterNumber") public BlockOutputStream( BlockID blockID, + long blockSize, XceiverClientFactory xceiverClientManager, Pipeline pipeline, BufferPool bufferPool, @@ -175,6 +180,7 @@ public BlockOutputStream( this.xceiverClientFactory = xceiverClientManager; this.config = config; this.blockID = new AtomicReference<>(blockID); + this.blockSize = blockSize; replicationIndex = pipeline.getReplicaIndex(pipeline.getClosestNode()); KeyValue keyValue = KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build(); @@ -530,7 +536,7 @@ CompletableFuture executePutBlock(boolean close, final XceiverClientReply asyncReply; try { BlockData blockData = containerBlockData.build(); - LOG.debug("sending PutBlock {}", blockData); + LOG.debug("sending PutBlock {} flushPos {}", blockData, flushPos); if (config.getIncrementalChunkList()) { // remove any chunks in the containerBlockData list. @@ -538,7 +544,9 @@ CompletableFuture executePutBlock(boolean close, containerBlockData.clearChunks(); } - asyncReply = putBlockAsync(xceiverClient, blockData, close, tokenString); + // if block is full, send the eof + boolean isBlockFull = (blockSize != -1 && flushPos == blockSize); + asyncReply = putBlockAsync(xceiverClient, blockData, close || isBlockFull, tokenString); CompletableFuture future = asyncReply.getResponse(); flushFuture = future.thenApplyAsync(e -> { try { @@ -550,6 +558,7 @@ CompletableFuture executePutBlock(boolean close, if (getIoException() == null && !force) { handleSuccessfulPutBlock(e.getPutBlock().getCommittedBlockLength(), asyncReply, flushPos, byteBufferList); + eofSent.set(close || isBlockFull); } return e; }, responseExecutor).exceptionally(e -> { @@ -690,7 +699,7 @@ private synchronized CompletableFuture handleFlushInternalSynchronized(boo // There're no pending written data, but there're uncommitted data. updatePutBlockLength(); putBlockResultFuture = executePutBlock(close, false); - } else if (close) { + } else if (close && !eofSent.get()) { // forcing an "empty" putBlock if stream is being closed without new // data since latest flush - we need to send the "EOF" flag updatePutBlockLength(); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java index bbb3f30687af..12ca9978c685 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java @@ -80,7 +80,7 @@ public ECBlockOutputStream( ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs, Supplier executorServiceSupplier ) throws IOException { - super(blockID, xceiverClientManager, + super(blockID, -1, xceiverClientManager, pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, executorServiceSupplier); // In EC stream, there will be only one node in pipeline. this.datanodeDetails = pipeline.getClosestNode(); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java index c0e99a5b4a08..d32c37eba6c3 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java @@ -72,6 +72,7 @@ public class RatisBlockOutputStream extends BlockOutputStream @SuppressWarnings("checkstyle:ParameterNumber") public RatisBlockOutputStream( BlockID blockID, + long blockSize, XceiverClientFactory xceiverClientManager, Pipeline pipeline, BufferPool bufferPool, @@ -80,7 +81,7 @@ public RatisBlockOutputStream( ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs, Supplier blockOutputStreamResourceProvider ) throws IOException { - super(blockID, xceiverClientManager, pipeline, + super(blockID, blockSize, xceiverClientManager, pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, blockOutputStreamResourceProvider); this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient()); } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java index df4d1cb3f8aa..d3425b7d2b0f 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java @@ -174,6 +174,7 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool) return new RatisBlockOutputStream( new BlockID(1L, 1L), + -1, xcm, pipeline, bufferPool, diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 18a9231c66f7..5e6ecceefa1e 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -108,7 +108,7 @@ void checkStream() throws IOException { * @throws IOException */ void createOutputStream() throws IOException { - outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager, + outputStream = new RatisBlockOutputStream(blockID, length, xceiverClientManager, pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, executorServiceSupplier); }