diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 3b7fd7efe037..305b7b55a229 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -43,6 +43,7 @@ import java.util.OptionalLong; import java.util.Queue; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -70,6 +71,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ClosePipelineCommandHandler; import org.apache.hadoop.ozone.container.common.states.DatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState; @@ -807,6 +809,12 @@ public Map getCommandQueueSummary() { return summary; } + public boolean isPipelineCloseInProgress(UUID pipelineID) { + ClosePipelineCommandHandler handler = parentDatanodeStateMachine.getCommandDispatcher() + .getClosePipelineCommandHandler(); + return handler.isPipelineCloseInProgress(pipelineID); + } + /** * Returns the count of the Execution. * @return long diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java index 2e392ccf6639..38924e9dcac3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java @@ -90,6 +90,15 @@ public ClosePipelineCommandHandler( this.pipelinesInProgress = ConcurrentHashMap.newKeySet(); } + /** + * Returns true if pipeline close is in progress, else false. + * + * @return boolean + */ + public boolean isPipelineCloseInProgress(UUID pipelineID) { + return pipelinesInProgress.contains(pipelineID); + } + /** * Handles a given SCM command. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java index 696b04defe36..ece91ffdd1c2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java @@ -80,6 +80,10 @@ public CommandHandler getDeleteBlocksCommandHandler() { return handlerMap.get(Type.deleteBlocksCommand); } + public ClosePipelineCommandHandler getClosePipelineCommandHandler() { + return (ClosePipelineCommandHandler) handlerMap.get(Type.closePipelineCommand); + } + /** * Dispatch the command to the correct handler. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 6661823f9a10..02167d750b04 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -741,9 +741,19 @@ private void handlePipelineFailure(RaftGroupId groupId, RoleInfoProto roleInfoPr triggerPipelineClose(groupId, b.toString(), ClosePipelineInfo.Reason.PIPELINE_FAILED); } - private void triggerPipelineClose(RaftGroupId groupId, String detail, + @VisibleForTesting + public void triggerPipelineClose(RaftGroupId groupId, String detail, ClosePipelineInfo.Reason reasonCode) { PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid()); + + if (context != null) { + if (context.isPipelineCloseInProgress(pipelineID.getId())) { + LOG.debug("Skipped triggering pipeline close for {} as it is already in progress. Reason: {}", + pipelineID.getId(), detail); + return; + } + } + ClosePipelineInfo.Builder closePipelineInfo = ClosePipelineInfo.newBuilder() .setPipelineID(pipelineID.getProtobuf()) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java index 2120615081e8..70744efbbed7 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java @@ -17,8 +17,12 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; @@ -31,6 +35,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -132,6 +141,55 @@ void testCommandIdempotency() throws IOException { .remove(any(), anyBoolean(), anyBoolean()); } + @Test + void testPendingPipelineClose() throws IOException, InterruptedException { + final List datanodes = getDatanodes(); + final DatanodeDetails currentDatanode = datanodes.get(0); + final PipelineID pipelineID = PipelineID.randomId(); + final UUID pipelineUUID = pipelineID.getId(); + final SCMCommand command1 = new ClosePipelineCommand(pipelineID); + final SCMCommand command2 = new ClosePipelineCommand(pipelineID); + StateContext stateContext = ContainerTestUtils.getMockContext(currentDatanode, conf); + + final boolean shouldDeleteRatisLogDirectory = true; + XceiverServerRatis writeChannel = mock(XceiverServerRatis.class); + when(ozoneContainer.getWriteChannel()).thenReturn(writeChannel); + when(writeChannel.getShouldDeleteRatisLogDirectory()).thenReturn(shouldDeleteRatisLogDirectory); + when(writeChannel.isExist(pipelineID.getProtobuf())).thenReturn(true); + Collection raftPeers = datanodes.stream() + .map(RatisHelper::toRaftPeer) + .collect(Collectors.toList()); + when(writeChannel.getServer()).thenReturn(mock(RaftServer.class)); + when(writeChannel.getServer().getId()).thenReturn(RatisHelper.toRaftPeerId(currentDatanode)); + when(writeChannel.getRaftPeersInPipeline(pipelineID)).thenReturn(raftPeers); + + CountDownLatch firstCommandStarted = new CountDownLatch(1); + CountDownLatch secondCommandSubmitted = new CountDownLatch(1); + + doAnswer(invocation -> { + firstCommandStarted.countDown(); + secondCommandSubmitted.await(); + return null; + }).when(writeChannel).removeGroup(pipelineID.getProtobuf()); + + ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); + + final ClosePipelineCommandHandler commandHandler = + new ClosePipelineCommandHandler((leader, tls) -> raftClient, singleThreadExecutor); + assertFalse(commandHandler.isPipelineCloseInProgress(pipelineUUID)); + commandHandler.handle(command1, ozoneContainer, stateContext, connectionManager); + assertTrue(firstCommandStarted.await(5, TimeUnit.SECONDS)); + commandHandler.handle(command2, ozoneContainer, stateContext, connectionManager); + secondCommandSubmitted.countDown(); + + singleThreadExecutor.shutdown(); + assertTrue(singleThreadExecutor.awaitTermination(10, TimeUnit.SECONDS)); + + // Only one command should have been processed due to duplicate prevention + assertEquals(1, commandHandler.getInvocationCount()); + assertFalse(commandHandler.isPipelineCloseInProgress(pipelineUUID)); + } + private List getDatanodes() { final DatanodeDetails dnOne = MockDatanodeDetails.randomDatanodeDetails(); final DatanodeDetails dnTwo = MockDatanodeDetails.randomDatanodeDetails(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java index 1763d9e269b2..1e6aa4e04bcd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -27,11 +27,13 @@ import static org.mockito.Mockito.verify; import java.io.IOException; +import java.lang.reflect.Field; import java.util.List; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -39,6 +41,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.ScmConfigKeys; @@ -56,6 +59,8 @@ import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ClosePipelineCommandHandler; import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.ozone.test.GenericTestUtils; @@ -66,6 +71,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.mockito.ArgumentCaptor; +import org.slf4j.event.Level; /** * Tests for Pipeline Closing. @@ -256,6 +262,52 @@ void testPipelineCloseWithLogFailure() verifyCloseForPipeline(openPipeline, actionsFromDatanode); } + @Test + @SuppressWarnings("unchecked") + void testPipelineCloseTriggersSkippedWhenAlreadyInProgress() throws Exception { + ContainerInfo allocateContainer = containerManager + .allocateContainer(RatisReplicationConfig.getInstance( + ReplicationFactor.THREE), "newTestOwner"); + ContainerWithPipeline containerWithPipeline = new ContainerWithPipeline(allocateContainer, + pipelineManager.getPipeline(allocateContainer.getPipelineID())); + + DatanodeStateMachine datanodeStateMachine = cluster.getHddsDatanodes().get(0).getDatanodeStateMachine(); + XceiverServerRatis xceiverRatis = (XceiverServerRatis) datanodeStateMachine.getContainer().getWriteChannel(); + + GenericTestUtils.setLogLevel(XceiverServerRatis.class, Level.DEBUG); + GenericTestUtils.LogCapturer xceiverLogCapturer = + GenericTestUtils.LogCapturer.captureLogs(XceiverServerRatis.class); + + RaftGroupId groupId = RaftGroupId.valueOf(containerWithPipeline.getPipeline().getId().getId()); + PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid()); + + ClosePipelineCommandHandler handler = datanodeStateMachine.getCommandDispatcher().getClosePipelineCommandHandler(); + + Field pipelinesInProgressField = handler.getClass().getDeclaredField("pipelinesInProgress"); + pipelinesInProgressField.setAccessible(true); + Set pipelinesInProgress = (Set) pipelinesInProgressField.get(handler); + + try { + pipelinesInProgress.add(pipelineID.getId()); + + String detail = "test duplicate trigger "; + int numOfDuplicateTriggers = 10; + for (int i = 1; i <= numOfDuplicateTriggers; i++) { + xceiverRatis.triggerPipelineClose(groupId, detail + i, ClosePipelineInfo.Reason.PIPELINE_FAILED); + } + + String xceiverLogs = xceiverLogCapturer.getOutput(); + int skippedCount = StringUtils.countMatches(xceiverLogs.toLowerCase(), "skipped triggering pipeline close"); + assertEquals(numOfDuplicateTriggers, skippedCount); + } finally { + pipelinesInProgress.remove(pipelineID.getId()); + xceiverLogCapturer.stopCapturing(); + + pipelineManager.closePipeline(containerWithPipeline.getPipeline().getId()); + pipelineManager.deletePipeline(containerWithPipeline.getPipeline().getId()); + } + } + private boolean verifyCloseForPipeline(Pipeline pipeline, PipelineActionsFromDatanode report) { UUID uuidToFind = pipeline.getId().getId();