Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ public static class UnderReplicatedHealthResult
private final int remainingRedundancy;
private final boolean dueToDecommission;
private final boolean sufficientlyReplicatedAfterPending;
private boolean dueToMisReplication = false;
private boolean isMisReplicated = false;
private boolean isMisReplicatedAfterPending = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have another constructor which initializes the following arguments?
public UnderReplicatedHealthResult(ContainerInfo containerInfo,
int remainingRedundancy, boolean dueToDecommission, boolean replicatedOkWithPending, boolean unrecoverable,boolean dueToMisReplication, boolean isMisReplicated, boolean isMisReplicatedAfterPending)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to change the existing constructor, as then we need to change it everywhere it is used. Adding a new constructor starts a bad pattern where each new parameter needs a new constructor, and what we really need is a builder.

At the moment I think these 3 parameters have good defaults for the common case and then using the settings when needed is a good compromise.

private final boolean unrecoverable;
private int requeueCount = 0;

Expand All @@ -119,6 +122,44 @@ public UnderReplicatedHealthResult(ContainerInfo containerInfo,
this.unrecoverable = unrecoverable;
}

/**
* Pass true to indicate the container is mis-replicated - ie it does not
* meet the placement policy.
* @param isMisRep True if the container is mis-replicated, false if not.
* @return this object to allow calls to be chained
*/
public UnderReplicatedHealthResult
setMisReplicated(boolean isMisRep) {
this.isMisReplicated = isMisRep;
return this;
}

/**
* Pass true to indicate the container is mis-replicated after considering
* pending replicas scheduled for create or delete.
* @param isMisRep True if the container is mis-replicated considering
* pending replicas, or false if not.
* @return this object to allow calls to be chained
*/
public UnderReplicatedHealthResult
setMisReplicatedAfterPending(boolean isMisRep) {
this.isMisReplicatedAfterPending = isMisRep;
return this;
}

/**
* If the container is ONLY under replicated due to mis-replication, pass
* true, otherwise pass false.
* @param dueToMisRep Pass true if the container has enough replicas but
* does not meet the placement policy.
* @return
*/
public UnderReplicatedHealthResult
setDueToMisReplication(boolean dueToMisRep) {
this.dueToMisReplication = dueToMisRep;
return this;
}

/**
* How many more replicas can be lost before the container is
* unreadable. For containers which are under-replicated due to decommission
Expand Down Expand Up @@ -187,6 +228,34 @@ public boolean isSufficientlyReplicatedAfterPending() {
return sufficientlyReplicatedAfterPending;
}

/**
* Returns true if the container is mis-replicated, ignoring any pending
* replicas scheduled to be created.
* @return True if mis-replicated, ignoring pending
*/
public boolean isMisReplicated() {
return isMisReplicated;
}

/**
* Returns true if the container is mis-replicated after taking account of
* pending replicas, which are schedule to be created.
* @return true is mis-replicated after pending.
*/
public boolean isMisReplicatedAfterPending() {
return isMisReplicatedAfterPending;
}

/**
* Returns true if the under replication is only due to mis-replication.
* In other words, the container has enough replicas, but they do not meet
* the placement policy.
* @return true if the under-replication is only due to mis-replication
*/
public boolean isDueToMisReplication() {
return dueToMisReplication;
}

/**
* Indicates whether a container has enough replicas to be read. For Ratis
* at least one replia must be available. For EC, at least dataNum replicas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.RatisContainerReplicaCount;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container;
package org.apache.hadoop.hdds.scm.container.replication;

import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaCount;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;

import java.util.Set;

Expand Down Expand Up @@ -243,11 +244,27 @@ private int missingReplicas() {
*/
@Override
public boolean isSufficientlyReplicated() {
return missingReplicas() + inFlightDel <= 0;
return isSufficientlyReplicated(false);
}

/**
* Return true is the container is over replicated. Decommission and
* Return true if the container is sufficiently replicated. Decommissioning
* and Decommissioned containers are ignored in this check, assuming they will
* eventually be removed from the cluster.
* This check ignores inflight additions, if includePendingAdd is false,
* otherwise it will assume they complete ok.
*
* @return True if the container is sufficiently replicated and False
* otherwise.
*/
public boolean isSufficientlyReplicated(boolean includePendingAdd) {
// Positive for under-rep, negative for over-rep
int delta = redundancyDelta(true, includePendingAdd);
return delta <= 0;
}

/**
* Return true if the container is over replicated. Decommission and
* maintenance containers are ignored for this check.
* The check ignores inflight additions, as they may fail, but it does
* consider inflight deletes, as they would reduce the over replication when
Expand All @@ -257,7 +274,67 @@ public boolean isSufficientlyReplicated() {
*/
@Override
public boolean isOverReplicated() {
return missingReplicas() + inFlightDel < 0;
return isOverReplicated(true);
}

/**
* Return true if the container is over replicated. Decommission and
* maintenance containers are ignored for this check.
* The check ignores inflight additions, as they may fail, but it does
* consider inflight deletes if includePendingDelete is true.
*
* @return True if the container is over replicated, false otherwise.
*/
public boolean isOverReplicated(boolean includePendingDelete) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be changed to getExcessRedundancyCanBeCalled(includePending)>0 to avoid redundancy of logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be changed to getExcessRedundancyCanBeCalled(includePending)>0 to avoid redundancy of logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I have changed this.

