diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 0df37437c746..01f0ee0543c4 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -54,14 +54,6 @@ public class OzoneClientConfig { tags = ConfigTag.CLIENT) private int streamBufferSize = 4 * 1024 * 1024; - @Config(key = "datastream.max.buffer.size", - defaultValue = "4MB", - type = ConfigType.SIZE, - description = "The maximum size of the ByteBuffer " - + "(used via ratis streaming)", - tags = ConfigTag.CLIENT) - private int dataStreamMaxBufferSize = 4 * 1024 * 1024; - @Config(key = "datastream.buffer.flush.size", defaultValue = "16MB", type = ConfigType.SIZE, @@ -77,6 +69,15 @@ public class OzoneClientConfig { tags = ConfigTag.CLIENT) private int dataStreamMinPacketSize = 1024 * 1024; + @Config(key = "datastream.window.size", + defaultValue = "64MB", + type = ConfigType.SIZE, + description = "Maximum size of BufferList(used for retry) size per " + + "BlockDataStreamOutput instance", + tags = ConfigTag.CLIENT) + private long streamWindowSize = 64 * 1024 * 1024; + + @Config(key = "stream.buffer.increment", defaultValue = "0B", type = ConfigType.SIZE, @@ -191,14 +192,6 @@ public void setStreamBufferSize(int streamBufferSize) { this.streamBufferSize = streamBufferSize; } - public int getDataStreamMaxBufferSize() { - return dataStreamMaxBufferSize; - } - - public void setDataStreamMaxBufferSize(int dataStreamMaxBufferSize) { - this.dataStreamMaxBufferSize = dataStreamMaxBufferSize; - } - public boolean isStreamBufferFlushDelay() { return streamBufferFlushDelay; } @@ -223,6 +216,14 @@ public void setDataStreamMinPacketSize(int dataStreamMinPacketSize) { this.dataStreamMinPacketSize = dataStreamMinPacketSize; } + public long getStreamWindowSize() { + return streamWindowSize; + } + + public void setStreamWindowSize(long streamWindowSize) { + this.streamWindowSize = streamWindowSize; + } + public int getMaxRetryCount() { return maxRetryCount; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index f37cd1c94ae8..2ad6b690644e 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -122,7 +122,7 @@ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, this.ozoneConfiguration = configuration; } - private void updateCommitInfosMap( + public void updateCommitInfosMap( Collection commitInfoProtos) { // if the commitInfo map is empty, just update the commit indexes for each // of the servers diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index ec925d1e6ac3..a3fe1c247951 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -52,7 +52,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; +import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -116,9 +118,9 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { // Also, corresponding to the logIndex, the corresponding list of buffers will // be released from the buffer pool. private final StreamCommitWatcher commitWatcher; - private final AtomicReference> putBlockFuture - = new AtomicReference<>(CompletableFuture.completedFuture(null)); + + private Queue> + putBlockFutures = new LinkedList<>(); private final List failedServers; private final Checksum checksum; @@ -307,14 +309,33 @@ private void allocateNewBufferIfNeeded() { } private void doFlushIfNeeded() throws IOException { - Preconditions.checkArgument(config.getDataStreamBufferFlushSize() > config - .getDataStreamMaxBufferSize()); long boundary = config.getDataStreamBufferFlushSize() / config - .getDataStreamMaxBufferSize(); + .getDataStreamMinPacketSize(); + // streamWindow is the maximum number of buffers that + // are allowed to exist in the bufferList. If buffers in + // the list exceed this limit , client will till it gets + // one putBlockResponse (first index) . This is similar to + // the bufferFull condition in async write path. + long streamWindow = config.getStreamWindowSize() / config + .getDataStreamMinPacketSize(); if (!bufferList.isEmpty() && bufferList.size() % boundary == 0) { updateFlushLength(); executePutBlock(false, false); } + if (bufferList.size()==streamWindow){ + try { + checkOpen(); + if (!putBlockFutures.isEmpty()) { + putBlockFutures.remove().get(); + } + } catch (ExecutionException e) { + handleExecutionException(e); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + handleInterruptedException(ex, true); + } + watchForCommit(true); + } } private void updateFlushLength() { @@ -453,8 +474,7 @@ private void executePutBlock(boolean close, setIoException(ce); throw ce; }); - putBlockFuture.updateAndGet(f -> f.thenCombine(flushFuture, - (previous, current) -> current)); + putBlockFutures.add(flushFuture); } catch (IOException | ExecutionException e) { throw new IOException(EXCEPTION_MSG + e.toString(), e); } catch (InterruptedException ex) { @@ -496,7 +516,7 @@ private void handleFlush(boolean close) // data since latest flush - we need to send the "EOF" flag executePutBlock(true, true); } - putBlockFuture.get().get(); + CompletableFuture.allOf(putBlockFutures.toArray(EMPTY_FUTURE_ARRAY)).get(); watchForCommit(false); // just check again if the exception is hit while waiting for the // futures to ensure flush has indeed succeeded @@ -638,6 +658,8 @@ private void writeChunkToContainer(ByteBuffer buf) CompletionException ce = new CompletionException(msg, e); setIoException(ce); throw ce; + } else if (r.isSuccess()) { + xceiverClient.updateCommitInfosMap(r.getCommitInfos()); } }, responseExecutor); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index fc86ff3185f8..058ba6af2c24 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -322,7 +322,7 @@ abstract class Builder { protected OptionalInt streamBufferSize = OptionalInt.empty(); protected Optional streamBufferFlushSize = Optional.empty(); protected Optional dataStreamBufferFlushSize= Optional.empty(); - protected OptionalInt dataStreamMaxBufferSize = OptionalInt.empty(); + protected Optional datastreamWindowSize= Optional.empty(); protected Optional streamBufferMaxSize = Optional.empty(); protected OptionalInt dataStreamMinPacketSize = OptionalInt.empty(); protected Optional blockSize = Optional.empty(); @@ -558,11 +558,6 @@ public Builder setStreamBufferMaxSize(long size) { return this; } - public Builder setDataStreamBufferMaxSize(int size) { - dataStreamMaxBufferSize = OptionalInt.of(size); - return this; - } - public Builder setDataStreamBufferFlushize(long size) { dataStreamBufferFlushSize = Optional.of(size); return this; @@ -573,6 +568,11 @@ public Builder setDataStreamMinPacketSize(int size) { return this; } + public Builder setDataStreamStreamWindowSize(long size) { + datastreamWindowSize = Optional.of(size); + return this; + } + /** * Sets the block size for stream buffer. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index c30dbada9e4c..68ec88c1b0db 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -645,12 +645,12 @@ protected void initializeConfiguration() throws IOException { if (!dataStreamBufferFlushSize.isPresent()) { dataStreamBufferFlushSize = Optional.of((long) 4 * chunkSize.get()); } - if (!dataStreamMaxBufferSize.isPresent()) { - dataStreamMaxBufferSize = OptionalInt.of(chunkSize.get()); - } if (!dataStreamMinPacketSize.isPresent()) { dataStreamMinPacketSize = OptionalInt.of(chunkSize.get()/4); } + if (!datastreamWindowSize.isPresent()) { + datastreamWindowSize = Optional.of((long) 8 * chunkSize.get()); + } if (!blockSize.isPresent()) { blockSize = Optional.of(2 * streamBufferMaxSize.get()); } @@ -669,12 +669,11 @@ protected void initializeConfiguration() throws IOException { streamBufferSizeUnit.get().toBytes(streamBufferFlushSize.get()))); clientConfig.setDataStreamBufferFlushSize(Math.round( streamBufferSizeUnit.get().toBytes(dataStreamBufferFlushSize.get()))); - clientConfig.setDataStreamMaxBufferSize((int) Math.round( - streamBufferSizeUnit.get() - .toBytes(dataStreamMaxBufferSize.getAsInt()))); clientConfig.setDataStreamMinPacketSize((int) Math.round( streamBufferSizeUnit.get() .toBytes(dataStreamMinPacketSize.getAsInt()))); + clientConfig.setStreamWindowSize(Math.round( + streamBufferSizeUnit.get().toBytes(datastreamWindowSize.get()))); conf.setFromObject(clientConfig); conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index c6a3c32d2bb9..696ab92ab78d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -106,9 +106,9 @@ public static void init() throws Exception { .setStreamBufferFlushSize(flushSize) .setStreamBufferMaxSize(maxFlushSize) .setDataStreamBufferFlushize(maxFlushSize) - .setDataStreamBufferMaxSize(chunkSize) .setStreamBufferSizeUnit(StorageUnit.BYTES) - .setDataStreamMinPacketSize(2*chunkSize/5) + .setDataStreamMinPacketSize(chunkSize) + .setDataStreamStreamWindowSize(5*chunkSize) .build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key @@ -195,7 +195,7 @@ private void testWriteWithFailure(int dataLength) throws Exception { @Test public void testPutBlockAtBoundary() throws Exception { - int dataLength = 200; + int dataLength = 500; XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); long putBlockCount = metrics.getContainerOpCountMetrics( @@ -213,8 +213,8 @@ public void testPutBlockAtBoundary() throws Exception { metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock) <= pendingPutBlockCount + 1); key.close(); - // Since data length is 200 , first putBlock will be at 160(flush boundary) - // and the other at 200 + // Since data length is 500 , first putBlock will be at 400(flush boundary) + // and the other at 500 Assert.assertTrue( metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock) == putBlockCount + 2); @@ -242,10 +242,10 @@ public void testMinPacketSize() throws Exception { long writeChunkCount = metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); byte[] data = - ContainerTestHelper.getFixedLengthString(keyString, chunkSize / 5) + ContainerTestHelper.getFixedLengthString(keyString, chunkSize / 2) .getBytes(UTF_8); key.write(ByteBuffer.wrap(data)); - // minPacketSize= 40, so first write of 20 wont trigger a writeChunk + // minPacketSize= 100, so first write of 50 wont trigger a writeChunk Assert.assertEquals(writeChunkCount, metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); key.write(ByteBuffer.wrap(data));