Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,15 @@ public static class UnderReplicatedHealthResult
public UnderReplicatedHealthResult(ContainerInfo containerInfo,
int remainingRedundancy, boolean dueToDecommission,
boolean replicatedOkWithPending, boolean unrecoverable) {
super(containerInfo, HealthState.UNDER_REPLICATED);
this(containerInfo, remainingRedundancy, dueToDecommission,
replicatedOkWithPending, unrecoverable, HealthState.UNDER_REPLICATED);
}

protected UnderReplicatedHealthResult(ContainerInfo containerInfo,
int remainingRedundancy, boolean dueToDecommission,
boolean replicatedOkWithPending, boolean unrecoverable,
HealthState healthState) {
super(containerInfo, healthState);
this.remainingRedundancy = remainingRedundancy;
this.dueToDecommission = dueToDecommission;
this.sufficientlyReplicatedAfterPending = replicatedOkWithPending;
Expand Down Expand Up @@ -148,7 +156,7 @@ public int getWeightedRedundancy() {
if (dueToDecommission) {
result += DECOMMISSION_REDUNDANCY;
} else {
result += remainingRedundancy;
result += getRemainingRedundancy();
}
return result;
}
Expand Down Expand Up @@ -207,19 +215,28 @@ public boolean isUnrecoverable() {
* containers are not spread across enough racks.
*/
public static class MisReplicatedHealthResult
extends ContainerHealthResult {
extends UnderReplicatedHealthResult {

private final boolean replicatedOkAfterPending;
/**
* In UnderReplicatedHealthState, DECOMMISSION_REDUNDANCY is defined as
* 5 so that containers which are really under replicated get fixed as a
* priority over decommissioning hosts. We have defined that a container
* can only be mis replicated if it is not over or under replicated. Fixing
* mis replication is arguably less important than competing a decommission.
* So as a lot of mis replicated container do not block decommission, we
* set the redundancy of mis replicated containers to 6 so they sort after
* under / over replicated and decommissioning replicas in the under
* replication queue.
*/
private static final int MIS_REP_REDUNDANCY = 6;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of constant value, can we have some priority based on the misreplication count

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really convinced that mis-rep count really means much and it is worth ordering them further, as the container already has enough replicas. We want these to start with a lower priority than anything else (under rep, decommission), which setting it to 6 does. After that, we can only lower the priority further. If we have a mis rep count of 1 or 2 for a 3-2 and a 6-3 container, which is higher priority? I'm not sure about a good way to do this.


public MisReplicatedHealthResult(ContainerInfo containerInfo,
boolean replicatedOkAfterPending) {
super(containerInfo, HealthState.MIS_REPLICATED);
this.replicatedOkAfterPending = replicatedOkAfterPending;
super(containerInfo, MIS_REP_REDUNDANCY, false,
replicatedOkAfterPending, false,
HealthState.MIS_REPLICATED);
}

public boolean isReplicatedOkAfterPending() {
return replicatedOkAfterPending;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public class ReplicationManager implements SCMService {
private ReplicationQueue replicationQueue;
private final ECUnderReplicationHandler ecUnderReplicationHandler;
private final ECOverReplicationHandler ecOverReplicationHandler;
private final ECMisReplicationHandler ecMisReplicationHandler;
private final RatisUnderReplicationHandler ratisUnderReplicationHandler;
private final RatisOverReplicationHandler ratisOverReplicationHandler;
private final int maintenanceRedundancy;
Expand Down Expand Up @@ -223,6 +224,8 @@ public ReplicationManager(final ConfigurationSource conf,
ecContainerPlacement, conf, nodeManager, this);
ecOverReplicationHandler =
new ECOverReplicationHandler(ecContainerPlacement, nodeManager);
ecMisReplicationHandler = new ECMisReplicationHandler(ecContainerPlacement,
conf, nodeManager);
ratisUnderReplicationHandler = new RatisUnderReplicationHandler(
ratisContainerPlacement, conf, nodeManager);
ratisOverReplicationHandler =
Expand Down Expand Up @@ -525,8 +528,18 @@ public Map<DatanodeDetails, SCMCommand<?>> processUnderReplicatedContainer(
List<ContainerReplicaOp> pendingOps =
containerReplicaPendingOps.getPendingOps(containerID);
if (result.getContainerInfo().getReplicationType() == EC) {
return ecUnderReplicationHandler.processAndCreateCommands(replicas,
pendingOps, result, maintenanceRedundancy);
if (result.getHealthState()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create a separate method to processMisreplicatedContainer & have this if case in UnderReplicatedProcessor class?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: probably making it into a switch case would be a good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think we should have a single processUnhealthyContainer method, and in it, have a switch / IF that caters for EC / Ratis and then Over / Under / Mis replicated. But we should also add some tests for that, to ensure the correct handler is called in the correct circumstances. However the way RM is currently we would need to change it to inject the handlers, so I think we should clean this up in another refactor jira.

== ContainerHealthResult.HealthState.UNDER_REPLICATED) {
return ecUnderReplicationHandler.processAndCreateCommands(replicas,
pendingOps, result, maintenanceRedundancy);
} else if (result.getHealthState()
== ContainerHealthResult.HealthState.MIS_REPLICATED) {
return ecMisReplicationHandler.processAndCreateCommands(replicas,
pendingOps, result, maintenanceRedundancy);
} else {
throw new IllegalArgumentException("Unexpected health state: "
+ result.getHealthState());
}
}
return ratisUnderReplicationHandler.processAndCreateCommands(replicas,
pendingOps, result, ratisMaintenanceMinReplicas);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ public class ReplicationQueue {
underRepQueue;
private final Queue<ContainerHealthResult.OverReplicatedHealthResult>
overRepQueue;
private final Queue<ContainerHealthResult.MisReplicatedHealthResult>
misRepQueue;

public ReplicationQueue() {
underRepQueue = new PriorityQueue<>(
Expand All @@ -42,7 +40,6 @@ public ReplicationQueue() {
.thenComparing(ContainerHealthResult
.UnderReplicatedHealthResult::getRequeueCount));
overRepQueue = new LinkedList<>();
misRepQueue = new LinkedList<>();
}

public void enqueue(ContainerHealthResult.UnderReplicatedHealthResult
Expand All @@ -55,11 +52,6 @@ public void enqueue(ContainerHealthResult.OverReplicatedHealthResult
overRepQueue.add(overReplicatedHealthResult);
}

public void enqueue(ContainerHealthResult.MisReplicatedHealthResult
misReplicatedHealthResult) {
misRepQueue.add(misReplicatedHealthResult);
}

public ContainerHealthResult.UnderReplicatedHealthResult
dequeueUnderReplicatedContainer() {
return underRepQueue.poll();
Expand All @@ -70,11 +62,6 @@ public void enqueue(ContainerHealthResult.MisReplicatedHealthResult
return overRepQueue.poll();
}

public ContainerHealthResult.MisReplicatedHealthResult
dequeueMisReplicatedContainer() {
return misRepQueue.poll();
}

public int underReplicatedQueueSize() {
return underRepQueue.size();
}
Expand All @@ -83,8 +70,4 @@ public int overReplicatedQueueSize() {
return overRepQueue.size();
}

public int misReplicatedQueueSize() {
return misRepQueue.size();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,13 @@ public void testOverReplicatedFixByPending()

@Test
public void testUnderReplicationQueuePopulated() {
// Make it always return mis-replicated. Only a perfectly replicated
// container should make it the mis-replicated state as under / over
// replicated take precedence.
Mockito.when(ecPlacementPolicy.validateContainerPlacement(
anyList(), anyInt()))
.thenReturn(new ContainerPlacementStatusDefault(1, 2, 3));

ContainerInfo decomContainer = createContainerInfo(repConfig, 1,
HddsProtos.LifeCycleState.CLOSED);
addReplicas(decomContainer, ContainerReplicaProto.State.CLOSED,
Expand All @@ -404,6 +411,10 @@ public void testUnderReplicationQueuePopulated() {
HddsProtos.LifeCycleState.CLOSED);
addReplicas(underRep0, ContainerReplicaProto.State.CLOSED, 1, 2, 3);

ContainerInfo misRep = createContainerInfo(repConfig, 4,
HddsProtos.LifeCycleState.CLOSED);
addReplicas(misRep, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4, 5);

enableProcessAll();
replicationManager.processAll();

Expand Down Expand Up @@ -438,6 +449,10 @@ public void testUnderReplicationQueuePopulated() {
res = replicationManager.dequeueUnderReplicatedContainer();
Assert.assertEquals(underRep0, res.getContainerInfo());

// Next is the mis-rep container, which has a remaining redundancy of 6.
res = replicationManager.dequeueUnderReplicatedContainer();
Assert.assertEquals(misRep, res.getContainerInfo());

res = replicationManager.dequeueUnderReplicatedContainer();
Assert.assertNull(res);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,9 +481,8 @@ public void testMisReplicatedContainer() {
Assert.assertEquals(HealthState.MIS_REPLICATED, result.getHealthState());

Assert.assertTrue(healthCheck.handle(request));
Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
Assert.assertEquals(1, repQueue.misReplicatedQueueSize());
Assert.assertEquals(0, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
Assert.assertEquals(0, report.getStat(
Expand Down Expand Up @@ -531,7 +530,6 @@ public void testMisReplicatedContainerFixedByPending() {
Assert.assertTrue(healthCheck.handle(request));
Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
Assert.assertEquals(0, repQueue.misReplicatedQueueSize());
Assert.assertEquals(0, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
Assert.assertEquals(0, report.getStat(
Expand Down Expand Up @@ -567,7 +565,6 @@ public void testUnderAndMisReplicatedContainer() {
Assert.assertTrue(healthCheck.handle(request));
Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
Assert.assertEquals(0, repQueue.misReplicatedQueueSize());
Assert.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
Assert.assertEquals(0, report.getStat(
Expand Down Expand Up @@ -604,7 +601,6 @@ public void testOverAndMisReplicatedContainer() {
Assert.assertTrue(healthCheck.handle(request));
Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
Assert.assertEquals(1, repQueue.overReplicatedQueueSize());
Assert.assertEquals(0, repQueue.misReplicatedQueueSize());
Assert.assertEquals(0, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
Assert.assertEquals(1, report.getStat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,6 @@ public void testUnderReplicatedWithMisReplication() {
Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
Assert.assertEquals(0, repQueue.misReplicatedQueueSize());
Assert.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
Assert.assertEquals(0, report.getStat(
Expand Down Expand Up @@ -468,7 +467,6 @@ public void testUnderReplicatedWithMisReplicationFixedByPending() {
Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
Assert.assertEquals(0, repQueue.misReplicatedQueueSize());
Assert.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
Assert.assertEquals(0, report.getStat(
Expand All @@ -494,9 +492,8 @@ public void testMisReplicated() {
Assert.assertFalse(result.isReplicatedOkAfterPending());

Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
Assert.assertEquals(1, repQueue.misReplicatedQueueSize());
Assert.assertEquals(0, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
Assert.assertEquals(1, report.getStat(
Expand Down