Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
Expand Down Expand Up @@ -52,8 +53,12 @@ public class RatisOverReplicationHandler
public static final Logger LOG =
LoggerFactory.getLogger(RatisOverReplicationHandler.class);

public RatisOverReplicationHandler(PlacementPolicy placementPolicy) {
private final NodeManager nodeManager;

public RatisOverReplicationHandler(PlacementPolicy placementPolicy,
NodeManager nodeManager) {
super(placementPolicy);
this.nodeManager = nodeManager;
}

/**
Expand All @@ -79,6 +84,20 @@ public Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
ContainerInfo containerInfo = result.getContainerInfo();
LOG.debug("Handling container {}.", containerInfo);

// We are going to check for over replication, so we should filter out any
// replicas that are not in a HEALTHY state. This is because a replica can
// be healthy, stale or dead. If it is dead is will be quickly removed from
// scm. If it is state, there is a good chance the DN is offline and the
// replica will go away soon. So, if we have a container that is over
// replicated with a HEALTHY and STALE replica, and we decide to delete the
// HEALTHY one, and then the STALE ones goes away, we will lose them both.
// To avoid this, we will filter out any non-healthy replicas first.
Set<ContainerReplica> healthyReplicas = replicas.stream()
.filter(r -> ReplicationManager.getNodeStatus(
r.getDatanodeDetails(), nodeManager).isHealthy()
)
.collect(Collectors.toSet());

// count pending adds and deletes
int pendingAdd = 0, pendingDelete = 0;
for (ContainerReplicaOp op : pendingOps) {
Expand All @@ -89,8 +108,9 @@ public Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
}
}
RatisContainerReplicaCount replicaCount =
new RatisContainerReplicaCount(containerInfo, replicas, pendingAdd,
pendingDelete, containerInfo.getReplicationFactor().getNumber(),
new RatisContainerReplicaCount(containerInfo, healthyReplicas,
pendingAdd, pendingDelete,
containerInfo.getReplicationFactor().getNumber(),
minHealthyForMaintenance);

// verify that this container is actually over replicated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public ReplicationManager(final ConfigurationSource conf,
ratisUnderReplicationHandler = new RatisUnderReplicationHandler(
ratisContainerPlacement, conf, nodeManager);
ratisOverReplicationHandler =
new RatisOverReplicationHandler(ratisContainerPlacement);
new RatisOverReplicationHandler(ratisContainerPlacement, nodeManager);
underReplicatedProcessor =
new UnderReplicatedProcessor(this,
rmConf.getUnderReplicatedInterval());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.ozone.test.GenericTestUtils;
Expand Down Expand Up @@ -56,6 +58,7 @@ public class TestRatisOverReplicationHandler {
private static final RatisReplicationConfig RATIS_REPLICATION_CONFIG =
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
private PlacementPolicy policy;
private NodeManager nodeManager;

@Before
public void setup() throws NodeNotFoundException {
Expand All @@ -67,6 +70,10 @@ public void setup() throws NodeNotFoundException {
Mockito.anyList(), Mockito.anyInt()))
.thenReturn(new ContainerPlacementStatusDefault(2, 2, 3));

nodeManager = Mockito.mock(NodeManager.class);
Mockito.when(nodeManager.getNodeStatus(Mockito.any()))
.thenReturn(NodeStatus.inServiceHealthy());

GenericTestUtils.setLogLevel(RatisOverReplicationHandler.LOG, Level.DEBUG);
}

Expand All @@ -88,6 +95,23 @@ public void testOverReplicatedClosedContainer() throws IOException {
1);
}

/**
* Container has 4 replicas and 1 stale so none should be deleted.
*/
@Test
public void testOverReplicatedClosedContainerWithStale() throws IOException,
NodeNotFoundException {
Set<ContainerReplica> replicas = createReplicas(container.containerID(),
ContainerReplicaProto.State.CLOSED, 0, 0, 0, 0);

ContainerReplica stale = replicas.stream().findFirst().get();
Mockito.when(nodeManager.getNodeStatus(stale.getDatanodeDetails()))
.thenReturn(NodeStatus.inServiceStale());

testProcessing(replicas, Collections.emptyList(),
getOverReplicatedHealthResult(), 0);
}

/**
* The container is quasi closed. All 4 replicas are quasi closed and
* originate from the same datanode. This container is over replicated.
Expand Down Expand Up @@ -261,7 +285,7 @@ private Map<DatanodeDetails, SCMCommand<?>> testProcessing(
ContainerHealthResult healthResult,
int expectNumCommands) throws IOException {
RatisOverReplicationHandler handler =
new RatisOverReplicationHandler(policy);
new RatisOverReplicationHandler(policy, nodeManager);

Map<DatanodeDetails, SCMCommand<?>> commands =
handler.processAndCreateCommands(replicas, pendingOps,
Expand Down