-
Notifications
You must be signed in to change notification settings - Fork 588
HDDS-11200. Hsync client-side metrics #7371
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -62,7 +62,9 @@ | |
| import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync; | ||
| import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.writeChunkAsync; | ||
| import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST; | ||
| import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs; | ||
|
|
||
| import org.apache.hadoop.util.Time; | ||
| import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
@@ -360,6 +362,7 @@ public void write(int b) throws IOException { | |
| private void writeChunkIfNeeded() throws IOException { | ||
| if (currentBufferRemaining == 0) { | ||
| LOG.debug("WriteChunk from write(), buffer = {}", currentBuffer); | ||
| clientMetrics.getWriteChunksDuringWrite().incr(); | ||
| writeChunk(currentBuffer); | ||
| updateWriteChunkLength(); | ||
| } | ||
|
|
@@ -404,6 +407,7 @@ private void doFlushOrWatchIfNeeded() throws IOException { | |
| updatePutBlockLength(); | ||
| CompletableFuture<PutBlockResult> putBlockFuture = executePutBlock(false, false); | ||
| recordWatchForCommitAsync(putBlockFuture); | ||
| clientMetrics.getFlushesDuringWrite().incr(); | ||
| } | ||
|
|
||
| if (bufferPool.isAtCapacity()) { | ||
|
|
@@ -532,12 +536,16 @@ private CompletableFuture<Void> watchForCommit(long commitIndex) { | |
| } | ||
|
|
||
| LOG.debug("Entering watchForCommit commitIndex = {}", commitIndex); | ||
| final long start = Time.monotonicNowNanos(); | ||
| return sendWatchForCommit(commitIndex) | ||
| .thenAccept(this::checkReply) | ||
| .exceptionally(e -> { | ||
| throw new FlushRuntimeException(setIoException(e)); | ||
| }) | ||
| .whenComplete((r, e) -> LOG.debug("Leaving watchForCommit commitIndex = {}", commitIndex)); | ||
| .whenComplete((r, e) -> { | ||
| LOG.debug("Leaving watchForCommit commitIndex = {}", commitIndex); | ||
| clientMetrics.getHsyncWatchForCommitNs().add(Time.monotonicNowNanos() - start); | ||
| }); | ||
| } | ||
|
|
||
| private void checkReply(XceiverClientReply reply) { | ||
|
|
@@ -693,12 +701,15 @@ private void handleFlushInternal(boolean close) | |
| throws IOException, InterruptedException, ExecutionException { | ||
| checkOpen(); | ||
| LOG.debug("Start handleFlushInternal close={}", close); | ||
| CompletableFuture<Void> toWaitFor = handleFlushInternalSynchronized(close); | ||
| CompletableFuture<Void> toWaitFor = captureLatencyNs(clientMetrics.getHsyncSynchronizedWorkNs(), | ||
| () -> handleFlushInternalSynchronized(close)); | ||
|
|
||
| if (toWaitFor != null) { | ||
| LOG.debug("Waiting for flush"); | ||
| try { | ||
| long startWaiting = Time.monotonicNowNanos(); | ||
| toWaitFor.get(); | ||
| clientMetrics.getHsyncWaitForFlushNs().add(Time.monotonicNowNanos() - startWaiting); | ||
| } catch (ExecutionException ex) { | ||
| if (ex.getCause() instanceof FlushRuntimeException) { | ||
| throw ((FlushRuntimeException) ex.getCause()).cause; | ||
|
|
@@ -727,6 +738,7 @@ public void waitForAllPendingFlushes() throws IOException { | |
| } | ||
|
|
||
| private synchronized CompletableFuture<Void> handleFlushInternalSynchronized(boolean close) throws IOException { | ||
| long start = Time.monotonicNowNanos(); | ||
| CompletableFuture<PutBlockResult> putBlockResultFuture = null; | ||
| // flush the last chunk data residing on the currentBuffer | ||
| if (totalWriteChunkLength < writtenDataLength) { | ||
|
|
@@ -768,6 +780,7 @@ private synchronized CompletableFuture<Void> handleFlushInternalSynchronized(boo | |
| if (putBlockResultFuture != null) { | ||
| recordWatchForCommitAsync(putBlockResultFuture); | ||
| } | ||
| clientMetrics.getHsyncSendWriteChunkNs().add(Time.monotonicNowNanos() - start); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this approach can miss the invocations that throws exception.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's ok, we want to see normal metrics. This method is long, I didn't wanna add indentation to it. |
||
| return lastFlushFuture; | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.