From 69ef7b4a53a32055411da7cd953e598eb5eae409 Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Fri, 12 Sep 2025 13:57:21 +0530 Subject: [PATCH 1/6] HDDS-13618. Avoid frequent pipeline close action from DN --- .../common/statemachine/StateContext.java | 8 +++ .../ClosePipelineCommandHandler.java | 9 ++++ .../commandhandler/CommandDispatcher.java | 5 ++ .../server/ratis/XceiverServerRatis.java | 9 ++++ .../TestClosePipelineCommandHandler.java | 51 +++++++++++++++++++ 5 files changed, 82 insertions(+) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 3b7fd7efe037..d51a8d51fae1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -43,6 +43,7 @@ import java.util.OptionalLong; import java.util.Queue; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -70,6 +71,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ClosePipelineCommandHandler; import org.apache.hadoop.ozone.container.common.states.DatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState; @@ -807,6 +809,12 @@ public Map getCommandQueueSummary() { return summary; } + public boolean isPipelineCloseInProgress(UUID pipelineID) { + ClosePipelineCommandHandler handler = parentDatanodeStateMachine.getCommandDispatcher() + .getClosePipelineCommandHandler(); + return handler.isPipelineInProgress(pipelineID); + } + /** * Returns the count of the Execution. * @return long diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java index 2e392ccf6639..0611efe6858a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java @@ -90,6 +90,15 @@ public ClosePipelineCommandHandler( this.pipelinesInProgress = ConcurrentHashMap.newKeySet(); } + /** + * Returns true if pipeline is in progress, else false. + * + * @return boolean + */ + public boolean isPipelineInProgress(UUID pipelineID) { + return pipelinesInProgress.contains(pipelineID); + } + /** * Handles a given SCM command. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java index 696b04defe36..63cb935cbb42 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java @@ -80,6 +80,11 @@ public CommandHandler getDeleteBlocksCommandHandler() { return handlerMap.get(Type.deleteBlocksCommand); } + @VisibleForTesting + public ClosePipelineCommandHandler getClosePipelineCommandHandler() { + return (ClosePipelineCommandHandler) handlerMap.get(Type.closePipelineCommand); + } + /** * Dispatch the command to the correct handler. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 6661823f9a10..7659aa9240dc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -744,6 +744,15 @@ private void handlePipelineFailure(RaftGroupId groupId, RoleInfoProto roleInfoPr private void triggerPipelineClose(RaftGroupId groupId, String detail, ClosePipelineInfo.Reason reasonCode) { PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid()); + + if (context != null) { + if (context.isPipelineCloseInProgress(pipelineID.getId())) { + LOG.debug("Skipped triggering pipeline close for {} as it is already in progress. Reason: {}", + pipelineID.getId(), detail); + return; + } + } + ClosePipelineInfo.Builder closePipelineInfo = ClosePipelineInfo.newBuilder() .setPipelineID(pipelineID.getProtobuf()) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java index 2120615081e8..928150a2f566 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java @@ -17,6 +17,9 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.eq; @@ -31,6 +34,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -132,6 +139,50 @@ void testCommandIdempotency() throws IOException { .remove(any(), anyBoolean(), anyBoolean()); } + @Test + void testPendingPipelineClose() throws IOException, InterruptedException { + final List datanodes = getDatanodes(); + final DatanodeDetails currentDatanode = datanodes.get(0); + final PipelineID pipelineID = PipelineID.randomId(); + final UUID pipelineUUID = pipelineID.getId(); + final SCMCommand command1 = new ClosePipelineCommand(pipelineID); + final SCMCommand command2 = new ClosePipelineCommand(pipelineID); + StateContext stateContext = ContainerTestUtils.getMockContext(currentDatanode, conf); + + final boolean shouldDeleteRatisLogDirectory = true; + XceiverServerRatis writeChannel = mock(XceiverServerRatis.class); + when(ozoneContainer.getWriteChannel()).thenReturn(writeChannel); + when(writeChannel.getShouldDeleteRatisLogDirectory()).thenReturn(shouldDeleteRatisLogDirectory); + when(writeChannel.isExist(pipelineID.getProtobuf())).thenReturn(true); + Collection raftPeers = datanodes.stream() + .map(RatisHelper::toRaftPeer) + .collect(Collectors.toList()); + when(writeChannel.getServer()).thenReturn(mock(RaftServer.class)); + when(writeChannel.getServer().getId()).thenReturn(RatisHelper.toRaftPeerId(currentDatanode)); + when(writeChannel.getRaftPeersInPipeline(pipelineID)).thenReturn(raftPeers); + + lenient().doAnswer(invocation -> { + Thread.sleep(200); + return null; + }).when(writeChannel).removeGroup(pipelineID.getProtobuf()); + + ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); + + final ClosePipelineCommandHandler commandHandler = + new ClosePipelineCommandHandler((leader, tls) -> raftClient, singleThreadExecutor); + assertFalse(commandHandler.isPipelineInProgress(pipelineUUID)); + commandHandler.handle(command1, ozoneContainer, stateContext, connectionManager); + Thread.sleep(50); + commandHandler.handle(command2, ozoneContainer, stateContext, connectionManager); + + singleThreadExecutor.shutdown(); + assertTrue(singleThreadExecutor.awaitTermination(10, TimeUnit.SECONDS)); + + // Only one command should have been processed due to duplicate prevention + assertEquals(1, commandHandler.getInvocationCount()); + assertFalse(commandHandler.isPipelineInProgress(pipelineUUID)); + } + private List getDatanodes() { final DatanodeDetails dnOne = MockDatanodeDetails.randomDatanodeDetails(); final DatanodeDetails dnTwo = MockDatanodeDetails.randomDatanodeDetails(); From a9f736d3b7f75ca2c58d1791d23ec9a202974a79 Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Mon, 15 Sep 2025 17:21:44 +0530 Subject: [PATCH 2/6] Used CountDownLatch in test --- .../TestClosePipelineCommandHandler.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java index 928150a2f566..fbd6fb3fce29 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java @@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; @@ -35,6 +36,7 @@ import java.util.Collection; import java.util.List; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -161,8 +163,12 @@ void testPendingPipelineClose() throws IOException, InterruptedException { when(writeChannel.getServer().getId()).thenReturn(RatisHelper.toRaftPeerId(currentDatanode)); when(writeChannel.getRaftPeersInPipeline(pipelineID)).thenReturn(raftPeers); - lenient().doAnswer(invocation -> { - Thread.sleep(200); + CountDownLatch firstCommandStarted = new CountDownLatch(1); + CountDownLatch secondCommandSubmitted = new CountDownLatch(1); + + doAnswer(invocation -> { + firstCommandStarted.countDown(); + secondCommandSubmitted.await(); return null; }).when(writeChannel).removeGroup(pipelineID.getProtobuf()); @@ -172,8 +178,9 @@ void testPendingPipelineClose() throws IOException, InterruptedException { new ClosePipelineCommandHandler((leader, tls) -> raftClient, singleThreadExecutor); assertFalse(commandHandler.isPipelineInProgress(pipelineUUID)); commandHandler.handle(command1, ozoneContainer, stateContext, connectionManager); - Thread.sleep(50); + assertTrue(firstCommandStarted.await(5, TimeUnit.SECONDS)); commandHandler.handle(command2, ozoneContainer, stateContext, connectionManager); + secondCommandSubmitted.countDown(); singleThreadExecutor.shutdown(); assertTrue(singleThreadExecutor.awaitTermination(10, TimeUnit.SECONDS)); From 27600d2f6ff70d9c4a7270640025b44e6fb69113 Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Wed, 17 Sep 2025 13:54:25 +0530 Subject: [PATCH 3/6] Added integration test and addressed comments --- .../common/statemachine/StateContext.java | 2 +- .../ClosePipelineCommandHandler.java | 4 +- .../commandhandler/CommandDispatcher.java | 1 - .../server/ratis/XceiverServerRatis.java | 3 +- .../TestClosePipelineCommandHandler.java | 4 +- .../hdds/scm/pipeline/TestPipelineClose.java | 39 +++++++++++++++++++ 6 files changed, 46 insertions(+), 7 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index d51a8d51fae1..305b7b55a229 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -812,7 +812,7 @@ public Map getCommandQueueSummary() { public boolean isPipelineCloseInProgress(UUID pipelineID) { ClosePipelineCommandHandler handler = parentDatanodeStateMachine.getCommandDispatcher() .getClosePipelineCommandHandler(); - return handler.isPipelineInProgress(pipelineID); + return handler.isPipelineCloseInProgress(pipelineID); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java index 0611efe6858a..38924e9dcac3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java @@ -91,11 +91,11 @@ public ClosePipelineCommandHandler( } /** - * Returns true if pipeline is in progress, else false. + * Returns true if pipeline close is in progress, else false. * * @return boolean */ - public boolean isPipelineInProgress(UUID pipelineID) { + public boolean isPipelineCloseInProgress(UUID pipelineID) { return pipelinesInProgress.contains(pipelineID); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java index 63cb935cbb42..ece91ffdd1c2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java @@ -80,7 +80,6 @@ public CommandHandler getDeleteBlocksCommandHandler() { return handlerMap.get(Type.deleteBlocksCommand); } - @VisibleForTesting public ClosePipelineCommandHandler getClosePipelineCommandHandler() { return (ClosePipelineCommandHandler) handlerMap.get(Type.closePipelineCommand); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 7659aa9240dc..02167d750b04 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -741,7 +741,8 @@ private void handlePipelineFailure(RaftGroupId groupId, RoleInfoProto roleInfoPr triggerPipelineClose(groupId, b.toString(), ClosePipelineInfo.Reason.PIPELINE_FAILED); } - private void triggerPipelineClose(RaftGroupId groupId, String detail, + @VisibleForTesting + public void triggerPipelineClose(RaftGroupId groupId, String detail, ClosePipelineInfo.Reason reasonCode) { PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid()); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java index fbd6fb3fce29..70744efbbed7 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java @@ -176,7 +176,7 @@ void testPendingPipelineClose() throws IOException, InterruptedException { final ClosePipelineCommandHandler commandHandler = new ClosePipelineCommandHandler((leader, tls) -> raftClient, singleThreadExecutor); - assertFalse(commandHandler.isPipelineInProgress(pipelineUUID)); + assertFalse(commandHandler.isPipelineCloseInProgress(pipelineUUID)); commandHandler.handle(command1, ozoneContainer, stateContext, connectionManager); assertTrue(firstCommandStarted.await(5, TimeUnit.SECONDS)); commandHandler.handle(command2, ozoneContainer, stateContext, connectionManager); @@ -187,7 +187,7 @@ void testPendingPipelineClose() throws IOException, InterruptedException { // Only one command should have been processed due to duplicate prevention assertEquals(1, commandHandler.getInvocationCount()); - assertFalse(commandHandler.isPipelineInProgress(pipelineUUID)); + assertFalse(commandHandler.isPipelineCloseInProgress(pipelineUUID)); } private List getDatanodes() { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java index 1763d9e269b2..06e7250f5346 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -27,11 +27,13 @@ import static org.mockito.Mockito.verify; import java.io.IOException; +import java.lang.reflect.Field; import java.util.List; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -39,6 +41,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.ScmConfigKeys; @@ -56,6 +59,8 @@ import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ClosePipelineCommandHandler; import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.ozone.test.GenericTestUtils; @@ -66,6 +71,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.mockito.ArgumentCaptor; +import org.slf4j.event.Level; /** * Tests for Pipeline Closing. @@ -256,6 +262,39 @@ void testPipelineCloseWithLogFailure() verifyCloseForPipeline(openPipeline, actionsFromDatanode); } + @Test + @SuppressWarnings("unchecked") + void testDuplicatePipelineClosePrevention() throws Exception { + DatanodeStateMachine datanodeStateMachine = cluster.getHddsDatanodes().get(0).getDatanodeStateMachine(); + XceiverServerRatis xceiverRatis = (XceiverServerRatis) datanodeStateMachine.getContainer().getWriteChannel(); + + GenericTestUtils.setLogLevel(XceiverServerRatis.class, Level.DEBUG); + GenericTestUtils.LogCapturer xceiverLogCapturer = + GenericTestUtils.LogCapturer.captureLogs(XceiverServerRatis.class); + + RaftGroupId groupId = RaftGroupId.valueOf(ratisContainer.getPipeline().getId().getId()); + PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid()); + + ClosePipelineCommandHandler handler = datanodeStateMachine.getCommandDispatcher().getClosePipelineCommandHandler(); + + Field pipelinesInProgressField = handler.getClass().getDeclaredField("pipelinesInProgress"); + pipelinesInProgressField.setAccessible(true); + Set pipelinesInProgress = (Set) pipelinesInProgressField.get(handler); + pipelinesInProgress.add(pipelineID.getId()); + + String detail = "test duplicate trigger "; + int numOfDuplicateTriggers = 10; + for (int i = 1; i <= numOfDuplicateTriggers; i++) { + xceiverRatis.triggerPipelineClose(groupId, detail + i, ClosePipelineInfo.Reason.PIPELINE_FAILED); + } + + String xceiverLogs = xceiverLogCapturer.getOutput(); + int skippedCount = StringUtils.countMatches(xceiverLogs.toLowerCase(), "skipped triggering pipeline close"); + assertEquals(numOfDuplicateTriggers, skippedCount); + + xceiverLogCapturer.stopCapturing(); + } + private boolean verifyCloseForPipeline(Pipeline pipeline, PipelineActionsFromDatanode report) { UUID uuidToFind = pipeline.getId().getId(); From dce1e89885ed02ad0a3c6c07abf9df90405d01c8 Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Wed, 17 Sep 2025 14:12:16 +0530 Subject: [PATCH 4/6] Use org.apache.commons.lang3.StringUtils --- .../org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java index 06e7250f5346..8c80e5f871f7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -33,7 +33,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; From 8bbb9f1dce19b030cbc771c29eff9245666e2b82 Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Wed, 17 Sep 2025 16:14:15 +0530 Subject: [PATCH 5/6] Fixed TestPipelineClose test --- .../hdds/scm/pipeline/TestPipelineClose.java | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java index 8c80e5f871f7..23e4716db155 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -264,7 +264,7 @@ void testPipelineCloseWithLogFailure() @Test @SuppressWarnings("unchecked") - void testDuplicatePipelineClosePrevention() throws Exception { + void testPipelineCloseTriggersSkippedWhenAlreadyInProgress() throws Exception { DatanodeStateMachine datanodeStateMachine = cluster.getHddsDatanodes().get(0).getDatanodeStateMachine(); XceiverServerRatis xceiverRatis = (XceiverServerRatis) datanodeStateMachine.getContainer().getWriteChannel(); @@ -280,19 +280,23 @@ void testDuplicatePipelineClosePrevention() throws Exception { Field pipelinesInProgressField = handler.getClass().getDeclaredField("pipelinesInProgress"); pipelinesInProgressField.setAccessible(true); Set pipelinesInProgress = (Set) pipelinesInProgressField.get(handler); - pipelinesInProgress.add(pipelineID.getId()); - String detail = "test duplicate trigger "; - int numOfDuplicateTriggers = 10; - for (int i = 1; i <= numOfDuplicateTriggers; i++) { - xceiverRatis.triggerPipelineClose(groupId, detail + i, ClosePipelineInfo.Reason.PIPELINE_FAILED); - } + try { + pipelinesInProgress.add(pipelineID.getId()); - String xceiverLogs = xceiverLogCapturer.getOutput(); - int skippedCount = StringUtils.countMatches(xceiverLogs.toLowerCase(), "skipped triggering pipeline close"); - assertEquals(numOfDuplicateTriggers, skippedCount); - - xceiverLogCapturer.stopCapturing(); + String detail = "test duplicate trigger "; + int numOfDuplicateTriggers = 10; + for (int i = 1; i <= numOfDuplicateTriggers; i++) { + xceiverRatis.triggerPipelineClose(groupId, detail + i, ClosePipelineInfo.Reason.PIPELINE_FAILED); + } + + String xceiverLogs = xceiverLogCapturer.getOutput(); + int skippedCount = StringUtils.countMatches(xceiverLogs.toLowerCase(), "skipped triggering pipeline close"); + assertEquals(numOfDuplicateTriggers, skippedCount); + } finally { + pipelinesInProgress.remove(pipelineID.getId()); + xceiverLogCapturer.stopCapturing(); + } } private boolean verifyCloseForPipeline(Pipeline pipeline, From ccc95cde13e1d1d2a41c0b45c858997a2478f0fc Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Wed, 17 Sep 2025 20:51:48 +0530 Subject: [PATCH 6/6] Fixed flakiness in TestPipelineClose --- .../hadoop/hdds/scm/pipeline/TestPipelineClose.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java index 23e4716db155..1e6aa4e04bcd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -265,6 +265,12 @@ void testPipelineCloseWithLogFailure() @Test @SuppressWarnings("unchecked") void testPipelineCloseTriggersSkippedWhenAlreadyInProgress() throws Exception { + ContainerInfo allocateContainer = containerManager + .allocateContainer(RatisReplicationConfig.getInstance( + ReplicationFactor.THREE), "newTestOwner"); + ContainerWithPipeline containerWithPipeline = new ContainerWithPipeline(allocateContainer, + pipelineManager.getPipeline(allocateContainer.getPipelineID())); + DatanodeStateMachine datanodeStateMachine = cluster.getHddsDatanodes().get(0).getDatanodeStateMachine(); XceiverServerRatis xceiverRatis = (XceiverServerRatis) datanodeStateMachine.getContainer().getWriteChannel(); @@ -272,7 +278,7 @@ void testPipelineCloseTriggersSkippedWhenAlreadyInProgress() throws Exception { GenericTestUtils.LogCapturer xceiverLogCapturer = GenericTestUtils.LogCapturer.captureLogs(XceiverServerRatis.class); - RaftGroupId groupId = RaftGroupId.valueOf(ratisContainer.getPipeline().getId().getId()); + RaftGroupId groupId = RaftGroupId.valueOf(containerWithPipeline.getPipeline().getId().getId()); PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid()); ClosePipelineCommandHandler handler = datanodeStateMachine.getCommandDispatcher().getClosePipelineCommandHandler(); @@ -296,6 +302,9 @@ void testPipelineCloseTriggersSkippedWhenAlreadyInProgress() throws Exception { } finally { pipelinesInProgress.remove(pipelineID.getId()); xceiverLogCapturer.stopCapturing(); + + pipelineManager.closePipeline(containerWithPipeline.getPipeline().getId()); + pipelineManager.deletePipeline(containerWithPipeline.getPipeline().getId()); } }