diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java index 78e625d34a57..93f536a6442d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java @@ -17,11 +17,8 @@ */ package org.apache.hadoop.hdds.scm.container.replication; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Map; @@ -31,89 +28,32 @@ * queue, calculate the delete commands and assign to the datanodes * via the eventQueue. */ -public class OverReplicatedProcessor implements Runnable { - - private static final Logger LOG = LoggerFactory - .getLogger(OverReplicatedProcessor.class); - private final ReplicationManager replicationManager; - private volatile boolean runImmediately = false; - private final long intervalInMillis; +public class OverReplicatedProcessor extends UnhealthyReplicationProcessor + { public OverReplicatedProcessor(ReplicationManager replicationManager, long intervalInMillis) { - this.replicationManager = replicationManager; - this.intervalInMillis = intervalInMillis; - } + super(replicationManager, intervalInMillis); - /** - * Read messages from the ReplicationManager over replicated queue and, - * form commands to correct the over replication. The commands are added - * to the event queue and the PendingReplicaOps are adjusted. - * - * Note: this is a temporary implementation of this feature. A future - * version will need to limit the amount of messages assigned to each - * datanode, so they are not assigned too much work. - */ - public void processAll() { - int processed = 0; - int failed = 0; - while (true) { - if (!replicationManager.shouldRun()) { - break; - } - ContainerHealthResult.OverReplicatedHealthResult overRep = - replicationManager.dequeueOverReplicatedContainer(); - if (overRep == null) { - break; - } - try { - processContainer(overRep); - processed++; - } catch (Exception e) { - LOG.error("Error processing over replicated container {}", - overRep.getContainerInfo(), e); - failed++; - replicationManager.requeueOverReplicatedContainer(overRep); - } - } - LOG.info("Processed {} over replicated containers, failed processing {}", - processed, failed); } - protected void processContainer(ContainerHealthResult - .OverReplicatedHealthResult overRep) throws IOException { - Map> cmds = replicationManager - .processOverReplicatedContainer(overRep); - for (Map.Entry> cmd : cmds.entrySet()) { - SCMCommand scmCmd = cmd.getValue(); - replicationManager.sendDatanodeCommand(scmCmd, overRep.getContainerInfo(), - cmd.getKey()); - } + @Override + protected ContainerHealthResult.OverReplicatedHealthResult + dequeueHealthResultFromQueue(ReplicationManager replicationManager) { + return replicationManager.dequeueOverReplicatedContainer(); } @Override - public void run() { - try { - while (!Thread.currentThread().isInterrupted()) { - if (replicationManager.shouldRun()) { - processAll(); - } - synchronized (this) { - if (!runImmediately) { - wait(intervalInMillis); - } - runImmediately = false; - } - } - } catch (InterruptedException e) { - LOG.warn("{} interrupted. Exiting...", Thread.currentThread().getName()); - Thread.currentThread().interrupt(); - } + protected void requeueHealthResultFromQueue( + ReplicationManager replicationManager, + ContainerHealthResult.OverReplicatedHealthResult healthResult) { + replicationManager.requeueOverReplicatedContainer(healthResult); } - - @VisibleForTesting - synchronized void runImmediately() { - runImmediately = true; - notify(); + @Override + protected Map> getDatanodeCommands( + ReplicationManager replicationManager, + ContainerHealthResult.OverReplicatedHealthResult healthResult) + throws IOException { + return replicationManager.processOverReplicatedContainer(healthResult); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java index aa7e28069934..429c0e14eb0c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java @@ -17,11 +17,8 @@ */ package org.apache.hadoop.hdds.scm.container.replication; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Map; @@ -31,88 +28,32 @@ * queue, calculate the reconstruction commands and assign to the datanodes * via the eventQueue. */ -public class UnderReplicatedProcessor implements Runnable { - - private static final Logger LOG = LoggerFactory - .getLogger(UnderReplicatedProcessor.class); - private final ReplicationManager replicationManager; - private volatile boolean runImmediately = false; - private final long intervalInMillis; +public class UnderReplicatedProcessor extends UnhealthyReplicationProcessor + { public UnderReplicatedProcessor(ReplicationManager replicationManager, - long intervalInMillis) { - this.replicationManager = replicationManager; - this.intervalInMillis = intervalInMillis; - } - - /** - * Read messages from the ReplicationManager under replicated queue and, - * form commands to correct the under replication. The commands are added - * to the event queue and the PendingReplicaOps are adjusted. - * - * Note: this is a temporary implementation of this feature. A future - * version will need to limit the amount of messages assigned to each - * datanode, so they are not assigned too much work. - */ - public void processAll() { - int processed = 0; - int failed = 0; - while (true) { - if (!replicationManager.shouldRun()) { - break; - } - ContainerHealthResult.UnderReplicatedHealthResult underRep = - replicationManager.dequeueUnderReplicatedContainer(); - if (underRep == null) { - break; - } - try { - processContainer(underRep); - processed++; - } catch (Exception e) { - LOG.error("Error processing under replicated container {}", - underRep.getContainerInfo(), e); - failed++; - replicationManager.requeueUnderReplicatedContainer(underRep); - } - } - LOG.info("Processed {} under replicated containers, failed processing {}", - processed, failed); + long intervalInMillis) { + super(replicationManager, intervalInMillis); } - protected void processContainer(ContainerHealthResult - .UnderReplicatedHealthResult underRep) throws IOException { - Map> cmds = replicationManager - .processUnderReplicatedContainer(underRep); - for (Map.Entry> cmd : cmds.entrySet()) { - replicationManager.sendDatanodeCommand(cmd.getValue(), - underRep.getContainerInfo(), cmd.getKey()); - } + @Override + protected ContainerHealthResult.UnderReplicatedHealthResult + dequeueHealthResultFromQueue(ReplicationManager replicationManager) { + return replicationManager.dequeueUnderReplicatedContainer(); } @Override - public void run() { - try { - while (!Thread.currentThread().isInterrupted()) { - if (replicationManager.shouldRun()) { - processAll(); - } - synchronized (this) { - if (!runImmediately) { - wait(intervalInMillis); - } - runImmediately = false; - } - } - } catch (InterruptedException e) { - LOG.warn("{} interrupted. Exiting...", Thread.currentThread().getName()); - Thread.currentThread().interrupt(); - } + protected void requeueHealthResultFromQueue( + ReplicationManager replicationManager, + ContainerHealthResult.UnderReplicatedHealthResult healthResult) { + replicationManager.requeueUnderReplicatedContainer(healthResult); } - @VisibleForTesting - synchronized void runImmediately() { - runImmediately = true; - notify(); + @Override + protected Map> getDatanodeCommands( + ReplicationManager replicationManager, + ContainerHealthResult.UnderReplicatedHealthResult healthResult) + throws IOException { + return replicationManager.processUnderReplicatedContainer(healthResult); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java new file mode 100644 index 000000000000..9623222f0531 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.replication; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; + +/** + * Class used to pick messages from the respective ReplicationManager + * unhealthy replicated queue, + * calculate the delete commands and assign to the datanodes via the eventQueue. + * + */ +public abstract class UnhealthyReplicationProcessor implements Runnable { + private static final Logger LOG = LoggerFactory + .getLogger(UnhealthyReplicationProcessor.class); + private final ReplicationManager replicationManager; + private volatile boolean runImmediately = false; + private final long intervalInMillis; + + public UnhealthyReplicationProcessor(ReplicationManager replicationManager, + long intervalInMillis) { + this.replicationManager = replicationManager; + this.intervalInMillis = intervalInMillis; + } + + /** + * Read messages from the respective queue from ReplicationManager + * for processing the health result. + * @return next HealthResult from the replication manager + */ + protected abstract HealthResult dequeueHealthResultFromQueue( + ReplicationManager rm); + + /** + * Requeue HealthResult to ReplicationManager + * for reprocessing the health result. + * @return next HealthResult from the replication manager + */ + protected abstract void requeueHealthResultFromQueue( + ReplicationManager rm, HealthResult healthResult); + + /** + * Read messages from the ReplicationManager under replicated queue and, + * form commands to correct replication. The commands are added + * to the event queue and the PendingReplicaOps are adjusted. + * + * Note: this is a temporary implementation of this feature. A future + * version will need to limit the amount of messages assigned to each + * datanode, so they are not assigned too much work. + */ + public void processAll() { + int processed = 0; + int failed = 0; + Map healthStateCntMap = + Maps.newHashMap(); + while (true) { + if (!replicationManager.shouldRun()) { + break; + } + HealthResult healthResult = + dequeueHealthResultFromQueue(replicationManager); + if (healthResult == null) { + break; + } + try { + processContainer(healthResult); + processed++; + healthStateCntMap.compute(healthResult.getHealthState(), + (healthState, cnt) -> cnt == null ? 1 : (cnt + 1)); + } catch (Exception e) { + LOG.error("Error processing Health result of class: {} for " + + "container {}", healthResult.getClass(), + healthResult.getContainerInfo(), e); + failed++; + requeueHealthResultFromQueue(replicationManager, healthResult); + } + } + LOG.info("Processed {} containers with health state counts {}," + + "failed processing {}", processed, healthStateCntMap, failed); + } + + /** + * Gets the commands to be run datanode to process the + * container health result. + * @param rm + * @param healthResult + * @return Commands to be run on Datanodes + */ + protected abstract Map> getDatanodeCommands( + ReplicationManager rm, HealthResult healthResult) + throws IOException; + private void processContainer(HealthResult healthResult) throws IOException { + Map> cmds = getDatanodeCommands( + replicationManager, healthResult); + for (Map.Entry> cmd : cmds.entrySet()) { + replicationManager.sendDatanodeCommand(cmd.getValue(), + healthResult.getContainerInfo(), cmd.getKey()); + } + } + + @Override + public void run() { + try { + while (!Thread.currentThread().isInterrupted()) { + if (replicationManager.shouldRun()) { + processAll(); + } + synchronized (this) { + if (!runImmediately) { + wait(intervalInMillis); + } + runImmediately = false; + } + } + } catch (InterruptedException e) { + LOG.warn("{} interrupted. Exiting...", Thread.currentThread().getName()); + Thread.currentThread().interrupt(); + } + } + + @VisibleForTesting + synchronized void runImmediately() { + runImmediately = true; + notify(); + } +}