diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java index 4d87bb096cb6..6115f1653826 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java @@ -32,11 +32,9 @@ public abstract class SCMCommand implements IdentifiableEventPayload { private final long id; - // Under HA mode, holds term of underlying RaftServer iff current - // SCM is a leader, otherwise, holds term 0. - // Notes that, the first elected leader is from term 1, term 0, - // as the initial value of currentTerm, is never used under HA mode. - private long term = 0; + // If running upon Ratis, holds term of underlying RaftServer iff current + // SCM is a leader. If running without Ratis, holds SCMContext.INVALID_TERM. + private long term; SCMCommand() { this.id = HddsIdFactory.getLongId(); diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto index c7a22934509d..410970eb3713 100644 --- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto @@ -306,11 +306,9 @@ message SCMCommandProto { optional ClosePipelineCommandProto closePipelineCommandProto = 8; optional SetNodeOperationalStateCommandProto setNodeOperationalStateCommandProto = 9; - // Under HA mode, holds term of underlying RaftServer iff current - // SCM is a leader, otherwise, holds term 0. - // Notes that, the first elected leader is from term 1, term 0, - // as the initial value of currentTerm, is never used under HA mode. - optional uint64 term = 15; + // If running upon Ratis, holds term of underlying RaftServer iff current + // SCM is a leader. If running without Ratis, holds SCMContext.INVALID_TERM. + optional int64 term = 15; } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index b2150265c963..fb5d5d521c49 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -113,7 +113,7 @@ public BlockManagerImpl(final ConfigurationSource conf, blockDeletingService = new SCMBlockDeletingService(deletedBlockLog, containerManager, scm.getScmNodeManager(), scm.getEventQueue(), scm.getScmContext(), - svcInterval, serviceTimeout, conf); + scm.getSCMServiceManager(), svcInterval, serviceTimeout, conf); } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java index c3028a40ab78..0a8e89745504 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -32,6 +34,8 @@ import org.apache.hadoop.hdds.scm.container.ContainerManagerV2; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMService; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -57,7 +61,8 @@ * SCM HB thread polls cached commands and sends them to datanode for physical * processing. */ -public class SCMBlockDeletingService extends BackgroundService { +public class SCMBlockDeletingService extends BackgroundService + implements SCMService { public static final Logger LOG = LoggerFactory.getLogger(SCMBlockDeletingService.class); @@ -71,11 +76,18 @@ public class SCMBlockDeletingService extends BackgroundService { private int blockDeleteLimitSize; + /** + * SCMService related variables. + */ + private final Lock serviceLock = new ReentrantLock(); + private ServiceStatus serviceStatus = ServiceStatus.PAUSING; + @SuppressWarnings("parameternumber") public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog, ContainerManagerV2 containerManager, NodeManager nodeManager, EventPublisher eventPublisher, SCMContext scmContext, - Duration interval, long serviceTimeout, ConfigurationSource conf) { + SCMServiceManager serviceManager, Duration interval, long serviceTimeout, + ConfigurationSource conf) { super("SCMBlockDeletingService", interval.toMillis(), TimeUnit.MILLISECONDS, BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout); this.deletedBlockLog = deletedBlockLog; @@ -88,6 +100,9 @@ public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog, conf.getObject(ScmConfig.class).getBlockDeletionLimit(); Preconditions.checkArgument(blockDeleteLimitSize > 0, "Block deletion limit should be " + "positive."); + + // register SCMBlockDeletingService to SCMServiceManager + serviceManager.register(this); } @Override @@ -121,6 +136,10 @@ public int getPriority() { @Override public EmptyTaskResult call() throws Exception { + if (!shouldRun()) { + return EmptyTaskResult.newResult(); + } + long startTime = Time.monotonicNow(); // Scan SCM DB in HB interval and collect a throttled list of // to delete blocks. @@ -153,7 +172,7 @@ public EmptyTaskResult call() throws Exception { // command is bigger than a limit, e.g 50. In case datanode goes // offline for sometime, the cached commands be flooded. SCMCommand command = new DeleteBlocksCommand(dnTXs); - command.setTerm(scmContext.getTerm()); + command.setTerm(scmContext.getTermOfLeader()); eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, new CommandForDatanode<>(dnId, command)); if (LOG.isDebugEnabled()) { @@ -200,4 +219,33 @@ public EmptyTaskResult call() throws Exception { public void setBlockDeleteTXNum(int numTXs) { blockDeleteLimitSize = numTXs; } + + @Override + public void notifyStatusChanged() { + serviceLock.lock(); + try { + if (scmContext.isLeader()) { + serviceStatus = ServiceStatus.RUNNING; + } else { + serviceStatus = ServiceStatus.PAUSING; + } + } finally { + serviceLock.unlock(); + } + } + + @Override + public boolean shouldRun() { + serviceLock.lock(); + try { + return serviceStatus == ServiceStatus.RUNNING; + } finally { + serviceLock.unlock(); + } + } + + @Override + public String getServiceName() { + return SCMBlockDeletingService.class.getSimpleName(); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java index d71539d1af76..d8d31aed9d37 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java @@ -327,7 +327,7 @@ protected void deleteReplica(ContainerID containerID, DatanodeDetails dn, SCMCommand command = new DeleteContainerCommand( containerID.getId(), true); try { - command.setTerm(scmContext.getTerm()); + command.setTerm(scmContext.getTermOfLeader()); } catch (NotLeaderException nle) { logger.warn("Skip sending delete container command," + " since not leader SCM", nle); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java index 3320d900dbbe..449252caedac 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java @@ -81,7 +81,7 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) { if (container.getState() == LifeCycleState.CLOSING) { SCMCommand command = new CloseContainerCommand( containerID.getId(), container.getPipelineID()); - command.setTerm(scmContext.getTerm()); + command.setTerm(scmContext.getTermOfLeader()); getNodes(container).forEach(node -> publisher.fireEvent(DATANODE_COMMAND, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 0559c3c7e0b0..ed439d002499 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -32,13 +32,18 @@ import java.util.StringJoiner; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; +import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigType; +import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; @@ -48,11 +53,11 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMService; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; -import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus; -import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsInfo; @@ -84,8 +89,7 @@ * that the containers are properly replicated. Replication Manager deals only * with Quasi Closed / Closed container. */ -public class ReplicationManager - implements MetricsSource, EventHandler { +public class ReplicationManager implements MetricsSource, SCMService { public static final Logger LOG = LoggerFactory.getLogger(ReplicationManager.class); @@ -138,7 +142,7 @@ public class ReplicationManager /** * ReplicationManager specific configuration. */ - private final ReplicationManagerConfiguration conf; + private final ReplicationManagerConfiguration rmConf; /** * ReplicationMonitor thread is the one which wakes up at configured @@ -157,6 +161,16 @@ public class ReplicationManager */ private int minHealthyForMaintenance; + /** + * SCMService related variables. + * After leaving safe mode, replicationMonitor needs to wait for a while + * before really take effect. + */ + private final Lock serviceLock = new ReentrantLock(); + private ServiceStatus serviceStatus = ServiceStatus.PAUSING; + private final long waitTimeInMillis; + private long lastTimeToBeReadyInMillis = 0; + /** * Constructs ReplicationManager instance with the given configuration. * @@ -165,11 +179,13 @@ public class ReplicationManager * @param containerPlacement PlacementPolicy * @param eventPublisher EventPublisher */ - public ReplicationManager(final ReplicationManagerConfiguration conf, + @SuppressWarnings("parameternumber") + public ReplicationManager(final ConfigurationSource conf, final ContainerManagerV2 containerManager, final PlacementPolicy containerPlacement, final EventPublisher eventPublisher, final SCMContext scmContext, + final SCMServiceManager serviceManager, final LockManager lockManager, final NodeManager nodeManager) { this.containerManager = containerManager; @@ -178,11 +194,22 @@ public ReplicationManager(final ReplicationManagerConfiguration conf, this.scmContext = scmContext; this.lockManager = lockManager; this.nodeManager = nodeManager; - this.conf = conf; + this.rmConf = conf.getObject(ReplicationManagerConfiguration.class); this.running = false; this.inflightReplication = new ConcurrentHashMap<>(); this.inflightDeletion = new ConcurrentHashMap<>(); - this.minHealthyForMaintenance = conf.getMaintenanceReplicaMinimum(); + this.minHealthyForMaintenance = rmConf.getMaintenanceReplicaMinimum(); + + this.waitTimeInMillis = conf.getTimeDuration( + HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, + HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT, + TimeUnit.MILLISECONDS); + + // register ReplicationManager to SCMServiceManager. + serviceManager.register(this); + + // start ReplicationManager. + start(); } /** @@ -263,7 +290,7 @@ private synchronized void run() { " processing {} containers.", Time.monotonicNow() - start, containers.size()); - wait(conf.getInterval()); + wait(rmConf.getInterval()); } } catch (Throwable t) { // When we get runtime exception, we should terminate SCM. @@ -278,6 +305,10 @@ private synchronized void run() { * @param container ContainerInfo */ private void processContainer(ContainerInfo container) { + if (!shouldRun()) { + return; + } + final ContainerID id = container.containerID(); lockManager.lock(id); try { @@ -419,7 +450,7 @@ private void updateInflightAction(final ContainerInfo container, final Map> inflightActions, final Predicate filter) { final ContainerID id = container.containerID(); - final long deadline = Time.monotonicNow() - conf.getEventTimeout(); + final long deadline = Time.monotonicNow() - rmConf.getEventTimeout(); if (inflightActions.containsKey(id)) { final List actions = inflightActions.get(id); @@ -971,7 +1002,7 @@ private void sendCloseCommand(final ContainerInfo container, new CloseContainerCommand(container.getContainerID(), container.getPipelineID(), force); try { - closeContainerCommand.setTerm(scmContext.getTerm()); + closeContainerCommand.setTerm(scmContext.getTermOfLeader()); } catch (NotLeaderException nle) { LOG.warn("Skip sending close container command," + " since current SCM is not leader.", nle); @@ -1043,7 +1074,7 @@ private void sendAndTrackDatanodeCommand( final SCMCommand command, final Consumer tracker) { try { - command.setTerm(scmContext.getTerm()); + command.setTerm(scmContext.getTermOfLeader()); } catch (NotLeaderException nle) { LOG.warn("Skip sending datanode command," + " since current SCM is not leader.", nle); @@ -1120,14 +1151,6 @@ public void getMetrics(MetricsCollector collector, boolean all) { .endRecord(); } - @Override - public void onMessage(SafeModeStatus status, - EventPublisher publisher) { - if (!status.isInSafeMode() && !this.isRunning()) { - this.start(); - } - } - /** * Wrapper class to hold the InflightAction with its start time. */ @@ -1241,4 +1264,42 @@ public String toString() { .toString(); } } + + @Override + public void notifyStatusChanged() { + serviceLock.lock(); + try { + // 1) SCMContext#isLeader returns true. + // 2) not in safe mode. + if (scmContext.isLeader() && !scmContext.isInSafeMode()) { + // transition from PAUSING to RUNNING + if (serviceStatus != ServiceStatus.RUNNING) { + LOG.info("Service {} transitions to RUNNING.", getServiceName()); + lastTimeToBeReadyInMillis = Time.monotonicNow(); + serviceStatus = ServiceStatus.RUNNING; + } + } else { + serviceStatus = ServiceStatus.PAUSING; + } + } finally { + serviceLock.unlock(); + } + } + + @Override + public boolean shouldRun() { + serviceLock.lock(); + try { + // If safe mode is off, then this SCMService starts to run with a delay. + return serviceStatus == ServiceStatus.RUNNING && + Time.monotonicNow() - lastTimeToBeReadyInMillis >= waitTimeInMillis; + } finally { + serviceLock.unlock(); + } + } + + @Override + public String getServiceName() { + return ReplicationManager.class.getSimpleName(); + } } \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 6f6cc54f3054..d01257bcb2ef 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -194,9 +194,6 @@ public final class SCMEvents { public static final TypedEvent SAFE_MODE_STATUS = new TypedEvent<>(SafeModeStatus.class, "Safe mode status"); - public static final TypedEvent DELAYED_SAFE_MODE_STATUS = - new TypedEvent<>(SafeModeStatus.class, "Delayed safe mode status"); - /** * Private Ctor. Never Constructed. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java index 17dad7e07c71..2d4941f9c194 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java @@ -20,8 +20,6 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,14 +30,24 @@ /** * SCMContext is the single source of truth for some key information shared * across all components within SCM, including: - * - RaftServer related info, e.g., isLeader, term. - * - SafeMode related info, e.g., inSafeMode, preCheckComplete. + * 1) RaftServer related info, e.g., isLeader, term. + * 2) SafeMode related info, e.g., inSafeMode, preCheckComplete. + * + * If current SCM is not running upon Ratis, the {@link SCMContext#isLeader} + * check will always return true, and {@link SCMContext#getTermOfLeader} will + * return INVALID_TERM. */ -public class SCMContext implements EventHandler { +public final class SCMContext { private static final Logger LOG = LoggerFactory.getLogger(SCMContext.class); + /** + * The initial value of term in raft is 0, and term increases monotonically. + * term equals INVALID_TERM indicates current SCM is running without Ratis. + */ + public static final long INVALID_TERM = -1; + private static final SCMContext EMPTY_CONTEXT - = new SCMContext(true, 0, new SafeModeStatus(false, true), null); + = new SCMContext.Builder().build(); /** * Used by non-HA mode SCM, Recon and Unit Tests. @@ -62,9 +70,8 @@ public static SCMContext emptyContext() { private final StorageContainerManager scm; private final ReadWriteLock lock = new ReentrantReadWriteLock(); - SCMContext(boolean isLeader, long term, - final SafeModeStatus safeModeStatus, - final StorageContainerManager scm) { + private SCMContext(boolean isLeader, long term, + final SafeModeStatus safeModeStatus, final StorageContainerManager scm) { this.isLeader = isLeader; this.term = term; this.safeModeStatus = safeModeStatus; @@ -72,25 +79,16 @@ public static SCMContext emptyContext() { } /** - * Creates SCMContext instance from StorageContainerManager. - */ - public SCMContext(final StorageContainerManager scm) { - this(false, 0, new SafeModeStatus(true, false), scm); - Preconditions.checkNotNull(scm, "scm is null"); - } - - /** - * - * @param newIsLeader : is leader or not - * @param newTerm : term if current SCM becomes leader + * @param leader : is leader or not + * @param newTerm : term if current SCM becomes leader */ - public void updateIsLeaderAndTerm(boolean newIsLeader, long newTerm) { + public void updateLeaderAndTerm(boolean leader, long newTerm) { lock.writeLock().lock(); try { LOG.info("update from <{},{}> to <{},{}>", - isLeader, term, newIsLeader, newTerm); + isLeader, term, leader, newTerm); - isLeader = newIsLeader; + isLeader = leader; term = newTerm; } finally { lock.writeLock().unlock(); @@ -105,6 +103,10 @@ public void updateIsLeaderAndTerm(boolean newIsLeader, long newTerm) { public boolean isLeader() { lock.readLock().lock(); try { + if (term == INVALID_TERM) { + return true; + } + return isLeader; } finally { lock.readLock().unlock(); @@ -117,9 +119,13 @@ public boolean isLeader() { * @return term * @throws NotLeaderException if isLeader is false */ - public long getTerm() throws NotLeaderException { + public long getTermOfLeader() throws NotLeaderException { lock.readLock().lock(); try { + if (term == INVALID_TERM) { + return term; + } + if (!isLeader) { LOG.warn("getTerm is invoked when not leader."); throw scm.getScmHAManager() @@ -132,9 +138,10 @@ public long getTerm() throws NotLeaderException { } } - @Override - public void onMessage(SafeModeStatus status, - EventPublisher publisher) { + /** + * @param status : update SCMContext with latest SafeModeStatus. + */ + public void updateSafeModeStatus(SafeModeStatus status) { lock.writeLock().lock(); try { LOG.info("Update SafeModeStatus from {} to {}.", safeModeStatus, status); @@ -161,4 +168,57 @@ public boolean isPreCheckComplete() { lock.readLock().unlock(); } } + + /** + * @return StorageContainerManager + */ + public StorageContainerManager getScm() { + Preconditions.checkNotNull(scm, "scm == null"); + return scm; + } + + public static class Builder { + /** + * The default context: + * running without Ratis, out of safe mode, and has completed preCheck. + */ + private boolean isLeader = false; + private long term = INVALID_TERM; + private boolean isInSafeMode = false; + private boolean isPreCheckComplete = true; + private StorageContainerManager scm = null; + + public Builder setLeader(boolean leader) { + this.isLeader = leader; + return this; + } + + public Builder setTerm(long newTerm) { + this.term = newTerm; + return this; + } + + public Builder setIsInSafeMode(boolean inSafeMode) { + this.isInSafeMode = inSafeMode; + return this; + } + + public Builder setIsPreCheckComplete(boolean preCheckComplete) { + this.isPreCheckComplete = preCheckComplete; + return this; + } + + public Builder setSCM(StorageContainerManager storageContainerManager) { + this.scm = storageContainerManager; + return this; + } + + public SCMContext build() { + return new SCMContext( + isLeader, + term, + new SafeModeStatus(isInSafeMode, isPreCheckComplete), + scm); + } + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java new file mode 100644 index 000000000000..32194a6803d1 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.ha; + +/** + * Interface for background services in SCM, including ReplicationManager, + * SCMBlockDeletingService and BackgroundPipelineCreator. + * + * Provide a fine-grained method to manipulate the status of these background + * services. + */ +public interface SCMService { + /** + * Notify raft or safe mode related status changed. + */ + void notifyStatusChanged(); + + /** + * @param event latest triggered event. + */ + default void notifyEventTriggered(Event event) { + } + + /** + * @return true, if next iteration of Service should take effect, + * false, if next iteration of Service should be skipped. + */ + boolean shouldRun(); + + /** + * @return name of the Service. + */ + String getServiceName(); + + /** + * Status of Service. + */ + enum ServiceStatus { + RUNNING, + PAUSING + } + + /** + * One time event. + */ + enum Event { + PRE_CHECK_COMPLETED, + NEW_NODE_HANDLER_TRIGGERED, + UNHEALTHY_TO_HEALTHY_NODE_HANDLER_TRIGGERED + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java new file mode 100644 index 000000000000..52e3d26ec9db --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.ha; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hdds.scm.ha.SCMService.*; + +import java.util.ArrayList; +import java.util.List; + +/** + * Manipulate background services in SCM, including ReplicationManager, + * SCMBlockDeletingService and BackgroundPipelineCreator. + */ +public final class SCMServiceManager { + private static final Logger LOG = + LoggerFactory.getLogger(SCMServiceManager.class); + + private final List services = new ArrayList<>(); + + /** + * Register a SCMService to SCMServiceManager. + */ + public synchronized void register(SCMService service) { + Preconditions.checkNotNull(service); + LOG.info("Registering service {}.", service.getServiceName()); + services.add(service); + } + + /** + * Notify raft or safe mode related status changed. + */ + public synchronized void notifyStatusChanged() { + for (SCMService service : services) { + LOG.debug("Notify service:{}.", service.getServiceName()); + service.notifyStatusChanged(); + } + } + + /** + * Notify event triggered, which may affect SCMService. + */ + public synchronized void notifyEventTriggered(Event event) { + for (SCMService service : services) { + LOG.debug("Notify service:{} with event:{.", + service.getServiceName(), event); + service.notifyEventTriggered(event); + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java index aa366e1c96a9..a04f0d82b0d7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java @@ -133,7 +133,9 @@ private Message process(final SCMRatisRequest request) throws Exception { @Override public void notifyNotLeader(Collection pendingEntries) { LOG.info("current leader SCM steps down."); - scm.getScmContext().updateIsLeaderAndTerm(false, 0); + + scm.getScmContext().updateLeaderAndTerm(false, 0); + scm.getSCMServiceManager().notifyStatusChanged(); } @Override @@ -151,7 +153,9 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, .getCurrentTerm(); LOG.info("current SCM becomes leader of term {}.", term); - scm.getScmContext().updateIsLeaderAndTerm(true, term); + + scm.getScmContext().updateLeaderAndTerm(true, term); + scm.getSCMServiceManager().notifyStatusChanged(); } @Override diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java index c511b55b0744..674cf2dfcc7f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java @@ -21,11 +21,12 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ha.SCMService.Event; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,32 +40,33 @@ public class NewNodeHandler implements EventHandler { private final PipelineManager pipelineManager; private final NodeDecommissionManager decommissionManager; private final ConfigurationSource conf; + private final SCMServiceManager serviceManager; public NewNodeHandler(PipelineManager pipelineManager, NodeDecommissionManager decommissionManager, - ConfigurationSource conf) { + ConfigurationSource conf, + SCMServiceManager serviceManager) { this.pipelineManager = pipelineManager; this.decommissionManager = decommissionManager; this.conf = conf; + this.serviceManager = serviceManager; } @Override public void onMessage(DatanodeDetails datanodeDetails, EventPublisher publisher) { try { - pipelineManager.triggerPipelineCreation(); + serviceManager.notifyEventTriggered(Event.NEW_NODE_HANDLER_TRIGGERED); + if (datanodeDetails.getPersistedOpState() != HddsProtos.NodeOperationalState.IN_SERVICE) { decommissionManager.continueAdminForNode(datanodeDetails); } - }catch (NodeNotFoundException e) { + } catch (NodeNotFoundException e) { // Should not happen, as the node has just registered to call this event // handler. LOG.warn("NodeNotFound when adding the node to the decommissionManager", e); - } catch (NotLeaderException nle) { - LOG.debug("Not the current leader SCM and cannot start pipeline" + - " creation."); } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java index 1cb6501e9cf2..d74f90f72856 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java @@ -20,10 +20,10 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.ha.SCMService.Event; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,23 +35,19 @@ public class NonHealthyToHealthyNodeHandler private static final Logger LOG = LoggerFactory.getLogger(NonHealthyToHealthyNodeHandler.class); - private final PipelineManager pipelineManager; private final ConfigurationSource conf; + private final SCMServiceManager serviceManager; public NonHealthyToHealthyNodeHandler( - PipelineManager pipelineManager, OzoneConfiguration conf) { - this.pipelineManager = pipelineManager; + OzoneConfiguration conf, SCMServiceManager serviceManager) { this.conf = conf; + this.serviceManager = serviceManager; } @Override public void onMessage(DatanodeDetails datanodeDetails, EventPublisher publisher) { - try { - pipelineManager.triggerPipelineCreation(); - } catch (NotLeaderException ex) { - LOG.debug("Not the current leader SCM and cannot start pipeline" + - " creation."); - } + serviceManager.notifyEventTriggered( + Event.UNHEALTHY_TO_HEALTHY_NODE_HANDLER_TRIGGERED); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 4e6f53e987b7..e2e50e836255 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -433,7 +433,7 @@ private void updateDatanodeOpState(DatanodeDetails reportedDn) Time.monotonicNow(), scmStatus.getOperationalState(), scmStatus.getOpStateExpiryEpochSeconds()); - command.setTerm(scmContext.getTerm()); + command.setTerm(scmContext.getTermOfLeader()); addDatanodeCommand(reportedDn.getUuid(), command); } catch (NotLeaderException nle) { LOG.warn("Skip sending SetNodeOperationalStateCommand," diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreatorV2.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreatorV2.java new file mode 100644 index 000000000000..343444df5572 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreatorV2.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.pipeline; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.collections.iterators.LoopingIterator; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMService; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.util.Time; +import org.apache.ratis.util.ExitUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.hadoop.hdds.scm.ha.SCMService.Event.UNHEALTHY_TO_HEALTHY_NODE_HANDLER_TRIGGERED; +import static org.apache.hadoop.hdds.scm.ha.SCMService.Event.NEW_NODE_HANDLER_TRIGGERED; +import static org.apache.hadoop.hdds.scm.ha.SCMService.Event.PRE_CHECK_COMPLETED; + +/** + * Implements api for running background pipeline creation jobs. + */ +public class BackgroundPipelineCreatorV2 implements SCMService { + + private static final Logger LOG = + LoggerFactory.getLogger(BackgroundPipelineCreator.class); + + private final PipelineManager pipelineManager; + private final ConfigurationSource conf; + private final SCMContext scmContext; + + /** + * SCMService related variables. + * 1) after leaving safe mode, BackgroundPipelineCreator needs to + * wait for a while before really take effect. + * 2) NewNodeHandler, NonHealthyToHealthyNodeHandler, PreCheckComplete + * will trigger a one-shot run of BackgroundPipelineCreator, + * no matter in safe mode or not. + */ + private final Lock serviceLock = new ReentrantLock(); + private ServiceStatus serviceStatus = ServiceStatus.PAUSING; + private final boolean createPipelineInSafeMode; + private final long waitTimeInMillis; + private long lastTimeToBeReadyInMillis = 0; + private boolean oneShotRun = false; + + /** + * RatisPipelineUtilsThread is the one which wakes up at + * configured interval and tries to create pipelines. + */ + private Thread thread; + private final Object monitor = new Object(); + private static final String THREAD_NAME = "RatisPipelineUtilsThread"; + private final AtomicBoolean running = new AtomicBoolean(false); + private final long intervalInMillis; + + + BackgroundPipelineCreatorV2(PipelineManager pipelineManager, + ConfigurationSource conf, + SCMServiceManager serviceManager, + SCMContext scmContext) { + this.pipelineManager = pipelineManager; + this.conf = conf; + this.scmContext = scmContext; + + this.createPipelineInSafeMode = conf.getBoolean( + HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, + HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT); + + this.waitTimeInMillis = conf.getTimeDuration( + HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, + HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT, + TimeUnit.MILLISECONDS); + + this.intervalInMillis = conf.getTimeDuration( + ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, + ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + + // register BackgroundPipelineCreator to SCMServiceManager + serviceManager.register(this); + + // start RatisPipelineUtilsThread + start(); + } + + /** + * Start RatisPipelineUtilsThread. + */ + public void start() { + if (!running.compareAndSet(false, true)) { + LOG.warn("{} is already started, just ignore.", THREAD_NAME); + return; + } + + LOG.info("Starting {}.", THREAD_NAME); + + thread = new ThreadFactoryBuilder() + .setDaemon(false) + .setNameFormat(THREAD_NAME + " - %d") + .setUncaughtExceptionHandler((Thread t, Throwable ex) -> { + // gracefully shutdown SCM. + scmContext.getScm().stop(); + + String message = "Terminate SCM, encounter uncaught exception" + + " in RatisPipelineUtilsThread"; + ExitUtils.terminate(1, message, ex, LOG); + }) + .build() + .newThread(this::run); + + thread.start(); + } + + /** + * Stop RatisPipelineUtilsThread. + */ + public void stop() { + if (running.compareAndSet(true, false)) { + LOG.warn("{} is not running, just ignore.", THREAD_NAME); + return; + } + + LOG.info("Stopping {}.", THREAD_NAME); + + // in case RatisPipelineUtilsThread is sleeping + synchronized (monitor) { + monitor.notifyAll(); + } + + try { + thread.join(); + } catch (InterruptedException e) { + LOG.warn("Interrupted during join {}.", THREAD_NAME); + Thread.currentThread().interrupt(); + } + } + + private void run() { + while (running.get()) { + if (shouldRun()) { + createPipelines(); + } + + try { + synchronized (monitor) { + monitor.wait(intervalInMillis); + } + } catch (InterruptedException e) { + LOG.warn("{} is interrupted.", THREAD_NAME); + Thread.currentThread().interrupt(); + } + } + } + + private boolean skipCreation(HddsProtos.ReplicationFactor factor, + HddsProtos.ReplicationType type, + boolean autoCreate) { + if (type == HddsProtos.ReplicationType.RATIS) { + return factor == HddsProtos.ReplicationFactor.ONE && (!autoCreate); + } else { + // For STAND_ALONE Replication Type, Replication Factor 3 should not be + // used. + return factor == HddsProtos.ReplicationFactor.THREE; + } + } + + private void createPipelines() throws RuntimeException { + // TODO: #CLUTIL Different replication factor may need to be supported + HddsProtos.ReplicationType type = HddsProtos.ReplicationType.valueOf( + conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE, + OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT)); + boolean autoCreateFactorOne = conf.getBoolean( + ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, + ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE_DEFAULT); + + List list = + new ArrayList<>(); + for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor + .values()) { + if (skipCreation(factor, type, autoCreateFactorOne)) { + // Skip this iteration for creating pipeline + continue; + } + list.add(factor); + if (!pipelineManager.getSafeModeStatus()) { + try { + pipelineManager.scrubPipeline(type, factor); + } catch (IOException e) { + LOG.error("Error while scrubbing pipelines.", e); + } + } + } + + LoopingIterator it = new LoopingIterator(list); + while (it.hasNext()) { + HddsProtos.ReplicationFactor factor = + (HddsProtos.ReplicationFactor) it.next(); + + try { + pipelineManager.createPipeline(type, factor); + } catch (IOException ioe) { + it.remove(); + } catch (Throwable t) { + LOG.error("Error while creating pipelines", t); + it.remove(); + } + } + + LOG.debug("BackgroundPipelineCreator createPipelines finished."); + } + + @Override + public void notifyStatusChanged() { + serviceLock.lock(); + try { + // 1) SCMContext#isLeader returns true. + // 2) not in safe mode or createPipelineInSafeMode is true + if (scmContext.isLeader() && + (!scmContext.isInSafeMode() || createPipelineInSafeMode)) { + // transition from PAUSING to RUNNING + if (serviceStatus != ServiceStatus.RUNNING) { + LOG.info("Service {} transitions to RUNNING.", getServiceName()); + lastTimeToBeReadyInMillis = Time.monotonicNow(); + serviceStatus = ServiceStatus.RUNNING; + } + } else { + serviceStatus = ServiceStatus.PAUSING; + } + } finally { + serviceLock.unlock(); + } + } + + @Override + public void notifyEventTriggered(Event event) { + if (!scmContext.isLeader()) { + LOG.info("ignore, not leader SCM."); + return; + } + if (event == NEW_NODE_HANDLER_TRIGGERED + || event == UNHEALTHY_TO_HEALTHY_NODE_HANDLER_TRIGGERED + || event == PRE_CHECK_COMPLETED) { + LOG.info("trigger a one-shot run on {}.", THREAD_NAME); + oneShotRun = true; + + synchronized (monitor) { + monitor.notifyAll(); + } + } + } + + @Override + public boolean shouldRun() { + serviceLock.lock(); + try { + // check one-short run + if (oneShotRun) { + oneShotRun = false; + return true; + } + + // If safe mode is off, then this SCMService starts to run with a delay. + return serviceStatus == ServiceStatus.RUNNING && ( + createPipelineInSafeMode || + Time.monotonicNow() - lastTimeToBeReadyInMillis >= waitTimeInMillis); + } finally { + serviceLock.unlock(); + } + } + + @Override + public String getServiceName() { + return BackgroundPipelineCreator.class.getSimpleName(); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java index 89d2833d32ee..e33f256a4476 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java @@ -94,7 +94,7 @@ private void processPipelineAction(final DatanodeDetails datanode, "firing close pipeline event.", action, pid); SCMCommand command = new ClosePipelineCommand(pid); try { - command.setTerm(scmContext.getTerm()); + command.setTerm(scmContext.getTermOfLeader()); } catch (NotLeaderException nle) { LOG.warn("Skip sending ClosePipelineCommand for pipeline {}," + " since not leader SCM.", pid); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 9f714da0a4d3..2078460133e2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -28,15 +28,12 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus; -import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.ratis.protocol.exceptions.NotLeaderException; /** * Interface which exposes the api for pipeline management. */ -public interface PipelineManager extends Closeable, PipelineManagerMXBean, - EventHandler { +public interface PipelineManager extends Closeable, PipelineManagerMXBean { Pipeline createPipeline(ReplicationType type, ReplicationFactor factor) throws IOException; @@ -85,7 +82,7 @@ void scrubPipeline(ReplicationType type, ReplicationFactor factor) void startPipelineCreator(); - void triggerPipelineCreation() throws NotLeaderException; + void triggerPipelineCreation(); void incNumBlocksAllocatedMetric(PipelineID id); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java index 3c881742764e..d0a4c96098d8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java @@ -30,10 +30,9 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.ha.SCMHAManager; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.hdds.utils.Scheduler; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.util.Time; @@ -52,7 +51,6 @@ import java.util.NavigableSet; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -69,27 +67,24 @@ public final class PipelineManagerV2Impl implements PipelineManager { private final Lock lock; private PipelineFactory pipelineFactory; private StateManager stateManager; - private Scheduler scheduler; - private BackgroundPipelineCreator backgroundPipelineCreator; + private BackgroundPipelineCreatorV2 backgroundPipelineCreator; private final ConfigurationSource conf; private final EventPublisher eventPublisher; // Pipeline Manager MXBean private ObjectName pmInfoBean; private final SCMPipelineMetrics metrics; - private long pipelineWaitDefaultTimeout; - private final AtomicBoolean isInSafeMode; - private SCMHAManager scmhaManager; - private NodeManager nodeManager; - // Used to track if the safemode pre-checks have completed. This is designed - // to prevent pipelines being created until sufficient nodes have registered. - private final AtomicBoolean pipelineCreationAllowed; + private final long pipelineWaitDefaultTimeout; + private final SCMHAManager scmhaManager; + private final SCMContext scmContext; + private final NodeManager nodeManager; private PipelineManagerV2Impl(ConfigurationSource conf, SCMHAManager scmhaManager, NodeManager nodeManager, StateManager pipelineStateManager, PipelineFactory pipelineFactory, - EventPublisher eventPublisher) { + EventPublisher eventPublisher, + SCMContext scmContext) { this.lock = new ReentrantLock(); this.pipelineFactory = pipelineFactory; this.stateManager = pipelineStateManager; @@ -97,6 +92,7 @@ private PipelineManagerV2Impl(ConfigurationSource conf, this.scmhaManager = scmhaManager; this.nodeManager = nodeManager; this.eventPublisher = eventPublisher; + this.scmContext = scmContext; this.pmInfoBean = MBeans.register("SCMPipelineManager", "SCMPipelineManagerInfo", this); this.metrics = SCMPipelineMetrics.create(); @@ -104,12 +100,6 @@ private PipelineManagerV2Impl(ConfigurationSource conf, HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL, HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); - this.isInSafeMode = new AtomicBoolean(conf.getBoolean( - HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED, - HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED_DEFAULT)); - // Pipeline creation is only allowed after the safemode prechecks have - // passed, eg sufficient nodes have registered. - this.pipelineCreationAllowed = new AtomicBoolean(!this.isInSafeMode.get()); } public static PipelineManagerV2Impl newPipelineManager( @@ -118,7 +108,8 @@ public static PipelineManagerV2Impl newPipelineManager( NodeManager nodeManager, Table pipelineStore, EventPublisher eventPublisher, - SCMContext scmContext) throws IOException { + SCMContext scmContext, + SCMServiceManager serviceManager) throws IOException { // Create PipelineStateManager StateManager stateManager = PipelineStateManagerV2Impl .newBuilder().setPipelineStore(pipelineStore) @@ -130,18 +121,18 @@ public static PipelineManagerV2Impl newPipelineManager( // Create PipelineFactory PipelineFactory pipelineFactory = new PipelineFactory( nodeManager, stateManager, conf, eventPublisher, scmContext); + // Create PipelineManager PipelineManagerV2Impl pipelineManager = new PipelineManagerV2Impl(conf, scmhaManager, nodeManager, stateManager, pipelineFactory, - eventPublisher); + eventPublisher, scmContext); // Create background thread. - Scheduler scheduler = new Scheduler( - "RatisPipelineUtilsThread", false, 1); - BackgroundPipelineCreator backgroundPipelineCreator = - new BackgroundPipelineCreator(pipelineManager, scheduler, conf); + BackgroundPipelineCreatorV2 backgroundPipelineCreator = + new BackgroundPipelineCreatorV2( + pipelineManager, conf, serviceManager, scmContext); + pipelineManager.setBackgroundPipelineCreator(backgroundPipelineCreator); - pipelineManager.setScheduler(scheduler); return pipelineManager; } @@ -385,17 +376,15 @@ public void scrubPipeline(ReplicationType type, ReplicationFactor factor) */ @Override public void startPipelineCreator() { - backgroundPipelineCreator.startFixedIntervalPipelineCreator(); + throw new RuntimeException("Not supported in HA code."); } /** * Triggers pipeline creation after the specified time. */ @Override - public void triggerPipelineCreation() throws NotLeaderException { - // TODO add checkLeader once follower validates safemode - // before it becomes leader. - backgroundPipelineCreator.triggerPipelineCreation(); + public void triggerPipelineCreation() { + throw new RuntimeException("Not supported in HA code."); } @Override @@ -497,14 +486,13 @@ public Map getPipelineInfo() throws NotLeaderException { */ @Override public boolean getSafeModeStatus() { - return this.isInSafeMode.get(); + return scmContext.isInSafeMode(); } @Override public void close() throws IOException { - if (scheduler != null) { - scheduler.close(); - scheduler = null; + if (backgroundPipelineCreator != null) { + backgroundPipelineCreator.stop(); } if(pmInfoBean != null) { @@ -523,40 +511,9 @@ public void close() throws IOException { } } - @Override - public void onMessage(SCMSafeModeManager.SafeModeStatus status, - EventPublisher publisher) { - // TODO: #CLUTIL - handle safemode getting re-enabled - boolean currentAllowPipelines = - pipelineCreationAllowed.getAndSet(status.isPreCheckComplete()); - boolean currentlyInSafeMode = - isInSafeMode.getAndSet(status.isInSafeMode()); - - // Trigger pipeline creation only if the preCheck status has changed to - // complete. - - try { - if (isPipelineCreationAllowed() && !currentAllowPipelines) { - triggerPipelineCreation(); - } - // Start the pipeline creation thread only when safemode switches off - if (!getSafeModeStatus() && currentlyInSafeMode) { - startPipelineCreator(); - } - } catch (NotLeaderException ex) { - LOG.warn("Not leader SCM, cannot process pipeline creation."); - } - - } - @VisibleForTesting public boolean isPipelineCreationAllowed() { - return pipelineCreationAllowed.get(); - } - - @VisibleForTesting - public void allowPipelineCreation() { - this.pipelineCreationAllowed.set(true); + return scmContext.isLeader() && scmContext.isPreCheckComplete(); } @VisibleForTesting @@ -576,12 +533,13 @@ public SCMHAManager getScmhaManager() { } private void setBackgroundPipelineCreator( - BackgroundPipelineCreator backgroundPipelineCreator) { + BackgroundPipelineCreatorV2 backgroundPipelineCreator) { this.backgroundPipelineCreator = backgroundPipelineCreator; } - private void setScheduler(Scheduler scheduler) { - this.scheduler = scheduler; + @VisibleForTesting + public BackgroundPipelineCreatorV2 getBackgroundPipelineCreator() { + return this.backgroundPipelineCreator; } private void recordMetricsForPipeline(Pipeline pipeline) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java index ca514337fee0..8fc7f3eccbd9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java @@ -103,7 +103,7 @@ protected void processPipelineReport(PipelineReport report, pipeline = pipelineManager.getPipeline(pipelineID); } catch (PipelineNotFoundException e) { SCMCommand command = new ClosePipelineCommand(pipelineID); - command.setTerm(scmContext.getTerm()); + command.setTerm(scmContext.getTermOfLeader()); publisher.fireEvent(SCMEvents.DATANODE_COMMAND, new CommandForDatanode<>(dn.getUuid(), command)); return; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index ede3b1e540dc..e485a286bc15 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -163,7 +163,7 @@ public synchronized Pipeline create(ReplicationFactor factor) new CreatePipelineCommand(pipeline.getId(), pipeline.getType(), factor, dns); - createCommand.setTerm(scmContext.getTerm()); + createCommand.setTerm(scmContext.getTermOfLeader()); dns.forEach(node -> { LOG.info("Sending CreatePipelineCommand for pipeline:{} to datanode:{}", @@ -201,7 +201,7 @@ public void shutdown() { public void close(Pipeline pipeline) throws NotLeaderException { final ClosePipelineCommand closeCommand = new ClosePipelineCommand(pipeline.getId()); - closeCommand.setTerm(scmContext.getTerm()); + closeCommand.setTerm(scmContext.getTermOfLeader()); pipeline.getNodes().forEach(node -> { final CommandForDatanode datanodeCommand = new CommandForDatanode<>(node.getUuid(), closeCommand); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 892e3bcc8761..a1cef225b597 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus; +import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.utils.Scheduler; import org.apache.hadoop.hdds.utils.db.Table; @@ -64,7 +65,8 @@ * for pipelines must come via PipelineManager. It synchronises all write * and read operations via a ReadWriteLock. */ -public class SCMPipelineManager implements PipelineManager { +public class SCMPipelineManager implements + PipelineManager, EventHandler { private static final Logger LOG = LoggerFactory.getLogger(SCMPipelineManager.class); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java index 26fb80660a24..e4e069a15860 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.tuple.Pair; @@ -30,6 +29,9 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMService.Event; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventQueue; @@ -83,7 +85,6 @@ public class SCMSafeModeManager implements SafeModeManager { private static final Logger LOG = LoggerFactory.getLogger(SCMSafeModeManager.class); private final boolean isSafeModeEnabled; - private final long waitTime; private AtomicBoolean inSafeMode = new AtomicBoolean(true); private AtomicBoolean preCheckComplete = new AtomicBoolean(false); @@ -102,25 +103,24 @@ public class SCMSafeModeManager implements SafeModeManager { private final EventQueue eventPublisher; private final PipelineManager pipelineManager; + private final SCMServiceManager serviceManager; + private final SCMContext scmContext; private final SafeModeMetrics safeModeMetrics; public SCMSafeModeManager(ConfigurationSource conf, List allContainers, PipelineManager pipelineManager, - EventQueue eventQueue) { + EventQueue eventQueue, SCMServiceManager serviceManager, + SCMContext scmContext) { this.config = conf; this.pipelineManager = pipelineManager; this.eventPublisher = eventQueue; + this.serviceManager = serviceManager; + this.scmContext = scmContext; this.isSafeModeEnabled = conf.getBoolean( HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED, HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED_DEFAULT); - - this.waitTime = conf.getTimeDuration( - HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, - HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT, - TimeUnit.MILLISECONDS); - if (isSafeModeEnabled) { this.safeModeMetrics = SafeModeMetrics.create(); ContainerSafeModeRule containerSafeModeRule = @@ -147,13 +147,6 @@ public SCMSafeModeManager(ConfigurationSource conf, exitRules.put(ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE, oneReplicaPipelineSafeModeRule); } - boolean createPipelineInSafemode = conf.getBoolean( - HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, - HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT); - - if (createPipelineInSafemode) { - pipelineManager.startPipelineCreator(); - } } else { this.safeModeMetrics = null; exitSafeMode(eventQueue); @@ -177,28 +170,22 @@ public SafeModeMetrics getSafeModeMetrics() { public void emitSafeModeStatus() { SafeModeStatus safeModeStatus = new SafeModeStatus(getInSafeMode(), getPreCheckComplete()); + // TODO: remove eventPublisher, + // since there will no consumer of SAFE_MODE_STATUS in future. eventPublisher.fireEvent(SCMEvents.SAFE_MODE_STATUS, safeModeStatus); - // Only notify the delayed listeners if safemode remains on, as precheck - // may have completed. - if (safeModeStatus.isInSafeMode()) { - eventPublisher.fireEvent(SCMEvents.DELAYED_SAFE_MODE_STATUS, - safeModeStatus); - } else { + // update SCMContext + scmContext.updateSafeModeStatus(safeModeStatus); + + // notify SCMServiceManager + if (!safeModeStatus.isInSafeMode()) { // If safemode is off, then notify the delayed listeners with a delay. - final Thread safeModeExitThread = new Thread(() -> { - try { - Thread.sleep(waitTime); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - eventPublisher.fireEvent(SCMEvents.DELAYED_SAFE_MODE_STATUS, - safeModeStatus); - }); - - safeModeExitThread.setDaemon(true); - safeModeExitThread.start(); + serviceManager.notifyStatusChanged(); + } else if (safeModeStatus.isPreCheckComplete()) { + // Only notify the delayed listeners if safemode remains on, as precheck + // may have completed. + serviceManager.notifyEventTriggered(Event.PRE_CHECK_COMPLETED); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 6782f5275bae..809dda9af057 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.ha.SCMHAManager; import org.apache.hadoop.hdds.scm.ha.SCMHAManagerImpl; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; @@ -70,7 +71,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler; import org.apache.hadoop.hdds.scm.container.ReplicationManager; -import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration; import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory; import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics; import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat; @@ -174,6 +174,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl private SCMContext scmContext; private final EventQueue eventQueue; + private final SCMServiceManager serviceManager; + /* * HTTP endpoint for JMX access. */ @@ -284,6 +286,8 @@ private StorageContainerManager(OzoneConfiguration conf, } eventQueue = new EventQueue(); + serviceManager = new SCMServiceManager(); + long watcherTimeout = conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT, HDDS_SCM_WATCHER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); @@ -303,7 +307,7 @@ private StorageContainerManager(OzoneConfiguration conf, new CommandStatusReportHandler(); NewNodeHandler newNodeHandler = new NewNodeHandler(pipelineManager, - scmDecommissionManager, conf); + scmDecommissionManager, conf, serviceManager); StaleNodeHandler staleNodeHandler = new StaleNodeHandler(scmNodeManager, pipelineManager, conf); DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager, @@ -311,7 +315,7 @@ private StorageContainerManager(OzoneConfiguration conf, StartDatanodeAdminHandler datanodeStartAdminHandler = new StartDatanodeAdminHandler(scmNodeManager, pipelineManager); NonHealthyToHealthyNodeHandler nonHealthyToHealthyNodeHandler = - new NonHealthyToHealthyNodeHandler(pipelineManager, conf); + new NonHealthyToHealthyNodeHandler(conf, serviceManager); ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); PendingDeleteHandler pendingDeleteHandler = new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService()); @@ -361,14 +365,6 @@ private StorageContainerManager(OzoneConfiguration conf, (DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog()); eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler); eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler); - eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS, scmContext); - // TODO: - // handle replicationManager and pipelineManager in ServiceManager - eventQueue - .addHandler(SCMEvents.DELAYED_SAFE_MODE_STATUS, replicationManager); - eventQueue - .addHandler(SCMEvents.DELAYED_SAFE_MODE_STATUS, pipelineManager); - // Emit initial safe mode status, as now handlers are registered. scmSafeModeManager.emitSafeModeStatus(); @@ -436,7 +432,14 @@ private void initializeSystemManagers(OzoneConfiguration conf, if (configurator.getScmContext() != null) { scmContext = configurator.getScmContext(); } else { - scmContext = new SCMContext(this); + // non-leader of term 0, in safe mode, preCheck not completed. + scmContext = new SCMContext.Builder() + .setLeader(false) + .setTerm(0) + .setIsInSafeMode(true) + .setIsPreCheckComplete(false) + .setSCM(this) + .build(); } if(configurator.getScmNodeManager() != null) { @@ -461,7 +464,8 @@ private void initializeSystemManagers(OzoneConfiguration conf, scmNodeManager, scmMetadataStore.getPipelineTable(), eventQueue, - scmContext); + scmContext, + serviceManager); } if (configurator.getContainerManager() != null) { @@ -481,11 +485,12 @@ private void initializeSystemManagers(OzoneConfiguration conf, replicationManager = configurator.getReplicationManager(); } else { replicationManager = new ReplicationManager( - conf.getObject(ReplicationManagerConfiguration.class), + conf, containerManager, containerPlacementPolicy, eventQueue, scmContext, + serviceManager, new LockManager<>(conf), scmNodeManager); } @@ -494,7 +499,7 @@ private void initializeSystemManagers(OzoneConfiguration conf, } else { scmSafeModeManager = new SCMSafeModeManager(conf, containerManager.getContainers(), - pipelineManager, eventQueue); + pipelineManager, eventQueue, serviceManager, scmContext); } scmDecommissionManager = new NodeDecommissionManager(conf, scmNodeManager, containerManager, eventQueue, replicationManager); @@ -1165,6 +1170,13 @@ public SCMContext getScmContext() { return scmContext; } + /** + * Returns SCMServiceManager. + */ + public SCMServiceManager getSCMServiceManager() { + return serviceManager; + } + /** * Force SCM out of safe mode. */ diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java index c8c8243cae6d..a2a13e3f21c4 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -91,6 +92,7 @@ public class TestBlockManager { private static HddsProtos.ReplicationType type; private EventQueue eventQueue; private SCMContext scmContext; + private SCMServiceManager serviceManager; private int numContainerPerOwnerInPipeline; private OzoneConfiguration conf; @@ -121,6 +123,7 @@ public void setUp() throws Exception { eventQueue = new EventQueue(); scmContext = SCMContext.emptyContext(); + serviceManager = new SCMServiceManager(); scmMetadataStore = new SCMMetadataStoreImpl(conf); scmMetadataStore.start(conf); @@ -131,8 +134,8 @@ public void setUp() throws Exception { nodeManager, scmMetadataStore.getPipelineTable(), eventQueue, - scmContext); - pipelineManager.allowPipelineCreation(); + scmContext, + serviceManager); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, @@ -146,7 +149,7 @@ public void setUp() throws Exception { scmMetadataStore.getContainerTable()); SCMSafeModeManager safeModeManager = new SCMSafeModeManager(conf, containerManager.getContainers(), - pipelineManager, eventQueue) { + pipelineManager, eventQueue, serviceManager, scmContext) { @Override public void emitSafeModeStatus() { // skip @@ -173,8 +176,7 @@ public void emitSafeModeStatus() { factor = HddsProtos.ReplicationFactor.THREE; type = HddsProtos.ReplicationType.RATIS; - scm.getScmContext().onMessage( - new SafeModeStatus(false, true), null); + scm.getScmContext().updateSafeModeStatus(new SafeModeStatus(false, true)); } @After @@ -460,8 +462,8 @@ public void testAllocateOversizedBlock() throws Exception { @Test public void testAllocateBlockFailureInSafeMode() throws Exception { - scm.getScmContext().onMessage( - new SCMSafeModeManager.SafeModeStatus(true, true), null); + scm.getScmContext().updateSafeModeStatus( + new SCMSafeModeManager.SafeModeStatus(true, true)); // Test1: In safe mode expect an SCMException. thrown.expectMessage("SafeModePrecheck failed for " + "allocateBlock"); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java index 6d5fee200934..d42db5da901e 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMService.Event; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider; @@ -80,6 +82,8 @@ public static void setUp() throws Exception { scmContext = SCMContext.emptyContext(); scmMetadataStore = new SCMMetadataStoreImpl(configuration); + SCMServiceManager serviceManager = new SCMServiceManager(); + pipelineManager = PipelineManagerV2Impl.newPipelineManager( configuration, @@ -87,9 +91,9 @@ public static void setUp() throws Exception { nodeManager, scmMetadataStore.getPipelineTable(), eventQueue, - scmContext); + scmContext, + serviceManager); - pipelineManager.allowPipelineCreation(); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, pipelineManager.getStateManager(), configuration, eventQueue); @@ -99,7 +103,10 @@ public static void setUp() throws Exception { MockSCMHAManager.getInstance(true), pipelineManager, scmMetadataStore.getContainerTable()); - pipelineManager.triggerPipelineCreation(); + + // trigger BackgroundPipelineCreator to take effect. + serviceManager.notifyEventTriggered(Event.PRE_CHECK_COMPLETED); + eventQueue.addHandler(CLOSE_CONTAINER, new CloseContainerEventHandler( pipelineManager, containerManager, scmContext)); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java index d926a024fc5a..6e2e3cef207e 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hdds.scm.container; import com.google.common.primitives.Longs; -import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; @@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.node.SCMNodeManager; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; @@ -58,6 +59,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -88,13 +90,17 @@ public class TestReplicationManager { private DatanodeCommandHandler datanodeCommandHandler; private SimpleMockNodeManager nodeManager; private ContainerManagerV2 containerManager; - private ConfigurationSource conf; + private OzoneConfiguration conf; private SCMNodeManager scmNodeManager; @Before public void setup() throws IOException, InterruptedException, NodeNotFoundException { conf = new OzoneConfiguration(); + conf.setTimeDuration( + HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, + 0, TimeUnit.SECONDS); + containerManager = Mockito.mock(ContainerManagerV2.class); nodeManager = new SimpleMockNodeManager(); eventQueue = new EventQueue(); @@ -147,30 +153,43 @@ public void setup() Mockito.any(DatanodeDetails.class))) .thenReturn(NodeStatus.inServiceHealthy()); + SCMServiceManager serviceManager = new SCMServiceManager(); + replicationManager = new ReplicationManager( - new ReplicationManagerConfiguration(), + conf, containerManager, containerPlacementPolicy, eventQueue, SCMContext.emptyContext(), + serviceManager, new LockManager<>(conf), nodeManager); - replicationManager.start(); + + serviceManager.notifyStatusChanged(); Thread.sleep(100L); } private void createReplicationManager(ReplicationManagerConfiguration rmConf) throws InterruptedException { + OzoneConfiguration config = new OzoneConfiguration(); + config.setTimeDuration( + HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, + 0, TimeUnit.SECONDS); + config.setFromObject(rmConf); + + SCMServiceManager serviceManager = new SCMServiceManager(); + replicationManager = new ReplicationManager( - rmConf, + config, containerManager, containerPlacementPolicy, eventQueue, SCMContext.emptyContext(), - new LockManager(conf), + serviceManager, + new LockManager(config), nodeManager); - replicationManager.start(); + serviceManager.notifyStatusChanged(); Thread.sleep(100L); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java index ba0cba5fbae7..8f9bc5dce1f5 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -100,8 +101,8 @@ public static void setUp() throws Exception { nodeManager, scmMetadataStore.getPipelineTable(), new EventQueue(), - SCMContext.emptyContext()); - pipelineManager.allowPipelineCreation(); + SCMContext.emptyContext(), + new SCMServiceManager()); containerManager = new SCMContainerManager(conf, scmMetadataStore.getContainerTable(), scmMetadataStore.getStore(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMContext.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMContext.java index a8e4c00bbaa1..c8098804abbd 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMContext.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMContext.java @@ -34,33 +34,43 @@ public class TestSCMContext { @Test public void testRaftOperations() { // start as follower - SCMContext scmContext = new SCMContext(false, 0, null, null); + SCMContext scmContext = + new SCMContext.Builder().setLeader(false).setTerm(0).build(); + assertFalse(scmContext.isLeader()); // become leader - scmContext.updateIsLeaderAndTerm(true, 10); + scmContext.updateLeaderAndTerm(true, 10); assertTrue(scmContext.isLeader()); try { - assertEquals(scmContext.getTerm(), 10); + assertEquals(scmContext.getTermOfLeader(), 10); } catch (NotLeaderException e) { fail("Should not throw nle."); } // step down - scmContext.updateIsLeaderAndTerm(false, 0); + scmContext.updateLeaderAndTerm(false, 0); assertFalse(scmContext.isLeader()); } @Test public void testSafeModeOperations() { // in safe mode - SCMContext scmContext = new SCMContext( - true, 0, new SafeModeStatus(true, false), null); + SCMContext scmContext = new SCMContext.Builder() + .setIsInSafeMode(true) + .setIsPreCheckComplete(false) + .build(); + assertTrue(scmContext.isInSafeMode()); assertFalse(scmContext.isPreCheckComplete()); + // in safe mode, pass preCheck + scmContext.updateSafeModeStatus(new SafeModeStatus(true, true)); + assertTrue(scmContext.isInSafeMode()); + assertTrue(scmContext.isPreCheckComplete()); + // out of safe mode - scmContext.onMessage(new SafeModeStatus(false, true), null); + scmContext.updateSafeModeStatus(new SafeModeStatus(false, true)); assertFalse(scmContext.isInSafeMode()); assertTrue(scmContext.isPreCheckComplete()); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMServiceManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMServiceManager.java new file mode 100644 index 000000000000..47b453709958 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMServiceManager.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.ha; + +import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestSCMServiceManager { + @Test + public void testServiceRunWhenLeader() { + SCMContext scmContext = new SCMContext.Builder() + .setLeader(false) + .setTerm(1) + .setIsInSafeMode(true) + .setIsPreCheckComplete(false) + .build(); + + // A service runs when it is leader. + SCMService serviceRunWhenLeader = new SCMService() { + private ServiceStatus serviceStatus = ServiceStatus.PAUSING; + + @Override + public void notifyStatusChanged() { + if (scmContext.isLeader()) { + serviceStatus = ServiceStatus.RUNNING; + } else { + serviceStatus = ServiceStatus.PAUSING; + } + } + + @Override + public boolean shouldRun() { + return serviceStatus == ServiceStatus.RUNNING; + } + + @Override + public String getServiceName() { + return "serviceRunWhenLeader"; + } + }; + + SCMServiceManager serviceManager = new SCMServiceManager(); + serviceManager.register(serviceRunWhenLeader); + + // PAUSING at the beginning. + assertFalse(serviceRunWhenLeader.shouldRun()); + + // PAUSING when out of safe mode. + scmContext.updateSafeModeStatus( + new SCMSafeModeManager.SafeModeStatus(false, true)); + serviceManager.notifyStatusChanged(); + assertFalse(serviceRunWhenLeader.shouldRun()); + + // RUNNING when becoming leader. + scmContext.updateLeaderAndTerm(true, 2); + serviceManager.notifyStatusChanged(); + assertTrue(serviceRunWhenLeader.shouldRun()); + + // RUNNING when in safe mode. + scmContext.updateSafeModeStatus( + new SCMSafeModeManager.SafeModeStatus(true, false)); + serviceManager.notifyStatusChanged(); + assertTrue(serviceRunWhenLeader.shouldRun()); + + // PAUSING when stepping down. + scmContext.updateLeaderAndTerm(false, 3); + serviceManager.notifyStatusChanged(); + assertFalse(serviceRunWhenLeader.shouldRun()); + } + + @Test + public void setServiceRunWhenLeaderAndOutOfSafeMode() { + SCMContext scmContext = new SCMContext.Builder() + .setLeader(false) + .setTerm(1) + .setIsInSafeMode(true) + .setIsPreCheckComplete(false) + .build(); + + // A service runs when it is leader and out of safe mode. + SCMService serviceRunWhenLeaderAndOutOfSafeMode = new SCMService() { + private ServiceStatus serviceStatus = ServiceStatus.PAUSING; + + @Override + public void notifyStatusChanged() { + if (scmContext.isLeader() && !scmContext.isInSafeMode()) { + serviceStatus = ServiceStatus.RUNNING; + } else { + serviceStatus = ServiceStatus.PAUSING; + } + } + + @Override + public boolean shouldRun() { + return serviceStatus == ServiceStatus.RUNNING; + } + + @Override + public String getServiceName() { + return "serviceRunWhenLeaderAndOutOfSafeMode"; + } + }; + + SCMServiceManager serviceManager = new SCMServiceManager(); + serviceManager.register(serviceRunWhenLeaderAndOutOfSafeMode); + + // PAUSING at the beginning. + assertFalse(serviceRunWhenLeaderAndOutOfSafeMode.shouldRun()); + + // PAUSING when out of safe mode. + scmContext.updateSafeModeStatus( + new SCMSafeModeManager.SafeModeStatus(false, true)); + serviceManager.notifyStatusChanged(); + assertFalse(serviceRunWhenLeaderAndOutOfSafeMode.shouldRun()); + + // RUNNING when becoming leader. + scmContext.updateLeaderAndTerm(true, 2); + serviceManager.notifyStatusChanged(); + assertTrue(serviceRunWhenLeaderAndOutOfSafeMode.shouldRun()); + + // PAUSING when in safe mode. + scmContext.updateSafeModeStatus( + new SCMSafeModeManager.SafeModeStatus(true, false)); + serviceManager.notifyStatusChanged(); + assertFalse(serviceRunWhenLeaderAndOutOfSafeMode.shouldRun()); + + // PAUSING when stepping down. + scmContext.updateLeaderAndTerm(false, 3); + serviceManager.notifyStatusChanged(); + assertFalse(serviceRunWhenLeaderAndOutOfSafeMode.shouldRun()); + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java index 5bcdf4bcbc78..eb76e9ff0a7c 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; @@ -123,7 +124,8 @@ SCMContainerManager createContainerManager(ConfigurationSource config, scmNodeManager, scmMetadataStore.getPipelineTable(), eventQueue, - SCMContext.emptyContext()); + SCMContext.emptyContext(), + new SCMServiceManager()); return new SCMContainerManager(config, scmMetadataStore.getContainerTable(), scmMetadataStore.getStore(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index 78e771828783..23b79504fd5f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -162,7 +162,6 @@ public void testOnMessage() throws Exception { LambdaTestUtils.await(120000, 1000, () -> { - pipelineManager.triggerPipelineCreation(); System.out.println(pipelineManager.getPipelines(RATIS, THREE).size()); System.out.println(pipelineManager.getPipelines(RATIS, ONE).size()); return pipelineManager.getPipelines(RATIS, THREE).size() > 3; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java index 947cd378b93f..1759245e4ccd 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java @@ -22,8 +22,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; -import org.apache.hadoop.hdds.server.events.EventPublisher; import java.io.IOException; import java.util.Collection; @@ -228,10 +226,4 @@ public void close() throws IOException { public Map getPipelineInfo() { return null; } - - @Override - public void onMessage(final SCMSafeModeManager.SafeModeStatus safeModeStatus, - final EventPublisher publisher) { - - } } \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java index 654fd9c9714b..862e19be5c1a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.ha.MockDBTransactionBuffer; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition; import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher; @@ -71,6 +72,8 @@ public class TestPipelineManagerImpl { private DBStore dbStore; private static MockNodeManager nodeManager; private static int maxPipelineCount; + private SCMContext scmContext; + private SCMServiceManager serviceManager; @Before public void init() throws Exception { @@ -86,6 +89,8 @@ public void init() throws Exception { conf.getInt(OZONE_DATANODE_PIPELINE_LIMIT, OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT) / HddsProtos.ReplicationFactor.THREE.getNumber(); + scmContext = SCMContext.emptyContext(); + serviceManager = new SCMServiceManager(); } @After @@ -103,7 +108,8 @@ private PipelineManagerV2Impl createPipelineManager(boolean isLeader) new MockNodeManager(true, 20), SCMDBDefinition.PIPELINES.getTable(dbStore), new EventQueue(), - SCMContext.emptyContext()); + scmContext, + serviceManager); } private PipelineManagerV2Impl createPipelineManager( @@ -113,7 +119,8 @@ private PipelineManagerV2Impl createPipelineManager( new MockNodeManager(true, 20), SCMDBDefinition.PIPELINES.getTable(dbStore), new EventQueue(), - SCMContext.emptyContext()); + SCMContext.emptyContext(), + serviceManager); } @Test @@ -122,7 +129,6 @@ public void testCreatePipeline() throws Exception { PipelineManagerV2Impl pipelineManager = createPipelineManager(true, buffer1); Assert.assertTrue(pipelineManager.getPipelines().isEmpty()); - pipelineManager.allowPipelineCreation(); Pipeline pipeline1 = pipelineManager.createPipeline( HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); Assert.assertEquals(1, pipelineManager.getPipelines().size()); @@ -141,7 +147,6 @@ public void testCreatePipeline() throws Exception { // Should be able to load previous pipelines. Assert.assertFalse(pipelineManager2.getPipelines().isEmpty()); Assert.assertEquals(2, pipelineManager.getPipelines().size()); - pipelineManager2.allowPipelineCreation(); Pipeline pipeline3 = pipelineManager2.createPipeline( HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); buffer2.close(); @@ -155,7 +160,6 @@ public void testCreatePipeline() throws Exception { public void testCreatePipelineShouldFailOnFollower() throws Exception { PipelineManagerV2Impl pipelineManager = createPipelineManager(false); Assert.assertTrue(pipelineManager.getPipelines().isEmpty()); - pipelineManager.allowPipelineCreation(); try { pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); @@ -174,7 +178,6 @@ public void testUpdatePipelineStates() throws Exception { createPipelineManager(true, buffer); Table pipelineStore = SCMDBDefinition.PIPELINES.getTable(dbStore); - pipelineManager.allowPipelineCreation(); Pipeline pipeline = pipelineManager.createPipeline( HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); Assert.assertEquals(1, pipelineManager.getPipelines().size()); @@ -218,7 +221,6 @@ public void testUpdatePipelineStates() throws Exception { @Test public void testOpenPipelineShouldFailOnFollower() throws Exception { PipelineManagerV2Impl pipelineManager = createPipelineManager(true); - pipelineManager.allowPipelineCreation(); Pipeline pipeline = pipelineManager.createPipeline( HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); Assert.assertEquals(1, pipelineManager.getPipelines().size()); @@ -240,7 +242,6 @@ public void testOpenPipelineShouldFailOnFollower() throws Exception { @Test public void testActivatePipelineShouldFailOnFollower() throws Exception { PipelineManagerV2Impl pipelineManager = createPipelineManager(true); - pipelineManager.allowPipelineCreation(); Pipeline pipeline = pipelineManager.createPipeline( HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); Assert.assertEquals(1, pipelineManager.getPipelines().size()); @@ -262,7 +263,6 @@ public void testActivatePipelineShouldFailOnFollower() throws Exception { @Test public void testDeactivatePipelineShouldFailOnFollower() throws Exception { PipelineManagerV2Impl pipelineManager = createPipelineManager(true); - pipelineManager.allowPipelineCreation(); Pipeline pipeline = pipelineManager.createPipeline( HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); Assert.assertEquals(1, pipelineManager.getPipelines().size()); @@ -284,7 +284,6 @@ public void testDeactivatePipelineShouldFailOnFollower() throws Exception { @Test public void testRemovePipeline() throws Exception { PipelineManagerV2Impl pipelineManager = createPipelineManager(true); - pipelineManager.allowPipelineCreation(); // Create a pipeline Pipeline pipeline = pipelineManager.createPipeline( HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); @@ -327,7 +326,6 @@ public void testRemovePipeline() throws Exception { @Test public void testClosePipelineShouldFailOnFollower() throws Exception { PipelineManagerV2Impl pipelineManager = createPipelineManager(true); - pipelineManager.allowPipelineCreation(); Pipeline pipeline = pipelineManager.createPipeline( HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); Assert.assertEquals(1, pipelineManager.getPipelines().size()); @@ -349,10 +347,9 @@ public void testClosePipelineShouldFailOnFollower() throws Exception { @Test public void testPipelineReport() throws Exception { PipelineManagerV2Impl pipelineManager = createPipelineManager(true); - pipelineManager.allowPipelineCreation(); SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(conf, new ArrayList<>(), pipelineManager, - new EventQueue()); + new EventQueue(), serviceManager, scmContext); Pipeline pipeline = pipelineManager .createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); @@ -400,7 +397,6 @@ public void testPipelineReport() throws Exception { @Test public void testPipelineCreationFailedMetric() throws Exception { PipelineManagerV2Impl pipelineManager = createPipelineManager(true); - pipelineManager.allowPipelineCreation(); // No pipeline at start MetricsRecordBuilder metrics = getMetrics( @@ -457,10 +453,7 @@ public void testPipelineOpenOnlyWhenLeaderReported() throws Exception { DBTransactionBuffer buffer1 = new MockDBTransactionBuffer(dbStore); PipelineManagerV2Impl pipelineManager = createPipelineManager(true, buffer1); - pipelineManager.allowPipelineCreation(); - pipelineManager.onMessage( - new SCMSafeModeManager.SafeModeStatus(true, true), null); Pipeline pipeline = pipelineManager .createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); @@ -473,8 +466,8 @@ public void testPipelineOpenOnlyWhenLeaderReported() throws Exception { pipelineManager.getPipeline(pipeline.getId()).getPipelineState()); SCMSafeModeManager scmSafeModeManager = - new SCMSafeModeManager(new OzoneConfiguration(), - new ArrayList<>(), pipelineManager, new EventQueue()); + new SCMSafeModeManager(new OzoneConfiguration(), new ArrayList<>(), + pipelineManager, new EventQueue(), serviceManager, scmContext); PipelineReportHandler pipelineReportHandler = new PipelineReportHandler(scmSafeModeManager, pipelineManager, SCMContext.emptyContext(), conf); @@ -508,7 +501,6 @@ public void testScrubPipeline() throws Exception { TimeUnit.MILLISECONDS); PipelineManagerV2Impl pipelineManager = createPipelineManager(true); - pipelineManager.allowPipelineCreation(); Pipeline pipeline = pipelineManager .createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); @@ -541,7 +533,6 @@ public void testScrubPipelineShouldFailOnFollower() throws Exception { TimeUnit.MILLISECONDS); PipelineManagerV2Impl pipelineManager = createPipelineManager(true); - pipelineManager.allowPipelineCreation(); Pipeline pipeline = pipelineManager .createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); @@ -577,6 +568,9 @@ public void testPipelineNotCreatedUntilSafeModePrecheck() throws Exception { OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1, TimeUnit.MILLISECONDS); + scmContext.updateSafeModeStatus( + new SCMSafeModeManager.SafeModeStatus(true, false)); + PipelineManagerV2Impl pipelineManager = createPipelineManager(true); try { pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS, @@ -597,8 +591,8 @@ public void testPipelineNotCreatedUntilSafeModePrecheck() throws Exception { HddsProtos.ReplicationFactor.ONE).contains(pipeline)); // Simulate safemode check exiting. - pipelineManager.onMessage( - new SCMSafeModeManager.SafeModeStatus(true, true), null); + scmContext.updateSafeModeStatus( + new SCMSafeModeManager.SafeModeStatus(true, true)); GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { @@ -616,17 +610,22 @@ public void testSafeModeUpdatedOnSafemodeExit() throws Exception { TimeUnit.MILLISECONDS); PipelineManagerV2Impl pipelineManager = createPipelineManager(true); + + scmContext.updateSafeModeStatus( + new SCMSafeModeManager.SafeModeStatus(true, false)); Assert.assertTrue(pipelineManager.getSafeModeStatus()); Assert.assertFalse(pipelineManager.isPipelineCreationAllowed()); + // First pass pre-check as true, but safemode still on - pipelineManager.onMessage( - new SCMSafeModeManager.SafeModeStatus(true, true), null); + // Simulate safemode check exiting. + scmContext.updateSafeModeStatus( + new SCMSafeModeManager.SafeModeStatus(true, true)); Assert.assertTrue(pipelineManager.getSafeModeStatus()); Assert.assertTrue(pipelineManager.isPipelineCreationAllowed()); // Then also turn safemode off - pipelineManager.onMessage( - new SCMSafeModeManager.SafeModeStatus(false, true), null); + scmContext.updateSafeModeStatus( + new SCMSafeModeManager.SafeModeStatus(false, true)); Assert.assertFalse(pipelineManager.getSafeModeStatus()); Assert.assertTrue(pipelineManager.isPipelineCreationAllowed()); pipelineManager.close(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index 2226a43f002c..24cb4b59a4a9 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.metadata.PipelineIDCodec; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; @@ -221,7 +222,8 @@ public void testPipelineReport() throws IOException { SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(conf, new ArrayList<>(), pipelineManager, - eventQueue); + eventQueue, new SCMServiceManager(), + SCMContext.emptyContext()); // create a pipeline in allocated state with no dns yet reported Pipeline pipeline = pipelineManager @@ -494,8 +496,10 @@ public void testPipelineOpenOnlyWhenLeaderReported() throws Exception { pipelineManager.getPipeline(pipeline.getId()).getPipelineState()); SCMSafeModeManager scmSafeModeManager = - new SCMSafeModeManager(new OzoneConfiguration(), - new ArrayList<>(), pipelineManager, eventQueue); + new SCMSafeModeManager(new OzoneConfiguration(), new ArrayList<>(), + pipelineManager, eventQueue, + new SCMServiceManager(), + SCMContext.emptyContext()); PipelineReportHandler pipelineReportHandler = new PipelineReportHandler(scmSafeModeManager, pipelineManager, SCMContext.emptyContext(), conf); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java index 19f1f308bde9..e7dbeebc4db0 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider; @@ -55,6 +56,8 @@ public class TestHealthyPipelineSafeModeRule { public void testHealthyPipelineSafeModeRuleWithNoPipelines() throws Exception { EventQueue eventQueue = new EventQueue(); + SCMServiceManager serviceManager = new SCMServiceManager(); + SCMContext scmContext = SCMContext.emptyContext(); List containers = new ArrayList<>(HddsTestUtils.getContainerInfo(1)); @@ -79,14 +82,16 @@ public void testHealthyPipelineSafeModeRuleWithNoPipelines() nodeManager, scmMetadataStore.getPipelineTable(), eventQueue, - SCMContext.emptyContext()); + scmContext, + serviceManager); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, pipelineManager.getStateManager(), config); pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, mockRatisProvider); SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager( - config, containers, pipelineManager, eventQueue); + config, containers, pipelineManager, eventQueue, + serviceManager, scmContext); HealthyPipelineSafeModeRule healthyPipelineSafeModeRule = scmSafeModeManager.getHealthyPipelineSafeModeRule(); @@ -105,6 +110,8 @@ public void testHealthyPipelineSafeModeRuleWithPipelines() throws Exception { TestHealthyPipelineSafeModeRule.class.getName() + UUID.randomUUID()); EventQueue eventQueue = new EventQueue(); + SCMServiceManager serviceManager = new SCMServiceManager(); + SCMContext scmContext = SCMContext.emptyContext(); List containers = new ArrayList<>(HddsTestUtils.getContainerInfo(1)); @@ -129,8 +136,8 @@ public void testHealthyPipelineSafeModeRuleWithPipelines() throws Exception { nodeManager, scmMetadataStore.getPipelineTable(), eventQueue, - SCMContext.emptyContext()); - pipelineManager.allowPipelineCreation(); + scmContext, + serviceManager); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, @@ -163,7 +170,8 @@ public void testHealthyPipelineSafeModeRuleWithPipelines() throws Exception { MockRatisPipelineProvider.markPipelineHealthy(pipeline3); SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager( - config, containers, pipelineManager, eventQueue); + config, containers, pipelineManager, eventQueue, + serviceManager, scmContext); HealthyPipelineSafeModeRule healthyPipelineSafeModeRule = scmSafeModeManager.getHealthyPipelineSafeModeRule(); @@ -199,6 +207,8 @@ public void testHealthyPipelineSafeModeRuleWithMixedPipelines() TestHealthyPipelineSafeModeRule.class.getName() + UUID.randomUUID()); EventQueue eventQueue = new EventQueue(); + SCMServiceManager serviceManager = new SCMServiceManager(); + SCMContext scmContext = SCMContext.emptyContext(); List containers = new ArrayList<>(HddsTestUtils.getContainerInfo(1)); @@ -224,9 +234,9 @@ public void testHealthyPipelineSafeModeRuleWithMixedPipelines() nodeManager, scmMetadataStore.getPipelineTable(), eventQueue, - SCMContext.emptyContext()); + scmContext, + serviceManager); - pipelineManager.allowPipelineCreation(); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, pipelineManager.getStateManager(), config); @@ -258,7 +268,8 @@ public void testHealthyPipelineSafeModeRuleWithMixedPipelines() MockRatisPipelineProvider.markPipelineHealthy(pipeline3); SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager( - config, containers, pipelineManager, eventQueue); + config, containers, pipelineManager, eventQueue, + serviceManager, scmContext); HealthyPipelineSafeModeRule healthyPipelineSafeModeRule = scmSafeModeManager.getHealthyPipelineSafeModeRule(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java index b915899ce39b..6de81dcf22e8 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider; @@ -62,6 +63,8 @@ public class TestOneReplicaPipelineSafeModeRule { private OneReplicaPipelineSafeModeRule rule; private PipelineManagerV2Impl pipelineManager; private EventQueue eventQueue; + private SCMServiceManager serviceManager; + private SCMContext scmContext; private MockNodeManager mockNodeManager; private void setup(int nodes, int pipelineFactorThreeCount, @@ -79,6 +82,8 @@ private void setup(int nodes, int pipelineFactorThreeCount, mockNodeManager = new MockNodeManager(true, nodes); eventQueue = new EventQueue(); + serviceManager = new SCMServiceManager(); + scmContext = SCMContext.emptyContext(); SCMMetadataStore scmMetadataStore = new SCMMetadataStoreImpl(ozoneConfiguration); @@ -89,8 +94,8 @@ private void setup(int nodes, int pipelineFactorThreeCount, mockNodeManager, scmMetadataStore.getPipelineTable(), eventQueue, - SCMContext.emptyContext()); - pipelineManager.allowPipelineCreation(); + scmContext, + serviceManager); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(mockNodeManager, @@ -105,7 +110,7 @@ private void setup(int nodes, int pipelineFactorThreeCount, SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(ozoneConfiguration, containers, - pipelineManager, eventQueue); + pipelineManager, eventQueue, serviceManager, scmContext); rule = scmSafeModeManager.getOneReplicaPipelineSafeModeRule(); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java index e8dbc2e450cc..523f9647f103 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.List; import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -40,6 +39,7 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider; @@ -47,7 +47,6 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider; import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl; -import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -73,6 +72,7 @@ public class TestSCMSafeModeManager { private static EventQueue queue; private SCMContext scmContext; + private SCMServiceManager serviceManager; private SCMSafeModeManager scmSafeModeManager; private static OzoneConfiguration config; private List containers = Collections.emptyList(); @@ -88,7 +88,8 @@ public class TestSCMSafeModeManager { @Before public void setUp() { queue = new EventQueue(); - scmContext = SCMContext.emptyContext(); + scmContext = new SCMContext.Builder().build(); + serviceManager = new SCMServiceManager(); config = new OzoneConfiguration(); config.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false); @@ -120,7 +121,7 @@ public void testSafeModeState() throws Exception { @Test public void testSafeModeStateWithNullContainers() { new SCMSafeModeManager(config, Collections.emptyList(), - null, queue); + null, queue, serviceManager, scmContext); } private void testSafeMode(int numContainers) throws Exception { @@ -132,7 +133,7 @@ private void testSafeMode(int numContainers) throws Exception { container.setState(HddsProtos.LifeCycleState.CLOSED); } scmSafeModeManager = new SCMSafeModeManager( - config, containers, null, queue); + config, containers, null, queue, serviceManager, scmContext); assertTrue(scmSafeModeManager.getInSafeMode()); queue.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT, @@ -154,59 +155,6 @@ private void testSafeMode(int numContainers) throws Exception { } - @Test - public void testDelayedEventNotification() throws Exception { - - List delayedSafeModeEvents = new ArrayList<>(); - List safeModeEvents = new ArrayList<>(); - - //given - EventQueue eventQueue = new EventQueue(); - eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS, - (safeModeStatus, publisher) -> safeModeEvents.add(safeModeStatus)); - eventQueue.addHandler(SCMEvents.DELAYED_SAFE_MODE_STATUS, - (safeModeStatus, publisher) -> delayedSafeModeEvents - .add(safeModeStatus)); - - OzoneConfiguration ozoneConfiguration = new OzoneConfiguration(); - ozoneConfiguration - .setTimeDuration(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, - 3, TimeUnit.SECONDS); - ozoneConfiguration - .setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false); - - scmSafeModeManager = new SCMSafeModeManager( - ozoneConfiguration, containers, null, eventQueue); - - //when - scmSafeModeManager.setInSafeMode(true); - scmSafeModeManager.setPreCheckComplete(true); - - scmSafeModeManager.emitSafeModeStatus(); - eventQueue.processAll(1000L); - - //then - Assert.assertEquals(1, delayedSafeModeEvents.size()); - Assert.assertEquals(1, safeModeEvents.size()); - - //when - scmSafeModeManager.setInSafeMode(false); - scmSafeModeManager.setPreCheckComplete(true); - - scmSafeModeManager.emitSafeModeStatus(); - eventQueue.processAll(1000L); - - //then - Assert.assertEquals(2, safeModeEvents.size()); - //delayed messages are not yet sent (unless JVM is paused for 3 seconds) - Assert.assertEquals(1, delayedSafeModeEvents.size()); - - //event will be triggered after 3 seconds (see previous config) - GenericTestUtils.waitFor(() -> delayedSafeModeEvents.size() == 2, - 300, - 6000); - - } @Test public void testSafeModeExitRule() throws Exception { containers = new ArrayList<>(); @@ -218,7 +166,7 @@ public void testSafeModeExitRule() throws Exception { container.setState(HddsProtos.LifeCycleState.CLOSED); } scmSafeModeManager = new SCMSafeModeManager( - config, containers, null, queue); + config, containers, null, queue, serviceManager, scmContext); long cutOff = (long) Math.ceil(numContainers * config.getDouble( HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT, @@ -311,9 +259,10 @@ public void testFailWithIncorrectValueForHealthyPipelinePercent() mockNodeManager, scmMetadataStore.getPipelineTable(), queue, - scmContext); + scmContext, + serviceManager); scmSafeModeManager = new SCMSafeModeManager( - conf, containers, pipelineManager, queue); + conf, containers, pipelineManager, queue, serviceManager, scmContext); fail("testFailWithIncorrectValueForHealthyPipelinePercent"); } catch (IllegalArgumentException ex) { GenericTestUtils.assertExceptionContains("value should be >= 0.0 and <=" + @@ -335,9 +284,10 @@ public void testFailWithIncorrectValueForOneReplicaPipelinePercent() mockNodeManager, scmMetadataStore.getPipelineTable(), queue, - scmContext); + scmContext, + serviceManager); scmSafeModeManager = new SCMSafeModeManager( - conf, containers, pipelineManager, queue); + conf, containers, pipelineManager, queue, serviceManager, scmContext); fail("testFailWithIncorrectValueForOneReplicaPipelinePercent"); } catch (IllegalArgumentException ex) { GenericTestUtils.assertExceptionContains("value should be >= 0.0 and <=" + @@ -358,9 +308,10 @@ public void testFailWithIncorrectValueForSafeModePercent() throws Exception { mockNodeManager, scmMetadataStore.getPipelineTable(), queue, - scmContext); + scmContext, + serviceManager); scmSafeModeManager = new SCMSafeModeManager( - conf, containers, pipelineManager, queue); + conf, containers, pipelineManager, queue, serviceManager, scmContext); fail("testFailWithIncorrectValueForSafeModePercent"); } catch (IllegalArgumentException ex) { GenericTestUtils.assertExceptionContains("value should be >= 0.0 and <=" + @@ -388,13 +339,14 @@ public void testSafeModeExitRuleWithPipelineAvailabilityCheck( mockNodeManager, scmMetadataStore.getPipelineTable(), queue, - scmContext); + scmContext, + serviceManager); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(mockNodeManager, pipelineManager.getStateManager(), config); pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, mockRatisProvider); - pipelineManager.allowPipelineCreation(); + pipelineManager.getBackgroundPipelineCreator().stop(); for (int i = 0; i < pipelineCount; i++) { // Create pipeline @@ -412,8 +364,8 @@ public void testSafeModeExitRuleWithPipelineAvailabilityCheck( container.setState(HddsProtos.LifeCycleState.CLOSED); } - scmSafeModeManager = new SCMSafeModeManager(conf, containers, - pipelineManager, queue); + scmSafeModeManager = new SCMSafeModeManager( + conf, containers, pipelineManager, queue, serviceManager, scmContext); assertTrue(scmSafeModeManager.getInSafeMode()); testContainerThreshold(containers, 1.0); @@ -523,9 +475,8 @@ public void testDisableSafeMode() throws IOException { OzoneConfiguration conf = new OzoneConfiguration(config); conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED, false); PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); - Mockito.doNothing().when(pipelineManager).startPipelineCreator(); - scmSafeModeManager = - new SCMSafeModeManager(conf, containers, pipelineManager, queue); + scmSafeModeManager = new SCMSafeModeManager( + conf, containers, pipelineManager, queue, serviceManager, scmContext); assertFalse(scmSafeModeManager.getInSafeMode()); } @@ -557,7 +508,7 @@ public void testContainerSafeModeRule() throws Exception { } scmSafeModeManager = new SCMSafeModeManager( - config, containers, null, queue); + config, containers, null, queue, serviceManager, scmContext); assertTrue(scmSafeModeManager.getInSafeMode()); @@ -581,7 +532,7 @@ private void testSafeModeDataNodes(int numOfDns) throws Exception { OzoneConfiguration conf = new OzoneConfiguration(config); conf.setInt(HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_DATANODE, numOfDns); scmSafeModeManager = new SCMSafeModeManager( - conf, containers, null, queue); + conf, containers, null, queue, serviceManager, scmContext); // Assert SCM is in Safe mode. assertTrue(scmSafeModeManager.getInSafeMode()); @@ -639,14 +590,14 @@ public void testSafeModePipelineExitRule() throws Exception { nodeManager, scmMetadataStore.getPipelineTable(), queue, - scmContext); + scmContext, + serviceManager); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, pipelineManager.getStateManager(), config); pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, mockRatisProvider); - pipelineManager.allowPipelineCreation(); Pipeline pipeline = pipelineManager.createPipeline( HddsProtos.ReplicationType.RATIS, @@ -656,7 +607,8 @@ public void testSafeModePipelineExitRule() throws Exception { MockRatisPipelineProvider.markPipelineHealthy(pipeline); scmSafeModeManager = new SCMSafeModeManager( - config, containers, pipelineManager, queue); + config, containers, pipelineManager, queue, serviceManager, + scmContext); queue.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT, HddsTestUtils.createNodeRegistrationContainerReport(containers)); @@ -703,7 +655,8 @@ public void testPipelinesNotCreatedUntilPreCheckPasses() nodeManager, scmMetadataStore.getPipelineTable(), queue, - scmContext); + scmContext, + serviceManager); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, @@ -714,7 +667,7 @@ public void testPipelinesNotCreatedUntilPreCheckPasses() SafeModeEventHandler smHandler = new SafeModeEventHandler(); queue.addHandler(SCMEvents.SAFE_MODE_STATUS, smHandler); scmSafeModeManager = new SCMSafeModeManager( - config, containers, pipelineManager, queue); + config, containers, pipelineManager, queue, serviceManager, scmContext); // Assert SCM is in Safe mode. assertTrue(scmSafeModeManager.getInSafeMode()); @@ -739,9 +692,6 @@ public void testPipelinesNotCreatedUntilPreCheckPasses() Assert.assertEquals(true, smHandler.getPreCheckComplete()); Assert.assertEquals(true, smHandler.getIsInSafeMode()); - // Create a pipeline and ensure safemode is exited. - pipelineManager.allowPipelineCreation(); - /* There is a race condition where the background pipeline creation * task creates the pipeline before the following create call. * So wrapping it with try..catch. diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java index 26852f0e7e1c..657f5db7c62d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.ha.SCMService.Event; import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.HddsDatanodeService; @@ -160,7 +161,9 @@ public void testPipelineCreationOnNodeRestart() throws Exception { .getScmNodeManager().getNodeCount(NodeStatus.inServiceHealthy()) >= HddsProtos.ReplicationFactor.THREE.getNumber()) { // make sure pipelines is created after node start - pipelineManager.triggerPipelineCreation(); + cluster.getStorageContainerManager() + .getSCMServiceManager() + .notifyEventTriggered(Event.PRE_CHECK_COMPLETED); waitForPipelines(1); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java index 2a468f560605..635fe30a413e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; @@ -647,10 +648,16 @@ public void testCloseContainerCommandOnRestart() throws Exception { dnUuid, closeContainerCommand); GenericTestUtils.waitFor(() -> { - return replicationManager.isRunning(); + SCMContext scmContext + = cluster.getStorageContainerManager().getScmContext(); + return !scmContext.isInSafeMode() && scmContext.isLeader(); }, 1000, 25000); + // After safe mode is off, ReplicationManager starts to run with a delay. + Thread.sleep(5000); // Give ReplicationManager some time to process the containers. + cluster.getStorageContainerManager() + .getReplicationManager().processContainersNow(); Thread.sleep(5000); verify(publisher).fireEvent(eq(SCMEvents.DATANODE_COMMAND), argThat(new diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java index 5e08f2dcf64a..69f07df4e2af 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java @@ -141,7 +141,7 @@ public void testContainerReplication() throws Exception { SCMCommand command = new ReplicateContainerCommand(containerId, sourcePipelines.getNodes()); command.setTerm( - cluster.getStorageContainerManager().getScmContext().getTerm()); + cluster.getStorageContainerManager().getScmContext().getTermOfLeader()); cluster.getStorageContainerManager().getScmNodeManager() .addDatanodeCommand(destinationDatanode.getDatanodeDetails().getUuid(), command); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java index 00a338bf337c..b45603a9f605 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java @@ -147,7 +147,7 @@ public void testIfCloseContainerCommandHandlerIsInvoked() throws Exception { SCMCommand command = new CloseContainerCommand( containerID, pipeline.getId()); command.setTerm( - cluster.getStorageContainerManager().getScmContext().getTerm()); + cluster.getStorageContainerManager().getScmContext().getTermOfLeader()); cluster.getStorageContainerManager().getScmNodeManager() .addDatanodeCommand(datanodeDetails.getUuid(), command); GenericTestUtils @@ -198,7 +198,7 @@ public void testCloseContainerViaStandAlone() SCMCommand command = new CloseContainerCommand( containerID, pipeline.getId()); command.setTerm( - cluster.getStorageContainerManager().getScmContext().getTerm()); + cluster.getStorageContainerManager().getScmContext().getTermOfLeader()); cluster.getStorageContainerManager().getScmNodeManager() .addDatanodeCommand(datanodeDetails.getUuid(), command); @@ -251,8 +251,8 @@ public void testCloseContainerViaRatis() throws IOException, //send the order to close the container SCMCommand command = new CloseContainerCommand( containerID, pipeline.getId()); - command.setTerm( - cluster.getStorageContainerManager().getScmContext().getTerm()); + command.setTerm(cluster.getStorageContainerManager() + .getScmContext().getTermOfLeader()); cluster.getStorageContainerManager().getScmNodeManager() .addDatanodeCommand(details.getUuid(), command); int index = cluster.getHddsDatanodeIndex(details); @@ -332,7 +332,7 @@ public void testQuasiCloseTransitionViaRatis() SCMCommand command = new CloseContainerCommand( containerID, pipeline.getId(), true); command.setTerm( - cluster.getStorageContainerManager().getScmContext().getTerm()); + cluster.getStorageContainerManager().getScmContext().getTermOfLeader()); cluster.getStorageContainerManager().getScmNodeManager() .addDatanodeCommand(datanodeDetails.getUuid(), command); GenericTestUtils diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java index 870486a7f54c..c165f921887d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java @@ -123,7 +123,7 @@ public void test() throws Exception { SCMCommand command = new CloseContainerCommand( containerId.getId(), pipeline.getId()); command.setTerm( - cluster.getStorageContainerManager().getScmContext().getTerm()); + cluster.getStorageContainerManager().getScmContext().getTermOfLeader()); cluster.getStorageContainerManager().getScmNodeManager() .addDatanodeCommand(datanodeDetails.getUuid(), command); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java index 40998e1868c2..ab20505d400c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java @@ -136,7 +136,7 @@ public void testDeleteContainerRequestHandlerOnClosedContainer() SCMCommand command = new CloseContainerCommand( containerId.getId(), pipeline.getId()); command.setTerm( - cluster.getStorageContainerManager().getScmContext().getTerm()); + cluster.getStorageContainerManager().getScmContext().getTermOfLeader()); nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command); GenericTestUtils.waitFor(() -> @@ -154,7 +154,7 @@ public void testDeleteContainerRequestHandlerOnClosedContainer() // send delete container to the datanode command = new DeleteContainerCommand(containerId.getId(), false); command.setTerm( - cluster.getStorageContainerManager().getScmContext().getTerm()); + cluster.getStorageContainerManager().getScmContext().getTermOfLeader()); nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command); GenericTestUtils.waitFor(() -> @@ -192,7 +192,7 @@ public void testDeleteContainerRequestHandlerOnOpenContainer() SCMCommand command = new DeleteContainerCommand( containerId.getId(), false); command.setTerm( - cluster.getStorageContainerManager().getScmContext().getTerm()); + cluster.getStorageContainerManager().getScmContext().getTermOfLeader()); nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command); // Here it should not delete it, and the container should exist in the @@ -216,7 +216,7 @@ public void testDeleteContainerRequestHandlerOnOpenContainer() // container command = new DeleteContainerCommand(containerId.getId(), true); command.setTerm( - cluster.getStorageContainerManager().getScmContext().getTerm()); + cluster.getStorageContainerManager().getScmContext().getTermOfLeader()); nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command); GenericTestUtils.waitFor(() ->