diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 3fb6d519afa8..cfd57112bf64 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -239,6 +239,14 @@ public enum ChecksumCombineMode { tags = ConfigTag.CLIENT) private boolean incrementalChunkList = true; + @Config(key = "stream.putblock.piggybacking", + defaultValue = "false", + type = ConfigType.BOOLEAN, + description = "Allow PutBlock to be piggybacked in WriteChunk " + + "requests if the chunk is small.", + tags = ConfigTag.CLIENT) + private boolean enablePutblockPiggybacking = false; + @PostConstruct public void validate() { Preconditions.checkState(streamBufferSize > 0); @@ -421,6 +429,14 @@ public String getFsDefaultBucketLayout() { return fsDefaultBucketLayout; } + public void setEnablePutblockPiggybacking(boolean enablePutblockPiggybacking) { + this.enablePutblockPiggybacking = enablePutblockPiggybacking; + } + + public boolean getEnablePutblockPiggybacking() { + return enablePutblockPiggybacking; + } + public boolean isDatastreamPipelineMode() { return datastreamPipelineMode; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 239800746c8b..f29bf490382f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -55,6 +55,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; + +import static org.apache.hadoop.hdds.DatanodeVersion.COMBINED_PUTBLOCK_WRITECHUNK_RPC; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.writeChunkAsync; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; @@ -140,6 +142,7 @@ public class BlockOutputStream extends OutputStream { private int replicationIndex; private Pipeline pipeline; private final ContainerClientMetrics clientMetrics; + private boolean allowPutBlockPiggybacking; /** * Creates a new BlockOutputStream. @@ -211,6 +214,20 @@ public BlockOutputStream( this.clientMetrics = clientMetrics; this.pipeline = pipeline; this.streamBufferArgs = streamBufferArgs; + this.allowPutBlockPiggybacking = config.getEnablePutblockPiggybacking() && + allDataNodesSupportPiggybacking(); + } + + private boolean allDataNodesSupportPiggybacking() { + // return true only if all DataNodes in the pipeline are on a version + // that supports PutBlock piggybacking. + for (DatanodeDetails dn : pipeline.getNodes()) { + if (dn.getCurrentVersion() < + COMBINED_PUTBLOCK_WRITECHUNK_RPC.toProtoValue()) { + return false; + } + } + return true; } void refreshCurrentBuffer() { @@ -499,22 +516,8 @@ ContainerCommandResponseProto> executePutBlock(boolean close, } // if the ioException is not set, putBlock is successful if (getIoException() == null && !force) { - BlockID responseBlockID = BlockID.getFromProtobuf( - e.getPutBlock().getCommittedBlockLength().getBlockID()); - Preconditions.checkState(blockID.get().getContainerBlockID() - .equals(responseBlockID.getContainerBlockID())); - // updates the bcsId of the block - blockID.set(responseBlockID); - if (LOG.isDebugEnabled()) { - LOG.debug( - "Adding index " + asyncReply.getLogIndex() + " flushLength " - + flushPos + " numBuffers " + byteBufferList.size() - + " blockID " + blockID + " bufferPool size" + bufferPool - .getSize() + " currentBufferIndex " + bufferPool - .getCurrentBufferIndex()); - } - // for standalone protocol, logIndex will always be 0. - updateCommitInfo(asyncReply, byteBufferList); + handleSuccessfulPutBlock(e.getPutBlock().getCommittedBlockLength(), + asyncReply, flushPos, byteBufferList); } return e; }, responseExecutor).exceptionally(e -> { @@ -551,7 +554,7 @@ public void flush() throws IOException { } } - private void writeChunk(ChunkBuffer buffer) + private void writeChunkCommon(ChunkBuffer buffer) throws IOException { // This data in the buffer will be pushed to datanode and a reference will // be added to the bufferList. Once putBlock gets executed, this list will @@ -562,7 +565,18 @@ private void writeChunk(ChunkBuffer buffer) bufferList = new ArrayList<>(); } bufferList.add(buffer); - writeChunkToContainer(buffer.duplicate(0, buffer.position())); + } + + private void writeChunk(ChunkBuffer buffer) + throws IOException { + writeChunkCommon(buffer); + writeChunkToContainer(buffer.duplicate(0, buffer.position()), false); + } + + private void writeChunkAndPutBlock(ChunkBuffer buffer) + throws IOException { + writeChunkCommon(buffer); + writeChunkToContainer(buffer.duplicate(0, buffer.position()), true); } /** @@ -594,14 +608,23 @@ private void handleFlushInternal(boolean close) if (totalDataFlushedLength < writtenDataLength) { refreshCurrentBuffer(); Preconditions.checkArgument(currentBuffer.position() > 0); - if (currentBuffer.hasRemaining()) { - writeChunk(currentBuffer); - } + // This can be a partially filled chunk. Since we are flushing the buffer // here, we just limit this buffer to the current position. So that next // write will happen in new buffer - updateFlushLength(); - executePutBlock(close, false); + if (currentBuffer.hasRemaining()) { + if (allowPutBlockPiggybacking) { + updateFlushLength(); + writeChunkAndPutBlock(currentBuffer); + } else { + writeChunk(currentBuffer); + updateFlushLength(); + executePutBlock(close, false); + } + } else { + updateFlushLength(); + executePutBlock(close, false); + } } else if (close) { // forcing an "empty" putBlock if stream is being closed without new // data since latest flush - we need to send the "EOF" flag @@ -713,7 +736,7 @@ public boolean isClosed() { * @return */ CompletableFuture writeChunkToContainer( - ChunkBuffer chunk) throws IOException { + ChunkBuffer chunk, boolean putBlockPiggybacking) throws IOException { int effectiveChunkSize = chunk.remaining(); final long offset = chunkOffset.getAndAdd(effectiveChunkSize); final ByteString data = chunk.toByteString( @@ -726,6 +749,8 @@ CompletableFuture writeChunkToContainer( .setChecksumData(checksumData.getProtoBufMessage()) .build(); + long flushPos = totalDataFlushedLength; + if (LOG.isDebugEnabled()) { LOG.debug("Writing chunk {} length {} at offset {}", chunkInfo.getChunkName(), effectiveChunkSize, offset); @@ -743,42 +768,93 @@ CompletableFuture writeChunkToContainer( + ", previous = " + previous); } + final List byteBufferList; + CompletableFuture + validateFuture = null; try { - XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo, - blockID.get(), data, tokenString, replicationIndex); - CompletableFuture - respFuture = asyncReply.getResponse(); - CompletableFuture - validateFuture = respFuture.thenApplyAsync(e -> { - try { - validateResponse(e); - } catch (IOException sce) { - respFuture.completeExceptionally(sce); - } - return e; - }, responseExecutor).exceptionally(e -> { - String msg = "Failed to write chunk " + chunkInfo.getChunkName() + - " into block " + blockID; - LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage()); - CompletionException ce = new CompletionException(msg, e); - setIoException(ce); - throw ce; - }); + BlockData blockData = null; + if (config.getIncrementalChunkList()) { updateBlockDataForWriteChunk(chunk); } else { containerBlockData.addChunks(chunkInfo); } + if (putBlockPiggybacking) { + Preconditions.checkNotNull(bufferList); + byteBufferList = bufferList; + bufferList = null; + Preconditions.checkNotNull(byteBufferList); + + blockData = containerBlockData.build(); + LOG.debug("piggyback chunk list {}", blockData); + + if (config.getIncrementalChunkList()) { + // remove any chunks in the containerBlockData list. + // since they are sent. + containerBlockData.clearChunks(); + } + } else { + byteBufferList = null; + } + XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo, + blockID.get(), data, tokenString, replicationIndex, blockData); + CompletableFuture + respFuture = asyncReply.getResponse(); + validateFuture = respFuture.thenApplyAsync(e -> { + try { + validateResponse(e); + } catch (IOException sce) { + respFuture.completeExceptionally(sce); + } + // if the ioException is not set, putBlock is successful + if (getIoException() == null && putBlockPiggybacking) { + handleSuccessfulPutBlock(e.getWriteChunk().getCommittedBlockLength(), + asyncReply, flushPos, byteBufferList); + } + return e; + }, responseExecutor).exceptionally(e -> { + String msg = "Failed to write chunk " + chunkInfo.getChunkName() + + " into block " + blockID; + LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage()); + CompletionException ce = new CompletionException(msg, e); + setIoException(ce); + throw ce; + }); clientMetrics.recordWriteChunk(pipeline, chunkInfo.getLen()); - return validateFuture; + } catch (IOException | ExecutionException e) { throw new IOException(EXCEPTION_MSG + e.toString(), e); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); handleInterruptedException(ex, false); } - return null; + if (putBlockPiggybacking) { + putFlushFuture(flushPos, validateFuture); + } + return validateFuture; + } + + private void handleSuccessfulPutBlock( + ContainerProtos.GetCommittedBlockLengthResponseProto e, + XceiverClientReply asyncReply, long flushPos, + List byteBufferList) { + BlockID responseBlockID = BlockID.getFromProtobuf( + e.getBlockID()); + Preconditions.checkState(blockID.get().getContainerBlockID() + .equals(responseBlockID.getContainerBlockID())); + // updates the bcsId of the block + blockID.set(responseBlockID); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Adding index " + asyncReply.getLogIndex() + " flushLength " + + flushPos + " numBuffers " + byteBufferList.size() + + " blockID " + blockID + " bufferPool size" + bufferPool + .getSize() + " currentBufferIndex " + bufferPool + .getCurrentBufferIndex()); + } + // for standalone protocol, logIndex will always be 0. + updateCommitInfo(asyncReply, byteBufferList); } /** diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java index adecc3e4c1e2..c8bfaf3e1bce 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java @@ -89,13 +89,14 @@ public ECBlockOutputStream( @Override public void write(byte[] b, int off, int len) throws IOException { this.currentChunkRspFuture = - writeChunkToContainer(ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len))); + writeChunkToContainer( + ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)), false); updateWrittenDataLength(len); } public CompletableFuture write( ByteBuffer buff) throws IOException { - return writeChunkToContainer(ChunkBuffer.wrap(buff)); + return writeChunkToContainer(ChunkBuffer.wrap(buff), false); } public CompletableFuture finalizeBlocksTableWithIterator; - static final Logger LOG = + public static final Logger LOG = LoggerFactory.getLogger(AbstractDatanodeStore.class); private volatile DBStore store; private final AbstractDatanodeDBDefinition dbDef; diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto index 0206a8ea71d4..3c0eab906db8 100644 --- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto +++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto @@ -423,9 +423,11 @@ message WriteChunkRequestProto { required DatanodeBlockID blockID = 1; optional ChunkInfo chunkData = 2; optional bytes data = 3; + optional PutBlockRequestProto block = 4; } message WriteChunkResponseProto { + optional GetCommittedBlockLengthResponseProto committedBlockLength = 1; } enum ReadChunkVersion { diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java index 7e5de329d129..0d82f0f8bbb2 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java @@ -129,21 +129,26 @@ private ContainerProtos.ListBlockResponseProto listBlock(long containerID) { } private PutBlockResponseProto putBlock(PutBlockRequestProto putBlock) { + return PutBlockResponseProto.newBuilder() + .setCommittedBlockLength( + doPutBlock(putBlock.getBlockData())) + .build(); + } + + private GetCommittedBlockLengthResponseProto doPutBlock( + ContainerProtos.BlockData blockData) { long length = 0; - for (ChunkInfo chunk : putBlock.getBlockData().getChunksList()) { + for (ChunkInfo chunk : blockData.getChunksList()) { length += chunk.getLen(); } - datanodeStorage.putBlock(putBlock.getBlockData().getBlockID(), - putBlock.getBlockData()); + datanodeStorage.putBlock(blockData.getBlockID(), + blockData); - return PutBlockResponseProto.newBuilder() - .setCommittedBlockLength( - GetCommittedBlockLengthResponseProto.newBuilder() - .setBlockID(putBlock.getBlockData().getBlockID()) + return GetCommittedBlockLengthResponseProto.newBuilder() + .setBlockID(blockData.getBlockID()) .setBlockLength(length) - .build()) - .build(); + .build(); } private XceiverClientReply result( @@ -166,8 +171,15 @@ private WriteChunkResponseProto writeChunk( datanodeStorage .writeChunk(writeChunk.getBlockID(), writeChunk.getChunkData(), writeChunk.getData()); - return WriteChunkResponseProto.newBuilder() - .build(); + + WriteChunkResponseProto.Builder builder = + WriteChunkResponseProto.newBuilder(); + if (writeChunk.hasBlock()) { + ContainerProtos.BlockData + blockData = writeChunk.getBlock().getBlockData(); + builder.setCommittedBlockLength(doPutBlock(blockData)); + } + return builder.build(); } @Override diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index 015eaa2916a6..daa433f68f8a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -33,27 +33,27 @@ import java.util.stream.Stream; import org.apache.commons.lang3.RandomStringUtils; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.crypto.CryptoCodec; import org.apache.hadoop.crypto.CryptoOutputStream; import org.apache.hadoop.crypto.Encryptor; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; +import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.client.DefaultReplicationConfig; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.StorageType; -import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.storage.BlockInputStream; -import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -73,6 +73,9 @@ import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; +import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl; +import org.apache.hadoop.ozone.container.metadata.AbstractDatanodeStore; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OMMetrics; import org.apache.hadoop.ozone.om.OzoneManager; @@ -81,9 +84,6 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; -import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequest; -import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequestWithFSO; -import org.apache.hadoop.ozone.om.request.key.OMKeyRequest; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; import org.apache.ozone.test.GenericTestUtils; @@ -156,6 +156,7 @@ public static void init() throws Exception { // Reduce KeyDeletingService interval CONF.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100, TimeUnit.MILLISECONDS); CONF.setBoolean("ozone.client.incremental.chunk.list", true); + CONF.setBoolean("ozone.client.stream.putblock.piggybacking", true); CONF.setBoolean(OZONE_CHUNK_LIST_INCREMENTAL, true); ClientConfigForTesting.newBuilder(StorageUnit.BYTES) .setBlockSize(BLOCK_SIZE) @@ -177,11 +178,11 @@ public static void init() throws Exception { bucket = TestDataUtil.createVolumeAndBucket(client, layout); // Enable DEBUG level logging for relevant classes - GenericTestUtils.setLogLevel(OMKeyRequest.LOG, Level.DEBUG); - GenericTestUtils.setLogLevel(OMKeyCommitRequest.LOG, Level.DEBUG); - GenericTestUtils.setLogLevel(OMKeyCommitRequestWithFSO.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(BlockManagerImpl.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(AbstractDatanodeStore.LOG, Level.DEBUG); GenericTestUtils.setLogLevel(BlockOutputStream.LOG, Level.DEBUG); GenericTestUtils.setLogLevel(BlockInputStream.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(KeyValueHandler.LOG, Level.DEBUG); } @AfterAll @@ -549,7 +550,8 @@ static void runTestHSync(FileSystem fs, Path file, break; } for (int i = 0; i < n; i++) { - assertEquals(data[offset + i], buffer[i]); + assertEquals(data[offset + i], buffer[i], + "expected at offset " + offset + " i=" + i); } offset += n; } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index cc51218b288b..012b2545321b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -275,7 +275,7 @@ abstract class Builder { protected boolean includeRecon = false; protected int dnInitialVersion = DatanodeVersion.FUTURE_VERSION.toProtoValue(); - protected int dnCurrentVersion = DatanodeVersion.FUTURE_VERSION.toProtoValue(); + protected int dnCurrentVersion = DatanodeVersion.COMBINED_PUTBLOCK_WRITECHUNK_RPC.toProtoValue(); protected int numOfDatanodes = 3; protected boolean startDataNodes = true; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java index e15e1e4d63ba..ce5432739cbd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java @@ -80,8 +80,13 @@ class TestBlockOutputStream { static MiniOzoneCluster createCluster() throws IOException, InterruptedException, TimeoutException { - OzoneConfiguration conf = new OzoneConfiguration(); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumType(ChecksumType.NONE); + clientConfig.setStreamBufferFlushDelay(false); + clientConfig.setEnablePutblockPiggybacking(true); + conf.setFromObject(clientConfig); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, TimeUnit.SECONDS); conf.setQuietMode(false); @@ -397,7 +402,7 @@ void testWriteMoreThanChunkSize(boolean flushDelay) throws Exception { key.flush(); assertEquals(writeChunkCount + 2, metrics.getContainerOpCountMetrics(WriteChunk)); - assertEquals(putBlockCount + 1, + assertEquals(putBlockCount, metrics.getContainerOpCountMetrics(PutBlock)); assertEquals(pendingWriteChunkCount, metrics.getPendingContainerOpCountMetrics(WriteChunk)); @@ -426,9 +431,9 @@ void testWriteMoreThanChunkSize(boolean flushDelay) throws Exception { metrics.getPendingContainerOpCountMetrics(PutBlock)); assertEquals(writeChunkCount + 2, metrics.getContainerOpCountMetrics(WriteChunk)); - assertEquals(putBlockCount + 2, + assertEquals(putBlockCount + 1, metrics.getContainerOpCountMetrics(PutBlock)); - assertEquals(totalOpCount + 4, metrics.getTotalOpCount()); + assertEquals(totalOpCount + 3, metrics.getTotalOpCount()); assertEquals(0, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1, client.getObjectStore(), VOLUME, BUCKET); @@ -493,9 +498,9 @@ void testWriteMoreThanFlushSize(boolean flushDelay) throws Exception { metrics.getPendingContainerOpCountMetrics(PutBlock)); assertEquals(writeChunkCount + 3, metrics.getContainerOpCountMetrics(WriteChunk)); - assertEquals(putBlockCount + 2, + assertEquals(putBlockCount + 1, metrics.getContainerOpCountMetrics(PutBlock)); - assertEquals(totalOpCount + 5, metrics.getTotalOpCount()); + assertEquals(totalOpCount + 4, metrics.getTotalOpCount()); assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); // make sure the bufferPool is empty assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); @@ -686,9 +691,9 @@ void testWriteMoreThanMaxFlushSize(boolean flushDelay) throws Exception { assertEquals(writeChunkCount + 5, metrics.getContainerOpCountMetrics(WriteChunk)); // The previous flush did not trigger any action with flushDelay enabled - assertEquals(putBlockCount + (flushDelay ? 3 : 4), + assertEquals(putBlockCount + (flushDelay ? 2 : 3), metrics.getContainerOpCountMetrics(PutBlock)); - assertEquals(totalOpCount + (flushDelay ? 8 : 9), + assertEquals(totalOpCount + (flushDelay ? 7 : 8), metrics.getTotalOpCount()); assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java index 2ae69dc3c96f..e773bf7ed7f2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java @@ -607,18 +607,40 @@ private ContainerProtos.DatanodeBlockID createBlockId(long containerId, private void mockWriteChunkResponse(XceiverClientSpi mockDnProtocol) throws IOException, ExecutionException, InterruptedException { - ContainerCommandResponseProto writeResponse = - ContainerCommandResponseProto.newBuilder() - .setWriteChunk(WriteChunkResponseProto.newBuilder().build()) - .setResult(Result.SUCCESS) - .setCmdType(Type.WriteChunk) - .build(); doAnswer(invocation -> - new XceiverClientReply(completedFuture(writeResponse))) + new XceiverClientReply( + completedFuture( + createWriteChunkResponse( + (ContainerCommandRequestProto)invocation.getArgument(0))))) .when(mockDnProtocol) .sendCommandAsync(argThat(matchCmd(Type.WriteChunk))); } + ContainerCommandResponseProto createWriteChunkResponse( + ContainerCommandRequestProto request) { + ContainerProtos.WriteChunkRequestProto writeChunk = request.getWriteChunk(); + + WriteChunkResponseProto.Builder builder = + WriteChunkResponseProto.newBuilder(); + if (writeChunk.hasBlock()) { + ContainerProtos.BlockData + blockData = writeChunk.getBlock().getBlockData(); + + GetCommittedBlockLengthResponseProto response = + GetCommittedBlockLengthResponseProto.newBuilder() + .setBlockID(blockData.getBlockID()) + .setBlockLength(blockData.getSize()) + .build(); + + builder.setCommittedBlockLength(response); + } + return ContainerCommandResponseProto.newBuilder() + .setWriteChunk(builder.build()) + .setResult(Result.SUCCESS) + .setCmdType(Type.WriteChunk) + .build(); + } + private ArgumentMatcher matchCmd(Type type) { return argument -> argument != null && argument.getCmdType() == type; }