Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,24 @@ public static class HealthyResult extends ContainerHealthResult {
public static class UnderReplicatedHealthResult
extends ContainerHealthResult {

// For under replicated containers, the best remaining redundancy we can
// have is 3 for EC-10-4, 2 for EC-6-3, 1 for EC-3-2 and 2 for Ratis.
// A container which is under-replicated due to decommission will have one
// more, ie 4, 3, 2, 3 respectively. Ideally we want to sort decommission
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more means weight right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under-replicated due to decommission is not missing any replicas - so its remaining redundancy is still the same as if it was not under-replicated at all.

// only under-replication after all other under-replicated containers.
// It may also make sense to allow under-replicated containers a chance to
// retry once before processing the decommission only under replication.
// Therefore we should adjust the weighted remaining redundancy of
// decommission only under-replicated containers to a floor of 5 so they
// sort after an under-replicated container with 3 remaining replicas (
// EC-10-4) and plus one retry.
private static final int DECOMMISSION_REDUNDANCY = 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically if the idea is to keep the decom elements last priority than underreplication, will that cause decom to take very long time if there are lot of under replication items in that cluster?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, but that is how it should be. Decommission is less important that repairing containers which are at risk of dataloss.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure we should block decom tasks. Decommision tasks (replicate commands are lighter weight compared to reconstruction tasks). If cluster has too manay reconstruction tasks ( may be due to rack down or so), decommission will take very long time may be. I just looked at HDFS, looks we there is no separate queue for decom. Probably let's move ahead with the current plan and revisit based on how this is going with decom in practice. I am wondering there may be complaints on decom taking longer time in practice.


private final int remainingRedundancy;
private final boolean dueToDecommission;
private final boolean sufficientlyReplicatedAfterPending;
private final boolean unrecoverable;
private int requeueCount = 0;

UnderReplicatedHealthResult(ContainerInfo containerInfo,
int remainingRedundancy, boolean dueToDecommission,
Expand All @@ -93,7 +107,7 @@ public static class UnderReplicatedHealthResult
}

/**
* How many more replicas can be lost before the the container is
* How many more replicas can be lost before the container is
* unreadable. For containers which are under-replicated due to decommission
* or maintenance only, the remaining redundancy will include those
* decommissioning or maintenance replicas, as they are technically still
Expand All @@ -104,6 +118,41 @@ public int getRemainingRedundancy() {
return remainingRedundancy;
}

/**
* The weightedRedundancy, is the remaining redundancy + the requeue count.
* When this value is used for ordering in a priority queue it ensures the
* priority is reduced each time it is requeued, to prevent it from blocking
* other containers from being processed.
* Additionally, so that decommission and maintenance replicas are not
* ordered ahead of under-replicated replicas, a redundancy of
* DECOMMISSION_REDUNDANCY is used for the decommission redundancy rather
* than its real redundancy.
* @return The weightedRedundancy of this result.
*/
public int getWeightedRedundancy() {
int result = requeueCount;
if (dueToDecommission) {
result += DECOMMISSION_REDUNDANCY;
} else {
result += remainingRedundancy;
}
return result;
}

/**
* If there is an attempt to process this under-replicated result, and it
* fails and has to be requeued, this method should be called to increment
* the requeue count to ensure the result is not placed back at the head
* of the queue.
*/
public void incrementRequeueCount() {
++requeueCount;
}

public int getRequeueCount() {
return requeueCount;
}

/**
* Indicates whether the under-replication is caused only by replicas
* being decommissioned or entering maintenance. Ie, there are not replicas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -130,6 +133,9 @@ public class ReplicationManager implements SCMService {
private final ContainerReplicaPendingOps containerReplicaPendingOps;
private final ContainerHealthCheck ecContainerHealthCheck;
private final EventPublisher eventPublisher;
private final ReentrantLock lock = new ReentrantLock();
private Queue<ContainerHealthResult.UnderReplicatedHealthResult>
underRepQueue;

/**
* Constructs ReplicationManager instance with the given configuration.
Expand Down Expand Up @@ -166,6 +172,7 @@ public ReplicationManager(final ConfigurationSource conf,
this.legacyReplicationManager = legacyReplicationManager;
this.ecContainerHealthCheck = new ECContainerHealthCheck();
this.nodeManager = nodeManager;
this.underRepQueue = createUnderReplicatedQueue();
start();
}

Expand Down Expand Up @@ -232,8 +239,8 @@ public synchronized void processAll() {
final List<ContainerInfo> containers =
containerManager.getContainers();
ReplicationManagerReport report = new ReplicationManagerReport();
List<ContainerHealthResult.UnderReplicatedHealthResult> underReplicated =
new ArrayList<>();
Queue<ContainerHealthResult.UnderReplicatedHealthResult>
underReplicated = createUnderReplicatedQueue();
List<ContainerHealthResult.OverReplicatedHealthResult> overReplicated =
new ArrayList<>();

Expand All @@ -254,16 +261,61 @@ public synchronized void processAll() {
}
}
report.setComplete();
// TODO - Sort the pending lists by priority and assign to the main queue,
// which is yet to be defined.
lock.lock();
try {
underRepQueue = underReplicated;
} finally {
lock.unlock();
}
this.containerReport = report;
LOG.info("Replication Monitor Thread took {} milliseconds for" +
" processing {} containers.", clock.millis() - start,
containers.size());
}

/**
* Retrieve the new highest priority container to be replicated from the
* under replicated queue.
* @return The new underReplicated container to be processed, or null if the
* queue is empty.
*/
public ContainerHealthResult.UnderReplicatedHealthResult
dequeueUnderReplicatedContainer() {
lock.lock();
try {
return underRepQueue.poll();
} finally {
lock.unlock();
}
}

/**
* Add an under replicated container back to the queue if it was unable to
* be processed. Its retry count will be incremented before it is re-queued,
* reducing its priority.
* Note that the queue could have been rebuilt and replaced after this
* message was removed but before it is added back. This will result in a
* duplicate entry on the queue. However, when it is processed again, the
* result of the processing will end up with pending replicas scheduled. If
* instance 1 is processed and creates the pending replicas, when instance 2
* is processed, it will find the pending containers and know it has no work
* to do, and be discarded. Additionally, the queue will be refreshed
* periodically removing any duplicates.
* @param underReplicatedHealthResult
*/
public void requeueUnderReplicatedContainer(ContainerHealthResult
.UnderReplicatedHealthResult underReplicatedHealthResult) {
underReplicatedHealthResult.incrementRequeueCount();
lock.lock();
try {
underRepQueue.add(underReplicatedHealthResult);
} finally {
lock.unlock();
}
}

protected ContainerHealthResult processContainer(ContainerInfo containerInfo,
List<ContainerHealthResult.UnderReplicatedHealthResult> underRep,
Queue<ContainerHealthResult.UnderReplicatedHealthResult> underRep,
List<ContainerHealthResult.OverReplicatedHealthResult> overRep,
ReplicationManagerReport report) throws ContainerNotFoundException {
Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
Expand Down Expand Up @@ -302,6 +354,21 @@ protected ContainerHealthResult processContainer(ContainerInfo containerInfo,
return health;
}

/**
* Creates a priority queue of UnderReplicatedHealthResult, where the elements
* are ordered by the weighted redundancy of the container. This means that
* containers with the least remaining redundancy are at the front of the
* queue, and will be processed first.
* @return An empty instance of a PriorityQueue.
*/
protected PriorityQueue<ContainerHealthResult.UnderReplicatedHealthResult>
createUnderReplicatedQueue() {
return new PriorityQueue<>(Comparator.comparing(ContainerHealthResult
.UnderReplicatedHealthResult::getWeightedRedundancy)
.thenComparing(ContainerHealthResult
.UnderReplicatedHealthResult::getRequeueCount));
}

public ReplicationManagerReport getContainerReport() {
return containerReport;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.container.replication;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
Expand All @@ -30,6 +31,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.ozone.test.TestClock;
Expand All @@ -43,11 +45,14 @@
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;

import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerInfo;
import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;

Expand All @@ -68,9 +73,10 @@ public class TestReplicationManager {
private ContainerReplicaPendingOps containerReplicaPendingOps;

private Map<ContainerID, Set<ContainerReplica>> containerReplicaMap;
private Set<ContainerInfo> containerInfoSet;
private ReplicationConfig repConfig;
private ReplicationManagerReport repReport;
private List<ContainerHealthResult.UnderReplicatedHealthResult> underRep;
private Queue<ContainerHealthResult.UnderReplicatedHealthResult> underRep;
private List<ContainerHealthResult.OverReplicatedHealthResult> overRep;

@Before
Expand All @@ -94,6 +100,9 @@ public void setup() throws IOException {
return containerReplicaMap.get(cid);
});

Mockito.when(containerManager.getContainers()).thenAnswer(
invocation -> new ArrayList<>(containerInfoSet));

replicationManager = new ReplicationManager(
configuration,
containerManager,
Expand All @@ -105,10 +114,18 @@ public void setup() throws IOException {
legacyReplicationManager,
containerReplicaPendingOps);
containerReplicaMap = new HashMap<>();
containerInfoSet = new HashSet<>();
repConfig = new ECReplicationConfig(3, 2);
repReport = new ReplicationManagerReport();
underRep = new ArrayList<>();
underRep = replicationManager.createUnderReplicatedQueue();
overRep = new ArrayList<>();

// Ensure that RM will run when asked.
Mockito.when(scmContext.isLeaderReady()).thenReturn(true);
Mockito.when(scmContext.isInSafeMode()).thenReturn(false);
SCMServiceManager serviceManager = new SCMServiceManager();
serviceManager.register(replicationManager);
serviceManager.notifyStatusChanged();
}

@Test
Expand Down Expand Up @@ -245,12 +262,78 @@ public void testOverReplicatedFixByPending()
ReplicationManagerReport.HealthState.OVER_REPLICATED));
}

@Test
public void testUnderReplicationQueuePopulated() {
ContainerInfo decomContainer = createContainerInfo(repConfig, 1,
HddsProtos.LifeCycleState.CLOSED);
addReplicas(decomContainer, Pair.of(DECOMMISSIONING, 1),
Pair.of(DECOMMISSIONING, 2), Pair.of(DECOMMISSIONING, 3),
Pair.of(DECOMMISSIONING, 4), Pair.of(DECOMMISSIONING, 5));

ContainerInfo underRep1 = createContainerInfo(repConfig, 2,
HddsProtos.LifeCycleState.CLOSED);
addReplicas(underRep1, 1, 2, 3, 4);
ContainerInfo underRep0 = createContainerInfo(repConfig, 3,
HddsProtos.LifeCycleState.CLOSED);
addReplicas(underRep0, 1, 2, 3);

replicationManager.processAll();

// Get the first message off the queue - it should be underRep0.
ContainerHealthResult.UnderReplicatedHealthResult res
= replicationManager.dequeueUnderReplicatedContainer();
Assert.assertEquals(underRep0, res.getContainerInfo());

// Now requeue it
replicationManager.requeueUnderReplicatedContainer(res);

// Now get the next message. It should be underRep1, as it has remaining
// redundancy 1 + zero retries. UnderRep0 will have remaining redundancy 0
// and 1 retry. They will have the same weighted redundancy so lesser
// retries should come first
res = replicationManager.dequeueUnderReplicatedContainer();
Assert.assertEquals(underRep1, res.getContainerInfo());

// Next message is underRep0. It starts with a weighted redundancy of 0 + 1
// retry. The other message on the queue is a decommission only with a
// weighted redundancy of 5 + 0. So lets dequeue and requeue the message 4
// times. Then the weighted redundancy will be equal and the decommission
// one will be next due to having less retries.
for (int i = 0; i < 4; i++) {
res = replicationManager.dequeueUnderReplicatedContainer();
Assert.assertEquals(underRep0, res.getContainerInfo());
replicationManager.requeueUnderReplicatedContainer(res);
}
res = replicationManager.dequeueUnderReplicatedContainer();
Assert.assertEquals(decomContainer, res.getContainerInfo());

res = replicationManager.dequeueUnderReplicatedContainer();
Assert.assertEquals(underRep0, res.getContainerInfo());

res = replicationManager.dequeueUnderReplicatedContainer();
Assert.assertNull(res);
}

private Set<ContainerReplica> addReplicas(ContainerInfo container,
Pair<HddsProtos.NodeOperationalState, Integer>... nodes) {
final Set<ContainerReplica> replicas =
createReplicas(container.containerID(), nodes);
storeContainerAndReplicas(container, replicas);
return replicas;
}

private Set<ContainerReplica> addReplicas(ContainerInfo container,
int... indexes) {
final Set<ContainerReplica> replicas =
createReplicas(container.containerID(), indexes);
containerReplicaMap.put(container.containerID(), replicas);
storeContainerAndReplicas(container, replicas);
return replicas;
}

private void storeContainerAndReplicas(ContainerInfo container,
Set<ContainerReplica> replicas) {
containerReplicaMap.put(container.containerID(), replicas);
containerInfoSet.add(container);
}

}