Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@
*/
package org.apache.hadoop.hdds.scm.container.replication;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;
Expand All @@ -31,89 +28,32 @@
* queue, calculate the delete commands and assign to the datanodes
* via the eventQueue.
*/
public class OverReplicatedProcessor implements Runnable {

private static final Logger LOG = LoggerFactory
.getLogger(OverReplicatedProcessor.class);
private final ReplicationManager replicationManager;
private volatile boolean runImmediately = false;
private final long intervalInMillis;
public class OverReplicatedProcessor extends UnhealthyReplicationProcessor
<ContainerHealthResult.OverReplicatedHealthResult> {

public OverReplicatedProcessor(ReplicationManager replicationManager,
long intervalInMillis) {
this.replicationManager = replicationManager;
this.intervalInMillis = intervalInMillis;
}
super(replicationManager, intervalInMillis);

/**
* Read messages from the ReplicationManager over replicated queue and,
* form commands to correct the over replication. The commands are added
* to the event queue and the PendingReplicaOps are adjusted.
*
* Note: this is a temporary implementation of this feature. A future
* version will need to limit the amount of messages assigned to each
* datanode, so they are not assigned too much work.
*/
public void processAll() {
int processed = 0;
int failed = 0;
while (true) {
if (!replicationManager.shouldRun()) {
break;
}
ContainerHealthResult.OverReplicatedHealthResult overRep =
replicationManager.dequeueOverReplicatedContainer();
if (overRep == null) {
break;
}
try {
processContainer(overRep);
processed++;
} catch (Exception e) {
LOG.error("Error processing over replicated container {}",
overRep.getContainerInfo(), e);
failed++;
replicationManager.requeueOverReplicatedContainer(overRep);
}
}
LOG.info("Processed {} over replicated containers, failed processing {}",
processed, failed);
}

protected void processContainer(ContainerHealthResult
.OverReplicatedHealthResult overRep) throws IOException {
Map<DatanodeDetails, SCMCommand<?>> cmds = replicationManager
.processOverReplicatedContainer(overRep);
for (Map.Entry<DatanodeDetails, SCMCommand<?>> cmd : cmds.entrySet()) {
SCMCommand<?> scmCmd = cmd.getValue();
replicationManager.sendDatanodeCommand(scmCmd, overRep.getContainerInfo(),
cmd.getKey());
}
@Override
protected ContainerHealthResult.OverReplicatedHealthResult
dequeueHealthResultFromQueue(ReplicationManager replicationManager) {
return replicationManager.dequeueOverReplicatedContainer();
}

@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
if (replicationManager.shouldRun()) {
processAll();
}
synchronized (this) {
if (!runImmediately) {
wait(intervalInMillis);
}
runImmediately = false;
}
}
} catch (InterruptedException e) {
LOG.warn("{} interrupted. Exiting...", Thread.currentThread().getName());
Thread.currentThread().interrupt();
}
protected void requeueHealthResultFromQueue(
ReplicationManager replicationManager,
ContainerHealthResult.OverReplicatedHealthResult healthResult) {
replicationManager.requeueOverReplicatedContainer(healthResult);
}

@VisibleForTesting
synchronized void runImmediately() {
runImmediately = true;
notify();
@Override
protected Map<DatanodeDetails, SCMCommand<?>> getDatanodeCommands(
ReplicationManager replicationManager,
ContainerHealthResult.OverReplicatedHealthResult healthResult)
throws IOException {
return replicationManager.processOverReplicatedContainer(healthResult);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@
*/
package org.apache.hadoop.hdds.scm.container.replication;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;
Expand All @@ -31,88 +28,32 @@
* queue, calculate the reconstruction commands and assign to the datanodes
* via the eventQueue.
*/
public class UnderReplicatedProcessor implements Runnable {

private static final Logger LOG = LoggerFactory
.getLogger(UnderReplicatedProcessor.class);
private final ReplicationManager replicationManager;
private volatile boolean runImmediately = false;
private final long intervalInMillis;
public class UnderReplicatedProcessor extends UnhealthyReplicationProcessor
<ContainerHealthResult.UnderReplicatedHealthResult> {

public UnderReplicatedProcessor(ReplicationManager replicationManager,
long intervalInMillis) {
this.replicationManager = replicationManager;
this.intervalInMillis = intervalInMillis;
}

/**
* Read messages from the ReplicationManager under replicated queue and,
* form commands to correct the under replication. The commands are added
* to the event queue and the PendingReplicaOps are adjusted.
*
* Note: this is a temporary implementation of this feature. A future
* version will need to limit the amount of messages assigned to each
* datanode, so they are not assigned too much work.
*/
public void processAll() {
int processed = 0;
int failed = 0;
while (true) {
if (!replicationManager.shouldRun()) {
break;
}
ContainerHealthResult.UnderReplicatedHealthResult underRep =
replicationManager.dequeueUnderReplicatedContainer();
if (underRep == null) {
break;
}
try {
processContainer(underRep);
processed++;
} catch (Exception e) {
LOG.error("Error processing under replicated container {}",
underRep.getContainerInfo(), e);
failed++;
replicationManager.requeueUnderReplicatedContainer(underRep);
}
}
LOG.info("Processed {} under replicated containers, failed processing {}",
processed, failed);
long intervalInMillis) {
super(replicationManager, intervalInMillis);
}

protected void processContainer(ContainerHealthResult
.UnderReplicatedHealthResult underRep) throws IOException {
Map<DatanodeDetails, SCMCommand<?>> cmds = replicationManager
.processUnderReplicatedContainer(underRep);
for (Map.Entry<DatanodeDetails, SCMCommand<?>> cmd : cmds.entrySet()) {
replicationManager.sendDatanodeCommand(cmd.getValue(),
underRep.getContainerInfo(), cmd.getKey());
}
@Override
protected ContainerHealthResult.UnderReplicatedHealthResult
dequeueHealthResultFromQueue(ReplicationManager replicationManager) {
return replicationManager.dequeueUnderReplicatedContainer();
}

@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
if (replicationManager.shouldRun()) {
processAll();
}
synchronized (this) {
if (!runImmediately) {
wait(intervalInMillis);
}
runImmediately = false;
}
}
} catch (InterruptedException e) {
LOG.warn("{} interrupted. Exiting...", Thread.currentThread().getName());
Thread.currentThread().interrupt();
}
protected void requeueHealthResultFromQueue(
ReplicationManager replicationManager,
ContainerHealthResult.UnderReplicatedHealthResult healthResult) {
replicationManager.requeueUnderReplicatedContainer(healthResult);
}

@VisibleForTesting
synchronized void runImmediately() {
runImmediately = true;
notify();
@Override
protected Map<DatanodeDetails, SCMCommand<?>> getDatanodeCommands(
ReplicationManager replicationManager,
ContainerHealthResult.UnderReplicatedHealthResult healthResult)
throws IOException {
return replicationManager.processUnderReplicatedContainer(healthResult);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.container.replication;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;

/**
* Class used to pick messages from the respective ReplicationManager
* unhealthy replicated queue,
* calculate the delete commands and assign to the datanodes via the eventQueue.
*
*/
public abstract class UnhealthyReplicationProcessor<HealthResult extends
ContainerHealthResult> implements Runnable {
private static final Logger LOG = LoggerFactory
.getLogger(UnhealthyReplicationProcessor.class);
private final ReplicationManager replicationManager;
private volatile boolean runImmediately = false;
private final long intervalInMillis;

public UnhealthyReplicationProcessor(ReplicationManager replicationManager,
long intervalInMillis) {
this.replicationManager = replicationManager;
this.intervalInMillis = intervalInMillis;
}

/**
* Read messages from the respective queue from ReplicationManager
* for processing the health result.
* @return next HealthResult from the replication manager
*/
protected abstract HealthResult dequeueHealthResultFromQueue(
ReplicationManager rm);

/**
* Requeue HealthResult to ReplicationManager
* for reprocessing the health result.
* @return next HealthResult from the replication manager
*/
protected abstract void requeueHealthResultFromQueue(
ReplicationManager rm, HealthResult healthResult);

/**
* Read messages from the ReplicationManager under replicated queue and,
* form commands to correct replication. The commands are added
* to the event queue and the PendingReplicaOps are adjusted.
*
* Note: this is a temporary implementation of this feature. A future
* version will need to limit the amount of messages assigned to each
* datanode, so they are not assigned too much work.
*/
public void processAll() {
int processed = 0;
int failed = 0;
Map<ContainerHealthResult.HealthState, Integer> healthStateCntMap =
Maps.newHashMap();
while (true) {
if (!replicationManager.shouldRun()) {
break;
}
HealthResult healthResult =
dequeueHealthResultFromQueue(replicationManager);
if (healthResult == null) {
break;
}
try {
processContainer(healthResult);
processed++;
healthStateCntMap.compute(healthResult.getHealthState(),
(healthState, cnt) -> cnt == null ? 1 : (cnt + 1));
} catch (Exception e) {
LOG.error("Error processing Health result of class: {} for " +
"container {}", healthResult.getClass(),
healthResult.getContainerInfo(), e);
failed++;
requeueHealthResultFromQueue(replicationManager, healthResult);
}
}
LOG.info("Processed {} containers with health state counts {}," +
"failed processing {}", processed, healthStateCntMap, failed);
}

/**
* Gets the commands to be run datanode to process the
* container health result.
* @param rm
* @param healthResult
* @return Commands to be run on Datanodes
*/
protected abstract Map<DatanodeDetails, SCMCommand<?>> getDatanodeCommands(
ReplicationManager rm, HealthResult healthResult)
throws IOException;
private void processContainer(HealthResult healthResult) throws IOException {
Map<DatanodeDetails, SCMCommand<?>> cmds = getDatanodeCommands(
replicationManager, healthResult);
for (Map.Entry<DatanodeDetails, SCMCommand<?>> cmd : cmds.entrySet()) {
replicationManager.sendDatanodeCommand(cmd.getValue(),
healthResult.getContainerInfo(), cmd.getKey());
}
}

@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
if (replicationManager.shouldRun()) {
processAll();
}
synchronized (this) {
if (!runImmediately) {
wait(intervalInMillis);
}
runImmediately = false;
}
}
} catch (InterruptedException e) {
LOG.warn("{} interrupted. Exiting...", Thread.currentThread().getName());
Thread.currentThread().interrupt();
}
}

@VisibleForTesting
synchronized void runImmediately() {
runImmediately = true;
notify();
}
}