Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class ECContainerReplicaCount implements ContainerReplicaCount {
private final Map<Integer, Integer> healthyIndexes = new HashMap<>();
private final Map<Integer, Integer> decommissionIndexes = new HashMap<>();
private final Map<Integer, Integer> maintenanceIndexes = new HashMap<>();
private final Set<DatanodeDetails> unhealthyReplicaDNs;
private final List<ContainerReplica> replicas;

public ECContainerReplicaCount(ContainerInfo containerInfo,
Expand All @@ -97,25 +98,15 @@ public ECContainerReplicaCount(ContainerInfo containerInfo,
this.remainingMaintenanceRedundancy
= Math.min(repConfig.getParity(), remainingMaintenanceRedundancy);

Set<DatanodeDetails> unhealthyReplicaDNs = new HashSet<>();
unhealthyReplicaDNs = new HashSet<>();
for (ContainerReplica r : replicas) {
if (r.getState() == ContainerReplicaProto.State.UNHEALTHY) {
unhealthyReplicaDNs.add(r.getDatanodeDetails());
}
}

for (ContainerReplicaOp op : replicaPendingOps) {
if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
pendingAdd.add(op.getReplicaIndex());
} else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
if (!unhealthyReplicaDNs.contains(op.getTarget())) {
// We ignore unhealthy replicas later in this method, so we also
// need to ignore pending deletes on those unhealthy replicas,
// otherwise the pending delete will decrement the healthy count and
// make the container appear under-replicated when it is not.
pendingDelete.add(op.getReplicaIndex());
}
}
processPendingOp(op);
}

for (ContainerReplica replica : replicas) {
Expand Down Expand Up @@ -157,20 +148,7 @@ public ECContainerReplicaCount(ContainerInfo containerInfo,
// will eventually be removed and reduce the count for this replica. If the
// count goes to zero, remove it from the map.
for (Integer i : pendingDelete) {
ensureIndexWithinBounds(i, "pendingDelete");
Integer count = healthyIndexes.get(i);
if (count != null) {
count = count - 1;
if (count < 1) {
healthyIndexes.remove(i);
} else {
healthyIndexes.put(i, count);
}
}
}
// Ensure any pending adds are within bounds
for (Integer i : pendingAdd) {
ensureIndexWithinBounds(i, "pendingAdd");
adjustHealthyCountWithPendingDelete(i);
}
}

Expand All @@ -194,6 +172,58 @@ public int getMaintenanceCount() {
return maintenanceIndexes.size();
}

/**
* Given a ContainerReplicaOp, check its index is within the expected
* bounds and then add it to the relevant list.
* @param op
*/
private void processPendingOp(ContainerReplicaOp op) {
ensureIndexWithinBounds(op.getReplicaIndex(), "pending" + op.getOpType());
if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
pendingAdd.add(op.getReplicaIndex());
} else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
if (!unhealthyReplicaDNs.contains(op.getTarget())) {
// We ignore unhealthy replicas later in this method, so we also
// need to ignore pending deletes on those unhealthy replicas,
// otherwise the pending delete will decrement the healthy count and
// make the container appear under-replicated when it is not.
pendingDelete.add(op.getReplicaIndex());
}
}
}

/**
* Remove the pending delete replicas from the healthy set as we assume they
* will eventually be removed and reduce the count for this replica. If the
* count goes to zero, remove it from the map.
* @param index The replica Index which is pending delete.
*/
private void adjustHealthyCountWithPendingDelete(int index) {
Integer count = healthyIndexes.get(index);
if (count != null) {
count = count - 1;
if (count < 1) {
healthyIndexes.remove(index);
} else {
healthyIndexes.put(index, count);
}
}
}

/**
* Add a pending op to the object. This allows the same
* ECContainerReplicaCount object to be used for multiple processing stages
* where commands are created at each state. The addition of a new pending
* op could influence what further replicas are needed in subsequent stages.
* @param op The ContainerReplicaOp to add.
*/
public void addPendingOp(ContainerReplicaOp op) {
processPendingOp(op);
if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
adjustHealthyCountWithPendingDelete(op.getReplicaIndex());
}
}

