Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.pipeline;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
Expand Down Expand Up @@ -49,6 +50,7 @@
import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
import org.apache.hadoop.hdds.utils.db.Proto2Codec;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -337,6 +339,22 @@ void reportDatanode(DatanodeDetails dn) throws IOException {
nodeStatus.put(dn, System.currentTimeMillis());
}

@VisibleForTesting
public DatanodeDetails removeFirstFromNodeStatus() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these two removeFirstFromNodeStatus and setInNodeStatus can be removed. To make a 2-node pipeline, use pipelineManager.deletePipeline to delete original one and add a 2-node one back through it.

Iterator<DatanodeDetails> iterator = nodeStatus.keySet().iterator();
DatanodeDetails firstKey = null;
if (iterator.hasNext()) {
firstKey = iterator.next();
nodeStatus.remove(firstKey);
}
return firstKey;
}

@VisibleForTesting
public void setInNodeStatus(DatanodeDetails dd) {
nodeStatus.put(dd, Time.now());
}

public boolean isHealthy() {
// EC pipelines are not reported by the DN and do not have a leader. If a
// node goes stale or dead, EC pipelines will be closed like RATIS pipelines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.HddsConfigKeys;
Expand Down Expand Up @@ -51,7 +52,7 @@ public class OneReplicaPipelineSafeModeRule extends
private static final String NAME = "AtleastOneDatanodeReportedRule";

private int thresholdCount;
private final Set<PipelineID> reportedPipelineIDSet = new HashSet<>();
private Set<PipelineID> reportedPipelineIDSet = new HashSet<>();
private Set<PipelineID> oldPipelineIDSet;
private int currentReportedPipelineCount = 0;
private PipelineManager pipelineManager;
Expand Down Expand Up @@ -85,11 +86,19 @@ protected TypedEvent<PipelineReportFromDatanode> getEventType() {

@Override
protected synchronized boolean validate() {
if (validateBasedOnReportProcessing()) {
return currentReportedPipelineCount >= thresholdCount;
}

updateReportedPipelineSet();
return currentReportedPipelineCount >= thresholdCount;
}

@Override
protected synchronized void process(PipelineReportFromDatanode report) {
if (!validateBasedOnReportProcessing()) {
return;
}
Preconditions.checkNotNull(report);
for (PipelineReport report1 : report.getReport().getPipelineReportList()) {
Pipeline pipeline;
Expand Down Expand Up @@ -137,6 +146,11 @@ public synchronized int getCurrentReportedPipelineCount() {
return currentReportedPipelineCount;
}

@VisibleForTesting
public Set<PipelineID> getReportedPipelineIDSet() {
return reportedPipelineIDSet;
}

@Override
public String getStatusText() {
String status = String.format(
Expand Down Expand Up @@ -171,6 +185,25 @@ public synchronized void refresh(boolean forceRefresh) {
}
}

private void updateReportedPipelineSet() {
List<Pipeline> openRatisPipelines =
pipelineManager.getPipelines(RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
Pipeline.PipelineState.OPEN);

for (Pipeline pipeline : openRatisPipelines) {
PipelineID pipelineID = pipeline.getId();
boolean allDNsPipelineReported = pipeline.getNodeSet().size() == 3;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why line 195 check is added? The old logic seems not check that. TIA!

Copy link
Contributor Author

@aryangupta1998 aryangupta1998 May 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check ensures that all three DN have reported that particular pipeline. We can do pipeline.getNodeSet().size() > 1 to check for at least one replica!

boolean notAlreadyReported = !reportedPipelineIDSet.contains(pipelineID);
boolean wasExistingPipeline = oldPipelineIDSet.contains(pipelineID);

if (allDNsPipelineReported && notAlreadyReported && wasExistingPipeline) {
getSafeModeMetrics().incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount();
currentReportedPipelineCount++;
reportedPipelineIDSet.add(pipelineID);
}
}
}

private void initializeRule(boolean refresh) {

oldPipelineIDSet = pipelineManager.getPipelines(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.safemode;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -193,6 +194,40 @@ public void testOneReplicaPipelineRuleMixedPipelines() throws Exception {
GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000);
}

@Test
public void testOneReplicaPipelineRuleWithReportProcessingFalse() throws Exception {
// As with 30 nodes, We can create 7 pipelines with replication factor 3.
// (This is because in node manager for every 10 nodes, 7 nodes are
// healthy, 2 are stale one is dead.)
int totalNodes = 30;
int ratisPipelineCount = 7;
int standalonePipelineCount = 0;

setup(totalNodes, ratisPipelineCount, standalonePipelineCount);

// Disable validation based on report processing.
rule.setValidateBasedOnReportProcessing(false);

List<Pipeline> pipelines = pipelineManager.getPipelines();
assertFalse(pipelines.isEmpty());

// Pick the first pipeline and remove one node to make nodeSet.size() != 3
Pipeline targetPipeline = pipelines.get(0);
DatanodeDetails removedNode = targetPipeline.removeFirstFromNodeStatus();

// Validation should now fail because not all pipelines must meet the 3-node condition
assertFalse(rule.validate());

// Re-add the node back so pipeline meets 3-node condition
targetPipeline.setInNodeStatus(removedNode);

// Now the rule should validate successfully
assertTrue(rule.validate());

// Assert that the pipeline got added back to the reported set
assertTrue(rule.getReportedPipelineIDSet().contains(targetPipeline.getId()));
}

private void createPipelines(int count,
HddsProtos.ReplicationFactor factor) throws Exception {
for (int i = 0; i < count; i++) {
Expand Down