Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,10 @@ && getReplicas().stream()

/**
* QUASI_CLOSED containers that have a mix of healthy and UNHEALTHY
* replicas require special treatment. If the healthy replicas don't have
* the same BCSID as the container, but the UNHEALTHY ones do, then we need
* to save at least one copy of each such UNHEALTHY replica. This method
* finds such UNHEALTHY replicas.
* replicas require special treatment. If the UNHEALTHY replicas have the
* correct sequence ID and have unique origins, then we need to save at least
* one copy of each such UNHEALTHY replicas. This method finds such UNHEALTHY
* replicas.
*
* @param nodeStatusFn a function used to check the {@link NodeStatus} of a node,
* accepting a {@link DatanodeDetails} and returning {@link NodeStatus}
Expand All @@ -498,11 +498,6 @@ public List<ContainerReplica> getVulnerableUnhealthyReplicas(Function<DatanodeDe
if (replica.getSequenceId() == container.getSequenceId()) {
if (replica.getState() == ContainerReplicaProto.State.UNHEALTHY && !replica.isEmpty()) {
unhealthyReplicas.add(replica);
} else if (replica.getState() ==
ContainerReplicaProto.State.QUASI_CLOSED) {
// don't need to save UNHEALTHY replicas if there's a QUASI_CLOSED
// replica with the greatest Sequence ID.
return Collections.emptyList();
}
}
}
Expand All @@ -517,6 +512,11 @@ public List<ContainerReplica> getVulnerableUnhealthyReplicas(Function<DatanodeDe
NodeStatus status = nodeStatusFn.apply(replica.getDatanodeDetails());
return status == null || !status.isHealthy();
});
replicas.removeIf(
replica -> {
NodeStatus status = nodeStatusFn.apply(replica.getDatanodeDetails());
return status == null || !status.isHealthy();
});
/*
At this point, the list of unhealthyReplicas contains all UNHEALTHY non-empty
replicas with the greatest Sequence ID that are on healthy Datanodes.
Expand All @@ -531,9 +531,9 @@ public List<ContainerReplica> getVulnerableUnhealthyReplicas(Function<DatanodeDe
*/
// TODO should we also consider pending deletes?
Set<UUID> originsOfInServiceReplicas = new HashSet<>();
for (ContainerReplica replica : unhealthyReplicas) {
for (ContainerReplica replica : replicas) {
if (replica.getDatanodeDetails().getPersistedOpState()
.equals(IN_SERVICE)) {
.equals(IN_SERVICE) && replica.getSequenceId().equals(container.getSequenceId())) {
originsOfInServiceReplicas.add(replica.getOriginDatanodeId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@

/**
* A QUASI_CLOSED container may have some UNHEALTHY replicas with the
* same Sequence ID as the container. RM should try to maintain one
* copy of such replicas when there are no healthy replicas that
* match the container's Sequence ID.
* same Sequence ID as the container and on unique origins. RM should try to maintain one
* copy of such replicas.
*/
public class VulnerableUnhealthyReplicasHandler extends AbstractCheck {
public static final Logger LOG = LoggerFactory.getLogger(VulnerableUnhealthyReplicasHandler.class);
Expand All @@ -52,8 +51,8 @@ public VulnerableUnhealthyReplicasHandler(ReplicationManager replicationManager)
/**
* Checks if the container is QUASI_CLOSED has some vulnerable UNHEALTHY replicas that need to replicated to
* other Datanodes. These replicas have the same sequence ID as the container while other healthy replicas don't.
* If the node hosting such a replica is being taken offline, then the replica may have to be replicated to another
* node.
* Or, these replicas have unique origin Datanodes. If the node hosting such a replica is being taken offline, then
* the replica may have to be replicated to another node.
* @param request ContainerCheckRequest object representing the container
* @return true if some vulnerable UNHEALTHY replicas were found, else false
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -581,6 +583,35 @@ public void testUnderReplicationWithVulnerableReplicas() throws IOException {
assertEquals(unhealthyReplica.getDatanodeDetails(), commands.iterator().next().getKey());
}

/**
* A QUASI_CLOSED container may have UNHEALTHY replicas with the correct sequence ID which have unique
* origin Datanodes. If any of these UNHEALTHY replicas is being taken offline, then it needs to be replicated to
* another DN for decommission to progress. This test asserts that a replicate command is sent for one such replica.
*/
@Test
public void testUnderReplicationWithVulnerableReplicasOnUniqueOrigins() throws IOException {
final long sequenceID = 20;
container = ReplicationTestUtil.createContainerInfo(RATIS_REPLICATION_CONFIG, 1,
HddsProtos.LifeCycleState.QUASI_CLOSED, sequenceID);

final Set<ContainerReplica> replicas = new HashSet<>(4);
for (int i = 0; i < 3; i++) {
replicas.add(createContainerReplica(container.containerID(), 0, IN_SERVICE, State.QUASI_CLOSED,
sequenceID));
}

// create an UNHEALTHY replica with a unique origin
final ContainerReplica unhealthyReplica = createContainerReplica(container.containerID(), 0,
DECOMMISSIONING, State.UNHEALTHY, sequenceID);
replicas.add(unhealthyReplica);
UnderReplicatedHealthResult result = getUnderReplicatedHealthResult();
Mockito.when(result.hasVulnerableUnhealthy()).thenReturn(true);

final Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = testProcessing(replicas, Collections.emptyList(),
result, 2, 1);
Assertions.assertEquals(unhealthyReplica.getDatanodeDetails(), commands.iterator().next().getKey());
}

/**
* In the push replication model, a replicate command is sent to the DN hosting the replica, and that DN is
* expected to "push" the replica to another DN. If the DN hosting the replica has too many commands already, an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.io.IOException;
import java.time.Instant;
Expand Down Expand Up @@ -508,6 +509,67 @@ public void testQuasiClosedContainerWithVulnerableUnhealthyReplica()
assertEquals(decommissioning.getDatanodeDetails().getUuid(), command.getKey());
}


/**
* There is a QUASI_CLOSED container with some UNHEALTHY replicas on unique origin nodes. If the datanode hosting
* one such replica is being taken offline, then the UNHEALTHY replica needs to be replicated to another node.
*/
@Test
public void testQuasiClosedContainerWithUnhealthyReplicaOnDecommissioningNodeWithUniqueOrigin()
throws IOException, NodeNotFoundException {
RatisReplicationConfig ratisRepConfig =
RatisReplicationConfig.getInstance(THREE);
// create a QUASI_CLOSED container with 3 QUASI_CLOSED replicas on same origin, and 1 UNHEALTHY on unique origin
ContainerInfo container = createContainerInfo(ratisRepConfig, 1,
HddsProtos.LifeCycleState.QUASI_CLOSED);
Set<ContainerReplica> replicas =
createReplicasWithSameOrigin(container.containerID(),
ContainerReplicaProto.State.QUASI_CLOSED, 0, 0, 0);
ContainerReplica unhealthy =
createContainerReplica(container.containerID(), 0, DECOMMISSIONING,
ContainerReplicaProto.State.UNHEALTHY);
replicas.add(unhealthy);
storeContainerAndReplicas(container, replicas);
Mockito.when(replicationManager.getNodeStatus(any(DatanodeDetails.class)))
.thenAnswer(invocation -> {
DatanodeDetails dn = invocation.getArgument(0);
if (dn.equals(unhealthy.getDatanodeDetails())) {
return new NodeStatus(DECOMMISSIONING, HddsProtos.NodeState.HEALTHY);
}

return NodeStatus.inServiceHealthy();
});

// the container should be under replicated and queued to under replication queue
replicationManager.processContainer(container, repQueue, repReport);
assertEquals(1, repReport.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
assertEquals(0, repReport.getStat(
ReplicationManagerReport.HealthState.OVER_REPLICATED));
assertEquals(1, repQueue.underReplicatedQueueSize());
assertEquals(0, repQueue.overReplicatedQueueSize());

// next, this test sets up some mocks to test if RatisUnderReplicationHandler will handle this container correctly
Mockito.when(ratisPlacementPolicy.chooseDatanodes(anyList(), anyList(), eq(null), eq(1), anyLong(),
anyLong())).thenAnswer(invocation -> ImmutableList.of(MockDatanodeDetails.randomDatanodeDetails()));
Mockito.when(nodeManager.getTotalDatanodeCommandCounts(any(DatanodeDetails.class), any(), any()))
.thenAnswer(invocation -> {
Map<SCMCommandProto.Type, Integer> map = new HashMap<>();
map.put(SCMCommandProto.Type.replicateContainerCommand, 0);
map.put(SCMCommandProto.Type.reconstructECContainersCommand, 0);
return map;
});
RatisUnderReplicationHandler handler =
new RatisUnderReplicationHandler(ratisPlacementPolicy, configuration, replicationManager);

handler.processAndSendCommands(replicas, Collections.emptyList(), repQueue.dequeueUnderReplicatedContainer(), 2);
assertEquals(1, commandsSent.size());
Pair<UUID, SCMCommand<?>> command = commandsSent.iterator().next();
// a replicate command should have been sent for the UNHEALTHY replica
assertEquals(SCMCommandProto.Type.replicateContainerCommand, command.getValue().getType());
assertEquals(unhealthy.getDatanodeDetails().getUuid(), command.getKey());
}

/**
* When there is Quasi Closed Replica with incorrect sequence id
* for a Closed container, it's treated as unhealthy and deleted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -128,6 +129,11 @@ public void testReturnsFalseForQuasiClosedContainerWithNoVulnerableReplicas() {
assertEquals(0, repQueue.overReplicatedQueueSize());
}

/**
* A QUASI_CLOSED container with 3 QUASI_CLOSED replicas with incorrect sequence id. They're on unique origin nodes.
* There's an UNHEALTHY replica on a Decommissioning node, which has the correct sequence ID and unique origin.
* It's expected that the UNHEALTHY replica is queued for under replication.
*/
@Test
public void testReturnsTrueForQuasiClosedContainerWithVulnerableReplica() throws NodeNotFoundException {
long sequenceId = 10;
Expand Down Expand Up @@ -156,6 +162,49 @@ public void testReturnsTrueForQuasiClosedContainerWithVulnerableReplica() throws
assertEquals(0, repQueue.overReplicatedQueueSize());
}

/**
* A QUASI_CLOSED container with 3 QUASI_CLOSED replicas with correct sequence id. They're on unique origin nodes.
* There's an UNHEALTHY replica on a Decommissioning node, which also has the correct sequence ID and unique origin.
* It's expected that the UNHEALTHY replica is queued for under replication. This is a variation of the situation
* where the healthy replicas have incorrect sequence id, and the unhealthy ones have the correct sequence id.
* Here, all the replicas have the correct sequence id but the unhealthy still need to be saved because they're on
* unique origin nodes.
* <p>
* Why do we need to save the UNHEALTHY replicas if we have enough unique QUASI_CLOSED replicas to form a quorum?
* Simply because we're ensuring redundancy of replicas having unique origin node IDs. When HDDS has the ability to
* restore UNHEALTHY replicas to a healthy state, they can also be used to create a quorum. In any case, when the
* container transitions to CLOSED, any UNHEALTHY replicas will be deleted.
* </p>
*/
@Test
public void testReturnsTrueForQuasiClosedContainerWithVulnerableReplicaWhenAllReplicasHaveCorrectSequence()
throws NodeNotFoundException {
long sequenceId = 10;
ContainerInfo container = createContainerInfo(repConfig, 1, LifeCycleState.QUASI_CLOSED, sequenceId);
Set<ContainerReplica> replicas = new HashSet<>(4);
for (int i = 0; i < 3; i++) {
replicas.add(createContainerReplica(container.containerID(), 0, IN_SERVICE, State.QUASI_CLOSED,
container.getSequenceId()));
}
// create UNHEALTHY replica with unique origin id on a DECOMMISSIONING node
ContainerReplica unhealthy =
createContainerReplica(container.containerID(), 0, DECOMMISSIONING, State.UNHEALTHY, sequenceId);
replicas.add(unhealthy);
Mockito.when(replicationManager.getNodeStatus(Mockito.any(DatanodeDetails.class)))
.thenAnswer(invocation -> {
DatanodeDetails dn = invocation.getArgument(0);
if (dn.equals(unhealthy.getDatanodeDetails())) {
return new NodeStatus(DECOMMISSIONING, HEALTHY);
}
return NodeStatus.inServiceHealthy();
});
requestBuilder.setContainerReplicas(replicas).setContainerInfo(container);

assertTrue(handler.handle(requestBuilder.build()));
assertEquals(1, repQueue.underReplicatedQueueSize());
assertEquals(0, repQueue.overReplicatedQueueSize());
}

@Test
public void testReturnsFalseForVulnerableReplicaWithAnotherCopy() throws NodeNotFoundException {
long sequenceId = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -374,6 +376,70 @@ public void testDecommissionWaitsForUnhealthyReplicaToReplicateNewRM()
nodeManager.getNodeStatus(dn1).getOperationalState());
}

/**
* Situation: A QUASI_CLOSED container has an UNHEALTHY replica with a unique origin and three QUASI_CLOSED replicas.
* All the replicas have the correct Sequence ID. UNHEALTHY container is on a decommissioning node, and there are
* no other copies of this replica, that is, replicas with the same Origin ID as this replica.
*
* Expectation: Decommissioning should not complete until the UNHEALTHY replica has been replicated to another node.
*/
@Test
public void testDecommissionWaitsForUnhealthyReplicaWithUniqueOriginToReplicateNewRM()
throws NodeNotFoundException, ContainerNotFoundException {
DatanodeDetails dn1 = MockDatanodeDetails.randomDatanodeDetails();
nodeManager.register(dn1,
new NodeStatus(HddsProtos.NodeOperationalState.DECOMMISSIONING, HddsProtos.NodeState.HEALTHY));

// create a container and 3 QUASI_CLOSED replicas with containerID 1 and same origin ID
ContainerID containerID = ContainerID.valueOf(1);
ContainerInfo container = ReplicationTestUtil.createContainerInfo(RatisReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.THREE), containerID.getId(), HddsProtos.LifeCycleState.QUASI_CLOSED);
Set<ContainerReplica> replicas =
ReplicationTestUtil.createReplicasWithSameOrigin(containerID, State.QUASI_CLOSED, 0, 0, 0);

// UNHEALTHY replica is on a unique origin and has same sequence id as the container
ContainerReplica unhealthy =
ReplicationTestUtil.createContainerReplica(containerID, 0,
dn1.getPersistedOpState(), State.UNHEALTHY,
container.getNumberOfKeys(), container.getUsedBytes(), dn1,
dn1.getUuid(), container.getSequenceId());
replicas.add(unhealthy);
nodeManager.setContainers(dn1, ImmutableSet.of(containerID));

Mockito.when(repManager.getContainerReplicaCount(Mockito.eq(containerID)))
.thenReturn(new RatisContainerReplicaCount(container, replicas,
Collections.emptyList(), 2, false));
DatanodeAdminMonitorTestUtil.mockCheckContainerState(repManager, true);

// start monitoring dn1
monitor.startMonitoring(dn1);
monitor.run();
assertEquals(1, monitor.getTrackedNodeCount());
assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONING,
nodeManager.getNodeStatus(dn1).getOperationalState());

// Running the monitor again causes it to remain DECOMMISSIONING
// as nothing has changed.
monitor.run();
assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONING,
nodeManager.getNodeStatus(dn1).getOperationalState());

// add a copy of the UNHEALTHY replica on a new node, dn1 should get
// decommissioned now
ContainerReplica copyOfUnhealthyOnNewNode = unhealthy.toBuilder()
.setDatanodeDetails(MockDatanodeDetails.randomDatanodeDetails())
.build();
replicas.add(copyOfUnhealthyOnNewNode);
Mockito.when(repManager.getContainerReplicaCount(Mockito.eq(containerID)))
.thenReturn(new RatisContainerReplicaCount(container, replicas,
Collections.emptyList(), 2, false));
DatanodeAdminMonitorTestUtil.mockCheckContainerState(repManager, false);
monitor.run();
assertEquals(0, monitor.getTrackedNodeCount());
assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONED,
nodeManager.getNodeStatus(dn1).getOperationalState());
}

/**
* Consider a QUASI_CLOSED container with only UNHEALTHY replicas. If one
* of its nodes is decommissioned, the decommissioning should succeed.
Expand Down