Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public static class UnderReplicatedHealthResult
private final boolean dueToDecommission;
private final boolean sufficientlyReplicatedAfterPending;
private final boolean unrecoverable;
private boolean hasUnReplicatedOfflineIndexes = false;
private int requeueCount = 0;

public UnderReplicatedHealthResult(ContainerInfo containerInfo,
Expand Down Expand Up @@ -206,6 +207,27 @@ public boolean isReplicatedOkAfterPending() {
public boolean isUnrecoverable() {
return unrecoverable;
}

/**
* Pass true if a container has some indexes which are only on nodes
* which are DECOMMISSIONING or ENTERING_MAINTENANCE. These containers may
* need to be processed even if they are unrecoverable.
* @param val pass true if the container has indexes on nodes going offline
* or false otherwise.
*/
public void setHasUnReplicatedOfflineIndexes(boolean val) {
hasUnReplicatedOfflineIndexes = val;
}
/**
* Indicates whether a container has some indexes which are only on nodes
* which are DECOMMISSIONING or ENTERING_MAINTENANCE. These containers may
* need to be processed even if they are unrecoverable.
* @return True if the container has some decommission or maintenance only
* indexes.
*/
public boolean hasUnreplicatedOfflineIndexes() {
return hasUnReplicatedOfflineIndexes;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.container.replication;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
Expand All @@ -36,6 +37,8 @@ public interface ContainerReplicaCount {

boolean isSufficientlyReplicated();

boolean isSufficientlyReplicatedForOffline(DatanodeDetails datanode);

boolean isOverReplicated();

int getDecommissionCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.container.replication;

import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
Expand All @@ -36,6 +37,7 @@
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;

/**
* This class provides a set of methods to test for over / under replication of
Expand Down Expand Up @@ -421,6 +423,49 @@ public boolean isSufficientlyReplicated(boolean includePendingAdd) {
>= repConfig.getData() + remainingMaintenanceRedundancy;
}

/**
* If we are checking a container for sufficient replication for "offline",
* ie decommission or maintenance, then it is not really a requirement that
* all replicas for the container are present. Instead, we can ensure the
* replica on the node going offline has a copy elsewhere on another
* IN_SERVICE node, and if so that replica is sufficiently replicated.
* @param datanode The datanode being checked to go offline.
* @return True if the container is sufficiently replicated or if this replica
* on the passed node is present elsewhere on an IN_SERVICE node.
*/
@Override
public boolean isSufficientlyReplicatedForOffline(DatanodeDetails datanode) {
boolean sufficientlyReplicated = isSufficientlyReplicated(false);
if (sufficientlyReplicated) {
return true;
}
// If it is not sufficiently replicated (ie the container has all replicas)
// then we need to check if the replica that is on this node is available
// on another ONLINE node, ie in the healthy set. This means we avoid
// blocking decommission or maintenance caused by un-recoverable EC
// containers.
if (datanode.getPersistedOpState() == IN_SERVICE) {
// The node passed into this method must be a node going offline, so it
// cannot be IN_SERVICE. If an IN_SERVICE mode is passed, just return
// false.
return false;
}
ContainerReplica thisReplica = null;
for (ContainerReplica r : replicas) {
if (r.getDatanodeDetails().equals(datanode)) {
thisReplica = r;
break;
}
}
if (thisReplica == null) {
// From the set of replicas, none are on the passed datanode.
// This should not happen in practice but if it does we cannot indicate
// the container is sufficiently replicated.
return false;
}
return healthyIndexes.containsKey(thisReplica.getReplicaIndex());
}

@Override
public boolean isSufficientlyReplicated() {
return isSufficientlyReplicated(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,6 @@ public Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
container.getContainerID(), replicaCount.getReplicas());
return emptyMap();
}
if (replicaCount.isUnrecoverable()) {
LOG.warn("The container {} is unrecoverable. The available replicas" +
" are: {}.", container.containerID(), replicaCount.getReplicas());
return emptyMap();
}

// don't place reconstructed replicas on exclude nodes, since they already
// have replicas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.container.replication;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
Expand Down Expand Up @@ -247,6 +248,17 @@ public boolean isSufficientlyReplicated() {
return isSufficientlyReplicated(false);
}

/**
* For Ratis, this method is the same as isSufficientlyReplicated.
* @param datanode Not used in this implementation
* @return True if the container is sufficiently replicated and False
* otherwise.
*/
@Override
public boolean isSufficientlyReplicatedForOffline(DatanodeDetails datanode) {
return isSufficientlyReplicated();
}

/**
* Return true if the container is sufficiently replicated. Decommissioning
* and Decommissioned containers are ignored in this check, assuming they will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,15 @@ public boolean handle(ContainerCheckRequest request) {
ReplicationManagerReport.HealthState.MISSING, containerID);
}
if (!underHealth.isReplicatedOkAfterPending() &&
!underHealth.isUnrecoverable()) {
(!underHealth.isUnrecoverable()
|| underHealth.hasUnreplicatedOfflineIndexes())) {
request.getReplicationQueue().enqueue(underHealth);
}
LOG.debug("Container {} is Under Replicated. isReplicatedOkAfterPending "
+ "is [{}]. isUnrecoverable is [{}]", container,
underHealth.isReplicatedOkAfterPending(),
underHealth.isUnrecoverable());
+ "is [{}]. isUnrecoverable is [{}]. hasUnreplicatedOfflineIndexes "
+ "is [{}]", container, underHealth.isReplicatedOkAfterPending(),
underHealth.isUnrecoverable(),
underHealth.hasUnreplicatedOfflineIndexes());
return true;
} else if (health.getHealthState()
== ContainerHealthResult.HealthState.OVER_REPLICATED) {
Expand Down Expand Up @@ -149,10 +151,16 @@ public ContainerHealthResult checkHealth(ContainerCheckRequest request) {
dueToDecommission = false;
remainingRedundancy = repConfig.getParity() - missingIndexes.size();
}
return new ContainerHealthResult.UnderReplicatedHealthResult(
container, remainingRedundancy, dueToDecommission,
replicaCount.isSufficientlyReplicated(true),
replicaCount.isUnrecoverable());
ContainerHealthResult.UnderReplicatedHealthResult result =
new ContainerHealthResult.UnderReplicatedHealthResult(
container, remainingRedundancy, dueToDecommission,
replicaCount.isSufficientlyReplicated(true),
replicaCount.isUnrecoverable());
if (replicaCount.decommissioningOnlyIndexes(true).size() > 0
Copy link
Contributor

@siddhantsangwan siddhantsangwan Jan 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods will also count indexes that are on DECOMMISSIONED or IN_MAINTENANCE datanodes. I guess we only want to add replicas on datanodes that haven't yet entered these states to the under rep queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For it to have reached DECOMMISSIONED or IN_MAINTENANCE, then it should have made a new copy elsewhere already. The flag it sets here only comes into play later and when its an unrecoverable container. If there are that many replicas missing, we should have another copy of any decommission / or maintenance replicas anyway and if there are not, it would be OK for the system to create them. I think its OK for this to consider both decommissioning / entering_maintenance and decommissioned / in_mantenance.

|| replicaCount.maintenanceOnlyIndexes(true).size() > 0) {
result.setHasUnReplicatedOfflineIndexes(true);
}
return result;
}

if (replicaCount.isOverReplicated(false)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ private boolean checkContainersReplicatedOnNode(DatanodeDetails dn)
try {
ContainerReplicaCount replicaSet =
replicationManager.getContainerReplicaCount(cid);
if (replicaSet.isSufficientlyReplicated()) {
if (replicaSet.isSufficientlyReplicatedForOffline(dn)) {
sufficientlyReplicated++;
} else {
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;

/**
Expand Down Expand Up @@ -107,7 +108,7 @@ public void testContainerMissingReplicaDueToPendingDelete() {
public void testUnderReplicationDueToUnhealthyReplica() {
Set<ContainerReplica> replicas =
ReplicationTestUtil.createReplicas(container.containerID(),
ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4);
CLOSED, 1, 2, 3, 4);
ContainerReplica unhealthyIndex5 =
createContainerReplica(container.containerID(), 5,
IN_SERVICE, ContainerReplicaProto.State.UNHEALTHY);
Expand Down Expand Up @@ -572,4 +573,43 @@ public void testDecommissioningOnlyIndexes() {
Assertions
.assertEquals(ImmutableSet.of(), rcnt.decommissioningOnlyIndexes(true));
}

@Test
public void testSufficientlyReplicatedForOffline() {
Set<ContainerReplica> replica = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 2));

ContainerReplica inServiceReplica =
ReplicationTestUtil.createContainerReplica(container.containerID(),
1, IN_SERVICE, CLOSED);
replica.add(inServiceReplica);

ContainerReplica offlineReplica =
ReplicationTestUtil.createContainerReplica(container.containerID(),
1, DECOMMISSIONING, CLOSED);
replica.add(offlineReplica);

ContainerReplica offlineNotReplicated =
ReplicationTestUtil.createContainerReplica(container.containerID(),
3, DECOMMISSIONING, CLOSED);
replica.add(offlineNotReplicated);

ECContainerReplicaCount rcnt =
new ECContainerReplicaCount(container, replica, Collections.emptyList(),
1);
Assertions.assertFalse(rcnt.isSufficientlyReplicated(false));
Assertions.assertTrue(rcnt.isSufficientlyReplicatedForOffline(
offlineReplica.getDatanodeDetails()));
Assertions.assertFalse(rcnt.isSufficientlyReplicatedForOffline(
offlineNotReplicated.getDatanodeDetails()));

// A random DN not hosting a replica for this container should return false.
Assertions.assertFalse(rcnt.isSufficientlyReplicatedForOffline(
MockDatanodeDetails.randomDatanodeDetails()));

// Passing the IN_SERVICE node should return false even though the
// replica is on a healthy node
Assertions.assertFalse(rcnt.isSufficientlyReplicatedForOffline(
inServiceReplica.getDatanodeDetails()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class TestECUnderReplicationHandler {
private static final int DATA = 3;
private static final int PARITY = 2;
private PlacementPolicy ecPlacementPolicy;
private int remainingMaintenanceRedundancy = 1;

@BeforeEach
public void setup() {
Expand Down Expand Up @@ -564,6 +565,56 @@ public void testDatanodesPendingAddAreNotSelectedAsTargets()
Assertions.assertFalse(commands.containsKey(dn));
}

@Test
public void testDecommissioningIndexCopiedWhenContainerUnRecoverable()
throws IOException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1));
ContainerReplica decomReplica = ReplicationTestUtil.createContainerReplica(
container.containerID(), 2, DECOMMISSIONING, CLOSED);
availableReplicas.add(decomReplica);
Map<DatanodeDetails, SCMCommand<?>> cmds =
testUnderReplicationWithMissingIndexes(Collections.emptyList(),
availableReplicas, 1, 0, policy);
Assertions.assertEquals(1, cmds.size());
ReplicateContainerCommand cmd =
(ReplicateContainerCommand) cmds.values().iterator().next();

List<DatanodeDetails> sources = cmd.getSourceDatanodes();
Assertions.assertEquals(1, sources.size());
Assertions.assertEquals(decomReplica.getDatanodeDetails(),
cmd.getSourceDatanodes().get(0));
}

@Test
public void testMaintenanceIndexCopiedWhenContainerUnRecoverable()
throws IOException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1));
ContainerReplica maintReplica = ReplicationTestUtil.createContainerReplica(
container.containerID(), 2, ENTERING_MAINTENANCE, CLOSED);
availableReplicas.add(maintReplica);

Map<DatanodeDetails, SCMCommand<?>> cmds =
testUnderReplicationWithMissingIndexes(Collections.emptyList(),
availableReplicas, 0, 1, policy);
Assertions.assertEquals(0, cmds.size());

// Change the remaining redundancy to ensure something needs copied.
remainingMaintenanceRedundancy = 2;
cmds = testUnderReplicationWithMissingIndexes(Collections.emptyList(),
availableReplicas, 0, 1, policy);

Assertions.assertEquals(1, cmds.size());
ReplicateContainerCommand cmd =
(ReplicateContainerCommand) cmds.values().iterator().next();

List<DatanodeDetails> sources = cmd.getSourceDatanodes();
Assertions.assertEquals(1, sources.size());
Assertions.assertEquals(maintReplica.getDatanodeDetails(),
cmd.getSourceDatanodes().get(0));
}

public Map<DatanodeDetails, SCMCommand<?>>
testUnderReplicationWithMissingIndexes(
List<Integer> missingIndexes, Set<ContainerReplica> availableReplicas,
Expand All @@ -577,7 +628,6 @@ public void testDatanodesPendingAddAreNotSelectedAsTargets()
Mockito.when(result.isUnrecoverable()).thenReturn(false);
Mockito.when(result.getContainerInfo()).thenReturn(container);

int remainingMaintenanceRedundancy = 1;
Map<DatanodeDetails, SCMCommand<?>> datanodeDetailsSCMCommandMap = ecURH
.processAndCreateCommands(availableReplicas, ImmutableList.of(),
result, remainingMaintenanceRedundancy);
Expand Down
Loading