return getExcessRedundancy(includePendingDelete) > 0;
}

/**
* @return Return Excess Redundancy replica nums.
*/
public int getExcessRedundancy(boolean includePendingDelete) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add a boolean with includePendingAdd as well. I see redundant duplicate code logic in sufficientlyReplicated & isOverReplicated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea the logic is very similar. I have added a new private method both can call.

int excessRedundancy = redundancyDelta(includePendingDelete, false);
if (excessRedundancy >= 0) {
// either perfectly replicated or under replicated
return 0;
}
return -excessRedundancy;
}

/**
* Return the delta from the expected number of replicas, optionally
* considering inflight add and deletes.
* @param includePendingDelete
* @param includePendingAdd
* @return zero if perfectly replicated, a negative value for over replication
* and a positive value for under replication. The magnitude of the
* return value indicates how many replias the container is over or
* under replicated by.
*/
private int redundancyDelta(boolean includePendingDelete,
boolean includePendingAdd) {
int excessRedundancy = missingReplicas();
if (includePendingDelete) {
excessRedundancy += inFlightDel;
}
if (includePendingAdd) {
excessRedundancy -= inFlightAdd;
}
return excessRedundancy;
}

/**
* How many more replicas can be lost before the container is
* unreadable, assuming any infligh deletes will complete. For containers
* which are under-replicated due to decommission
* or maintenance only, the remaining redundancy will include those
* decommissioning or maintenance replicas, as they are technically still
* available until the datanode processes are stopped.
* @return Count of remaining redundant replicas.
*/
public int getRemainingRedundancy() {
return Math.max(0,
healthyCount + decommissionCount + maintenanceCount - inFlightDel - 1);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hdds.scm.container.replication.health.HealthCheck;
import org.apache.hadoop.hdds.scm.container.replication.health.OpenContainerHandler;
import org.apache.hadoop.hdds.scm.container.replication.health.QuasiClosedContainerHandler;
import org.apache.hadoop.hdds.scm.container.replication.health.RatisReplicationCheckHandler;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMService;
Expand Down Expand Up @@ -72,6 +73,7 @@
import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;

/**
* Replication Manager (RM) is the one which is responsible for making sure
Expand Down Expand Up @@ -142,12 +144,14 @@ public class ReplicationManager implements SCMService {
private final Clock clock;
private final ContainerReplicaPendingOps containerReplicaPendingOps;
private final ECReplicationCheckHandler ecReplicationCheckHandler;
private final RatisReplicationCheckHandler ratisReplicationCheckHandler;
private final EventPublisher eventPublisher;
private final ReentrantLock lock = new ReentrantLock();
private ReplicationQueue replicationQueue;
private final ECUnderReplicationHandler ecUnderReplicationHandler;
private final ECOverReplicationHandler ecOverReplicationHandler;
private final int maintenanceRedundancy;
private final int ratisMaintenanceMinReplicas;
private Thread underReplicatedProcessorThread;
private Thread overReplicatedProcessorThread;
private final UnderReplicatedProcessor underReplicatedProcessor;
Expand Down Expand Up @@ -188,9 +192,13 @@ public ReplicationManager(final ConfigurationSource conf,
this.containerReplicaPendingOps = replicaPendingOps;
this.legacyReplicationManager = legacyReplicationManager;
this.ecReplicationCheckHandler = new ECReplicationCheckHandler();
this.ratisReplicationCheckHandler =
new RatisReplicationCheckHandler(containerPlacement);
this.nodeManager = nodeManager;
this.replicationQueue = new ReplicationQueue();
this.maintenanceRedundancy = rmConf.maintenanceRemainingRedundancy;
this.ratisMaintenanceMinReplicas = rmConf.getMaintenanceReplicaMinimum();

ecUnderReplicationHandler = new ECUnderReplicationHandler(
ecReplicationCheckHandler, containerPlacement, conf, nodeManager);
ecOverReplicationHandler =
Expand All @@ -210,7 +218,8 @@ public ReplicationManager(final ConfigurationSource conf,
.addNext(new ClosingContainerHandler(this))
.addNext(new QuasiClosedContainerHandler(this))
.addNext(new ClosedWithMismatchedReplicasHandler(this))
.addNext(ecReplicationCheckHandler);
.addNext(ecReplicationCheckHandler)
.addNext(ratisReplicationCheckHandler);
start();
}

Expand Down Expand Up @@ -433,10 +442,16 @@ protected void processContainer(ContainerInfo containerInfo,
List<ContainerReplicaOp> pendingOps =
containerReplicaPendingOps.getPendingOps(containerID);

// There is a different config for EC and Ratis maintenance
// minimum replicas, so we must pass through the correct one.
int maintRedundancy = maintenanceRedundancy;
if (containerInfo.getReplicationType() == RATIS) {
maintRedundancy = ratisMaintenanceMinReplicas;
}
ContainerCheckRequest checkRequest = new ContainerCheckRequest.Builder()
.setContainerInfo(containerInfo)
.setContainerReplicas(replicas)
.setMaintenanceRedundancy(maintenanceRedundancy)
.setMaintenanceRedundancy(maintRedundancy)
.setReport(report)
.setPendingOps(pendingOps)
.setReplicationQueue(repQueue)
Expand Down
Loading