diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 84968b68d5b0..6ef59dd6d859 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -133,6 +133,8 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { private long syncPosition = 0; private StreamBuffer currentBuffer; private XceiverClientMetrics metrics; + // buffers for which putBlock is yet to be executed + private List buffersForPutBlock; /** * Creates a new BlockDataStreamOutput. * @@ -287,6 +289,10 @@ private void writeChunkIfNeeded() throws IOException { private void writeChunk(StreamBuffer sb) throws IOException { bufferList.add(sb); + if (buffersForPutBlock == null) { + buffersForPutBlock = new ArrayList<>(); + } + buffersForPutBlock.add(sb); ByteBuffer dup = sb.duplicate(); dup.position(0); dup.limit(sb.position()); @@ -392,7 +398,8 @@ private void executePutBlock(boolean close, final List byteBufferList; if (!force) { Preconditions.checkNotNull(bufferList); - byteBufferList = bufferList; + byteBufferList = buffersForPutBlock; + buffersForPutBlock = null; Preconditions.checkNotNull(byteBufferList); } else { byteBufferList = null; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java index 2cd5630549c3..4e5a35a539ce 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java @@ -156,7 +156,7 @@ long getWrittenDataLength() { } } - long getTotalAckDataLength() { + public long getTotalAckDataLength() { if (byteBufferStreamOutput != null) { BlockDataStreamOutput out = (BlockDataStreamOutput) this.byteBufferStreamOutput; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index c9242df8b1a4..c6a3c32d2bb9 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.BlockDataStreamOutputEntry; import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput; import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.container.ContainerTestHelper; @@ -256,4 +257,22 @@ public void testMinPacketSize() throws Exception { validateData(keyName, dataString.concat(dataString).getBytes(UTF_8)); } + @Test + public void testTotalAckDataLength() throws Exception { + int dataLength = 400; + String keyName = getKeyName(); + OzoneDataStreamOutput key = createKey( + keyName, ReplicationType.RATIS, 0); + byte[] data = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + KeyDataStreamOutput keyDataStreamOutput = + (KeyDataStreamOutput) key.getByteBufStreamOutput(); + BlockDataStreamOutputEntry stream = + keyDataStreamOutput.getStreamEntries().get(0); + key.write(ByteBuffer.wrap(data)); + key.close(); + Assert.assertEquals(dataLength, stream.getTotalAckDataLength()); + } + }