diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 1c55941e3e4b..0df37437c746 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -69,6 +69,14 @@ public class OzoneClientConfig { tags = ConfigTag.CLIENT) private long dataStreamBufferFlushSize = 16 * 1024 * 1024; + @Config(key = "datastream.min.packet.size", + defaultValue = "1MB", + type = ConfigType.SIZE, + description = "The maximum size of the ByteBuffer " + + "(used via ratis streaming)", + tags = ConfigTag.CLIENT) + private int dataStreamMinPacketSize = 1024 * 1024; + @Config(key = "stream.buffer.increment", defaultValue = "0B", type = ConfigType.SIZE, @@ -207,6 +215,14 @@ public void setStreamBufferMaxSize(long streamBufferMaxSize) { this.streamBufferMaxSize = streamBufferMaxSize; } + public int getDataStreamMinPacketSize() { + return dataStreamMinPacketSize; + } + + public void setDataStreamMinPacketSize(int dataStreamMinPacketSize) { + this.dataStreamMinPacketSize = dataStreamMinPacketSize; + } + public int getMaxRetryCount() { return maxRetryCount; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 6f5a54354a31..9fb1340527a3 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientMetrics; import org.apache.hadoop.hdds.scm.XceiverClientRatis; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; @@ -125,7 +127,8 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { private List> futures = new ArrayList<>(); private final long syncSize = 0; // TODO: disk sync is disabled for now private long syncPosition = 0; - + private StreamBuffer currentBuffer; + private XceiverClientMetrics metrics; /** * Creates a new BlockDataStreamOutput. * @@ -172,6 +175,7 @@ public BlockDataStreamOutput( ioException = new AtomicReference<>(null); checksum = new Checksum(config.getChecksumType(), config.getBytesPerChecksum()); + metrics = XceiverClientManager.getXceiverClientMetrics(); } private DataStreamOutput setupStream(Pipeline pipeline) throws IOException { @@ -257,27 +261,47 @@ public void write(ByteBuffer b, int off, int len) throws IOException { if (len == 0) { return; } - int curLen = len; - // set limit on the number of bytes that a ByteBuffer(StreamBuffer) can hold - int maxBufferLen = config.getDataStreamMaxBufferSize(); - while (curLen > 0) { - int writeLen = Math.min(curLen, maxBufferLen); + while (len > 0) { + allocateNewBufferIfNeeded(); + int writeLen = Math.min(len, currentBuffer.length()); final StreamBuffer buf = new StreamBuffer(b, off, writeLen); + currentBuffer.put(buf); + writeChunkIfNeeded(); off += writeLen; - bufferList.add(buf); - writeChunkToContainer(buf.duplicate()); - curLen -= writeLen; writtenDataLength += writeLen; + len -= writeLen; doFlushIfNeeded(); } } + private void writeChunkIfNeeded() throws IOException { + if (currentBuffer.length()==0) { + writeChunk(currentBuffer); + currentBuffer = null; + } + } + + private void writeChunk(StreamBuffer sb) throws IOException { + bufferList.add(sb); + ByteBuffer dup = sb.duplicate(); + dup.position(0); + dup.limit(sb.position()); + writeChunkToContainer(dup); + } + + private void allocateNewBufferIfNeeded() { + if (currentBuffer==null) { + currentBuffer = + StreamBuffer.allocate(config.getDataStreamMinPacketSize()); + } + } + private void doFlushIfNeeded() throws IOException { Preconditions.checkArgument(config.getDataStreamBufferFlushSize() > config .getDataStreamMaxBufferSize()); long boundary = config.getDataStreamBufferFlushSize() / config .getDataStreamMaxBufferSize(); - if (bufferList.size() % boundary == 0) { + if (!bufferList.isEmpty() && bufferList.size() % boundary == 0) { updateFlushLength(); executePutBlock(false, false); } @@ -308,11 +332,10 @@ public void writeOnRetry(long len) throws IOException { int count = 0; while (len > 0) { final StreamBuffer buf = bufferList.get(count); - final long writeLen = Math.min(buf.length(), len); + final long writeLen = Math.min(buf.position(), len); final ByteBuffer duplicated = buf.duplicate(); - if (writeLen != buf.length()) { - duplicated.limit(Math.toIntExact(len)); - } + duplicated.position(0); + duplicated.limit(buf.position()); writeChunkToContainer(duplicated); len -= writeLen; count++; @@ -449,6 +472,11 @@ private void handleFlush(boolean close) // This can be a partially filled chunk. Since we are flushing the buffer // here, we just limit this buffer to the current position. So that next // write will happen in new buffer + + if (currentBuffer!=null) { + writeChunk(currentBuffer); + currentBuffer = null; + } updateFlushLength(); executePutBlock(close, false); } else if (close) { @@ -584,6 +612,7 @@ private void writeChunkToContainer(ByteBuffer buf) .setLen(effectiveChunkSize) .setChecksumData(checksumData.getProtoBufMessage()) .build(); + metrics.incrPendingContainerOpsMetrics(ContainerProtos.Type.WriteChunk); if (LOG.isDebugEnabled()) { LOG.debug("Writing chunk {} length {} at offset {}", diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java index f36019e2aeb8..5118ea5ead3a 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java @@ -27,7 +27,7 @@ public class StreamBuffer { private final ByteBuffer buffer; public StreamBuffer(ByteBuffer buffer) { - this.buffer = buffer.asReadOnlyBuffer(); + this.buffer = buffer; } public StreamBuffer(ByteBuffer buffer, int offset, int length) { @@ -43,4 +43,17 @@ public int length() { return buffer.limit() - buffer.position(); } + public int position() { + return buffer.position(); + } + + + public void put(StreamBuffer sb){ + buffer.put(sb.buffer); + } + + public static StreamBuffer allocate(int size){ + return new StreamBuffer(ByteBuffer.allocate(size)); + } + } \ No newline at end of file diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java index 3a59d0757105..9ae604e95111 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java @@ -178,7 +178,7 @@ private long releaseBuffers(List indexes) { Preconditions.checkState(commitIndexMap.containsKey(index)); final List buffers = commitIndexMap.remove(index); final long length = - buffers.stream().mapToLong(StreamBuffer::length).sum(); + buffers.stream().mapToLong(StreamBuffer::position).sum(); totalAckDataLength += length; // clear the future object from the future Map final CompletableFuture remove = diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java index e49b0b79adf6..24a046f62395 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java @@ -309,7 +309,7 @@ boolean isEmpty() { long computeBufferData() { long totalDataLen =0; for (StreamBuffer b : bufferList){ - totalDataLen += b.length(); + totalDataLen += b.position(); } return totalDataLen; } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 94d66d4244c4..acea53cbf923 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -323,6 +323,7 @@ abstract class Builder { protected Optional dataStreamBufferFlushSize= Optional.empty(); protected OptionalInt dataStreamMaxBufferSize = OptionalInt.empty(); protected Optional streamBufferMaxSize = Optional.empty(); + protected OptionalInt dataStreamMinPacketSize = OptionalInt.empty(); protected Optional blockSize = Optional.empty(); protected Optional streamBufferSizeUnit = Optional.empty(); protected boolean includeRecon = false; @@ -565,6 +566,11 @@ public Builder setDataStreamBufferFlushize(long size) { return this; } + public Builder setDataStreamMinPacketSize(int size) { + dataStreamMinPacketSize = OptionalInt.of(size); + return this; + } + /** * Sets the block size for stream buffer. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index e291d4ecd5aa..d4cdaf0112f3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -664,6 +664,9 @@ protected void initializeConfiguration() throws IOException { if (!dataStreamMaxBufferSize.isPresent()) { dataStreamMaxBufferSize = OptionalInt.of(chunkSize.get()); } + if (!dataStreamMinPacketSize.isPresent()) { + dataStreamMinPacketSize = OptionalInt.of(chunkSize.get()/4); + } if (!blockSize.isPresent()) { blockSize = Optional.of(2 * streamBufferMaxSize.get()); } @@ -685,6 +688,9 @@ protected void initializeConfiguration() throws IOException { clientConfig.setDataStreamMaxBufferSize((int) Math.round( streamBufferSizeUnit.get() .toBytes(dataStreamMaxBufferSize.getAsInt()))); + clientConfig.setDataStreamMinPacketSize((int) Math.round( + streamBufferSizeUnit.get() + .toBytes(dataStreamMinPacketSize.getAsInt()))); conf.setFromObject(clientConfig); conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index 5eb38a00de5f..c9242df8b1a4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -107,6 +107,7 @@ public static void init() throws Exception { .setDataStreamBufferFlushize(maxFlushSize) .setDataStreamBufferMaxSize(chunkSize) .setStreamBufferSizeUnit(StorageUnit.BYTES) + .setDataStreamMinPacketSize(2*chunkSize/5) .build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key @@ -193,7 +194,7 @@ private void testWriteWithFailure(int dataLength) throws Exception { @Test public void testPutBlockAtBoundary() throws Exception { - int dataLength = 500; + int dataLength = 200; XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); long putBlockCount = metrics.getContainerOpCountMetrics( @@ -211,8 +212,8 @@ public void testPutBlockAtBoundary() throws Exception { metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock) <= pendingPutBlockCount + 1); key.close(); - // Since data length is 500 , first putBlock will be at 400(flush boundary) - // and the other at 500 + // Since data length is 200 , first putBlock will be at 160(flush boundary) + // and the other at 200 Assert.assertTrue( metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock) == putBlockCount + 2); @@ -230,4 +231,29 @@ private void validateData(String keyName, byte[] data) throws Exception { .validateData(keyName, data, objectStore, volumeName, bucketName); } + + @Test + public void testMinPacketSize() throws Exception { + String keyName = getKeyName(); + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + OzoneDataStreamOutput key = createKey(keyName, ReplicationType.RATIS, 0); + long writeChunkCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); + byte[] data = + ContainerTestHelper.getFixedLengthString(keyString, chunkSize / 5) + .getBytes(UTF_8); + key.write(ByteBuffer.wrap(data)); + // minPacketSize= 40, so first write of 20 wont trigger a writeChunk + Assert.assertEquals(writeChunkCount, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + key.write(ByteBuffer.wrap(data)); + Assert.assertEquals(writeChunkCount + 1, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + // now close the stream, It will update the key length. + key.close(); + String dataString = new String(data, UTF_8); + validateData(keyName, dataString.concat(dataString).getBytes(UTF_8)); + } + } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java index 3b174503767a..f4c756bccd14 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java @@ -119,6 +119,7 @@ public void setup() throws Exception { conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(200) + .setDataStreamMinPacketSize(1024) .build(); cluster.waitForClusterToBeReady(); cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 60000); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index 222e352154eb..89d7b50d49ac 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -191,6 +191,7 @@ static void startCluster(OzoneConfiguration conf) throws Exception { .setTotalPipelineNumLimit(10) .setScmId(scmId) .setClusterId(clusterId) + .setDataStreamMinPacketSize(1024) .build(); cluster.waitForClusterToBeReady(); ozClient = OzoneClientFactory.getRpcClient(conf);