Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a06b7a4
Add node state change notification for ReplicationManager
peterxcli Mar 2, 2025
27ccf55
Node state change in NodeManager should notify ReplicationManager
peterxcli Mar 2, 2025
caaa220
Notify ReplicationManager when a node becomes dead
peterxcli Mar 2, 2025
431c717
fix pmd
peterxcli Mar 2, 2025
bc7aa2f
fix tests
peterxcli Mar 2, 2025
3c9f838
Introduce ReplicationManagerEventHandler to decouple NM and RM
peterxcli Mar 8, 2025
c969709
Change notify call to RM to publish RM_NOTIFY event to queue
peterxcli Mar 8, 2025
f7363de
Some old code cleanup
peterxcli Mar 8, 2025
00e526f
Add ReplicationManagerEventHandler to SCM event queue
peterxcli Mar 8, 2025
6a53c3a
NPE in TestDeadNodeHandler
peterxcli Mar 10, 2025
32705a2
Notify RM if persisted op state changed
peterxcli Mar 11, 2025
bf17079
Add integration test for RM on RM being notify when node status chang…
peterxcli Mar 11, 2025
6092de1
Addressed comment: Only notify when leader ready and out of safemode
peterxcli Mar 16, 2025
b15af64
Addressed comment: don't notify when RM is running
peterxcli Mar 16, 2025
b2642bc
Addressed comment: Only notify when node is not IN_MAINTENANCE status…
peterxcli Mar 16, 2025
b6e71f3
Merge remote-tracking branch 'upstream/master' into hdds8660-Replicat…
peterxcli Apr 3, 2025
6c7affa
Merge remote-tracking branch 'upstream/master' into hdds8660-Replicat…
peterxcli Apr 6, 2025
fe73650
Merge remote-tracking branch 'upstream/master' into hdds8660-Replicat…
peterxcli Apr 8, 2025
ffd034d
Reuse cluster in TestReplicationManagerIntegration
peterxcli Apr 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public class ReplicationManager implements SCMService, ContainerReplicaPendingOp
/**
* SCMContext from StorageContainerManager.
*/
private final SCMContext scmContext;
private SCMContext scmContext;


