Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -41,6 +40,7 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.HddsConfigKeys;
Expand All @@ -57,6 +57,7 @@
StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManagerMetrics;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMService;
Expand All @@ -66,10 +67,6 @@
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
Expand All @@ -92,7 +89,7 @@
* that the containers are properly replicated. Replication Manager deals only
* with Quasi Closed / Closed container.
*/
public class ReplicationManager implements MetricsSource, SCMService {
public class ReplicationManager implements SCMService {

public static final Logger LOG =
LoggerFactory.getLogger(ReplicationManager.class);
Expand Down Expand Up @@ -230,6 +227,11 @@ enum MoveResult {
private long lastTimeToBeReadyInMillis = 0;
private final Clock clock;

/**
* Replication progress related metrics.
*/
private ReplicationManagerMetrics metrics;

/**
* Constructs ReplicationManager instance with the given configuration.
*
Expand Down Expand Up @@ -265,6 +267,7 @@ public ReplicationManager(final ConfigurationSource conf,
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
TimeUnit.MILLISECONDS);
this.metrics = null;

// register ReplicationManager to SCMServiceManager.
serviceManager.register(this);
Expand All @@ -279,10 +282,7 @@ public ReplicationManager(final ConfigurationSource conf,
@Override
public synchronized void start() {
if (!isRunning()) {
DefaultMetricsSystem.instance().register(METRICS_SOURCE_NAME,
"SCM Replication manager (closed container replication) related "
+ "metrics",
this);
metrics = ReplicationManagerMetrics.create(this);
LOG.info("Starting Replication Monitor Thread.");
running = true;
replicationMonitor = new Thread(this::run);
Expand Down Expand Up @@ -321,7 +321,7 @@ public synchronized void stop() {
inflightMove.clear();
inflightMoveFuture.clear();
running = false;
DefaultMetricsSystem.instance().unregisterSource(METRICS_SOURCE_NAME);
metrics.unRegister();
notifyAll();
} else {
LOG.info("Replication Monitor Thread is not running.");
Expand Down Expand Up @@ -420,12 +420,20 @@ private void processContainer(ContainerInfo container) {
*/
updateInflightAction(container, inflightReplication,
action -> replicas.stream()
.anyMatch(r -> r.getDatanodeDetails().equals(action.datanode)));
.anyMatch(r -> r.getDatanodeDetails().equals(action.datanode)),
()-> metrics.incrNumReplicationCmdsTimeout(),
() -> {
metrics.incrNumReplicationCmdsCompleted();
metrics.incrNumReplicationBytesCompleted(
container.getUsedBytes());
});

updateInflightAction(container, inflightDeletion,
action -> replicas.stream()
.noneMatch(r ->
r.getDatanodeDetails().equals(action.datanode)));
r.getDatanodeDetails().equals(action.datanode)),
() -> metrics.incrNumDeletionCmdsTimeout(),
() -> metrics.incrNumDeletionCmdsCompleted());

/*
* If container is under deleting and all it's replicas are deleted,
Expand Down Expand Up @@ -507,10 +515,14 @@ private void processContainer(ContainerInfo container) {
* @param container Container to update
* @param inflightActions inflightReplication (or) inflightDeletion
* @param filter filter to check if the operation is completed
* @param timeoutCounter update timeout metrics
* @param completedCounter update completed metrics
*/
private void updateInflightAction(final ContainerInfo container,
final Map<ContainerID, List<InflightAction>> inflightActions,
final Predicate<InflightAction> filter) {
final Predicate<InflightAction> filter,
final Runnable timeoutCounter,
final Runnable completedCounter) {
final ContainerID id = container.containerID();
final long deadline = clock.millis() - rmConf.getEventTimeout();
if (inflightActions.containsKey(id)) {
Expand All @@ -528,6 +540,13 @@ private void updateInflightAction(final ContainerInfo container,
NodeOperationalState.IN_SERVICE;
if (isCompleted || isUnhealthy || isTimeout || isNotInService) {
iter.remove();

if (isTimeout) {
timeoutCounter.run();
} else if (isCompleted) {
completedCounter.run();
}

updateMoveIfNeeded(isUnhealthy, isCompleted, isTimeout,
isNotInService, container, a.datanode, inflightActions);
}
Expand Down Expand Up @@ -1439,6 +1458,9 @@ private void sendReplicateCommand(final ContainerInfo container,
inflightReplication.computeIfAbsent(id, k -> new ArrayList<>());
sendAndTrackDatanodeCommand(datanode, replicateCommand,
action -> inflightReplication.get(id).add(action));

metrics.incrNumReplicationCmdsSent();
metrics.incrNumReplicationBytesTotal(container.getUsedBytes());
}

/**
Expand All @@ -1462,6 +1484,8 @@ private void sendDeleteCommand(final ContainerInfo container,
inflightDeletion.computeIfAbsent(id, k -> new ArrayList<>());
sendAndTrackDatanodeCommand(datanode, deleteCommand,
action -> inflightDeletion.get(id).add(action));

metrics.incrNumDeletionCmdsSent();
}

/**
Expand Down Expand Up @@ -1547,22 +1571,10 @@ private boolean isOpenContainerHealthy(
.allMatch(r -> ReplicationManager.compareState(state, r.getState()));
}

@Override
public void getMetrics(MetricsCollector collector, boolean all) {
collector.addRecord(ReplicationManager.class.getSimpleName())
.addGauge(ReplicationManagerMetrics.INFLIGHT_REPLICATION,
inflightReplication.size())
.addGauge(ReplicationManagerMetrics.INFLIGHT_DELETION,
inflightDeletion.size())
.addGauge(ReplicationManagerMetrics.INFLIGHT_MOVE,
inflightMove.size())
.endRecord();
}

/**
* Wrapper class to hold the InflightAction with its start time.
*/
private static final class InflightAction {
static final class InflightAction {

private final DatanodeDetails datanode;
private final long time;
Expand All @@ -1572,6 +1584,11 @@ private InflightAction(final DatanodeDetails datanode,
this.datanode = datanode;
this.time = time;
}

@VisibleForTesting
public DatanodeDetails getDatanode() {
return datanode;
}
}

/**
Expand Down Expand Up @@ -1645,35 +1662,6 @@ public int getMaintenanceReplicaMinimum() {
}
}

/**
* Metric name definitions for Replication manager.
*/
public enum ReplicationManagerMetrics implements MetricsInfo {

INFLIGHT_REPLICATION("Tracked inflight container replication requests."),
INFLIGHT_DELETION("Tracked inflight container deletion requests."),
INFLIGHT_MOVE("Tracked inflight container move requests.");

private final String desc;

ReplicationManagerMetrics(String desc) {
this.desc = desc;
}

@Override
public String description() {
return desc;
}

@Override
public String toString() {
return new StringJoiner(", ", this.getClass().getSimpleName() + "{", "}")
.add("name=" + name())
.add("description=" + desc)
.toString();
}
}

@Override
public void notifyStatusChanged() {
serviceLock.lock();
Expand Down Expand Up @@ -1711,4 +1699,21 @@ public boolean shouldRun() {
public String getServiceName() {
return ReplicationManager.class.getSimpleName();
}

public ReplicationManagerMetrics getMetrics() {
return this.metrics;
}

public Map<ContainerID, List<InflightAction>> getInflightReplication() {
return inflightReplication;
}

public Map<ContainerID, List<InflightAction>> getInflightDeletion() {
return inflightDeletion;
}

public Map<ContainerID,
Pair<DatanodeDetails, DatanodeDetails>> getInflightMove() {
return inflightMove;
}
}
Loading