/**
* Get a set containing all decommissioning indexes, or an empty set if none
* are decommissioning. Note it is possible for an index to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,12 @@ private int processMissingIndexes(
repConfig);
replicationManager.sendDatanodeCommand(reconstructionCommand,
container, selectedDatanodes.get(0));
// For each index that is going to be reconstructed with this command,
// adjust the replica count to reflect the pending operation.
for (int i = 0; i < missingIndexes.size(); i++) {
adjustPendingOps(
replicaCount, selectedDatanodes.get(i), missingIndexes.get(i));
}
commandsSent++;
}
} else {
Expand Down Expand Up @@ -367,7 +373,8 @@ private int processDecommissioningIndexes(
selectedDatanodes, excludedNodes, decomIndexes);
break;
}
createReplicateCommand(container, iterator, sourceReplica);
createReplicateCommand(
container, iterator, sourceReplica, replicaCount);
commandsSent++;
}
}
Expand Down Expand Up @@ -429,7 +436,7 @@ private int processMaintenanceOnlyIndexes(
targets, excludedNodes, maintIndexes);
break;
}
createReplicateCommand(container, iterator, sourceReplica);
createReplicateCommand(container, iterator, sourceReplica, replicaCount);
commandsSent++;
additionalMaintenanceCopiesNeeded -= 1;
}
Expand All @@ -438,7 +445,7 @@ private int processMaintenanceOnlyIndexes(

private void createReplicateCommand(
ContainerInfo container, Iterator<DatanodeDetails> iterator,
ContainerReplica replica)
ContainerReplica replica, ECContainerReplicaCount replicaCount)
throws AllSourcesOverloadedException, NotLeaderException {
final boolean push = replicationManager.getConfig().isPush();
DatanodeDetails source = replica.getDatanodeDetails();
Expand All @@ -459,6 +466,14 @@ private void createReplicateCommand(
replicationManager.sendDatanodeCommand(replicateCommand, container,
target);
}
adjustPendingOps(replicaCount, target, replica.getReplicaIndex());
}

private void adjustPendingOps(ECContainerReplicaCount replicaCount,
DatanodeDetails target, int replicaIndex) {
replicaCount.addPendingOp(new ContainerReplicaOp(
ContainerReplicaOp.PendingOpType.ADD, target, replicaIndex,
Long.MAX_VALUE));
}

private static byte[] int2byte(List<Integer> src) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ public void testContainerMissingReplica() {
Assertions.assertEquals(1, rcnt.unavailableIndexes(true).size());
Assertions.assertEquals(5,
rcnt.unavailableIndexes(true).get(0).intValue());

// Add a pending add op for the missing replica and ensure it no longer
// appears missing
ContainerReplicaOp op = new ContainerReplicaOp(
ContainerReplicaOp.PendingOpType.ADD,
MockDatanodeDetails.randomDatanodeDetails(), 5, Long.MAX_VALUE);
rcnt.addPendingOp(op);
Assertions.assertTrue(rcnt.isSufficientlyReplicated(true));
Assertions.assertEquals(0, rcnt.unavailableIndexes(true).size());
}

@Test
Expand Down Expand Up @@ -201,6 +210,13 @@ public void testOverReplicatedContainer() {
Assertions.assertEquals(1, rcnt.overReplicatedIndexes(true).size());
Assertions.assertTrue(rcnt.isOverReplicated(false));
Assertions.assertEquals(2, rcnt.overReplicatedIndexes(false).size());

// Add a pending delete op for the excess replica and ensure it now reports
// as not over replicated.
rcnt.addPendingOp(new ContainerReplicaOp(
ContainerReplicaOp.PendingOpType.DELETE,
MockDatanodeDetails.randomDatanodeDetails(), 2, Long.MAX_VALUE));
Assertions.assertFalse(rcnt.isOverReplicated(true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ public void testUnderReplicationWithDecomIndex1() throws IOException {
}


// Test used to reproduce the issue reported in HDDS-8171
// Test used to reproduce the issue reported in HDDS-8171 and then adjusted
// to ensure only a single command is sent for HDDS-8172.
@Test
public void testUnderReplicationWithDecomIndexAndMaintOnSameIndex()
throws IOException {
Expand Down Expand Up @@ -214,10 +215,13 @@ public NodeStatus getNodeStatus(DatanodeDetails dd) {
Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
Pair.of(IN_SERVICE, 5)));

// Note that maintenanceIndexes is set to zero as we do not expect any
// maintenance commands to be created, as they are solved by the earlier
// decommission command.
Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds =
testUnderReplicationWithMissingIndexes(
Lists.emptyList(), availableReplicas, 1, 2, policy);
Assertions.assertEquals(2, cmds.size());
Lists.emptyList(), availableReplicas, 1, 0, policy);
Assertions.assertEquals(1, cmds.size());
// Check the replicate command has index 1 set
for (Pair<DatanodeDetails, SCMCommand<?>> c : cmds) {
// Ensure neither of the commands are for the dead maintenance node
Expand Down