From 0c6ec377b317140376eaf40f02256e1f51ec1001 Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Wed, 23 Apr 2025 12:45:13 +0530 Subject: [PATCH 1/6] HDDS-11141. skipping logs for closed pipelines --- .../server/ratis/XceiverServerRatis.java | 6 +- .../pipeline/TestPipelineCloseLogsFlood.java | 103 ++++++++++++++++++ 2 files changed, 106 insertions(+), 3 deletions(-) create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineCloseLogsFlood.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 88cb0c78fcde..f8ee031e23f7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -761,11 +761,11 @@ private void triggerPipelineClose(RaftGroupId groupId, String detail, context.getParent().triggerHeartbeat(); activePipelines.computeIfPresent(groupId, (key, value) -> new ActivePipelineContext(value.isPipelineLeader(), true)); - } - } - LOG.error("pipeline Action {} on pipeline {}.Reason : {}", + LOG.error("pipeline Action {} on pipeline {}.Reason : {}", action.getAction(), pipelineID, action.getClosePipeline().getDetailedReason()); + } + } } @Override diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineCloseLogsFlood.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineCloseLogsFlood.java new file mode 100644 index 000000000000..cd5b450ccb5c --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineCloseLogsFlood.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.event.Level; + +/** + * Tests pipeline close logs. + */ +@Timeout(300) +public class TestPipelineCloseLogsFlood { + private static final String FLOOD_TOKEN = "pipeline Action CLOSE"; + private static final String DATANODE_SLOWNESS_TIMEOUT = "hdds.ratis.raft.server.rpc.slowness.timeout"; + private static final String NO_LEADER_TIMEOUT = "hdds.ratis.raft.server.notification.no-leader.timeout"; + private static final String VOLUME_NAME = "vol1"; + private static final String BUCKET_NAME = "bucket1"; + + private MiniOzoneCluster cluster; + private OzoneClient client; + private OzoneConfiguration conf; + + @BeforeEach + public void setUp() throws Exception { + conf = new OzoneConfiguration(); + // Make follower‑slowness detection fire quickly so that the log floods + conf.setTimeDuration(DATANODE_SLOWNESS_TIMEOUT, 10, TimeUnit.SECONDS); + conf.setTimeDuration(NO_LEADER_TIMEOUT, 10, TimeUnit.SECONDS); + + MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(3); + cluster = builder.build(); + cluster.waitForClusterToBeReady(); + client = OzoneClientFactory.getRpcClient(conf); + } + + @AfterEach + public void tearDown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + client.close(); + } + + @Test + public void testPipelineCloseLogFloodDoesntOccur() throws Exception { + GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.captureLogs(XceiverServerRatis.class); + GenericTestUtils.setLogLevel(XceiverServerRatis.class, Level.ERROR); + + client.getObjectStore().createVolume(VOLUME_NAME); + client.getObjectStore().getVolume(VOLUME_NAME).createBucket(BUCKET_NAME); + OzoneBucket ozoneBucket = client.getObjectStore().getVolume(VOLUME_NAME).getBucket(BUCKET_NAME); + ReplicationConfig config = ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.THREE); + + try (OutputStream out = ozoneBucket.createKey("key", 1024, config, new HashMap<>())) { + out.write(new byte[1024]); + } + // Kill one follower DN so that the pipeline becomes unhealthy + cluster.shutdownHddsDatanode(1); + // Wait (30 sec > a few heartbeat cycles) for multiple slowness callbacks + Thread.sleep(30_000L); + logCapturer.stopCapturing(); + int occurrences = StringUtils.countMatches(logCapturer.getOutput(), FLOOD_TOKEN); + // We expect many duplicates when the bug is present. A threshold of 5 is safe. + assertThat(occurrences).isGreaterThan(0); + assertThat(occurrences).isLessThan(5); + } +} From dc6bcd1c449b8a7fa797da154b5ba1af6527c742 Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Fri, 25 Apr 2025 19:59:29 +0530 Subject: [PATCH 2/6] Addressing review comments. --- .../transport/server/ratis/XceiverServerRatis.java | 2 +- .../hdds/scm/pipeline/TestPipelineCloseLogsFlood.java | 11 ++++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index f8ee031e23f7..df365b939f5d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -761,7 +761,7 @@ private void triggerPipelineClose(RaftGroupId groupId, String detail, context.getParent().triggerHeartbeat(); activePipelines.computeIfPresent(groupId, (key, value) -> new ActivePipelineContext(value.isPipelineLeader(), true)); - LOG.error("pipeline Action {} on pipeline {}.Reason : {}", + LOG.warn("pipeline Action {} on pipeline {}.Reason : {}", action.getAction(), pipelineID, action.getClosePipeline().getDetailedReason()); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineCloseLogsFlood.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineCloseLogsFlood.java index cd5b450ccb5c..12303984366c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineCloseLogsFlood.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineCloseLogsFlood.java @@ -58,7 +58,6 @@ public class TestPipelineCloseLogsFlood { @BeforeEach public void setUp() throws Exception { conf = new OzoneConfiguration(); - // Make follower‑slowness detection fire quickly so that the log floods conf.setTimeDuration(DATANODE_SLOWNESS_TIMEOUT, 10, TimeUnit.SECONDS); conf.setTimeDuration(NO_LEADER_TIMEOUT, 10, TimeUnit.SECONDS); @@ -80,7 +79,7 @@ public void tearDown() throws IOException { @Test public void testPipelineCloseLogFloodDoesntOccur() throws Exception { GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.captureLogs(XceiverServerRatis.class); - GenericTestUtils.setLogLevel(XceiverServerRatis.class, Level.ERROR); + GenericTestUtils.setLogLevel(XceiverServerRatis.class, Level.WARN); client.getObjectStore().createVolume(VOLUME_NAME); client.getObjectStore().getVolume(VOLUME_NAME).createBucket(BUCKET_NAME); @@ -92,12 +91,10 @@ public void testPipelineCloseLogFloodDoesntOccur() throws Exception { } // Kill one follower DN so that the pipeline becomes unhealthy cluster.shutdownHddsDatanode(1); - // Wait (30 sec > a few heartbeat cycles) for multiple slowness callbacks - Thread.sleep(30_000L); + Thread.sleep(15_000L); logCapturer.stopCapturing(); int occurrences = StringUtils.countMatches(logCapturer.getOutput(), FLOOD_TOKEN); - // We expect many duplicates when the bug is present. A threshold of 5 is safe. - assertThat(occurrences).isGreaterThan(0); - assertThat(occurrences).isLessThan(5); + //Follower slowness will happen for 2 pipelines since we are shutting down one node + assertThat(occurrences).isEqualTo(2); } } From aec2b5bb532952de562905e4d497e3c21f729630 Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Mon, 28 Apr 2025 11:43:59 +0530 Subject: [PATCH 3/6] addressing review comments --- .../pipeline/TestPipelineCloseLogsFlood.java | 32 +++++++------------ 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineCloseLogsFlood.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineCloseLogsFlood.java index 12303984366c..e52658b073f3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineCloseLogsFlood.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineCloseLogsFlood.java @@ -20,15 +20,12 @@ import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; -import java.io.OutputStream; -import java.util.HashMap; -import java.util.concurrent.TimeUnit; +import java.time.Duration; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hdds.client.ReplicationConfig; -import org.apache.hadoop.hdds.client.ReplicationFactor; -import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.TestDataUtil; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; @@ -38,7 +35,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -import org.slf4j.event.Level; /** * Tests pipeline close logs. @@ -46,21 +42,20 @@ @Timeout(300) public class TestPipelineCloseLogsFlood { private static final String FLOOD_TOKEN = "pipeline Action CLOSE"; - private static final String DATANODE_SLOWNESS_TIMEOUT = "hdds.ratis.raft.server.rpc.slowness.timeout"; - private static final String NO_LEADER_TIMEOUT = "hdds.ratis.raft.server.notification.no-leader.timeout"; private static final String VOLUME_NAME = "vol1"; private static final String BUCKET_NAME = "bucket1"; + private static final String KEY_NAME = "key1"; private MiniOzoneCluster cluster; private OzoneClient client; - private OzoneConfiguration conf; @BeforeEach public void setUp() throws Exception { - conf = new OzoneConfiguration(); - conf.setTimeDuration(DATANODE_SLOWNESS_TIMEOUT, 10, TimeUnit.SECONDS); - conf.setTimeDuration(NO_LEADER_TIMEOUT, 10, TimeUnit.SECONDS); - + OzoneConfiguration conf = new OzoneConfiguration(); + DatanodeRatisServerConfig ratisServerConfig = conf.getObject(DatanodeRatisServerConfig.class); + ratisServerConfig.setFollowerSlownessTimeout(Duration.ofSeconds(10)); + ratisServerConfig.setNoLeaderTimeout(Duration.ofMinutes(5)); + conf.setFromObject(ratisServerConfig); MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(3); cluster = builder.build(); @@ -79,19 +74,16 @@ public void tearDown() throws IOException { @Test public void testPipelineCloseLogFloodDoesntOccur() throws Exception { GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.captureLogs(XceiverServerRatis.class); - GenericTestUtils.setLogLevel(XceiverServerRatis.class, Level.WARN); client.getObjectStore().createVolume(VOLUME_NAME); client.getObjectStore().getVolume(VOLUME_NAME).createBucket(BUCKET_NAME); OzoneBucket ozoneBucket = client.getObjectStore().getVolume(VOLUME_NAME).getBucket(BUCKET_NAME); - ReplicationConfig config = ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.THREE); - try (OutputStream out = ozoneBucket.createKey("key", 1024, config, new HashMap<>())) { - out.write(new byte[1024]); - } + TestDataUtil.createKey(ozoneBucket, KEY_NAME, new byte[1024]); // Kill one follower DN so that the pipeline becomes unhealthy cluster.shutdownHddsDatanode(1); - Thread.sleep(15_000L); + // wait for time > follower slowness timeout + Thread.sleep(13_000); logCapturer.stopCapturing(); int occurrences = StringUtils.countMatches(logCapturer.getOutput(), FLOOD_TOKEN); //Follower slowness will happen for 2 pipelines since we are shutting down one node From 6a90b2072dbefd079d5b26abbca2cac351c1ad07 Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Wed, 30 Apr 2025 09:45:34 +0530 Subject: [PATCH 4/6] log only if addPipelineActionIfAbsent return true --- .../container/common/statemachine/StateContext.java | 10 ++++++---- .../transport/server/ratis/XceiverServerRatis.java | 9 +++++---- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 4060902dd22e..306fbf1c9dec 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -532,12 +532,14 @@ public List getPendingContainerAction( * * @param pipelineAction PipelineAction to be added */ - public void addPipelineActionIfAbsent(PipelineAction pipelineAction) { + public boolean addPipelineActionIfAbsent(PipelineAction pipelineAction) { // Put only if the pipeline id with the same action is absent. final PipelineKey key = new PipelineKey(pipelineAction); + boolean added = false; for (InetSocketAddress endpoint : endpoints) { - pipelineActions.get(endpoint).putIfAbsent(key, pipelineAction); + added = added || pipelineActions.get(endpoint).putIfAbsent(key, pipelineAction); } + return added; } /** @@ -958,9 +960,9 @@ synchronized int size() { return map.size(); } - synchronized void putIfAbsent(PipelineKey key, + synchronized boolean putIfAbsent(PipelineKey key, PipelineAction pipelineAction) { - map.putIfAbsent(key, pipelineAction); + return map.putIfAbsent(key, pipelineAction) == null; } synchronized List getActions(List reports, diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index df365b939f5d..5c5b801f8461 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -754,16 +754,17 @@ private void triggerPipelineClose(RaftGroupId groupId, String detail, .setAction(PipelineAction.Action.CLOSE) .build(); if (context != null) { - context.addPipelineActionIfAbsent(action); + if (context.addPipelineActionIfAbsent(action)) { + LOG.warn("pipeline Action {} on pipeline {}.Reason : {}", + action.getAction(), pipelineID, + action.getClosePipeline().getDetailedReason()); + } if (!activePipelines.get(groupId).isPendingClose()) { // if pipeline close action has not been triggered before, we need trigger pipeline close immediately to // prevent SCM to allocate blocks on the failed pipeline context.getParent().triggerHeartbeat(); activePipelines.computeIfPresent(groupId, (key, value) -> new ActivePipelineContext(value.isPipelineLeader(), true)); - LOG.warn("pipeline Action {} on pipeline {}.Reason : {}", - action.getAction(), pipelineID, - action.getClosePipeline().getDetailedReason()); } } } From 67324617fac279ac56746195f1c96fd885444073 Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Fri, 2 May 2025 10:44:06 +0530 Subject: [PATCH 5/6] Adding test to existing testcase to avoid cluster creation --- .../hdds/scm/pipeline/TestNodeFailure.java | 9 ++ .../pipeline/TestPipelineCloseLogsFlood.java | 92 ------------------- 2 files changed, 9 insertions(+), 92 deletions(-) delete mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineCloseLogsFlood.java diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java index 25caa7e0d64d..a4c777a15d27 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java @@ -18,11 +18,13 @@ package org.apache.hadoop.hdds.scm.pipeline; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; import java.time.Duration; import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig; @@ -31,6 +33,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -46,6 +49,8 @@ public class TestNodeFailure { private static PipelineManager pipelineManager; private static int timeForFailure; + private static final String FLOOD_TOKEN = "pipeline Action CLOSE"; + /** * Create a MiniDFSCluster for testing. * @@ -91,6 +96,7 @@ public static void shutdown() { @Test public void testPipelineFail() { + GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.captureLogs(XceiverServerRatis.class); ratisPipelines.forEach(pipeline -> { try { waitForPipelineCreation(pipeline.getId()); @@ -107,6 +113,9 @@ public void testPipelineFail() { fail("Test Failed: " + e.getMessage()); } }); + logCapturer.stopCapturing(); + int occurrences = StringUtils.countMatches(logCapturer.getOutput(), FLOOD_TOKEN); + assertThat(occurrences).isEqualTo(2); } /** diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineCloseLogsFlood.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineCloseLogsFlood.java deleted file mode 100644 index e52658b073f3..000000000000 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineCloseLogsFlood.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.pipeline; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.io.IOException; -import java.time.Duration; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.TestDataUtil; -import org.apache.hadoop.ozone.client.OzoneBucket; -import org.apache.hadoop.ozone.client.OzoneClient; -import org.apache.hadoop.ozone.client.OzoneClientFactory; -import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; -import org.apache.ozone.test.GenericTestUtils; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; - -/** - * Tests pipeline close logs. - */ -@Timeout(300) -public class TestPipelineCloseLogsFlood { - private static final String FLOOD_TOKEN = "pipeline Action CLOSE"; - private static final String VOLUME_NAME = "vol1"; - private static final String BUCKET_NAME = "bucket1"; - private static final String KEY_NAME = "key1"; - - private MiniOzoneCluster cluster; - private OzoneClient client; - - @BeforeEach - public void setUp() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - DatanodeRatisServerConfig ratisServerConfig = conf.getObject(DatanodeRatisServerConfig.class); - ratisServerConfig.setFollowerSlownessTimeout(Duration.ofSeconds(10)); - ratisServerConfig.setNoLeaderTimeout(Duration.ofMinutes(5)); - conf.setFromObject(ratisServerConfig); - MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(3); - cluster = builder.build(); - cluster.waitForClusterToBeReady(); - client = OzoneClientFactory.getRpcClient(conf); - } - - @AfterEach - public void tearDown() throws IOException { - if (cluster != null) { - cluster.shutdown(); - } - client.close(); - } - - @Test - public void testPipelineCloseLogFloodDoesntOccur() throws Exception { - GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.captureLogs(XceiverServerRatis.class); - - client.getObjectStore().createVolume(VOLUME_NAME); - client.getObjectStore().getVolume(VOLUME_NAME).createBucket(BUCKET_NAME); - OzoneBucket ozoneBucket = client.getObjectStore().getVolume(VOLUME_NAME).getBucket(BUCKET_NAME); - - TestDataUtil.createKey(ozoneBucket, KEY_NAME, new byte[1024]); - // Kill one follower DN so that the pipeline becomes unhealthy - cluster.shutdownHddsDatanode(1); - // wait for time > follower slowness timeout - Thread.sleep(13_000); - logCapturer.stopCapturing(); - int occurrences = StringUtils.countMatches(logCapturer.getOutput(), FLOOD_TOKEN); - //Follower slowness will happen for 2 pipelines since we are shutting down one node - assertThat(occurrences).isEqualTo(2); - } -} From 8d2e9d184db9a50158f8eedc252e494f90241a79 Mon Sep 17 00:00:00 2001 From: Priyesh Karatha <35779060+priyeshkaratha@users.noreply.github.com> Date: Fri, 2 May 2025 12:51:26 +0530 Subject: [PATCH 6/6] Update hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java Co-authored-by: Doroszlai, Attila <6454655+adoroszlai@users.noreply.github.com> --- .../ozone/container/common/statemachine/StateContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 306fbf1c9dec..529a536d0b3d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -537,7 +537,7 @@ public boolean addPipelineActionIfAbsent(PipelineAction pipelineAction) { final PipelineKey key = new PipelineKey(pipelineAction); boolean added = false; for (InetSocketAddress endpoint : endpoints) { - added = added || pipelineActions.get(endpoint).putIfAbsent(key, pipelineAction); + added = pipelineActions.get(endpoint).putIfAbsent(key, pipelineAction) || added; } return added; }