From a13845660d2bc01151572e91f110906b12b1330b Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Fri, 7 Mar 2025 18:58:08 +0530 Subject: [PATCH 01/13] mark statemachine unhealthy for write operation timeout --- .../apache/hadoop/hdds/scm/ScmConfigKeys.java | 4 +- .../server/ratis/ContainerStateMachine.java | 76 ++++++++++++++++++- 2 files changed, 75 insertions(+), 5 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 941df45c2df6..82125c73b481 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -632,7 +632,9 @@ public final class ScmConfigKeys { public static final String NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY = "net.topology.node.switch.mapping.impl"; - + public static final String HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL + = "hdds.container.ratis.statemachine.write.wait.interval"; + public static final long HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL_DEFAULT = 10 * 1000L; /** * Never constructed. */ diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 0a1df1088d68..26b5feeca5b5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -187,13 +187,39 @@ long getStartTime() { } } + static class WriteFutureContext { + private final CompletableFuture writeChunkFuture; + private final CompletableFuture raftFuture; + private final long startTime; + + WriteFutureContext(CompletableFuture writeChunkFuture, + CompletableFuture raftFuture, long startTime) { + this.writeChunkFuture = writeChunkFuture; + this.raftFuture = raftFuture; + this.startTime = startTime; + } + + public CompletableFuture getWriteChunkFuture() { + return writeChunkFuture; + } + + public CompletableFuture getRaftFuture() { + return raftFuture; + } + + long getStartTime() { + return startTime; + } + } + private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); private final ContainerDispatcher dispatcher; private final ContainerController containerController; private final XceiverServerRatis ratisServer; - private final ConcurrentHashMap> writeChunkFutureMap; + private final ConcurrentHashMap writeChunkFutureMap; + private final long writeChunkWaitMaxMs; + private long writeFutureMinIndex = 0; // keeps track of the containers created per pipeline private final Map container2BCSIDMap; @@ -273,6 +299,8 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI this.waitOnBothFollowers = conf.getObject( DatanodeConfiguration.class).waitOnAllFollowers(); + this.writeChunkWaitMaxMs = conf.getLong(ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL, + ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL_DEFAULT); } private void validatePeers() throws IOException { @@ -542,6 +570,15 @@ private ContainerCommandResponseProto dispatchCommand( private CompletableFuture writeStateMachineData( ContainerCommandRequestProto requestProto, long entryIndex, long term, long startTime) { + if (writeChunkFutureMap.containsKey(entryIndex)) { + // generally state machine will wait forever, for precaution, a check is added if retry happens. + return writeChunkFutureMap.get(entryIndex).getRaftFuture(); + } + try { + validateLongRunningWrite(entryIndex); + } catch (IOException e) { + return completeExceptionally(e); + } final WriteChunkRequestProto write = requestProto.getWriteChunk(); RaftServer server = ratisServer.getServer(); Preconditions.checkArgument(!write.getData().isEmpty()); @@ -591,7 +628,7 @@ private CompletableFuture writeStateMachineData( } }, getChunkExecutor(requestProto.getWriteChunk())); - writeChunkFutureMap.put(entryIndex, writeChunkFuture); + writeChunkFutureMap.put(entryIndex, new WriteFutureContext(writeChunkFuture, raftFuture, startTime)); if (LOG.isDebugEnabled()) { LOG.debug("{}: writeChunk writeStateMachineData : blockId" + "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(), @@ -639,6 +676,37 @@ private CompletableFuture writeStateMachineData( return raftFuture; } + private void validateLongRunningWrite(long currIndex) throws IOException { + // get min valid write chunk operation's future context + WriteFutureContext writeFutureContext = null; + for (long i = writeFutureMinIndex; i < currIndex; ++i) { + if (writeChunkFutureMap.containsKey(i)) { + writeFutureContext = writeChunkFutureMap.get(i); + writeFutureMinIndex = i; + break; + } + } + if (null == writeFutureContext) { + return; + } + // validate for timeout in milli second + long waitTime = Time.monotonicNow() - writeFutureContext.getStartTime() / 1000000; + IOException ex = new StorageContainerException("Write chunk has taken " + waitTime + " crossing threshold " + + writeChunkWaitMaxMs + " for groupId " + getGroupId(), ContainerProtos.Result.CONTAINER_INTERNAL_ERROR); + if (waitTime > writeChunkWaitMaxMs) { + LOG.error("Write chunk has taken {}ms crossing threshold {}ms for groupId {}", waitTime, writeChunkWaitMaxMs, + getGroupId()); + stateMachineHealthy.set(false); + writeChunkFutureMap.forEach((key, value) -> { + LOG.error("Cancelling write chunk for transaction {}, groupId {}", key, getGroupId()); + value.getWriteChunkFuture().cancel(true); + value.getRaftFuture().completeExceptionally(ex); + }); + writeFutureContext.getRaftFuture().completeExceptionally(ex); + throw ex; + } + } + private StateMachine.DataChannel getStreamDataChannel( ContainerCommandRequestProto requestProto, DispatcherContext context) throws StorageContainerException { @@ -821,7 +889,7 @@ private ByteString readStateMachineData( public CompletableFuture flush(long index) { return CompletableFuture.allOf( writeChunkFutureMap.entrySet().stream().filter(x -> x.getKey() <= index) - .map(Map.Entry::getValue).toArray(CompletableFuture[]::new)); + .map(e -> e.getValue().getWriteChunkFuture()).toArray(CompletableFuture[]::new)); } /** From 93b9dc48edc1f75c6370bb4acb7662e1e0d7e741 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Sat, 8 Mar 2025 19:31:13 +0530 Subject: [PATCH 02/13] review comment fix --- .../server/ratis/ContainerStateMachine.java | 27 +++++++------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 26b5feeca5b5..ef74659db87e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -35,12 +35,14 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @@ -217,9 +219,8 @@ long getStartTime() { private final ContainerDispatcher dispatcher; private final ContainerController containerController; private final XceiverServerRatis ratisServer; - private final ConcurrentHashMap writeChunkFutureMap; + private final NavigableMap writeChunkFutureMap; private final long writeChunkWaitMaxMs; - private long writeFutureMinIndex = 0; // keeps track of the containers created per pipeline private final Map container2BCSIDMap; @@ -255,7 +256,7 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI this.containerController = containerController; this.ratisServer = ratisServer; metrics = CSMMetrics.create(gid); - this.writeChunkFutureMap = new ConcurrentHashMap<>(); + this.writeChunkFutureMap = new ConcurrentSkipListMap<>(); applyTransactionCompletionMap = new ConcurrentHashMap<>(); this.unhealthyContainers = ConcurrentHashMap.newKeySet(); long pendingRequestsBytesLimit = (long)conf.getStorageSize( @@ -678,21 +679,12 @@ private CompletableFuture writeStateMachineData( private void validateLongRunningWrite(long currIndex) throws IOException { // get min valid write chunk operation's future context - WriteFutureContext writeFutureContext = null; - for (long i = writeFutureMinIndex; i < currIndex; ++i) { - if (writeChunkFutureMap.containsKey(i)) { - writeFutureContext = writeChunkFutureMap.get(i); - writeFutureMinIndex = i; - break; - } - } - if (null == writeFutureContext) { + Map.Entry longWriteFutureContextEntry = writeChunkFutureMap.firstEntry(); + if (null == longWriteFutureContextEntry) { return; } // validate for timeout in milli second - long waitTime = Time.monotonicNow() - writeFutureContext.getStartTime() / 1000000; - IOException ex = new StorageContainerException("Write chunk has taken " + waitTime + " crossing threshold " - + writeChunkWaitMaxMs + " for groupId " + getGroupId(), ContainerProtos.Result.CONTAINER_INTERNAL_ERROR); + long waitTime = Time.monotonicNow() - longWriteFutureContextEntry.getValue().getStartTime() / 1000000; if (waitTime > writeChunkWaitMaxMs) { LOG.error("Write chunk has taken {}ms crossing threshold {}ms for groupId {}", waitTime, writeChunkWaitMaxMs, getGroupId()); @@ -700,10 +692,9 @@ private void validateLongRunningWrite(long currIndex) throws IOException { writeChunkFutureMap.forEach((key, value) -> { LOG.error("Cancelling write chunk for transaction {}, groupId {}", key, getGroupId()); value.getWriteChunkFuture().cancel(true); - value.getRaftFuture().completeExceptionally(ex); }); - writeFutureContext.getRaftFuture().completeExceptionally(ex); - throw ex; + throw new StorageContainerException("Write chunk has taken " + waitTime + " crossing threshold " + + writeChunkWaitMaxMs + " for groupId " + getGroupId(), ContainerProtos.Result.CONTAINER_INTERNAL_ERROR); } } From 75bb186daefa730f112bfdf47cc2d39ee4230513 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Sun, 9 Mar 2025 19:43:30 +0530 Subject: [PATCH 03/13] add test case and fix failure --- .../server/ratis/ContainerStateMachine.java | 144 +++++++++--------- .../ratis/TestContainerStateMachine.java | 52 +++++++ 2 files changed, 126 insertions(+), 70 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index ef74659db87e..64df0285025e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -45,6 +45,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -190,18 +191,18 @@ long getStartTime() { } static class WriteFutureContext { - private final CompletableFuture writeChunkFuture; + private final Future writeChunkFuture; private final CompletableFuture raftFuture; private final long startTime; - WriteFutureContext(CompletableFuture writeChunkFuture, + WriteFutureContext(Future writeChunkFuture, CompletableFuture raftFuture, long startTime) { this.writeChunkFuture = writeChunkFuture; this.raftFuture = raftFuture; this.startTime = startTime; } - public CompletableFuture getWriteChunkFuture() { + public Future getWriteChunkFuture() { return writeChunkFuture; } @@ -576,7 +577,7 @@ private CompletableFuture writeStateMachineData( return writeChunkFutureMap.get(entryIndex).getRaftFuture(); } try { - validateLongRunningWrite(entryIndex); + validateLongRunningWrite(); } catch (IOException e) { return completeExceptionally(e); } @@ -602,82 +603,85 @@ private CompletableFuture writeStateMachineData( .setContainer2BCSIDMap(container2BCSIDMap) .build(); CompletableFuture raftFuture = new CompletableFuture<>(); - // ensure the write chunk happens asynchronously in writeChunkExecutor pool - // thread. - CompletableFuture writeChunkFuture = - CompletableFuture.supplyAsync(() -> { - try { - try { - checkContainerHealthy(write.getBlockID().getContainerID(), true); - } catch (StorageContainerException e) { - return ContainerUtils.logAndReturnError(LOG, e, requestProto); - } - metrics.recordWriteStateMachineQueueingLatencyNs( - Time.monotonicNowNanos() - startTime); - return dispatchCommand(requestProto, context); - } catch (Exception e) { - LOG.error("{}: writeChunk writeStateMachineData failed: blockId" + + // ensure the write chunk happens asynchronously in writeChunkExecutor pool thread. + Future future = getChunkExecutor(requestProto.getWriteChunk()).submit(() -> { + try { + try { + checkContainerHealthy(write.getBlockID().getContainerID(), true); + } catch (StorageContainerException e) { + ContainerCommandResponseProto result = ContainerUtils.logAndReturnError(LOG, e, requestProto); + handleCommandResult(requestProto, entryIndex, startTime, result, write, raftFuture); + return result; + } + metrics.recordWriteStateMachineQueueingLatencyNs( + Time.monotonicNowNanos() - startTime); + ContainerCommandResponseProto result = dispatchCommand(requestProto, context); + handleCommandResult(requestProto, entryIndex, startTime, result, write, raftFuture); + return result; + } catch (Exception e) { + LOG.error("{}: writeChunk writeStateMachineData failed: blockId" + "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(), - entryIndex, write.getChunkData().getChunkName(), e); - metrics.incNumWriteDataFails(); - // write chunks go in parallel. It's possible that one write chunk - // see the stateMachine is marked unhealthy by other parallel thread - unhealthyContainers.add(write.getBlockID().getContainerID()); - stateMachineHealthy.set(false); - raftFuture.completeExceptionally(e); - throw e; - } - }, getChunkExecutor(requestProto.getWriteChunk())); + entryIndex, write.getChunkData().getChunkName(), e); + metrics.incNumWriteDataFails(); + // write chunks go in parallel. It's possible that one write chunk + // see the stateMachine is marked unhealthy by other parallel thread + unhealthyContainers.add(write.getBlockID().getContainerID()); + stateMachineHealthy.set(false); + raftFuture.completeExceptionally(e); + throw e; + } finally { + // Remove the future once it finishes execution from the + writeChunkFutureMap.remove(entryIndex); + } + }); - writeChunkFutureMap.put(entryIndex, new WriteFutureContext(writeChunkFuture, raftFuture, startTime)); + writeChunkFutureMap.put(entryIndex, new WriteFutureContext(future, raftFuture, startTime)); if (LOG.isDebugEnabled()) { LOG.debug("{}: writeChunk writeStateMachineData : blockId" + "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(), entryIndex, write.getChunkData().getChunkName()); } - // Remove the future once it finishes execution from the - // writeChunkFutureMap. - writeChunkFuture.thenApply(r -> { - if (r.getResult() != ContainerProtos.Result.SUCCESS - && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN - && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO - // After concurrent flushes are allowed on the same key, chunk file inconsistencies can happen and - // that should not crash the pipeline. - && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) { - StorageContainerException sce = - new StorageContainerException(r.getMessage(), r.getResult()); - LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: blockId" + + return raftFuture; + } + + private void handleCommandResult(ContainerCommandRequestProto requestProto, long entryIndex, long startTime, + ContainerCommandResponseProto r, WriteChunkRequestProto write, + CompletableFuture raftFuture) { + if (r.getResult() != ContainerProtos.Result.SUCCESS + && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN + && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO + // After concurrent flushes are allowed on the same key, chunk file inconsistencies can happen and + // that should not crash the pipeline. + && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) { + StorageContainerException sce = + new StorageContainerException(r.getMessage(), r.getResult()); + LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: blockId" + + write.getBlockID() + " logIndex " + entryIndex + " chunkName " + + write.getChunkData().getChunkName() + " Error message: " + + r.getMessage() + " Container Result: " + r.getResult()); + metrics.incNumWriteDataFails(); + // If the write fails currently we mark the stateMachine as unhealthy. + // This leads to pipeline close. Any change in that behavior requires + // handling the entry for the write chunk in cache. + stateMachineHealthy.set(false); + unhealthyContainers.add(write.getBlockID().getContainerID()); + raftFuture.completeExceptionally(sce); + } else { + metrics.incNumBytesWrittenCount( + requestProto.getWriteChunk().getChunkData().getLen()); + if (LOG.isDebugEnabled()) { + LOG.debug(getGroupId() + + ": writeChunk writeStateMachineData completed: blockId" + write.getBlockID() + " logIndex " + entryIndex + " chunkName " + - write.getChunkData().getChunkName() + " Error message: " + - r.getMessage() + " Container Result: " + r.getResult()); - metrics.incNumWriteDataFails(); - // If the write fails currently we mark the stateMachine as unhealthy. - // This leads to pipeline close. Any change in that behavior requires - // handling the entry for the write chunk in cache. - stateMachineHealthy.set(false); - unhealthyContainers.add(write.getBlockID().getContainerID()); - raftFuture.completeExceptionally(sce); - } else { - metrics.incNumBytesWrittenCount( - requestProto.getWriteChunk().getChunkData().getLen()); - if (LOG.isDebugEnabled()) { - LOG.debug(getGroupId() + - ": writeChunk writeStateMachineData completed: blockId" + - write.getBlockID() + " logIndex " + entryIndex + " chunkName " + - write.getChunkData().getChunkName()); - } - raftFuture.complete(r::toByteString); - metrics.recordWriteStateMachineCompletionNs( - Time.monotonicNowNanos() - startTime); + write.getChunkData().getChunkName()); } - - writeChunkFutureMap.remove(entryIndex); - return r; - }); - return raftFuture; + raftFuture.complete(r::toByteString); + metrics.recordWriteStateMachineCompletionNs( + Time.monotonicNowNanos() - startTime); + } } - private void validateLongRunningWrite(long currIndex) throws IOException { + private void validateLongRunningWrite() throws IOException { // get min valid write chunk operation's future context Map.Entry longWriteFutureContextEntry = writeChunkFutureMap.firstEntry(); if (null == longWriteFutureContextEntry) { @@ -880,7 +884,7 @@ private ByteString readStateMachineData( public CompletableFuture flush(long index) { return CompletableFuture.allOf( writeChunkFutureMap.entrySet().stream().filter(x -> x.getKey() <= index) - .map(e -> e.getValue().getWriteChunkFuture()).toArray(CompletableFuture[]::new)); + .map(e -> e.getValue().getRaftFuture()).toArray(CompletableFuture[]::new)); } /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java index 8a5cf4b91ad7..e47380414428 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java @@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; @@ -30,8 +31,10 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; +import java.lang.reflect.Field; import java.util.List; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -57,6 +60,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -263,4 +267,52 @@ public void testApplyTransactionFailure(boolean failWithException) throws Execut ContainerProtos.ContainerCommandResponseProto.parseFrom(succcesfulTransaction.getContent()); assertEquals(ContainerProtos.Result.SUCCESS, resp.getResult()); } + + @Test + public void testWriteTimout() throws Exception { + RaftProtos.LogEntryProto entry = mock(RaftProtos.LogEntryProto.class); + when(entry.getTerm()).thenReturn(1L); + when(entry.getIndex()).thenReturn(1L); + RaftProtos.LogEntryProto entryNext = mock(RaftProtos.LogEntryProto.class); + when(entryNext.getTerm()).thenReturn(1L); + when(entryNext.getIndex()).thenReturn(2L); + TransactionContext trx = mock(TransactionContext.class); + ContainerStateMachine.Context context = mock(ContainerStateMachine.Context.class); + when(trx.getStateMachineContext()).thenReturn(context); + doAnswer(e -> { + try { + Thread.sleep(200000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw ie; + } + return null; + }).when(dispatcher).dispatch(any(), any()); + + when(context.getRequestProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk( + ContainerProtos.WriteChunkRequestProto.newBuilder().setData(ByteString.copyFromUtf8("Test Data")) + .setBlockID( + ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(1).setLocalID(1).build()).build()) + .setContainerID(1) + .setDatanodeUuid(UUID.randomUUID().toString()).build()); + AtomicReference throwable = new AtomicReference<>(null); + Function throwableSetter = t -> { + throwable.set(t); + return null; + }; + Field writeChunkWaitMaxMs = stateMachine.getClass().getDeclaredField("writeChunkWaitMaxMs"); + writeChunkWaitMaxMs.setAccessible(true); + writeChunkWaitMaxMs.set(stateMachine, 1000); + CompletableFuture firstWrite = stateMachine.write(entry, trx); + Thread.sleep(2000); + CompletableFuture secondWrite = stateMachine.write(entryNext, trx); + firstWrite.exceptionally(throwableSetter).get(); + assertNotNull(throwable.get()); + assertInstanceOf(InterruptedException.class, throwable.get()); + + secondWrite.exceptionally(throwableSetter).get(); + assertNotNull(throwable.get()); + assertInstanceOf(IOException.class, throwable.get()); + } } From d6a6a0f74a0145a55f6f88a75d8a4a174a2af16f Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Sun, 9 Mar 2025 22:20:06 +0530 Subject: [PATCH 04/13] fix test failures --- .../java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 2 +- hadoop-hdds/common/src/main/resources/ozone-default.xml | 8 ++++++++ .../transport/server/ratis/ContainerStateMachine.java | 5 +++-- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 82125c73b481..25d08f76b943 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -634,7 +634,7 @@ public final class ScmConfigKeys { "net.topology.node.switch.mapping.impl"; public static final String HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL = "hdds.container.ratis.statemachine.write.wait.interval"; - public static final long HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL_DEFAULT = 10 * 1000L; + public static final long HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL_DEFAULT = 10 * 60 * 1000L; /** * Never constructed. */ diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index f20d606d4365..9a636a913536 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -3548,6 +3548,14 @@ Timeout for the request submitted directly to Ratis in datanode. + + hdds.container.ratis.statemachine.write.wait.interval + OZONE, DATANODE + 10m + + Timeout for the write path for container blocks. + + hdds.datanode.slow.op.warning.threshold OZONE, DATANODE, PERFORMANCE diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 64df0285025e..2c54a7cca841 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -49,6 +49,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import org.apache.hadoop.hdds.HddsUtils; @@ -301,8 +302,8 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI this.waitOnBothFollowers = conf.getObject( DatanodeConfiguration.class).waitOnAllFollowers(); - this.writeChunkWaitMaxMs = conf.getLong(ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL, - ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL_DEFAULT); + this.writeChunkWaitMaxMs = conf.getTimeDuration(ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL, + ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); } private void validatePeers() throws IOException { From 71191ba64b40d53e487e83aa87ac5e71d6a41057 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Mon, 10 Mar 2025 09:00:14 +0530 Subject: [PATCH 05/13] fix test failure --- .../server/ratis/ContainerStateMachine.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 2c54a7cca841..3c33f4f45a7f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -684,12 +684,20 @@ private void handleCommandResult(ContainerCommandRequestProto requestProto, long private void validateLongRunningWrite() throws IOException { // get min valid write chunk operation's future context - Map.Entry longWriteFutureContextEntry = writeChunkFutureMap.firstEntry(); - if (null == longWriteFutureContextEntry) { + Map.Entry writeFutureContextEntry = null; + while (!writeChunkFutureMap.isEmpty()) { + writeFutureContextEntry = writeChunkFutureMap.firstEntry(); + // there is a possibility of entry being removed before added in map, cleanup those + if (!writeFutureContextEntry.getValue().getWriteChunkFuture().isDone()) { + break; + } + writeChunkFutureMap.remove(writeFutureContextEntry.getKey()); + } + if (null == writeFutureContextEntry) { return; } // validate for timeout in milli second - long waitTime = Time.monotonicNow() - longWriteFutureContextEntry.getValue().getStartTime() / 1000000; + long waitTime = Time.monotonicNow() - writeFutureContextEntry.getValue().getStartTime() / 1000000; if (waitTime > writeChunkWaitMaxMs) { LOG.error("Write chunk has taken {}ms crossing threshold {}ms for groupId {}", waitTime, writeChunkWaitMaxMs, getGroupId()); From bf48067fdf76a7ea33a90eac6f7759328d568f39 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Mon, 10 Mar 2025 12:27:37 +0530 Subject: [PATCH 06/13] test case fix --- .../common/transport/server/ratis/ContainerStateMachine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 3c33f4f45a7f..898feff9970e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -688,7 +688,7 @@ private void validateLongRunningWrite() throws IOException { while (!writeChunkFutureMap.isEmpty()) { writeFutureContextEntry = writeChunkFutureMap.firstEntry(); // there is a possibility of entry being removed before added in map, cleanup those - if (!writeFutureContextEntry.getValue().getWriteChunkFuture().isDone()) { + if (null == writeFutureContextEntry || !writeFutureContextEntry.getValue().getWriteChunkFuture().isDone()) { break; } writeChunkFutureMap.remove(writeFutureContextEntry.getKey()); From b9dbb66d7fdcffde0dce68019b83f17f099a77a4 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Mon, 10 Mar 2025 13:42:44 +0530 Subject: [PATCH 07/13] review fix --- .../transport/server/ratis/TestContainerStateMachine.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java index e47380414428..442ae0248464 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java @@ -313,6 +313,8 @@ public void testWriteTimout() throws Exception { secondWrite.exceptionally(throwableSetter).get(); assertNotNull(throwable.get()); - assertInstanceOf(IOException.class, throwable.get()); + assertInstanceOf(StorageContainerException.class, throwable.get()); + StorageContainerException sce = (StorageContainerException) throwable.get(); + assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, sce.getResult()); } } From 3b076866901a944422a6dbb55b50db64fa6b274a Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Tue, 11 Mar 2025 11:59:37 +0530 Subject: [PATCH 08/13] review fix --- .../apache/hadoop/hdds/scm/ScmConfigKeys.java | 2 +- .../server/ratis/ContainerStateMachine.java | 129 ++++++++++-------- 2 files changed, 72 insertions(+), 59 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 25d08f76b943..8f9dc256690c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -634,7 +634,7 @@ public final class ScmConfigKeys { "net.topology.node.switch.mapping.impl"; public static final String HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL = "hdds.container.ratis.statemachine.write.wait.interval"; - public static final long HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL_DEFAULT = 10 * 60 * 1000L; + public static final long HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL_NS_DEFAULT = 10 * 60 * 1000_1000_1000L; /** * Never constructed. */ diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 898feff9970e..03d97e9253f8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -39,6 +39,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.SortedMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; @@ -191,13 +192,13 @@ long getStartTime() { } } - static class WriteFutureContext { + static class WriteFutures { private final Future writeChunkFuture; private final CompletableFuture raftFuture; private final long startTime; - WriteFutureContext(Future writeChunkFuture, - CompletableFuture raftFuture, long startTime) { + WriteFutures(Future writeChunkFuture, + CompletableFuture raftFuture, long startTime) { this.writeChunkFuture = writeChunkFuture; this.raftFuture = raftFuture; this.startTime = startTime; @@ -221,8 +222,8 @@ long getStartTime() { private final ContainerDispatcher dispatcher; private final ContainerController containerController; private final XceiverServerRatis ratisServer; - private final NavigableMap writeChunkFutureMap; - private final long writeChunkWaitMaxMs; + private final NavigableMap writeChunkFutureMap; + private final long writeChunkWaitMaxNs; // keeps track of the containers created per pipeline private final Map container2BCSIDMap; @@ -302,8 +303,8 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI this.waitOnBothFollowers = conf.getObject( DatanodeConfiguration.class).waitOnAllFollowers(); - this.writeChunkWaitMaxMs = conf.getTimeDuration(ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL, - ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + this.writeChunkWaitMaxNs = conf.getTimeDuration(ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL, + ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL_NS_DEFAULT, TimeUnit.NANOSECONDS); } private void validatePeers() throws IOException { @@ -573,13 +574,14 @@ private ContainerCommandResponseProto dispatchCommand( private CompletableFuture writeStateMachineData( ContainerCommandRequestProto requestProto, long entryIndex, long term, long startTime) { - if (writeChunkFutureMap.containsKey(entryIndex)) { + final WriteFutures previous = writeChunkFutureMap.get(entryIndex); + if (previous != null) { // generally state machine will wait forever, for precaution, a check is added if retry happens. - return writeChunkFutureMap.get(entryIndex).getRaftFuture(); + return previous.getRaftFuture(); } try { validateLongRunningWrite(); - } catch (IOException e) { + } catch (StorageContainerException e) { return completeExceptionally(e); } final WriteChunkRequestProto write = requestProto.getWriteChunk(); @@ -605,38 +607,39 @@ private CompletableFuture writeStateMachineData( .build(); CompletableFuture raftFuture = new CompletableFuture<>(); // ensure the write chunk happens asynchronously in writeChunkExecutor pool thread. - Future future = getChunkExecutor(requestProto.getWriteChunk()).submit(() -> { - try { - try { - checkContainerHealthy(write.getBlockID().getContainerID(), true); - } catch (StorageContainerException e) { - ContainerCommandResponseProto result = ContainerUtils.logAndReturnError(LOG, e, requestProto); - handleCommandResult(requestProto, entryIndex, startTime, result, write, raftFuture); - return result; - } - metrics.recordWriteStateMachineQueueingLatencyNs( - Time.monotonicNowNanos() - startTime); - ContainerCommandResponseProto result = dispatchCommand(requestProto, context); - handleCommandResult(requestProto, entryIndex, startTime, result, write, raftFuture); - return result; - } catch (Exception e) { - LOG.error("{}: writeChunk writeStateMachineData failed: blockId" + - "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(), - entryIndex, write.getChunkData().getChunkName(), e); - metrics.incNumWriteDataFails(); - // write chunks go in parallel. It's possible that one write chunk - // see the stateMachine is marked unhealthy by other parallel thread - unhealthyContainers.add(write.getBlockID().getContainerID()); - stateMachineHealthy.set(false); - raftFuture.completeExceptionally(e); - throw e; - } finally { - // Remove the future once it finishes execution from the - writeChunkFutureMap.remove(entryIndex); - } - }); + Future future = getChunkExecutor( + requestProto.getWriteChunk()).submit(() -> { + try { + try { + checkContainerHealthy(write.getBlockID().getContainerID(), true); + } catch (StorageContainerException e) { + ContainerCommandResponseProto result = ContainerUtils.logAndReturnError(LOG, e, requestProto); + handleCommandResult(requestProto, entryIndex, startTime, result, write, raftFuture); + return result; + } + metrics.recordWriteStateMachineQueueingLatencyNs( + Time.monotonicNowNanos() - startTime); + ContainerCommandResponseProto result = dispatchCommand(requestProto, context); + handleCommandResult(requestProto, entryIndex, startTime, result, write, raftFuture); + return result; + } catch (Exception e) { + LOG.error("{}: writeChunk writeStateMachineData failed: blockId" + + "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(), + entryIndex, write.getChunkData().getChunkName(), e); + metrics.incNumWriteDataFails(); + // write chunks go in parallel. It's possible that one write chunk + // see the stateMachine is marked unhealthy by other parallel thread + unhealthyContainers.add(write.getBlockID().getContainerID()); + stateMachineHealthy.set(false); + raftFuture.completeExceptionally(e); + throw e; + } finally { + // Remove the future once it finishes execution from the + writeChunkFutureMap.remove(entryIndex); + } + }); - writeChunkFutureMap.put(entryIndex, new WriteFutureContext(future, raftFuture, startTime)); + writeChunkFutureMap.put(entryIndex, new WriteFutures(future, raftFuture, startTime)); if (LOG.isDebugEnabled()) { LOG.debug("{}: writeChunk writeStateMachineData : blockId" + "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(), @@ -682,32 +685,38 @@ private void handleCommandResult(ContainerCommandRequestProto requestProto, long } } - private void validateLongRunningWrite() throws IOException { + private void validateLongRunningWrite() throws StorageContainerException { // get min valid write chunk operation's future context - Map.Entry writeFutureContextEntry = null; - while (!writeChunkFutureMap.isEmpty()) { + Map.Entry writeFutureContextEntry = null; + for (boolean found = false; !found;) { writeFutureContextEntry = writeChunkFutureMap.firstEntry(); - // there is a possibility of entry being removed before added in map, cleanup those - if (null == writeFutureContextEntry || !writeFutureContextEntry.getValue().getWriteChunkFuture().isDone()) { - break; + if (null == writeFutureContextEntry) { + return; + } + if (writeFutureContextEntry.getValue().getWriteChunkFuture().isDone()) { + // there is a possibility that writeChunkFutureMap may have dangling entry, as remove is done before add future + writeChunkFutureMap.remove(writeFutureContextEntry.getKey()); + } else { + found = true; } - writeChunkFutureMap.remove(writeFutureContextEntry.getKey()); } if (null == writeFutureContextEntry) { return; } // validate for timeout in milli second - long waitTime = Time.monotonicNow() - writeFutureContextEntry.getValue().getStartTime() / 1000000; - if (waitTime > writeChunkWaitMaxMs) { - LOG.error("Write chunk has taken {}ms crossing threshold {}ms for groupId {}", waitTime, writeChunkWaitMaxMs, - getGroupId()); + long waitTime = Time.monotonicNowNanos() - writeFutureContextEntry.getValue().getStartTime(); + if (waitTime > writeChunkWaitMaxNs) { + LOG.error("Write chunk has taken {}ns crossing threshold {}ns for index {} groupId {}", waitTime, + writeChunkWaitMaxNs, writeFutureContextEntry.getKey(), getGroupId()); stateMachineHealthy.set(false); writeChunkFutureMap.forEach((key, value) -> { - LOG.error("Cancelling write chunk for transaction {}, groupId {}", key, getGroupId()); + LOG.error("Cancelling write chunk due to timeout {}ns crossing {}ns for index {}, groupId {}", waitTime, + writeChunkWaitMaxNs, key, getGroupId()); value.getWriteChunkFuture().cancel(true); }); - throw new StorageContainerException("Write chunk has taken " + waitTime + " crossing threshold " - + writeChunkWaitMaxMs + " for groupId " + getGroupId(), ContainerProtos.Result.CONTAINER_INTERNAL_ERROR); + throw new StorageContainerException("Write chunk has taken " + waitTime + "ns crossing threshold " + + writeChunkWaitMaxNs + "ns for index " + writeFutureContextEntry.getKey() + " groupId " + getGroupId(), + ContainerProtos.Result.CONTAINER_INTERNAL_ERROR); } } @@ -891,9 +900,13 @@ private ByteString readStateMachineData( */ @Override public CompletableFuture flush(long index) { - return CompletableFuture.allOf( - writeChunkFutureMap.entrySet().stream().filter(x -> x.getKey() <= index) - .map(e -> e.getValue().getRaftFuture()).toArray(CompletableFuture[]::new)); + final SortedMap head = writeChunkFutureMap.headMap(index + 1); + if (head.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + return CompletableFuture.allOf(head.values().stream() + .map(WriteFutures::getRaftFuture) + .toArray(CompletableFuture[]::new)); } /** From 2b37bbe676ddf08b43ffb423c09f79c627493da8 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Tue, 11 Mar 2025 12:47:54 +0530 Subject: [PATCH 09/13] test fix --- .../transport/server/ratis/TestContainerStateMachine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java index 442ae0248464..e3b37c96f74f 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java @@ -301,7 +301,7 @@ public void testWriteTimout() throws Exception { throwable.set(t); return null; }; - Field writeChunkWaitMaxMs = stateMachine.getClass().getDeclaredField("writeChunkWaitMaxMs"); + Field writeChunkWaitMaxMs = stateMachine.getClass().getDeclaredField("writeChunkWaitMaxNs"); writeChunkWaitMaxMs.setAccessible(true); writeChunkWaitMaxMs.set(stateMachine, 1000); CompletableFuture firstWrite = stateMachine.write(entry, trx); From 29da870c0dc634376a7e3b9daa3e63d693877b29 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Wed, 12 Mar 2025 14:29:22 +0530 Subject: [PATCH 10/13] review fix --- .../server/ratis/ContainerStateMachine.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 03d97e9253f8..e3d7487c2628 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -624,7 +624,7 @@ private CompletableFuture writeStateMachineData( return result; } catch (Exception e) { LOG.error("{}: writeChunk writeStateMachineData failed: blockId" + - "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(), + "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(), entryIndex, write.getChunkData().getChunkName(), e); metrics.incNumWriteDataFails(); // write chunks go in parallel. It's possible that one write chunk @@ -700,18 +700,14 @@ private void validateLongRunningWrite() throws StorageContainerException { found = true; } } - if (null == writeFutureContextEntry) { - return; - } // validate for timeout in milli second long waitTime = Time.monotonicNowNanos() - writeFutureContextEntry.getValue().getStartTime(); if (waitTime > writeChunkWaitMaxNs) { - LOG.error("Write chunk has taken {}ns crossing threshold {}ns for index {} groupId {}", waitTime, - writeChunkWaitMaxNs, writeFutureContextEntry.getKey(), getGroupId()); + LOG.error("Write chunk has taken {}ns crossing threshold {}ns for index {} groupId {}, " + + "cancelling pending write chunk for this group", waitTime, writeChunkWaitMaxNs, + writeFutureContextEntry.getKey(), getGroupId()); stateMachineHealthy.set(false); writeChunkFutureMap.forEach((key, value) -> { - LOG.error("Cancelling write chunk due to timeout {}ns crossing {}ns for index {}, groupId {}", waitTime, - writeChunkWaitMaxNs, key, getGroupId()); value.getWriteChunkFuture().cancel(true); }); throw new StorageContainerException("Write chunk has taken " + waitTime + "ns crossing threshold " From c743146ab0158754732bad27f7850e3dc62dbf61 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Wed, 12 Mar 2025 14:30:57 +0530 Subject: [PATCH 11/13] fix review comment --- .../common/transport/server/ratis/ContainerStateMachine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index e3d7487c2628..0b1b29474141 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -624,7 +624,7 @@ private CompletableFuture writeStateMachineData( return result; } catch (Exception e) { LOG.error("{}: writeChunk writeStateMachineData failed: blockId" + - "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(), + "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(), entryIndex, write.getChunkData().getChunkName(), e); metrics.incNumWriteDataFails(); // write chunks go in parallel. It's possible that one write chunk From c5afb02d6373078316ec4f5e349c3d2ff53beca1 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Wed, 12 Mar 2025 19:49:27 +0530 Subject: [PATCH 12/13] fix test --- .../transport/server/ratis/TestContainerStateMachine.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java index e3b37c96f74f..eea237226d9c 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java @@ -301,9 +301,9 @@ public void testWriteTimout() throws Exception { throwable.set(t); return null; }; - Field writeChunkWaitMaxMs = stateMachine.getClass().getDeclaredField("writeChunkWaitMaxNs"); - writeChunkWaitMaxMs.setAccessible(true); - writeChunkWaitMaxMs.set(stateMachine, 1000); + Field writeChunkWaitMaxNs = stateMachine.getClass().getDeclaredField("writeChunkWaitMaxNs"); + writeChunkWaitMaxNs.setAccessible(true); + writeChunkWaitMaxNs.set(stateMachine, 1000_000_000); CompletableFuture firstWrite = stateMachine.write(entry, trx); Thread.sleep(2000); CompletableFuture secondWrite = stateMachine.write(entryNext, trx); From b8eae7e530ba967788eef4108a01bec6a1a00f73 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Thu, 13 Mar 2025 08:48:20 +0530 Subject: [PATCH 13/13] fix comment --- .../src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 2 +- .../common/transport/server/ratis/ContainerStateMachine.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 8f9dc256690c..c2f79a786570 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -634,7 +634,7 @@ public final class ScmConfigKeys { "net.topology.node.switch.mapping.impl"; public static final String HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL = "hdds.container.ratis.statemachine.write.wait.interval"; - public static final long HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL_NS_DEFAULT = 10 * 60 * 1000_1000_1000L; + public static final long HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL_NS_DEFAULT = 10 * 60 * 1000_000_000L; /** * Never constructed. */ diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 0b1b29474141..2a9fe61d17f3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -896,7 +896,7 @@ private ByteString readStateMachineData( */ @Override public CompletableFuture flush(long index) { - final SortedMap head = writeChunkFutureMap.headMap(index + 1); + final SortedMap head = writeChunkFutureMap.headMap(index, true); if (head.isEmpty()) { return CompletableFuture.completedFuture(null); }