diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java index 19a5a9cad5da..0333f4758cef 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java @@ -28,6 +28,7 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableQuantiles; +import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.ozone.OzoneConsts; import java.util.Map; @@ -52,6 +53,21 @@ public final class ContainerClientMetrics { private MutableCounterLong totalWriteChunkCalls; @Metric private MutableCounterLong totalWriteChunkBytes; + + @Metric + private MutableRate hsyncSynchronizedWorkNs; + @Metric + private MutableRate hsyncSendWriteChunkNs; + @Metric + private MutableRate hsyncWaitForFlushNs; + @Metric + private MutableRate hsyncWatchForCommitNs; + @Metric + private MutableCounterLong writeChunksDuringWrite; + @Metric + private MutableCounterLong flushesDuringWrite; + + private MutableQuantiles[] listBlockLatency; private MutableQuantiles[] getBlockLatency; private MutableQuantiles[] getCommittedBlockLengthLatency; @@ -249,4 +265,28 @@ Map getWriteChunkCallsByPipeline() { Map getWriteChunksCallsByLeaders() { return writeChunksCallsByLeaders; } + + public MutableRate getHsyncSynchronizedWorkNs() { + return hsyncSynchronizedWorkNs; + } + + public MutableRate getHsyncSendWriteChunkNs() { + return hsyncSendWriteChunkNs; + } + + public MutableRate getHsyncWaitForFlushNs() { + return hsyncWaitForFlushNs; + } + + public MutableRate getHsyncWatchForCommitNs() { + return hsyncWatchForCommitNs; + } + + public MutableCounterLong getWriteChunksDuringWrite() { + return writeChunksDuringWrite; + } + + public MutableCounterLong getFlushesDuringWrite() { + return flushesDuringWrite; + } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 59795dd0f051..b3a1dc608944 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -62,7 +62,9 @@ import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.writeChunkAsync; import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST; +import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs; +import org.apache.hadoop.util.Time; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -360,6 +362,7 @@ public void write(int b) throws IOException { private void writeChunkIfNeeded() throws IOException { if (currentBufferRemaining == 0) { LOG.debug("WriteChunk from write(), buffer = {}", currentBuffer); + clientMetrics.getWriteChunksDuringWrite().incr(); writeChunk(currentBuffer); updateWriteChunkLength(); } @@ -404,6 +407,7 @@ private void doFlushOrWatchIfNeeded() throws IOException { updatePutBlockLength(); CompletableFuture putBlockFuture = executePutBlock(false, false); recordWatchForCommitAsync(putBlockFuture); + clientMetrics.getFlushesDuringWrite().incr(); } if (bufferPool.isAtCapacity()) { @@ -532,12 +536,16 @@ private CompletableFuture watchForCommit(long commitIndex) { } LOG.debug("Entering watchForCommit commitIndex = {}", commitIndex); + final long start = Time.monotonicNowNanos(); return sendWatchForCommit(commitIndex) .thenAccept(this::checkReply) .exceptionally(e -> { throw new FlushRuntimeException(setIoException(e)); }) - .whenComplete((r, e) -> LOG.debug("Leaving watchForCommit commitIndex = {}", commitIndex)); + .whenComplete((r, e) -> { + LOG.debug("Leaving watchForCommit commitIndex = {}", commitIndex); + clientMetrics.getHsyncWatchForCommitNs().add(Time.monotonicNowNanos() - start); + }); } private void checkReply(XceiverClientReply reply) { @@ -693,12 +701,15 @@ private void handleFlushInternal(boolean close) throws IOException, InterruptedException, ExecutionException { checkOpen(); LOG.debug("Start handleFlushInternal close={}", close); - CompletableFuture toWaitFor = handleFlushInternalSynchronized(close); + CompletableFuture toWaitFor = captureLatencyNs(clientMetrics.getHsyncSynchronizedWorkNs(), + () -> handleFlushInternalSynchronized(close)); if (toWaitFor != null) { LOG.debug("Waiting for flush"); try { + long startWaiting = Time.monotonicNowNanos(); toWaitFor.get(); + clientMetrics.getHsyncWaitForFlushNs().add(Time.monotonicNowNanos() - startWaiting); } catch (ExecutionException ex) { if (ex.getCause() instanceof FlushRuntimeException) { throw ((FlushRuntimeException) ex.getCause()).cause; @@ -727,6 +738,7 @@ public void waitForAllPendingFlushes() throws IOException { } private synchronized CompletableFuture handleFlushInternalSynchronized(boolean close) throws IOException { + long start = Time.monotonicNowNanos(); CompletableFuture putBlockResultFuture = null; // flush the last chunk data residing on the currentBuffer if (totalWriteChunkLength < writtenDataLength) { @@ -768,6 +780,7 @@ private synchronized CompletableFuture handleFlushInternalSynchronized(boo if (putBlockResultFuture != null) { recordWatchForCommitAsync(putBlockResultFuture); } + clientMetrics.getHsyncSendWriteChunkNs().add(Time.monotonicNowNanos() - start); return lastFlushFuture; }