From f26eb2a16ff2ad41eb020c91189c4330ffc5c607 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Tue, 13 Aug 2024 11:28:16 -0700 Subject: [PATCH 1/4] HDDS-11208. Change RatisBlockOutputStream to use HDDS-11174. --- .../hdds/scm/storage/BlockOutputStream.java | 51 ++++++++++--------- .../scm/storage/RatisBlockOutputStream.java | 4 +- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index e88b097c4990..293ca0f5c37b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -479,31 +479,30 @@ void releaseBuffersOnException() { /** * Watch for a specific commit index. */ - XceiverClientReply sendWatchForCommit(long commitIndex) - throws IOException { - return null; + CompletableFuture sendWatchForCommit(long commitIndex) { + return CompletableFuture.completedFuture(null); } - private void watchForCommit(long commitIndex) throws IOException { - checkOpen(); - try { - LOG.debug("Entering watchForCommit commitIndex = {}", commitIndex); - final XceiverClientReply reply = sendWatchForCommit(commitIndex); - if (reply != null) { - List dnList = reply.getDatanodes(); - if (!dnList.isEmpty()) { - Pipeline pipe = xceiverClient.getPipeline(); - - LOG.warn("Failed to commit BlockId {} on {}. Failed nodes: {}", - blockID, pipe, dnList); - failedServers.addAll(dnList); - } - } - } catch (IOException ioe) { - setIoException(ioe); - throw getIoException(); + private CompletableFuture watchForCommit(long commitIndex) { + LOG.debug("Entering watchForCommit commitIndex = {}", commitIndex); + return sendWatchForCommit(commitIndex) + .thenAccept(this::checkReply) + .exceptionally(e -> { throw new FlushRuntimeException(setIoException(e)); }) + .whenComplete((r, e) -> LOG.debug("Leaving watchForCommit commitIndex = {}", commitIndex)); + } + + void checkReply(XceiverClientReply reply) { + if (reply == null) { + return; } - LOG.debug("Leaving watchForCommit commitIndex = {}", commitIndex); + final List dnList = reply.getDatanodes(); + if (dnList.isEmpty()) { + return; + } + + LOG.warn("Failed to commit BlockId {} on {}. Failed nodes: {}", + blockID, xceiverClient.getPipeline(), dnList); + failedServers.addAll(dnList); } void updateCommitInfo(XceiverClientReply reply, List buffers) { @@ -724,12 +723,13 @@ private synchronized CompletableFuture handleFlushInternalSynchronized(boo } private CompletableFuture watchForCommitAsync(CompletableFuture putBlockResultFuture) { - return putBlockResultFuture.thenAccept(x -> { + return putBlockResultFuture.thenCompose(x -> { try { - watchForCommit(x.commitIndex); + checkOpen(); } catch (IOException e) { throw new FlushRuntimeException(e); } + return watchForCommit(x.commitIndex); }); } @@ -771,7 +771,7 @@ void validateResponse( } - public void setIoException(Exception e) { + public IOException setIoException(Throwable e) { IOException ioe = getIoException(); if (ioe == null) { IOException exception = new IOException(EXCEPTION_MSG + e.toString(), e); @@ -782,6 +782,7 @@ public void setIoException(Exception e) { "so subsequent request also encounters " + "Storage Container Exception {}", ioe, e); } + return getIoException(); } void cleanup() { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java index d32c37eba6c3..c420a2dcca29 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java @@ -102,8 +102,8 @@ void releaseBuffersOnException() { } @Override - XceiverClientReply sendWatchForCommit(long commitIndex) throws IOException { - return commitWatcher.watchForCommit(commitIndex); + CompletableFuture sendWatchForCommit(long commitIndex) { + return commitWatcher.watchForCommitAsync(commitIndex); } @Override From 6774d55d355a0daff18558991b1473c71b421d8a Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Tue, 13 Aug 2024 12:57:13 -0700 Subject: [PATCH 2/4] Fix checkstyle. --- .../org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 293ca0f5c37b..bdba4a10db0f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -487,7 +487,9 @@ private CompletableFuture watchForCommit(long commitIndex) { LOG.debug("Entering watchForCommit commitIndex = {}", commitIndex); return sendWatchForCommit(commitIndex) .thenAccept(this::checkReply) - .exceptionally(e -> { throw new FlushRuntimeException(setIoException(e)); }) + .exceptionally(e -> { + throw new FlushRuntimeException(setIoException(e)); + }) .whenComplete((r, e) -> LOG.debug("Leaving watchForCommit commitIndex = {}", commitIndex)); } From a9c54621934abe566c01967207c1e5edc2787e96 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Fri, 16 Aug 2024 13:36:30 -0700 Subject: [PATCH 3/4] Address review comments. --- .../scm/storage/AbstractCommitWatcher.java | 1 - .../hdds/scm/storage/BlockOutputStream.java | 31 +++++++++---------- .../scm/storage/RatisBlockOutputStream.java | 4 +-- 3 files changed, 16 insertions(+), 20 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java index 61bc73420e65..7641de1274d8 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java @@ -124,7 +124,6 @@ XceiverClientReply watchOnLastIndex() throws IOException { * * @param commitIndex log index to watch for * @return minimum commit index replicated to all nodes - * @throws IOException IOException in case watch gets timed out */ CompletableFuture watchForCommitAsync(long commitIndex) { final MemoizedSupplier> supplier diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index bdba4a10db0f..00b854721c18 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -375,10 +375,8 @@ private void doFlushOrWatchIfNeeded() throws IOException { } private void recordWatchForCommitAsync(CompletableFuture putBlockResultFuture) { - recordFlushFuture(watchForCommitAsync(putBlockResultFuture)); - } + final CompletableFuture flushFuture = putBlockResultFuture.thenCompose(x -> watchForCommit(x.commitIndex)); - private void recordFlushFuture(CompletableFuture flushFuture) { Preconditions.checkState(Thread.holdsLock(this)); this.lastFlushFuture = flushFuture; this.allPendingFlushFutures = allPendingFlushFutures.thenCombine(flushFuture, (last, curr) -> null); @@ -477,13 +475,23 @@ void releaseBuffersOnException() { } /** - * Watch for a specific commit index. + * Send a watch request to wait until the given index became committed. + * When watch is not needed (e.g. EC), this is a NOOP. + * + * @param index the log index to wait for. + * @return the future of the reply. */ - CompletableFuture sendWatchForCommit(long commitIndex) { + CompletableFuture sendWatchForCommit(long index) { return CompletableFuture.completedFuture(null); } private CompletableFuture watchForCommit(long commitIndex) { + try { + checkOpen(); + } catch (IOException e) { + throw new FlushRuntimeException(e); + } + LOG.debug("Entering watchForCommit commitIndex = {}", commitIndex); return sendWatchForCommit(commitIndex) .thenAccept(this::checkReply) @@ -493,7 +501,7 @@ private CompletableFuture watchForCommit(long commitIndex) { .whenComplete((r, e) -> LOG.debug("Leaving watchForCommit commitIndex = {}", commitIndex)); } - void checkReply(XceiverClientReply reply) { + private void checkReply(XceiverClientReply reply) { if (reply == null) { return; } @@ -724,17 +732,6 @@ private synchronized CompletableFuture handleFlushInternalSynchronized(boo return lastFlushFuture; } - private CompletableFuture watchForCommitAsync(CompletableFuture putBlockResultFuture) { - return putBlockResultFuture.thenCompose(x -> { - try { - checkOpen(); - } catch (IOException e) { - throw new FlushRuntimeException(e); - } - return watchForCommit(x.commitIndex); - }); - } - @Override public void close() throws IOException { if (xceiverClientFactory != null && xceiverClient != null) { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java index c420a2dcca29..0f95716bf9a4 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java @@ -102,8 +102,8 @@ void releaseBuffersOnException() { } @Override - CompletableFuture sendWatchForCommit(long commitIndex) { - return commitWatcher.watchForCommitAsync(commitIndex); + CompletableFuture sendWatchForCommit(long index) { + return commitWatcher.watchForCommitAsync(index); } @Override From fed414cdb80f8cbfec221cd63c507fc4b30023e3 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 23 Aug 2024 15:57:27 -0700 Subject: [PATCH 4/4] Fix rebase Change-Id: If922e51e4d42507e8a4a86cfa0a504f88ebea966 --- .../org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 00b854721c18..ca3e4e53743e 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -442,7 +442,8 @@ public synchronized void writeOnRetry(long len) throws IOException { writeChunk(buffer); putBlockFuture = executePutBlock(false, false); } - CompletableFuture watchForCommitAsync = watchForCommitAsync(putBlockFuture); + CompletableFuture watchForCommitAsync = + putBlockFuture.thenCompose(x -> watchForCommit(x.commitIndex)); try { watchForCommitAsync.get(); } catch (InterruptedException e) {