Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@ public abstract class SCMCommand<T extends GeneratedMessage> implements
IdentifiableEventPayload {
private final long id;

// Under HA mode, holds term of underlying RaftServer iff current
// SCM is a leader, otherwise, holds term 0.
// Notes that, the first elected leader is from term 1, term 0,
// as the initial value of currentTerm, is never used under HA mode.
private long term = 0;
// If running upon Ratis, holds term of underlying RaftServer iff current
// SCM is a leader. If running without Ratis, holds SCMContext.INVALID_TERM.
private long term;

SCMCommand() {
this.id = HddsIdFactory.getLongId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,9 @@ message SCMCommandProto {
optional ClosePipelineCommandProto closePipelineCommandProto = 8;
optional SetNodeOperationalStateCommandProto setNodeOperationalStateCommandProto = 9;

// Under HA mode, holds term of underlying RaftServer iff current
// SCM is a leader, otherwise, holds term 0.
// Notes that, the first elected leader is from term 1, term 0,
// as the initial value of currentTerm, is never used under HA mode.
optional uint64 term = 15;
// If running upon Ratis, holds term of underlying RaftServer iff current
// SCM is a leader. If running without Ratis, holds SCMContext.INVALID_TERM.
optional int64 term = 15;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public BlockManagerImpl(final ConfigurationSource conf,
blockDeletingService =
new SCMBlockDeletingService(deletedBlockLog, containerManager,
scm.getScmNodeManager(), scm.getEventQueue(), scm.getScmContext(),
svcInterval, serviceTimeout, conf);
scm.getSCMServiceManager(), svcInterval, serviceTimeout, conf);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand All @@ -32,6 +34,8 @@
import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMService;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.server.events.EventPublisher;
Expand All @@ -57,7 +61,8 @@
* SCM HB thread polls cached commands and sends them to datanode for physical
* processing.
*/
public class SCMBlockDeletingService extends BackgroundService {
public class SCMBlockDeletingService extends BackgroundService
implements SCMService {

public static final Logger LOG =
LoggerFactory.getLogger(SCMBlockDeletingService.class);
Expand All @@ -71,11 +76,18 @@ public class SCMBlockDeletingService extends BackgroundService {

private int blockDeleteLimitSize;

/**
* SCMService related variables.
*/
private final Lock serviceLock = new ReentrantLock();
private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of setting default value as ServiceStatus.PAUSING, will it make sense to initialize this variable as RUNNING during construction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually not. BlockDeletingService should not run on follower SCMs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O yes thanks!


@SuppressWarnings("parameternumber")
public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
ContainerManagerV2 containerManager, NodeManager nodeManager,
EventPublisher eventPublisher, SCMContext scmContext,
Duration interval, long serviceTimeout, ConfigurationSource conf) {
SCMServiceManager serviceManager, Duration interval, long serviceTimeout,
ConfigurationSource conf) {
super("SCMBlockDeletingService", interval.toMillis(), TimeUnit.MILLISECONDS,
BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
this.deletedBlockLog = deletedBlockLog;
Expand All @@ -88,6 +100,9 @@ public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
conf.getObject(ScmConfig.class).getBlockDeletionLimit();
Preconditions.checkArgument(blockDeleteLimitSize > 0,
"Block deletion limit should be " + "positive.");

// register SCMBlockDeletingService to SCMServiceManager
serviceManager.register(this);
}

@Override
Expand Down Expand Up @@ -121,6 +136,10 @@ public int getPriority() {

@Override
public EmptyTaskResult call() throws Exception {
if (!shouldRun()) {
return EmptyTaskResult.newResult();
}

long startTime = Time.monotonicNow();
// Scan SCM DB in HB interval and collect a throttled list of
// to delete blocks.
Expand Down Expand Up @@ -153,7 +172,7 @@ public EmptyTaskResult call() throws Exception {
// command is bigger than a limit, e.g 50. In case datanode goes
// offline for sometime, the cached commands be flooded.
SCMCommand<?> command = new DeleteBlocksCommand(dnTXs);
command.setTerm(scmContext.getTerm());
command.setTerm(scmContext.getTermOfLeader());
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
new CommandForDatanode<>(dnId, command));
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -200,4 +219,33 @@ public EmptyTaskResult call() throws Exception {
public void setBlockDeleteTXNum(int numTXs) {
blockDeleteLimitSize = numTXs;
}

@Override
public void notifyStatusChanged() {
serviceLock.lock();
try {
if (scmContext.isLeader()) {
serviceStatus = ServiceStatus.RUNNING;
} else {
serviceStatus = ServiceStatus.PAUSING;
}
} finally {
serviceLock.unlock();
}
}

@Override
public boolean shouldRun() {
serviceLock.lock();
try {
return serviceStatus == ServiceStatus.RUNNING;
} finally {
serviceLock.unlock();
}
}

@Override
public String getServiceName() {
return SCMBlockDeletingService.class.getSimpleName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ protected void deleteReplica(ContainerID containerID, DatanodeDetails dn,
SCMCommand<?> command = new DeleteContainerCommand(
containerID.getId(), true);
try {
command.setTerm(scmContext.getTerm());
command.setTerm(scmContext.getTermOfLeader());
} catch (NotLeaderException nle) {
logger.warn("Skip sending delete container command," +
" since not leader SCM", nle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) {
if (container.getState() == LifeCycleState.CLOSING) {
SCMCommand<?> command = new CloseContainerCommand(
containerID.getId(), container.getPipelineID());
command.setTerm(scmContext.getTerm());
command.setTerm(scmContext.getTermOfLeader());

getNodes(container).forEach(node ->
publisher.fireEvent(DATANODE_COMMAND,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,18 @@
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigType;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
Expand All @@ -48,11 +53,11 @@
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMService;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
Expand Down Expand Up @@ -84,8 +89,7 @@
* that the containers are properly replicated. Replication Manager deals only
* with Quasi Closed / Closed container.
*/
public class ReplicationManager
implements MetricsSource, EventHandler<SafeModeStatus> {
public class ReplicationManager implements MetricsSource, SCMService {

public static final Logger LOG =
LoggerFactory.getLogger(ReplicationManager.class);
Expand Down Expand Up @@ -138,7 +142,7 @@ public class ReplicationManager
/**
* ReplicationManager specific configuration.
*/
private final ReplicationManagerConfiguration conf;
private final ReplicationManagerConfiguration rmConf;

/**
* ReplicationMonitor thread is the one which wakes up at configured
Expand All @@ -157,6 +161,16 @@ public class ReplicationManager
*/
private int minHealthyForMaintenance;

/**
* SCMService related variables.
* After leaving safe mode, replicationMonitor needs to wait for a while
* before really take effect.
*/
private final Lock serviceLock = new ReentrantLock();
private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
private final long waitTimeInMillis;
private long lastTimeToBeReadyInMillis = 0;

/**
* Constructs ReplicationManager instance with the given configuration.
*
Expand All @@ -165,11 +179,13 @@ public class ReplicationManager
* @param containerPlacement PlacementPolicy
* @param eventPublisher EventPublisher
*/
public ReplicationManager(final ReplicationManagerConfiguration conf,
@SuppressWarnings("parameternumber")
public ReplicationManager(final ConfigurationSource conf,
final ContainerManagerV2 containerManager,
final PlacementPolicy containerPlacement,
final EventPublisher eventPublisher,
final SCMContext scmContext,
final SCMServiceManager serviceManager,
final LockManager<ContainerID> lockManager,
final NodeManager nodeManager) {
this.containerManager = containerManager;
Expand All @@ -178,11 +194,22 @@ public ReplicationManager(final ReplicationManagerConfiguration conf,
this.scmContext = scmContext;
this.lockManager = lockManager;
this.nodeManager = nodeManager;
this.conf = conf;
this.rmConf = conf.getObject(ReplicationManagerConfiguration.class);
this.running = false;
this.inflightReplication = new ConcurrentHashMap<>();
this.inflightDeletion = new ConcurrentHashMap<>();
this.minHealthyForMaintenance = conf.getMaintenanceReplicaMinimum();
this.minHealthyForMaintenance = rmConf.getMaintenanceReplicaMinimum();

this.waitTimeInMillis = conf.getTimeDuration(
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
TimeUnit.MILLISECONDS);

// register ReplicationManager to SCMServiceManager.
serviceManager.register(this);

// start ReplicationManager.
start();
}

/**
Expand Down Expand Up @@ -263,7 +290,7 @@ private synchronized void run() {
" processing {} containers.", Time.monotonicNow() - start,
containers.size());

wait(conf.getInterval());
wait(rmConf.getInterval());
}
} catch (Throwable t) {
// When we get runtime exception, we should terminate SCM.
Expand All @@ -278,6 +305,10 @@ private synchronized void run() {
* @param container ContainerInfo
*/
private void processContainer(ContainerInfo container) {
if (!shouldRun()) {
return;
}

final ContainerID id = container.containerID();
lockManager.lock(id);
try {
Expand Down Expand Up @@ -419,7 +450,7 @@ private void updateInflightAction(final ContainerInfo container,
final Map<ContainerID, List<InflightAction>> inflightActions,
final Predicate<InflightAction> filter) {
final ContainerID id = container.containerID();
final long deadline = Time.monotonicNow() - conf.getEventTimeout();
final long deadline = Time.monotonicNow() - rmConf.getEventTimeout();
if (inflightActions.containsKey(id)) {
final List<InflightAction> actions = inflightActions.get(id);

Expand Down Expand Up @@ -971,7 +1002,7 @@ private void sendCloseCommand(final ContainerInfo container,
new CloseContainerCommand(container.getContainerID(),
container.getPipelineID(), force);
try {
closeContainerCommand.setTerm(scmContext.getTerm());
closeContainerCommand.setTerm(scmContext.getTermOfLeader());
} catch (NotLeaderException nle) {
LOG.warn("Skip sending close container command,"
+ " since current SCM is not leader.", nle);
Expand Down Expand Up @@ -1043,7 +1074,7 @@ private <T extends GeneratedMessage> void sendAndTrackDatanodeCommand(
final SCMCommand<T> command,
final Consumer<InflightAction> tracker) {
try {
command.setTerm(scmContext.getTerm());
command.setTerm(scmContext.getTermOfLeader());
} catch (NotLeaderException nle) {
LOG.warn("Skip sending datanode command,"
+ " since current SCM is not leader.", nle);
Expand Down Expand Up @@ -1120,14 +1151,6 @@ public void getMetrics(MetricsCollector collector, boolean all) {
.endRecord();
}

@Override
public void onMessage(SafeModeStatus status,
EventPublisher publisher) {
if (!status.isInSafeMode() && !this.isRunning()) {
this.start();
}
}

/**
* Wrapper class to hold the InflightAction with its start time.
*/
Expand Down Expand Up @@ -1241,4 +1264,42 @@ public String toString() {
.toString();
}
}

@Override
public void notifyStatusChanged() {
serviceLock.lock();
try {
// 1) SCMContext#isLeader returns true.
// 2) not in safe mode.
if (scmContext.isLeader() && !scmContext.isInSafeMode()) {
// transition from PAUSING to RUNNING
if (serviceStatus != ServiceStatus.RUNNING) {
LOG.info("Service {} transitions to RUNNING.", getServiceName());
lastTimeToBeReadyInMillis = Time.monotonicNow();
serviceStatus = ServiceStatus.RUNNING;
}
} else {
serviceStatus = ServiceStatus.PAUSING;
}
} finally {
serviceLock.unlock();
}
}

@Override
public boolean shouldRun() {
serviceLock.lock();
try {
// If safe mode is off, then this SCMService starts to run with a delay.
return serviceStatus == ServiceStatus.RUNNING &&
Time.monotonicNow() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
} finally {
serviceLock.unlock();
}
}

@Override
public String getServiceName() {
return ReplicationManager.class.getSimpleName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,6 @@ public final class SCMEvents {
public static final TypedEvent<SafeModeStatus> SAFE_MODE_STATUS =
new TypedEvent<>(SafeModeStatus.class, "Safe mode status");

public static final TypedEvent<SafeModeStatus> DELAYED_SAFE_MODE_STATUS =
new TypedEvent<>(SafeModeStatus.class, "Delayed safe mode status");

/**
* Private Ctor. Never Constructed.
*/
Expand Down
Loading