diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index f39ec8613307..1c55941e3e4b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -54,6 +54,21 @@ public class OzoneClientConfig { tags = ConfigTag.CLIENT) private int streamBufferSize = 4 * 1024 * 1024; + @Config(key = "datastream.max.buffer.size", + defaultValue = "4MB", + type = ConfigType.SIZE, + description = "The maximum size of the ByteBuffer " + + "(used via ratis streaming)", + tags = ConfigTag.CLIENT) + private int dataStreamMaxBufferSize = 4 * 1024 * 1024; + + @Config(key = "datastream.buffer.flush.size", + defaultValue = "16MB", + type = ConfigType.SIZE, + description = "The boundary at which putBlock is executed", + tags = ConfigTag.CLIENT) + private long dataStreamBufferFlushSize = 16 * 1024 * 1024; + @Config(key = "stream.buffer.increment", defaultValue = "0B", type = ConfigType.SIZE, @@ -168,6 +183,14 @@ public void setStreamBufferSize(int streamBufferSize) { this.streamBufferSize = streamBufferSize; } + public int getDataStreamMaxBufferSize() { + return dataStreamMaxBufferSize; + } + + public void setDataStreamMaxBufferSize(int dataStreamMaxBufferSize) { + this.dataStreamMaxBufferSize = dataStreamMaxBufferSize; + } + public boolean isStreamBufferFlushDelay() { return streamBufferFlushDelay; } @@ -227,4 +250,12 @@ public void setChecksumVerify(boolean checksumVerify) { public int getBufferIncrement() { return bufferIncrement; } + + public long getDataStreamBufferFlushSize() { + return dataStreamBufferFlushSize; + } + + public void setDataStreamBufferFlushSize(long dataStreamBufferFlushSize) { + this.dataStreamBufferFlushSize = dataStreamBufferFlushSize; + } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index aada48e2f596..6f5a54354a31 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -257,13 +257,30 @@ public void write(ByteBuffer b, int off, int len) throws IOException { if (len == 0) { return; } + int curLen = len; + // set limit on the number of bytes that a ByteBuffer(StreamBuffer) can hold + int maxBufferLen = config.getDataStreamMaxBufferSize(); + while (curLen > 0) { + int writeLen = Math.min(curLen, maxBufferLen); + final StreamBuffer buf = new StreamBuffer(b, off, writeLen); + off += writeLen; + bufferList.add(buf); + writeChunkToContainer(buf.duplicate()); + curLen -= writeLen; + writtenDataLength += writeLen; + doFlushIfNeeded(); + } + } - final StreamBuffer buf = new StreamBuffer(b, off, len); - bufferList.add(buf); - - writeChunkToContainer(buf.duplicate()); - - writtenDataLength += len; + private void doFlushIfNeeded() throws IOException { + Preconditions.checkArgument(config.getDataStreamBufferFlushSize() > config + .getDataStreamMaxBufferSize()); + long boundary = config.getDataStreamBufferFlushSize() / config + .getDataStreamMaxBufferSize(); + if (bufferList.size() % boundary == 0) { + updateFlushLength(); + executePutBlock(false, false); + } } private void updateFlushLength() { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 9fbba8c7a0ee..94d66d4244c4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -320,6 +320,8 @@ abstract class Builder { protected Optional chunkSize = Optional.empty(); protected OptionalInt streamBufferSize = OptionalInt.empty(); protected Optional streamBufferFlushSize = Optional.empty(); + protected Optional dataStreamBufferFlushSize= Optional.empty(); + protected OptionalInt dataStreamMaxBufferSize = OptionalInt.empty(); protected Optional streamBufferMaxSize = Optional.empty(); protected Optional blockSize = Optional.empty(); protected Optional streamBufferSizeUnit = Optional.empty(); @@ -553,6 +555,16 @@ public Builder setStreamBufferMaxSize(long size) { return this; } + public Builder setDataStreamBufferMaxSize(int size) { + dataStreamMaxBufferSize = OptionalInt.of(size); + return this; + } + + public Builder setDataStreamBufferFlushize(long size) { + dataStreamBufferFlushSize = Optional.of(size); + return this; + } + /** * Sets the block size for stream buffer. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index cff7c3576b44..e291d4ecd5aa 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -658,6 +658,12 @@ protected void initializeConfiguration() throws IOException { if (!streamBufferMaxSize.isPresent()) { streamBufferMaxSize = Optional.of(2 * streamBufferFlushSize.get()); } + if (!dataStreamBufferFlushSize.isPresent()) { + dataStreamBufferFlushSize = Optional.of((long) 4 * chunkSize.get()); + } + if (!dataStreamMaxBufferSize.isPresent()) { + dataStreamMaxBufferSize = OptionalInt.of(chunkSize.get()); + } if (!blockSize.isPresent()) { blockSize = Optional.of(2 * streamBufferMaxSize.get()); } @@ -674,6 +680,11 @@ protected void initializeConfiguration() throws IOException { streamBufferSizeUnit.get().toBytes(streamBufferMaxSize.get()))); clientConfig.setStreamBufferFlushSize(Math.round( streamBufferSizeUnit.get().toBytes(streamBufferFlushSize.get()))); + clientConfig.setDataStreamBufferFlushSize(Math.round( + streamBufferSizeUnit.get().toBytes(dataStreamBufferFlushSize.get()))); + clientConfig.setDataStreamMaxBufferSize((int) Math.round( + streamBufferSizeUnit.get() + .toBytes(dataStreamMaxBufferSize.getAsInt()))); conf.setFromObject(clientConfig); conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index 05a101951b80..5eb38a00de5f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -20,7 +20,10 @@ import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientMetrics; import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput; import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -101,6 +104,8 @@ public static void init() throws Exception { .setChunkSize(chunkSize) .setStreamBufferFlushSize(flushSize) .setStreamBufferMaxSize(maxFlushSize) + .setDataStreamBufferFlushize(maxFlushSize) + .setDataStreamBufferMaxSize(chunkSize) .setStreamBufferSizeUnit(StorageUnit.BYTES) .build(); cluster.waitForClusterToBeReady(); @@ -186,6 +191,35 @@ private void testWriteWithFailure(int dataLength) throws Exception { validateData(keyName, dataString.concat(dataString).getBytes(UTF_8)); } + @Test + public void testPutBlockAtBoundary() throws Exception { + int dataLength = 500; + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingPutBlockCount = metrics.getPendingContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + String keyName = getKeyName(); + OzoneDataStreamOutput key = createKey( + keyName, ReplicationType.RATIS, 0); + byte[] data = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(ByteBuffer.wrap(data)); + Assert.assertTrue( + metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock) + <= pendingPutBlockCount + 1); + key.close(); + // Since data length is 500 , first putBlock will be at 400(flush boundary) + // and the other at 500 + Assert.assertTrue( + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock) + == putBlockCount + 2); + validateData(keyName, data); + } + + private OzoneDataStreamOutput createKey(String keyName, ReplicationType type, long size) throws Exception { return TestHelper.createStreamKey(