Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,20 @@ public boolean isPlacementStatusActuallyEqualAfterRemove(
return isPlacementStatusActuallyEqual(currentCPS, newCPS);
}

/**
* Allow the placement policy to indicate which replicas can be removed for
* an over replicated container, so that the placement policy is not violated
* by removing them.
* @param replicas
* @param expectedCountPerUniqueReplica
* @return
*/
protected Set<ContainerReplica> selectReplicasToRemove(
Set<ContainerReplica> replicas, int expectedCountPerUniqueReplica) {
return placementPolicy.replicasToRemoveToFixOverreplication(
replicas, expectedCountPerUniqueReplica);
}

/**
* Given a set of ContainerReplica, transform it to a list of DatanodeDetails
* and then check if the list meets the container placement policy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,24 @@
package org.apache.hadoop.hdds.scm.container.replication;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;

Expand Down Expand Up @@ -74,12 +74,34 @@ public Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
ContainerHealthResult result, int remainingMaintenanceRedundancy) {
ContainerInfo container = result.getContainerInfo();

// We are going to check for over replication, so we should filter out any
// replicas that are not in a HEALTHY state. This is because a replica can
// be healthy, stale or dead. If it is dead is will be quickly removed from
// scm. If it is state, there is a good chance the DN is offline and the
// replica will go away soon. So, if we have a container that is over
// replicated with a HEALTHY and STALE replica, and we decide to delete the
// HEALTHY one, and then the STALE ones goes away, we will lose them both.
// To avoid this, we will filter out any non-healthy replicas first.
// EcContainerReplicaCount will ignore nodes which are not IN_SERVICE for
// over replication checks, but we need to filter these out later in this
// method anyway, so it makes sense to filter them here too, to avoid a
// second lookup of the NodeStatus
Set<ContainerReplica> healthyReplicas = replicas.stream()
.filter(r -> {
NodeStatus ns = ReplicationManager.getNodeStatus(
r.getDatanodeDetails(), nodeManager);
return ns.isHealthy() && ns.getOperationalState() ==
HddsProtos.NodeOperationalState.IN_SERVICE;
})
.collect(Collectors.toSet());

final ECContainerReplicaCount replicaCount =
new ECContainerReplicaCount(container, replicas, pendingOps,
new ECContainerReplicaCount(container, healthyReplicas, pendingOps,
remainingMaintenanceRedundancy);
if (!replicaCount.isOverReplicated()) {
LOG.info("The container {} state changed and it's not in over"
+ " replication any more", container.getContainerID());
LOG.info("The container {} state changed and it is no longer over"
+ " replication. Replica count: {}, healthy replica count: {}",
container.getContainerID(), replicas.size(), healthyReplicas.size());
return emptyMap();
}

Expand All @@ -93,10 +115,9 @@ public Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
replicaCount.overReplicatedIndexes(true);
//sanity check
if (overReplicatedIndexes.size() == 0) {
LOG.warn("The container {} with replicas {} is found over replicated " +
"by ContainerHealthCheck, but found not over replicated by " +
"ECContainerReplicaCount",
container.getContainerID(), replicas);
LOG.warn("The container {} with replicas {} was found over replicated "
+ "by EcContainerReplicaCount, but there are no over replicated "
+ "indexes returned", container.getContainerID(), replicas);
return emptyMap();
}

Expand All @@ -106,48 +127,51 @@ public Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
deletionInFlight.add(op.getTarget());
}
}
Map<Integer, List<ContainerReplica>> index2replicas = new HashMap<>();
replicas.stream()
.filter(r -> overReplicatedIndexes.contains(r.getReplicaIndex()))
.filter(r -> r
.getState() == StorageContainerDatanodeProtocolProtos
.ContainerReplicaProto.State.CLOSED)
.filter(r -> ReplicationManager
.getNodeStatus(r.getDatanodeDetails(), nodeManager).isHealthy())

Set<ContainerReplica> candidates = healthyReplicas.stream()
.filter(r -> !deletionInFlight.contains(r.getDatanodeDetails()))
.forEach(r -> {
int index = r.getReplicaIndex();
index2replicas.computeIfAbsent(index, k -> new LinkedList<>());
index2replicas.get(index).add(r);
});

if (index2replicas.size() > 0) {
final Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
final int replicationFactor =
container.getReplicationConfig().getRequiredNodes();
index2replicas.values().forEach(l -> {
Iterator<ContainerReplica> it = l.iterator();
Set<ContainerReplica> tempReplicaSet = new HashSet<>(replicas);
while (it.hasNext() && l.size() > 1) {
ContainerReplica r = it.next();
if (isPlacementStatusActuallyEqualAfterRemove(
tempReplicaSet, r, replicationFactor)) {
DeleteContainerCommand deleteCommand =
new DeleteContainerCommand(container.getContainerID(), true);
deleteCommand.setReplicaIndex(r.getReplicaIndex());
commands.put(r.getDatanodeDetails(), deleteCommand);
it.remove();
tempReplicaSet.remove(r);
}
}
});
if (commands.size() == 0) {
LOG.info("With the current state of avilable replicas {}, no" +
" commands to process due to over replication.", replicas);
.filter(r -> r.getState() == StorageContainerDatanodeProtocolProtos
.ContainerReplicaProto.State.CLOSED)
.collect(Collectors.toSet());

Set<ContainerReplica> replicasToRemove =
selectReplicasToRemove(candidates, 1);

if (replicasToRemove.size() == 0) {
LOG.warn("The container {} is over replicated, but no replicas were "
+ "selected to remove by the placement policy. Replicas: {}",
container, replicas);
return emptyMap();
}

final Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
// As a sanity check, sum up the current counts of each replica index. When
// processing replicasToRemove, ensure that removing the replica would not
// drop the count of that index to zero.
Map<Integer, Integer> replicaIndexCounts = new HashMap<>();
for (ContainerReplica r : candidates) {
replicaIndexCounts.put(r.getReplicaIndex(),
replicaIndexCounts.getOrDefault(r.getReplicaIndex(), 0) + 1);
}
for (ContainerReplica r : replicasToRemove) {
int currentCount = replicaIndexCounts.getOrDefault(
r.getReplicaIndex(), 0);
if (currentCount < 2) {
LOG.warn("The replica {} selected to remove would reduce the count " +
"for that index to zero. Candidate Replicas: {}", r, candidates);
continue;
}
return commands;
replicaIndexCounts.put(r.getReplicaIndex(), currentCount - 1);
DeleteContainerCommand deleteCommand =
new DeleteContainerCommand(container.getContainerID(), true);
deleteCommand.setReplicaIndex(r.getReplicaIndex());
commands.put(r.getDatanodeDetails(), deleteCommand);
}

return emptyMap();
if (commands.size() == 0) {
LOG.warn("With the current state of available replicas {}, no" +
" commands were created to remove excess replicas.", replicas);
}
return commands;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.Node;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;

Expand Down Expand Up @@ -198,6 +199,9 @@ public static Set<ContainerReplica> createReplicas(

public static PlacementPolicy getSimpleTestPlacementPolicy(
final NodeManager nodeManager, final OzoneConfiguration conf) {

final Node rackNode = MockDatanodeDetails.randomDatanodeDetails();

return new SCMCommonPlacementPolicy(nodeManager, conf) {
@Override
protected List<DatanodeDetails> chooseDatanodesInternal(
Expand All @@ -216,6 +220,12 @@ protected List<DatanodeDetails> chooseDatanodesInternal(
public DatanodeDetails chooseNode(List<DatanodeDetails> healthyNodes) {
return null;
}

@Override
protected Node getPlacementGroup(DatanodeDetails dn) {
// Make it look like a single rack cluster
return rackNode;
}
};
}

Expand Down
Loading