/**
Expand Down Expand Up @@ -918,6 +918,10 @@ public ReplicationManagerReport getContainerReport() {
return containerReport;
}

public boolean isThreadWaiting() {
return replicationMonitor.getState() == Thread.State.TIMED_WAITING;
}

/**
* ReplicationMonitor thread runnable. This wakes up at configured
* interval and processes all the containers in the system.
Expand Down Expand Up @@ -1410,6 +1414,11 @@ public boolean shouldRun() {
}
}

@VisibleForTesting
public void setScmContext(SCMContext context) {
scmContext = context;
}

@Override
public String getServiceName() {
return ReplicationManager.class.getSimpleName();
Expand Down Expand Up @@ -1492,5 +1501,36 @@ public boolean hasHealthyPipeline(ContainerInfo container) {
return false;
}
}

/**
* Notify the ReplicationManager that a node state has changed, which might
* require container replication. This will wake up the replication monitor
* thread if it's sleeping and there's no active replication work in progress.
*
* @return true if the replication monitor was woken up, false otherwise
*/
public synchronized boolean notifyNodeStateChange() {
if (!running || serviceStatus == ServiceStatus.PAUSING) {
return false;
}

if (!isThreadWaiting()) {
LOG.debug("Replication monitor is running, not need to wake it up");
return false;
}

// Only wake up the thread if there's no active replication work
// This prevents creating a new replication queue over and over
// when multiple nodes change state in quick succession
if (getQueue().isEmpty()) {
LOG.debug("Waking up replication monitor due to node state change");
// Notify the replication monitor thread to wake up
notify();
return true;
} else {
LOG.debug("Replication queue is not empty, not waking up replication monitor");
return false;
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.container.replication;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Handles events related to the ReplicationManager.
*/
public class ReplicationManagerEventHandler implements EventHandler<DatanodeDetails> {

private static final Logger LOG = LoggerFactory.getLogger(ReplicationManagerEventHandler.class);

private final ReplicationManager replicationManager;
private final SCMContext scmContext;

public ReplicationManagerEventHandler(ReplicationManager replicationManager, SCMContext scmContext) {
this.replicationManager = replicationManager;
this.scmContext = scmContext;
}

@Override
public void onMessage(DatanodeDetails datanodeDetails, EventPublisher eventPublisher) {
if (!scmContext.isLeaderReady() || scmContext.isInSafeMode()) {
// same condition in ReplicationManager
return;
}
LOG.debug("ReplicationManagerEventHandler received event for datanode: {}", datanodeDetails);
replicationManager.notifyNodeStateChange();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,8 @@ public int overReplicatedQueueSize() {
return overRepQueue.size();
}

public boolean isEmpty() {
return underRepQueue.isEmpty() && overRepQueue.isEmpty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ public final class SCMEvents {
new TypedEvent<>(CommandStatusReportHandler.DeleteBlockStatus.class,
"Delete_Block_Status");

public static final TypedEvent<DatanodeDetails>
REPLICATION_MANAGER_NOTIFY =
new TypedEvent<>(DatanodeDetails.class, "Replication_Manager_Notify");

/**
* Private Ctor. Never Constructed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
Expand Down Expand Up @@ -91,12 +92,21 @@ public void onMessage(final DatanodeDetails datanodeDetails,
closeContainers(datanodeDetails, publisher);
destroyPipelines(datanodeDetails);

boolean isNodeInMaintenance = nodeManager.getNodeStatus(datanodeDetails).isInMaintenance();

// Remove the container replicas associated with the dead node unless it
// is IN_MAINTENANCE
if (!nodeManager.getNodeStatus(datanodeDetails).isInMaintenance()) {
if (!isNodeInMaintenance) {
removeContainerReplicas(datanodeDetails);
}


// Notify ReplicationManager
if (!isNodeInMaintenance) {
LOG.debug("Notifying ReplicationManager about dead node: {}",
datanodeDetails);
publisher.fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanodeDetails);
}

// remove commands in command queue for the DN
final List<SCMCommand<?>> cmdList = nodeManager.getCommandQueue(
datanodeDetails.getUuid());
Expand All @@ -105,8 +115,7 @@ public void onMessage(final DatanodeDetails datanodeDetails,

// remove DeleteBlocksCommand associated with the dead node unless it
// is IN_MAINTENANCE
if (deletedBlockLog != null &&
!nodeManager.getNodeStatus(datanodeDetails).isInMaintenance()) {
if (deletedBlockLog != null && !isNodeInMaintenance) {
deletedBlockLog.onDatanodeDead(datanodeDetails.getUuid());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,9 +636,34 @@ protected void updateDatanodeOpState(DatanodeDetails reportedDn)
}
}
DatanodeDetails scmDnd = nodeStateManager.getNode(reportedDn);
NodeOperationalState oldPersistedOpState = scmDnd.getPersistedOpState();
NodeOperationalState newPersistedOpState = reportedDn.getPersistedOpState();

scmDnd.setPersistedOpStateExpiryEpochSec(
reportedDn.getPersistedOpStateExpiryEpochSec());
scmDnd.setPersistedOpState(reportedDn.getPersistedOpState());
scmDnd.setPersistedOpState(newPersistedOpState);

maybeNotifyReplicationManager(reportedDn, oldPersistedOpState, newPersistedOpState);
}

private void maybeNotifyReplicationManager(
DatanodeDetails datanode,
NodeOperationalState oldState,
NodeOperationalState newState) {
if (!scmContext.isLeader()) {
return;
}

if (oldState != newState) {
// Notify when a node is entering maintenance, decommissioning or back to service
if (newState == NodeOperationalState.ENTERING_MAINTENANCE
|| newState == NodeOperationalState.DECOMMISSIONING
|| newState == NodeOperationalState.IN_SERVICE) {
LOG.info("Notifying ReplicationManager of node state change for {}: {} -> {}",
datanode, oldState, newState);
scmNodeEventPublisher.fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanode);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.container.replication.DatanodeCommandCountUpdatedHandler;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManagerEventHandler;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
Expand Down Expand Up @@ -495,11 +496,16 @@ private void initializeEventHandlers() {
PipelineActionHandler pipelineActionHandler =
new PipelineActionHandler(pipelineManager, scmContext);

ReplicationManagerEventHandler replicationManagerEventHandler =
new ReplicationManagerEventHandler(replicationManager, scmContext);

eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND_COUNT_UPDATED,
new DatanodeCommandCountUpdatedHandler(replicationManager));
eventQueue.addHandler(SCMEvents.REPLICATION_MANAGER_NOTIFY,
replicationManagerEventHandler);

// Use the same executor for both ICR and FCR.
// The Executor maps the event to a thread for DN.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
Expand Down Expand Up @@ -97,13 +99,15 @@
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Lists;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.TestClock;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;

/**
Expand Down Expand Up @@ -1648,6 +1652,62 @@ public void testPendingOpExpiry() throws ContainerNotFoundException {
assertNotEquals(commandDeadline, sentCommand.getRight().getDeadline());
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testNotifyNodeStateChangeWakesUpThread(boolean queueIsEmpty)
throws IOException, InterruptedException, ReflectiveOperationException, TimeoutException {

AtomicBoolean processAllCalled = new AtomicBoolean(false);
ReplicationQueue queue = mock(ReplicationQueue.class);
when(queue.isEmpty()).thenReturn(queueIsEmpty);
final ReplicationManager customRM = new ReplicationManager(
configuration,
containerManager,
ratisPlacementPolicy,
ecPlacementPolicy,
eventPublisher,
scmContext,
nodeManager,
clock,
containerReplicaPendingOps) {
@Override
public ReplicationQueue getQueue() {
return queue;
}

@Override
public synchronized void processAll() {
processAllCalled.set(true);
}
};

customRM.notifyStatusChanged();
customRM.start();

// wait for the thread become TIMED_WAITING
GenericTestUtils.waitFor(
() -> customRM.isThreadWaiting(),
100,
1000);

// The processAll method will be called when the ReplicationManager's run
// method is executed by the replicationMonitor thread.
assertTrue(processAllCalled.get());
processAllCalled.set(false);

assertThat(customRM.notifyNodeStateChange()).isEqualTo(queueIsEmpty);

GenericTestUtils.waitFor(
() -> customRM.isThreadWaiting(),
100,
1000);

// If the queue is empty, the processAll method should have been called
assertEquals(processAllCalled.get(), queueIsEmpty);

customRM.stop();
}

@SafeVarargs
private final Set<ContainerReplica> addReplicas(ContainerInfo container,
ContainerReplicaProto.State replicaState,
Expand Down
Loading