Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.container.replication;

import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;

/**
* Class to correct over replicated QuasiClosed Stuck Ratis containers.
*/
public class QuasiClosedStuckOverReplicationHandler implements UnhealthyReplicationHandler {

private static final org.slf4j.Logger LOG =
org.slf4j.LoggerFactory.getLogger(QuasiClosedStuckOverReplicationHandler.class);
private final ReplicationManager replicationManager;
private final ReplicationManagerMetrics metrics;

public QuasiClosedStuckOverReplicationHandler(final ReplicationManager replicationManager) {
this.replicationManager = replicationManager;
this.metrics = replicationManager.getMetrics();
}

@Override
public int processAndSendCommands(Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
ContainerHealthResult result, int remainingMaintenanceRedundancy)
throws IOException {

ContainerInfo containerInfo = result.getContainerInfo();
LOG.debug("Handling over replicated QuasiClosed Stuck Ratis container {}", containerInfo);

int pendingDelete = 0;
for (ContainerReplicaOp op : pendingOps) {
if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
pendingDelete++;
}
}

if (pendingDelete > 0) {
LOG.debug("Container {} has pending delete operations. No more over replication will be scheduled until they " +
"complete", containerInfo);
return 0;
}

// Filter out any STALE replicas, as they may go dead soon. If so, we don't want to remove other healthy replicas
// instead of them, as they could result in under replication.
Set<ContainerReplica> healthyReplicas = replicas.stream()
.filter(replica -> {
try {
return replicationManager.getNodeStatus(
replica.getDatanodeDetails()).getHealth() == HddsProtos.NodeState.HEALTHY;
} catch (NodeNotFoundException e) {
return false;
}
})
.collect(Collectors.toSet());

QuasiClosedStuckReplicaCount replicaCount =
new QuasiClosedStuckReplicaCount(healthyReplicas, remainingMaintenanceRedundancy);

List<QuasiClosedStuckReplicaCount.MisReplicatedOrigin> misReplicatedOrigins
= replicaCount.getOverReplicatedOrigins();

if (misReplicatedOrigins.isEmpty()) {
LOG.debug("Container {} is not over replicated", containerInfo);
return 0;
}

int totalCommandsSent = 0;
IOException firstException = null;
for (QuasiClosedStuckReplicaCount.MisReplicatedOrigin origin : misReplicatedOrigins) {
List<ContainerReplica> sortedReplicas = getSortedReplicas(origin.getSources());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove any replicas that are on stale Datanodes first and then check if the container is still over replicated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I have made that change too.

for (int i = 0; i < origin.getReplicaDelta(); i++) {
try {
replicationManager.sendThrottledDeleteCommand(
containerInfo, 0, sortedReplicas.get(i).getDatanodeDetails(), true);
totalCommandsSent++;
} catch (CommandTargetOverloadedException e) {
LOG.debug("Unable to send delete command for container {} to {} as it has too many pending delete commands",
containerInfo, sortedReplicas.get(i).getDatanodeDetails());
firstException = e;
}
}
}

if (firstException != null) {
// Some nodes were overloaded when attempting to send commands.
if (totalCommandsSent > 0) {
metrics.incrPartialReplicationTotal();
}
throw firstException;
}
return totalCommandsSent;
}

