Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,19 @@ private void processContainer(ContainerID id) {

StorageClass storageClass =
storageClassRegistry.getStorageClass(container.getStorageClass());

int closedStateReplicationFactor =
storageClass.getClosedStateConfiguration().getReplicationFactor();
int openStateReplicationFactor =
storageClass.getOpenStateConfiguration().getReplicationFactor()
.getNumber();
/*
* We don't take any action if the container is in OPEN state and
* the container is healthy. If the container is not healthy, i.e.
* the replicas are not in OPEN state, send CLOSE_CONTAINER command.
*/
if (state == LifeCycleState.OPEN) {
if (!isContainerHealthy(container, replicas, storageClass)) {
if (!isContainerHealthy(container, replicas,
openStateReplicationFactor)) {
eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, id);
}
return;
Expand All @@ -301,7 +306,7 @@ private void processContainer(ContainerID id) {
* container if possible.
*/
if (state == LifeCycleState.QUASI_CLOSED &&
canForceCloseContainer(container, replicas)) {
canForceCloseContainer(container, replicas, storageClass)) {
forceCloseContainer(container, replicas);
return;
}
Expand All @@ -328,25 +333,28 @@ private void processContainer(ContainerID id) {
* the container is either in QUASI_CLOSED or in CLOSED state and has
* exact number of replicas in the same state.
*/
if (isContainerHealthy(container, replicas, storageClass)) {
if (isContainerHealthy(container, replicas,
closedStateReplicationFactor)) {
return;
}

/*
* Check if the container is under replicated and take appropriate
* action.
*/
if (isContainerUnderReplicated(container, replicas, storageClass)) {
handleUnderReplicatedContainer(container, replicas);
if (isContainerUnderReplicated(container, replicas,
closedStateReplicationFactor)) {
handleUnderReplicatedContainer(container, replicas, storageClass);
return;
}

/*
* Check if the container is over replicated and take appropriate
* action.
*/
if (isContainerOverReplicated(container, replicas, storageClass)) {
handleOverReplicatedContainer(container, replicas);
if (isContainerOverReplicated(container, replicas,
closedStateReplicationFactor)) {
handleOverReplicatedContainer(container, replicas, storageClass);
return;
}

Expand Down Expand Up @@ -404,10 +412,10 @@ private void updateInflightAction(final ContainerInfo container,
private boolean isContainerHealthy(
final ContainerInfo container,
final Set<ContainerReplica> replicas,
final StorageClass storageClass
final int replicationFactor
) {
return !isContainerUnderReplicated(container, replicas, storageClass) &&
!isContainerOverReplicated(container, replicas, storageClass) &&
return !isContainerUnderReplicated(container, replicas, replicationFactor) &&
!isContainerOverReplicated(container, replicas, replicationFactor) &&
replicas.stream().allMatch(
r -> compareState(container.getState(), r.getState()));
}
Expand All @@ -422,13 +430,13 @@ private boolean isContainerHealthy(
private boolean isContainerUnderReplicated(
final ContainerInfo container,
final Set<ContainerReplica> replicas,
StorageClass storageClass
int replicationFactor
) {
boolean misReplicated = !getPlacementStatus(
replicas,
storageClass.getClosedStateConfiguration().getReplicationFactor())
replicationFactor)
.isPolicySatisfied();
return storageClass.getClosedStateConfiguration().getReplicationFactor() >
return replicationFactor >
getReplicaCount(container.containerID(), replicas) || misReplicated;
}

Expand All @@ -442,9 +450,9 @@ private boolean isContainerUnderReplicated(
private boolean isContainerOverReplicated(
final ContainerInfo container,
final Set<ContainerReplica> replicas,
StorageClass storageClass
final int replicationFactor
) {
return storageClass.getClosedStateConfiguration().getReplicationFactor() <
return replicationFactor <
getReplicaCount(container.containerID(), replicas);
}

Expand Down Expand Up @@ -472,10 +480,12 @@ private int getReplicaCount(final ContainerID id,
* @return true if we can force close the container, false otherwise
*/
private boolean canForceCloseContainer(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
final Set<ContainerReplica> replicas,
final StorageClass storageClass) {
Preconditions.assertTrue(container.getState() ==
LifeCycleState.QUASI_CLOSED);
final int replicationFactor = container.getReplicationFactor().getNumber();
final int replicationFactor =
storageClass.getClosedStateConfiguration().getReplicationFactor();
final long uniqueQuasiClosedReplicaCount = replicas.stream()
.filter(r -> r.getState() == State.QUASI_CLOSED)
.map(ContainerReplica::getOriginDatanodeId)
Expand Down Expand Up @@ -529,7 +539,8 @@ private void forceCloseContainer(final ContainerInfo container,
* @param replicas Set of ContainerReplicas
*/
private void handleUnderReplicatedContainer(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
final Set<ContainerReplica> replicas,
final StorageClass storageClass) {
LOG.debug("Handling under-replicated container: {}",
container.getContainerID());
try {
Expand All @@ -553,10 +564,9 @@ private void handleUnderReplicatedContainer(final ContainerInfo container,
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
if (source.size() > 0) {
final int replicationFactor = this.storageClassRegistry
.getStorageClass(container.getStorageClass())
.getOpenStateConfiguration()
.getReplicationFactor().getNumber();
final int replicationFactor = storageClass
.getClosedStateConfiguration()
.getReplicationFactor();
// Want to check if the container is mis-replicated after considering
// inflight add and delete.
// Create a new list from source (healthy replicas minus pending delete)
Expand Down Expand Up @@ -631,10 +641,12 @@ private void handleUnderReplicatedContainer(final ContainerInfo container,
* @param replicas Set of ContainerReplicas
*/
private void handleOverReplicatedContainer(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
final Set<ContainerReplica> replicas,
final StorageClass storageClass) {

final ContainerID id = container.containerID();
final int replicationFactor = container.getReplicationFactor().getNumber();
final int replicationFactor = storageClass.getClosedStateConfiguration()
.getReplicationFactor();
// Don't consider inflight replication while calculating excess here.
int excess = replicas.size() - replicationFactor -
inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
Expand All @@ -645,17 +657,20 @@ private void handleOverReplicatedContainer(final ContainerInfo container,
" is {}, but found {}.", id, replicationFactor,
replicationFactor + excess);

final Map<UUID, ContainerReplica> uniqueReplicas =
new LinkedHashMap<>();
final List<ContainerReplica> eligibleReplicas = new ArrayList<>(replicas);

replicas.stream()
.filter(r -> compareState(container.getState(), r.getState()))
.forEach(r -> uniqueReplicas
.putIfAbsent(r.getOriginDatanodeId(), r));
if (container.getState() != LifeCycleState.CLOSED) {
final Map<UUID, ContainerReplica> uniqueReplicas =
new LinkedHashMap<>();

// Retain one healthy replica per origin node Id.
final List<ContainerReplica> eligibleReplicas = new ArrayList<>(replicas);
eligibleReplicas.removeAll(uniqueReplicas.values());
replicas.stream()
.filter(r -> compareState(container.getState(), r.getState()))
.forEach(r -> uniqueReplicas
.putIfAbsent(r.getOriginDatanodeId(), r));

// Retain one healthy replica per origin node Id.
eligibleReplicas.removeAll(uniqueReplicas.values());
}

final List<ContainerReplica> unhealthyReplicas = eligibleReplicas
.stream()
Expand Down