Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -532,12 +532,14 @@ public List<ContainerAction> getPendingContainerAction(
*
* @param pipelineAction PipelineAction to be added
*/
public void addPipelineActionIfAbsent(PipelineAction pipelineAction) {
public boolean addPipelineActionIfAbsent(PipelineAction pipelineAction) {
// Put only if the pipeline id with the same action is absent.
final PipelineKey key = new PipelineKey(pipelineAction);
boolean added = false;
for (InetSocketAddress endpoint : endpoints) {
pipelineActions.get(endpoint).putIfAbsent(key, pipelineAction);
added = added || pipelineActions.get(endpoint).putIfAbsent(key, pipelineAction);
}
return added;
}

/**
Expand Down Expand Up @@ -958,9 +960,9 @@ synchronized int size() {
return map.size();
}

synchronized void putIfAbsent(PipelineKey key,
synchronized boolean putIfAbsent(PipelineKey key,
PipelineAction pipelineAction) {
map.putIfAbsent(key, pipelineAction);
return map.putIfAbsent(key, pipelineAction) == null;
}

synchronized List<PipelineAction> getActions(List<PipelineReport> reports,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,11 @@ private void triggerPipelineClose(RaftGroupId groupId, String detail,
.setAction(PipelineAction.Action.CLOSE)
.build();
if (context != null) {
context.addPipelineActionIfAbsent(action);
if (context.addPipelineActionIfAbsent(action)) {
LOG.warn("pipeline Action {} on pipeline {}.Reason : {}",
action.getAction(), pipelineID,
action.getClosePipeline().getDetailedReason());
}
if (!activePipelines.get(groupId).isPendingClose()) {
// if pipeline close action has not been triggered before, we need trigger pipeline close immediately to
// prevent SCM to allocate blocks on the failed pipeline
Expand All @@ -763,9 +767,6 @@ private void triggerPipelineClose(RaftGroupId groupId, String detail,
(key, value) -> new ActivePipelineContext(value.isPipelineLeader(), true));
}
}
LOG.error("pipeline Action {} on pipeline {}.Reason : {}",
action.getAction(), pipelineID,
action.getClosePipeline().getDetailedReason());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.hadoop.hdds.scm.pipeline;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
Expand All @@ -31,6 +33,7 @@
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -46,6 +49,8 @@ public class TestNodeFailure {
private static PipelineManager pipelineManager;
private static int timeForFailure;

private static final String FLOOD_TOKEN = "pipeline Action CLOSE";

/**
* Create a MiniDFSCluster for testing.
*
Expand Down Expand Up @@ -91,6 +96,7 @@ public static void shutdown() {

@Test
public void testPipelineFail() {
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.captureLogs(XceiverServerRatis.class);
ratisPipelines.forEach(pipeline -> {
try {
waitForPipelineCreation(pipeline.getId());
Expand All @@ -107,6 +113,9 @@ public void testPipelineFail() {
fail("Test Failed: " + e.getMessage());
}
});
logCapturer.stopCapturing();
int occurrences = StringUtils.countMatches(logCapturer.getOutput(), FLOOD_TOKEN);
assertThat(occurrences).isEqualTo(2);
}

/**
Expand Down