diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 4060902dd22e..529a536d0b3d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -532,12 +532,14 @@ public List getPendingContainerAction( * * @param pipelineAction PipelineAction to be added */ - public void addPipelineActionIfAbsent(PipelineAction pipelineAction) { + public boolean addPipelineActionIfAbsent(PipelineAction pipelineAction) { // Put only if the pipeline id with the same action is absent. final PipelineKey key = new PipelineKey(pipelineAction); + boolean added = false; for (InetSocketAddress endpoint : endpoints) { - pipelineActions.get(endpoint).putIfAbsent(key, pipelineAction); + added = pipelineActions.get(endpoint).putIfAbsent(key, pipelineAction) || added; } + return added; } /** @@ -958,9 +960,9 @@ synchronized int size() { return map.size(); } - synchronized void putIfAbsent(PipelineKey key, + synchronized boolean putIfAbsent(PipelineKey key, PipelineAction pipelineAction) { - map.putIfAbsent(key, pipelineAction); + return map.putIfAbsent(key, pipelineAction) == null; } synchronized List getActions(List reports, diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 88cb0c78fcde..5c5b801f8461 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -754,7 +754,11 @@ private void triggerPipelineClose(RaftGroupId groupId, String detail, .setAction(PipelineAction.Action.CLOSE) .build(); if (context != null) { - context.addPipelineActionIfAbsent(action); + if (context.addPipelineActionIfAbsent(action)) { + LOG.warn("pipeline Action {} on pipeline {}.Reason : {}", + action.getAction(), pipelineID, + action.getClosePipeline().getDetailedReason()); + } if (!activePipelines.get(groupId).isPendingClose()) { // if pipeline close action has not been triggered before, we need trigger pipeline close immediately to // prevent SCM to allocate blocks on the failed pipeline @@ -763,9 +767,6 @@ private void triggerPipelineClose(RaftGroupId groupId, String detail, (key, value) -> new ActivePipelineContext(value.isPipelineLeader(), true)); } } - LOG.error("pipeline Action {} on pipeline {}.Reason : {}", - action.getAction(), pipelineID, - action.getClosePipeline().getDetailedReason()); } @Override diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java index 25caa7e0d64d..a4c777a15d27 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java @@ -18,11 +18,13 @@ package org.apache.hadoop.hdds.scm.pipeline; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; import java.time.Duration; import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig; @@ -31,6 +33,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -46,6 +49,8 @@ public class TestNodeFailure { private static PipelineManager pipelineManager; private static int timeForFailure; + private static final String FLOOD_TOKEN = "pipeline Action CLOSE"; + /** * Create a MiniDFSCluster for testing. * @@ -91,6 +96,7 @@ public static void shutdown() { @Test public void testPipelineFail() { + GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.captureLogs(XceiverServerRatis.class); ratisPipelines.forEach(pipeline -> { try { waitForPipelineCreation(pipeline.getId()); @@ -107,6 +113,9 @@ public void testPipelineFail() { fail("Test Failed: " + e.getMessage()); } }); + logCapturer.stopCapturing(); + int occurrences = StringUtils.countMatches(logCapturer.getOutput(), FLOOD_TOKEN); + assertThat(occurrences).isEqualTo(2); } /**