private List<ContainerReplica> getSortedReplicas(
Set<ContainerReplica> replicas) {
// sort replicas so that they can be selected in a deterministic way
return replicas.stream()
.sorted(Comparator.comparingLong(ContainerReplica::hashCode))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public class ReplicationManager implements SCMService, ContainerReplicaPendingOp
private final RatisOverReplicationHandler ratisOverReplicationHandler;
private final RatisMisReplicationHandler ratisMisReplicationHandler;
private final QuasiClosedStuckUnderReplicationHandler quasiClosedStuckUnderReplicationHandler;
private final QuasiClosedStuckOverReplicationHandler quasiClosedStuckOverReplicationHandler;
private Thread underReplicatedProcessorThread;
private Thread overReplicatedProcessorThread;
private final UnderReplicatedProcessor underReplicatedProcessor;
Expand Down Expand Up @@ -252,6 +253,7 @@ public ReplicationManager(final ConfigurationSource conf,
ratisContainerPlacement, conf, this);
quasiClosedStuckUnderReplicationHandler =
new QuasiClosedStuckUnderReplicationHandler(ratisContainerPlacement, conf, this);
quasiClosedStuckOverReplicationHandler = new QuasiClosedStuckOverReplicationHandler(this);
underReplicatedProcessor =
new UnderReplicatedProcessor(this, rmConf::getUnderReplicatedInterval);
overReplicatedProcessor =
Expand Down Expand Up @@ -781,8 +783,16 @@ int processOverReplicatedContainer(
containerReplicaPendingOps.getPendingOps(containerID);

final boolean isEC = isEC(result.getContainerInfo().getReplicationConfig());
final UnhealthyReplicationHandler handler = isEC ? ecOverReplicationHandler
: ratisOverReplicationHandler;
UnhealthyReplicationHandler handler;
if (isEC) {
handler = ecOverReplicationHandler;
} else {
if (QuasiClosedStuckReplicationCheck.shouldHandleAsQuasiClosedStuck(result.getContainerInfo(), replicas)) {
handler = quasiClosedStuckOverReplicationHandler;
} else {
handler = ratisOverReplicationHandler;
}
}

return handler.processAndSendCommands(replicas,
pendingOps, result, getRemainingMaintenanceRedundancy(isEC));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,9 +504,29 @@ public static void mockRMSendDeleteCommand(ReplicationManager mock,
* @param commandsSent Set to add the command to rather than sending it.
*/
public static void mockRMSendThrottledDeleteCommand(ReplicationManager mock,
Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent)
Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent)
throws NotLeaderException, CommandTargetOverloadedException {
mockRMSendThrottledDeleteCommand(mock, commandsSent, new AtomicBoolean(false));
}

/**
* Given a Mockito mock of ReplicationManager, this method will mock the
* sendThrottledDeleteCommand method so that it adds the command created to
* the commandsSent set.
* @param mock Mock of ReplicationManager
* @param commandsSent Set to add the command to rather than sending it.
* @param throwOverloaded If the atomic boolean is true, throw a
* CommandTargetOverloadedException and set the boolean
* to false, instead of creating the replicate command.
*/
public static void mockRMSendThrottledDeleteCommand(ReplicationManager mock,
Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent, AtomicBoolean throwOverloaded)
throws NotLeaderException, CommandTargetOverloadedException {
doAnswer((Answer<Void>) invocationOnMock -> {
if (throwOverloaded.get()) {
throwOverloaded.set(false);
throw new CommandTargetOverloadedException("Overloaded");
}
ContainerInfo containerInfo = invocationOnMock.getArgument(0);
int replicaIndex = invocationOnMock.getArgument(1);
DatanodeDetails target = invocationOnMock.getArgument(2);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.container.replication;

import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/**
* Test for QuasiClosedStuckOverReplicationHandler.
*/
public class TestQuasiClosedStuckOverReplicationHandler {

private static final RatisReplicationConfig RATIS_REPLICATION_CONFIG = RatisReplicationConfig.getInstance(THREE);
private ContainerInfo container;
private ReplicationManager replicationManager;
private ReplicationManagerMetrics metrics;
private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
private QuasiClosedStuckOverReplicationHandler handler;
private UUID origin1 = UUID.randomUUID();
private UUID origin2 = UUID.randomUUID();

@BeforeEach
void setup() throws NodeNotFoundException,
CommandTargetOverloadedException, NotLeaderException {
container = ReplicationTestUtil.createContainer(
HddsProtos.LifeCycleState.QUASI_CLOSED, RATIS_REPLICATION_CONFIG);

replicationManager = mock(ReplicationManager.class);
OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
ozoneConfiguration.setBoolean("hdds.scm.replication.push", true);
when(replicationManager.getConfig())
.thenReturn(ozoneConfiguration.getObject(
ReplicationManager.ReplicationManagerConfiguration.class));
metrics = ReplicationManagerMetrics.create(replicationManager);
when(replicationManager.getMetrics()).thenReturn(metrics);

/*
Return NodeStatus with NodeOperationalState as specified in
DatanodeDetails, and NodeState as HEALTHY.
*/
when(
replicationManager.getNodeStatus(any(DatanodeDetails.class)))
.thenAnswer(invocationOnMock -> {
DatanodeDetails dn = invocationOnMock.getArgument(0);
return new NodeStatus(dn.getPersistedOpState(),
HddsProtos.NodeState.HEALTHY);
});

commandsSent = new HashSet<>();
ReplicationTestUtil.mockRMSendThrottledDeleteCommand(
replicationManager, commandsSent);
handler = new QuasiClosedStuckOverReplicationHandler(replicationManager);
}

@Test
public void testReturnsZeroIfNotOverReplicated() throws IOException {
Set<ContainerReplica> replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState(container.containerID(),
StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED,
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE),
Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE));

int count = handler.processAndSendCommands(replicas, Collections.emptyList(), getOverReplicatedHealthResult(), 1);
assertEquals(0, count);
}

@Test
public void testNoCommandsScheduledIfPendingOps() throws IOException {
Set<ContainerReplica> replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState(container.containerID(),
StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED,
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE),
Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE),
Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE));
List<ContainerReplicaOp> pendingOps = new ArrayList<>();
pendingOps.add(ContainerReplicaOp.create(
ContainerReplicaOp.PendingOpType.DELETE, MockDatanodeDetails.randomDatanodeDetails(), 0));

int count = handler.processAndSendCommands(replicas, pendingOps, getOverReplicatedHealthResult(), 1);
assertEquals(0, count);
}

@Test
public void testCommandScheduledForOverReplicatedContainer() throws IOException {
Set<ContainerReplica> replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState(container.containerID(),
StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED,
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE),
Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE));

int count = handler.processAndSendCommands(replicas, Collections.emptyList(), getOverReplicatedHealthResult(), 1);
assertEquals(1, count);
SCMCommand<?> command = commandsSent.iterator().next().getRight();
assertEquals(StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand, command.getType());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest also verifying that the delete was sent for one of the origin1 replicas.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have enhanced the scenario tests to perform this sort of validation. I had avoided that before because a command could be sent to dn1 or dn2, and the framework didn't support that. However now you can give "dn1|dn2" in the scenario spec and it will check it goes to one of the listed DNs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea, LGTM.

}

@Test
public void testOverloadedExceptionContinuesAndThrows() throws NotLeaderException, CommandTargetOverloadedException {
Set<ContainerReplica> replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState(container.containerID(),
StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED,
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE),
Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE),
Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE),
Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE));

ReplicationTestUtil.mockRMSendThrottledDeleteCommand(replicationManager, commandsSent, new AtomicBoolean(true));

assertThrows(CommandTargetOverloadedException.class, () ->
handler.processAndSendCommands(replicas, Collections.emptyList(), getOverReplicatedHealthResult(), 1));
assertEquals(1, commandsSent.size());
}


private ContainerHealthResult.OverReplicatedHealthResult getOverReplicatedHealthResult() {
ContainerHealthResult.OverReplicatedHealthResult
healthResult = mock(ContainerHealthResult.OverReplicatedHealthResult.class);
when(healthResult.getContainerInfo()).thenReturn(container);
return healthResult;
}

}
Loading