diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java index 77fe841bf9fa..bfc68c7341f9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java @@ -24,8 +24,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; -import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus; -import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.ozone.common.BlockGroup; /** @@ -33,8 +31,7 @@ * Block APIs. * Container is transparent to these APIs. */ -public interface BlockManager extends Closeable, - EventHandler { +public interface BlockManager extends Closeable { /** * Allocates a new block for a given size. * @param size - Block Size diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index ceca7bd3b6ff..b2150265c963 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -32,10 +32,8 @@ import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps; import org.apache.hadoop.hdds.scm.PipelineRequestInformation; import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.ScmUtils; import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.container.ContainerInfo; @@ -46,10 +44,7 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; -import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus; -import org.apache.hadoop.hdds.scm.safemode.SafeModePrecheck; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; -import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.utils.UniqueId; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.ozone.common.BlockGroup; @@ -72,6 +67,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { // Currently only user of the block service is Ozone, CBlock manages blocks // by itself and does not rely on the Block service offered by SCM. + private final StorageContainerManager scm; private final PipelineManager pipelineManager; private final ContainerManagerV2 containerManager; @@ -81,7 +77,6 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { private final SCMBlockDeletingService blockDeletingService; private ObjectName mxBean; - private SafeModePrecheck safeModePrecheck; private PipelineChoosePolicy pipelineChoosePolicy; /** @@ -94,6 +89,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { public BlockManagerImpl(final ConfigurationSource conf, final StorageContainerManager scm) { Objects.requireNonNull(scm, "SCM cannot be null"); + this.scm = scm; this.pipelineManager = scm.getPipelineManager(); this.containerManager = scm.getContainerManager(); this.pipelineChoosePolicy = scm.getPipelineChoosePolicy(); @@ -116,9 +112,8 @@ public BlockManagerImpl(final ConfigurationSource conf, TimeUnit.MILLISECONDS); blockDeletingService = new SCMBlockDeletingService(deletedBlockLog, containerManager, - scm.getScmNodeManager(), scm.getEventQueue(), svcInterval, - serviceTimeout, conf); - safeModePrecheck = new SafeModePrecheck(conf); + scm.getScmNodeManager(), scm.getEventQueue(), scm.getScmContext(), + svcInterval, serviceTimeout, conf); } /** @@ -158,7 +153,10 @@ public AllocatedBlock allocateBlock(final long size, ReplicationType type, if (LOG.isTraceEnabled()) { LOG.trace("Size : {} , type : {}, factor : {} ", size, type, factor); } - ScmUtils.preCheck(ScmOps.allocateBlock, safeModePrecheck); + if (scm.getScmContext().isInSafeMode()) { + throw new SCMException("SafeModePrecheck failed for allocateBlock", + SCMException.ResultCodes.SAFE_MODE_EXCEPTION); + } if (size < 0 || size > containerSize) { LOG.warn("Invalid block size requested : {}", size); throw new SCMException("Unsupported block size: " + size, @@ -294,8 +292,10 @@ private AllocatedBlock newBlock(ContainerInfo containerInfo) @Override public void deleteBlocks(List keyBlocksInfoList) throws IOException { - ScmUtils.preCheck(ScmOps.deleteBlock, safeModePrecheck); - + if (scm.getScmContext().isInSafeMode()) { + throw new SCMException("SafeModePrecheck failed for deleteBlocks", + SCMException.ResultCodes.SAFE_MODE_EXCEPTION); + } Map> containerBlocks = new HashMap<>(); // TODO: track the block size info so that we can reclaim the container // TODO: used space when the block is deleted. @@ -365,13 +365,6 @@ public SCMBlockDeletingService getSCMBlockDeletingService() { return this.blockDeletingService; } - /** - * Returns status of scm safe mode determined by SAFE_MODE_STATUS event. - * */ - public boolean isScmInSafeMode() { - return this.safeModePrecheck.isInSafeMode(); - } - /** * Get class logger. * */ @@ -379,12 +372,6 @@ public static Logger getLogger() { return LOG; } - @Override - public void onMessage(SafeModeStatus status, - EventPublisher publisher) { - this.safeModePrecheck.setInSafeMode(status.isInSafeMode()); - } - /** * This class uses system current time milliseconds to generate unique id. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java index 908b4f121765..c3028a40ab78 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManagerV2; import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -40,10 +41,12 @@ import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,19 +67,22 @@ public class SCMBlockDeletingService extends BackgroundService { private final ContainerManagerV2 containerManager; private final NodeManager nodeManager; private final EventPublisher eventPublisher; + private final SCMContext scmContext; private int blockDeleteLimitSize; + @SuppressWarnings("parameternumber") public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog, ContainerManagerV2 containerManager, NodeManager nodeManager, - EventPublisher eventPublisher, Duration interval, long serviceTimeout, - ConfigurationSource conf) { + EventPublisher eventPublisher, SCMContext scmContext, + Duration interval, long serviceTimeout, ConfigurationSource conf) { super("SCMBlockDeletingService", interval.toMillis(), TimeUnit.MILLISECONDS, BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout); this.deletedBlockLog = deletedBlockLog; this.containerManager = containerManager; this.nodeManager = nodeManager; this.eventPublisher = eventPublisher; + this.scmContext = scmContext; blockDeleteLimitSize = conf.getObject(ScmConfig.class).getBlockDeletionLimit(); @@ -146,9 +152,10 @@ public EmptyTaskResult call() throws Exception { // We should stop caching new commands if num of un-processed // command is bigger than a limit, e.g 50. In case datanode goes // offline for sometime, the cached commands be flooded. + SCMCommand command = new DeleteBlocksCommand(dnTXs); + command.setTerm(scmContext.getTerm()); eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, - new CommandForDatanode<>(dnId, - new DeleteBlocksCommand(dnTXs))); + new CommandForDatanode<>(dnId, command)); if (LOG.isDebugEnabled()) { LOG.debug( "Added delete block command for datanode {} in the queue," @@ -170,6 +177,9 @@ public EmptyTaskResult call() throws Exception { transactions.getBlocksDeleted(), transactions.getDatanodeTransactionMap().size(), Time.monotonicNow() - startTime); + } catch (NotLeaderException nle) { + LOG.warn("Skip current run, since not leader any more.", nle); + return EmptyTaskResult.newResult(); } catch (IOException e) { // We may tolerate a number of failures for sometime // but if it continues to fail, at some point we need to raise diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java index 1aa34cfcd8bd..d71539d1af76 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java @@ -27,10 +27,13 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.slf4j.Logger; import java.io.IOException; @@ -46,6 +49,7 @@ public class AbstractContainerReportHandler { private final ContainerManagerV2 containerManager; + private final SCMContext scmContext; private final Logger logger; /** @@ -56,10 +60,13 @@ public class AbstractContainerReportHandler { * @param logger Logger to be used for logging */ AbstractContainerReportHandler(final ContainerManagerV2 containerManager, + final SCMContext scmContext, final Logger logger) { Preconditions.checkNotNull(containerManager); + Preconditions.checkNotNull(scmContext); Preconditions.checkNotNull(logger); this.containerManager = containerManager; + this.scmContext = scmContext; this.logger = logger; } @@ -317,11 +324,17 @@ protected ContainerManagerV2 getContainerManager() { protected void deleteReplica(ContainerID containerID, DatanodeDetails dn, EventPublisher publisher, String reason) { - final DeleteContainerCommand deleteCommand = - new DeleteContainerCommand(containerID.getId(), true); - final CommandForDatanode datanodeCommand = new CommandForDatanode<>( - dn.getUuid(), deleteCommand); - publisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand); + SCMCommand command = new DeleteContainerCommand( + containerID.getId(), true); + try { + command.setTerm(scmContext.getTerm()); + } catch (NotLeaderException nle) { + logger.warn("Skip sending delete container command," + + " since not leader SCM", nle); + return; + } + publisher.fireEvent(SCMEvents.DATANODE_COMMAND, + new CommandForDatanode<>(dn.getUuid(), command)); logger.info("Sending delete container command for " + reason + " container {} to datanode {}", containerID.getId(), dn); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java index 45a0e1f07feb..3320d900dbbe 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; import org.apache.hadoop.hdds.server.events.EventHandler; @@ -30,6 +31,7 @@ import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,11 +53,14 @@ public class CloseContainerEventHandler implements EventHandler { private final PipelineManager pipelineManager; private final ContainerManagerV2 containerManager; + private final SCMContext scmContext; public CloseContainerEventHandler(final PipelineManager pipelineManager, - final ContainerManagerV2 containerManager) { + final ContainerManagerV2 containerManager, + final SCMContext scmContext) { this.pipelineManager = pipelineManager; this.containerManager = containerManager; + this.scmContext = scmContext; } @Override @@ -74,19 +79,21 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) { .getContainer(containerID); // Send close command to datanodes, if the container is in CLOSING state if (container.getState() == LifeCycleState.CLOSING) { + SCMCommand command = new CloseContainerCommand( + containerID.getId(), container.getPipelineID()); + command.setTerm(scmContext.getTerm()); - final CloseContainerCommand closeContainerCommand = - new CloseContainerCommand( - containerID.getId(), container.getPipelineID()); - - getNodes(container).forEach(node -> publisher.fireEvent( - DATANODE_COMMAND, - new CommandForDatanode<>(node.getUuid(), closeContainerCommand))); + getNodes(container).forEach(node -> + publisher.fireEvent(DATANODE_COMMAND, + new CommandForDatanode<>(node.getUuid(), command))); } else { LOG.warn("Cannot close container {}, which is in {} state.", containerID, container.getState()); } + } catch (NotLeaderException nle) { + LOG.warn("Skip sending close container command," + + " since current SCM is not leader.", nle); } catch (IOException | InvalidStateTransitionException ex) { LOG.error("Failed to close the container {}.", containerID, ex); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index 365750c7e9ac..48603c001df5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher @@ -73,8 +74,9 @@ public class ContainerReportHandler extends AbstractContainerReportHandler */ public ContainerReportHandler(final NodeManager nodeManager, final ContainerManagerV2 containerManager, + final SCMContext scmContext, OzoneConfiguration conf) { - super(containerManager, LOG); + super(containerManager, scmContext, LOG); this.nodeManager = nodeManager; this.containerManager = containerManager; @@ -88,7 +90,7 @@ public ContainerReportHandler(final NodeManager nodeManager, public ContainerReportHandler(final NodeManager nodeManager, final ContainerManagerV2 containerManager) { - this(nodeManager, containerManager, null); + this(nodeManager, containerManager, SCMContext.emptyContext(), null); } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java index 0f0f0f1dc653..1ad507dd1b10 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos .ContainerReplicaProto; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher @@ -47,8 +48,9 @@ public class IncrementalContainerReportHandler extends public IncrementalContainerReportHandler( final NodeManager nodeManager, - final ContainerManagerV2 containerManager) { - super(containerManager, LOG); + final ContainerManagerV2 containerManager, + final SCMContext scmContext) { + super(containerManager, scmContext, LOG); this.nodeManager = nodeManager; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 91e5f723698a..0559c3c7e0b0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdds.scm.ContainerPlacementStatus; import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; @@ -72,6 +73,8 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE; import static org.apache.hadoop.hdds.conf.ConfigTag.SCM; + +import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,6 +108,11 @@ public class ReplicationManager */ private final EventPublisher eventPublisher; + /** + * SCMContext from StorageContainerManager. + */ + private final SCMContext scmContext; + /** * Used for locking a container using its ID while processing it. */ @@ -161,11 +169,13 @@ public ReplicationManager(final ReplicationManagerConfiguration conf, final ContainerManagerV2 containerManager, final PlacementPolicy containerPlacement, final EventPublisher eventPublisher, + final SCMContext scmContext, final LockManager lockManager, final NodeManager nodeManager) { this.containerManager = containerManager; this.containerPlacement = containerPlacement; this.eventPublisher = eventPublisher; + this.scmContext = scmContext; this.lockManager = lockManager; this.nodeManager = nodeManager; this.conf = conf; @@ -957,10 +967,16 @@ private void sendCloseCommand(final ContainerInfo container, LOG.info("Sending close container command for container {}" + " to datanode {}.", container.containerID(), datanode); - CloseContainerCommand closeContainerCommand = new CloseContainerCommand(container.getContainerID(), container.getPipelineID(), force); + try { + closeContainerCommand.setTerm(scmContext.getTerm()); + } catch (NotLeaderException nle) { + LOG.warn("Skip sending close container command," + + " since current SCM is not leader.", nle); + return; + } eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, new CommandForDatanode<>(datanode.getUuid(), closeContainerCommand)); } @@ -1026,6 +1042,13 @@ private void sendAndTrackDatanodeCommand( final DatanodeDetails datanode, final SCMCommand command, final Consumer tracker) { + try { + command.setTerm(scmContext.getTerm()); + } catch (NotLeaderException nle) { + LOG.warn("Skip sending datanode command," + + " since current SCM is not leader.", nle); + return; + } final CommandForDatanode datanodeCommand = new CommandForDatanode<>(datanode.getUuid(), command); eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java index 08fd2b2a65d5..bf25ad53601e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java @@ -24,7 +24,6 @@ import java.util.EnumMap; import java.util.List; import java.util.Map; -import java.util.Optional; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; @@ -65,11 +64,10 @@ public void start() throws IOException { } /** - * {@inheritDoc} + * Informs MockRatisServe to behaviour as a leader SCM or a follower SCM. */ - @Override - public Optional isLeader() { - return isLeader ? Optional.of((long)0) : Optional.empty(); + boolean isLeader() { + return isLeader; } public void setIsLeader(boolean isLeader) { @@ -113,7 +111,7 @@ public SCMRatisResponse submitRequest(final SCMRatisRequest request) final RaftGroupMemberId raftId = RaftGroupMemberId.valueOf( RaftPeerId.valueOf("peer"), RaftGroupId.randomId()); RaftClientReply reply; - if (isLeader().isPresent()) { + if (isLeader()) { try { final Message result = process(request); reply = RaftClientReply.newBuilder() diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java new file mode 100644 index 000000000000..17dad7e07c71 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.ha; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.ratis.protocol.exceptions.NotLeaderException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * SCMContext is the single source of truth for some key information shared + * across all components within SCM, including: + * - RaftServer related info, e.g., isLeader, term. + * - SafeMode related info, e.g., inSafeMode, preCheckComplete. + */ +public class SCMContext implements EventHandler { + private static final Logger LOG = LoggerFactory.getLogger(SCMContext.class); + + private static final SCMContext EMPTY_CONTEXT + = new SCMContext(true, 0, new SafeModeStatus(false, true), null); + + /** + * Used by non-HA mode SCM, Recon and Unit Tests. + */ + public static SCMContext emptyContext() { + return EMPTY_CONTEXT; + } + + /** + * Raft related info. + */ + private boolean isLeader; + private long term; + + /** + * Safe mode related info. + */ + private SafeModeStatus safeModeStatus; + + private final StorageContainerManager scm; + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + SCMContext(boolean isLeader, long term, + final SafeModeStatus safeModeStatus, + final StorageContainerManager scm) { + this.isLeader = isLeader; + this.term = term; + this.safeModeStatus = safeModeStatus; + this.scm = scm; + } + + /** + * Creates SCMContext instance from StorageContainerManager. + */ + public SCMContext(final StorageContainerManager scm) { + this(false, 0, new SafeModeStatus(true, false), scm); + Preconditions.checkNotNull(scm, "scm is null"); + } + + /** + * + * @param newIsLeader : is leader or not + * @param newTerm : term if current SCM becomes leader + */ + public void updateIsLeaderAndTerm(boolean newIsLeader, long newTerm) { + lock.writeLock().lock(); + try { + LOG.info("update from <{},{}> to <{},{}>", + isLeader, term, newIsLeader, newTerm); + + isLeader = newIsLeader; + term = newTerm; + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Check whether current SCM is leader or not. + * + * @return isLeader + */ + public boolean isLeader() { + lock.readLock().lock(); + try { + return isLeader; + } finally { + lock.readLock().unlock(); + } + } + + /** + * Get term of current leader SCM. + * + * @return term + * @throws NotLeaderException if isLeader is false + */ + public long getTerm() throws NotLeaderException { + lock.readLock().lock(); + try { + if (!isLeader) { + LOG.warn("getTerm is invoked when not leader."); + throw scm.getScmHAManager() + .getRatisServer() + .triggerNotLeaderException(); + } + return term; + } finally { + lock.readLock().unlock(); + } + } + + @Override + public void onMessage(SafeModeStatus status, + EventPublisher publisher) { + lock.writeLock().lock(); + try { + LOG.info("Update SafeModeStatus from {} to {}.", safeModeStatus, status); + safeModeStatus = status; + } finally { + lock.writeLock().unlock(); + } + } + + public boolean isInSafeMode() { + lock.readLock().lock(); + try { + return safeModeStatus.isInSafeMode(); + } finally { + lock.readLock().unlock(); + } + } + + public boolean isPreCheckComplete() { + lock.readLock().lock(); + try { + return safeModeStatus.isPreCheckComplete(); + } finally { + lock.readLock().unlock(); + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java index 59410b19c2df..8fe6d7fced00 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.ha; import java.io.IOException; -import java.util.Optional; /** * SCMHAManager provides HA service for SCM. @@ -30,15 +29,6 @@ public interface SCMHAManager { */ void start() throws IOException; - /** - * For HA mode, return an Optional that holds term of the - * underlying RaftServer iff current SCM is in leader role. - * Otherwise, return an empty optional. - * - * For non-HA mode, return an Optional that holds term 0. - */ - Optional isLeader(); - /** * Returns RatisServer instance associated with the SCM instance. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java index bac6336a4e93..db5e9373aa9e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java @@ -18,12 +18,11 @@ package org.apache.hadoop.hdds.scm.ha; import org.apache.hadoop.hdds.conf.ConfigurationSource; -import org.apache.ratis.proto.RaftProtos; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Optional; /** * SCMHAManagerImpl uses Apache Ratis for HA implementation. We will have 2N+1 @@ -44,10 +43,12 @@ public class SCMHAManagerImpl implements SCMHAManager { /** * Creates SCMHAManager instance. */ - public SCMHAManagerImpl(final ConfigurationSource conf) throws IOException { + public SCMHAManagerImpl(final ConfigurationSource conf, + final StorageContainerManager scm) + throws IOException { this.conf = conf; this.ratisServer = new SCMRatisServerImpl( - conf.getObject(SCMHAConfiguration.class), conf); + conf.getObject(SCMHAConfiguration.class), conf, scm); } /** @@ -58,27 +59,6 @@ public void start() throws IOException { ratisServer.start(); } - /** - * {@inheritDoc} - */ - @Override - public Optional isLeader() { - if (!SCMHAUtils.isSCMHAEnabled(conf)) { - // When SCM HA is not enabled, the current SCM is always the leader. - return Optional.of((long)0); - } - RaftProtos.RoleInfoProto roleInfoProto - = ratisServer.getDivision().getInfo().getRoleInfoProto(); - - return roleInfoProto.hasLeaderInfo() - ? Optional.of(roleInfoProto.getLeaderInfo().getTerm()) - : Optional.empty(); - } - - /** - * {@inheritDoc} - */ - @Override public SCMRatisServer getRatisServer() { return ratisServer; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java index e9594461e7aa..3a453e31c9d2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftClientReply; @@ -55,6 +56,7 @@ public class SCMRatisServerImpl implements SCMRatisServer { LoggerFactory.getLogger(SCMRatisServerImpl.class); private final RaftServer.Division division; + private final StorageContainerManager scm; private final InetSocketAddress address; private final ClientId clientId = ClientId.randomId(); private final AtomicLong callId = new AtomicLong(); @@ -62,8 +64,10 @@ public class SCMRatisServerImpl implements SCMRatisServer { // TODO: Refactor and remove ConfigurationSource and use only // SCMHAConfiguration. SCMRatisServerImpl(final SCMHAConfiguration haConf, - final ConfigurationSource conf) + final ConfigurationSource conf, + final StorageContainerManager scm) throws IOException { + this.scm = scm; this.address = haConf.getRatisBindAddress(); SCMHAGroupBuilder haGrpBuilder = new SCMHAGroupBuilder(haConf, conf); @@ -75,7 +79,7 @@ public class SCMRatisServerImpl implements SCMRatisServer { .setServerId(haGrpBuilder.getPeerId()) .setGroup(haGrpBuilder.getRaftGroup()) .setProperties(serverProperties) - .setStateMachine(new SCMStateMachine()) + .setStateMachine(new SCMStateMachine(scm, this)) .build(); this.division = server.getDivision(haGrpBuilder.getRaftGroupId()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java index ee26e58ee392..052bf4b86587 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java @@ -20,26 +20,40 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftGroupMemberId; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.statemachine.impl.BaseStateMachine; import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * TODO. */ public class SCMStateMachine extends BaseStateMachine { + private static final Logger LOG = + LoggerFactory.getLogger(SCMStateMachine.class); + private final StorageContainerManager scm; + private final SCMRatisServer ratisServer; private final Map handlers; - public SCMStateMachine() { + + public SCMStateMachine(final StorageContainerManager scm, + final SCMRatisServer ratisServer) { + this.scm = scm; + this.ratisServer = ratisServer; this.handlers = new EnumMap<>(RequestType.class); } @@ -89,4 +103,27 @@ private Message process(final SCMRatisRequest request) } } + @Override + public void notifyNotLeader(Collection pendingEntries) { + LOG.info("current leader SCM steps down."); + scm.getScmContext().updateIsLeaderAndTerm(false, 0); + } + + @Override + public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, + RaftPeerId newLeaderId) { + if (!groupMemberId.getPeerId().equals(newLeaderId)) { + LOG.info("leader changed, yet current SCM is still follower."); + return; + } + + long term = scm.getScmHAManager() + .getRatisServer() + .getDivision() + .getInfo() + .getCurrentTerm(); + + LOG.info("current SCM becomes leader of term {}.", term); + scm.getScmContext().updateIsLeaderAndTerm(true, term); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index da51bea7cfdf..4e6f53e987b7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -25,7 +25,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.Collections; @@ -49,7 +48,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; -import org.apache.hadoop.hdds.scm.ha.SCMHAManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; @@ -75,6 +74,7 @@ import com.google.common.base.Strings; import org.apache.hadoop.util.Time; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,16 +111,14 @@ public class SCMNodeManager implements NodeManager { new ConcurrentHashMap<>(); private final int numPipelinesPerMetadataVolume; private final int heavyNodeCriteria; - private final SCMHAManager scmhaManager; + private final SCMContext scmContext; /** * Constructs SCM machine Manager. */ public SCMNodeManager(OzoneConfiguration conf, - SCMStorageConfig scmStorageConfig, - EventPublisher eventPublisher, - NetworkTopology networkTopology, - SCMHAManager scmhaManager) { + SCMStorageConfig scmStorageConfig, EventPublisher eventPublisher, + NetworkTopology networkTopology, SCMContext scmContext) { this.nodeStateManager = new NodeStateManager(conf, eventPublisher); this.version = VersionInfo.getLatestVersion(); this.commandQueue = new CommandQueue(); @@ -146,14 +144,7 @@ public SCMNodeManager(OzoneConfiguration conf, ScmConfigKeys.OZONE_SCM_PIPELINE_PER_METADATA_VOLUME_DEFAULT); String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT); this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit); - this.scmhaManager = scmhaManager; - } - - public SCMNodeManager(OzoneConfiguration conf, - SCMStorageConfig scmStorageConfig, - EventPublisher eventPublisher, - NetworkTopology networkTopology) { - this(conf, scmStorageConfig, eventPublisher, networkTopology, null); + this.scmContext = scmContext; } private void registerMXBean() { @@ -436,10 +427,19 @@ private void updateDatanodeOpState(DatanodeDetails reportedDn) reportedDn.getPersistedOpStateExpiryEpochSec(), scmStatus.getOperationalState(), scmStatus.getOpStateExpiryEpochSeconds()); - addDatanodeCommand(reportedDn.getUuid(), - new SetNodeOperationalStateCommand( - Time.monotonicNow(), scmStatus.getOperationalState(), - scmStatus.getOpStateExpiryEpochSeconds())); + + try { + SCMCommand command = new SetNodeOperationalStateCommand( + Time.monotonicNow(), + scmStatus.getOperationalState(), + scmStatus.getOpStateExpiryEpochSeconds()); + command.setTerm(scmContext.getTerm()); + addDatanodeCommand(reportedDn.getUuid(), command); + } catch (NotLeaderException nle) { + LOG.warn("Skip sending SetNodeOperationalStateCommand," + + " since current SCM is not leader.", nle); + return; + } } DatanodeDetails scmDnd = nodeStateManager.getNode(reportedDn); scmDnd.setPersistedOpStateExpiryEpochSec( @@ -802,18 +802,6 @@ public Set getContainers(DatanodeDetails datanodeDetails) // Refactor and remove all the usage of this method and delete this method. @Override public void addDatanodeCommand(UUID dnId, SCMCommand command) { - if (scmhaManager != null && command.getTerm() == 0) { - Optional termOpt = scmhaManager.isLeader(); - - if (!termOpt.isPresent()) { - LOG.warn("Not leader, drop SCMCommand {}.", command); - return; - } - - LOG.warn("Help set term {} for SCMCommand {}. It is not an accurate " + - "way to set term of SCMCommand.", termOpt.get(), command); - command.setTerm(termOpt.get()); - } this.commandQueue.addCommand(dnId, command); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java index e719adbf057b..89d2833d32ee 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java @@ -23,12 +23,15 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction; import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,11 +47,13 @@ public class PipelineActionHandler LoggerFactory.getLogger(PipelineActionHandler.class); private final PipelineManager pipelineManager; + private final SCMContext scmContext; private final ConfigurationSource ozoneConf; public PipelineActionHandler(PipelineManager pipelineManager, - OzoneConfiguration conf) { + SCMContext scmContext, OzoneConfiguration conf) { this.pipelineManager = pipelineManager; + this.scmContext = scmContext; this.ozoneConf = conf; } @@ -87,9 +92,16 @@ private void processPipelineAction(final DatanodeDetails datanode, } catch (PipelineNotFoundException e) { LOG.warn("Pipeline action {} received for unknown pipeline {}, " + "firing close pipeline event.", action, pid); + SCMCommand command = new ClosePipelineCommand(pid); + try { + command.setTerm(scmContext.getTerm()); + } catch (NotLeaderException nle) { + LOG.warn("Skip sending ClosePipelineCommand for pipeline {}," + + " since not leader SCM.", pid); + return; + } publisher.fireEvent(SCMEvents.DATANODE_COMMAND, - new CommandForDatanode<>(datanode.getUuid(), - new ClosePipelineCommand(pid))); + new CommandForDatanode<>(datanode.getUuid(), command)); } catch (IOException ioe) { LOG.error("Could not execute pipeline action={} pipeline={}", action, pid, ioe); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java index 6bf1d4e9bcd4..ed73a64dd2ac 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -40,14 +41,15 @@ public class PipelineFactory { private Map providers; PipelineFactory(NodeManager nodeManager, StateManager stateManager, - ConfigurationSource conf, EventPublisher eventPublisher) { + ConfigurationSource conf, EventPublisher eventPublisher, + SCMContext scmContext) { providers = new HashMap<>(); providers.put(ReplicationType.STAND_ALONE, new SimplePipelineProvider(nodeManager, stateManager)); providers.put(ReplicationType.RATIS, new RatisPipelineProvider(nodeManager, stateManager, conf, - eventPublisher)); + eventPublisher, scmContext)); } protected PipelineFactory() { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java index 7d6c88a2a7a2..a89ae785381a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.ha.SCMHAManager; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; @@ -112,9 +113,12 @@ private PipelineManagerV2Impl(ConfigurationSource conf, } public static PipelineManagerV2Impl newPipelineManager( - ConfigurationSource conf, SCMHAManager scmhaManager, - NodeManager nodeManager, Table pipelineStore, - EventPublisher eventPublisher) throws IOException { + ConfigurationSource conf, + SCMHAManager scmhaManager, + NodeManager nodeManager, + Table pipelineStore, + EventPublisher eventPublisher, + SCMContext scmContext) throws IOException { // Create PipelineStateManager StateManager stateManager = PipelineStateManagerV2Impl .newBuilder().setPipelineStore(pipelineStore) @@ -124,7 +128,7 @@ public static PipelineManagerV2Impl newPipelineManager( // Create PipelineFactory PipelineFactory pipelineFactory = new PipelineFactory( - nodeManager, stateManager, conf, eventPublisher); + nodeManager, stateManager, conf, eventPublisher, scmContext); // Create PipelineManager PipelineManagerV2Impl pipelineManager = new PipelineManagerV2Impl(conf, scmhaManager, nodeManager, stateManager, pipelineFactory, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java index ac6a4ad32630..ca514337fee0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.safemode.SafeModeManager; import org.apache.hadoop.hdds.scm.server .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; @@ -36,6 +37,7 @@ import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,14 +54,18 @@ public class PipelineReportHandler implements private final PipelineManager pipelineManager; private final ConfigurationSource conf; private final SafeModeManager scmSafeModeManager; + private final SCMContext scmContext; private final boolean pipelineAvailabilityCheck; private final SCMPipelineMetrics metrics; public PipelineReportHandler(SafeModeManager scmSafeModeManager, - PipelineManager pipelineManager, ConfigurationSource conf) { + PipelineManager pipelineManager, + SCMContext scmContext, + ConfigurationSource conf) { Preconditions.checkNotNull(pipelineManager); this.scmSafeModeManager = scmSafeModeManager; this.pipelineManager = pipelineManager; + this.scmContext = scmContext; this.conf = conf; this.metrics = SCMPipelineMetrics.create(); this.pipelineAvailabilityCheck = conf.getBoolean( @@ -96,11 +102,10 @@ protected void processPipelineReport(PipelineReport report, try { pipeline = pipelineManager.getPipeline(pipelineID); } catch (PipelineNotFoundException e) { - final ClosePipelineCommand closeCommand = - new ClosePipelineCommand(pipelineID); - final CommandForDatanode datanodeCommand = - new CommandForDatanode<>(dn.getUuid(), closeCommand); - publisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand); + SCMCommand command = new ClosePipelineCommand(pipelineID); + command.setTerm(scmContext.getTerm()); + publisher.fireEvent(SCMEvents.DATANODE_COMMAND, + new CommandForDatanode<>(dn.getUuid(), command)); return; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index c4f47a363b58..ede3b1e540dc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; @@ -38,6 +39,7 @@ import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand; +import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,14 +58,18 @@ public class RatisPipelineProvider extends PipelineProvider { private int pipelineNumberLimit; private int maxPipelinePerDatanode; private final LeaderChoosePolicy leaderChoosePolicy; + private final SCMContext scmContext; @VisibleForTesting public RatisPipelineProvider(NodeManager nodeManager, - StateManager stateManager, ConfigurationSource conf, - EventPublisher eventPublisher) { + StateManager stateManager, + ConfigurationSource conf, + EventPublisher eventPublisher, + SCMContext scmContext) { super(nodeManager, stateManager); this.conf = conf; this.eventPublisher = eventPublisher; + this.scmContext = scmContext; this.placementPolicy = new PipelinePlacementPolicy(nodeManager, stateManager, conf); this.pipelineNumberLimit = conf.getInt( @@ -157,6 +163,8 @@ public synchronized Pipeline create(ReplicationFactor factor) new CreatePipelineCommand(pipeline.getId(), pipeline.getType(), factor, dns); + createCommand.setTerm(scmContext.getTerm()); + dns.forEach(node -> { LOG.info("Sending CreatePipelineCommand for pipeline:{} to datanode:{}", pipeline.getId(), node.getUuidString()); @@ -187,14 +195,15 @@ public void shutdown() { * Removes pipeline from SCM. Sends command to destroy pipeline on all * the datanodes. * - * @param pipeline - Pipeline to be destroyed - * @throws IOException + * @param pipeline - Pipeline to be destroyed + * @throws NotLeaderException - Send datanode command while not leader */ - public void close(Pipeline pipeline) { + public void close(Pipeline pipeline) throws NotLeaderException { final ClosePipelineCommand closeCommand = new ClosePipelineCommand(pipeline.getId()); - pipeline.getNodes().stream().forEach(node -> { - final CommandForDatanode datanodeCommand = + closeCommand.setTerm(scmContext.getTerm()); + pipeline.getNodes().forEach(node -> { + final CommandForDatanode datanodeCommand = new CommandForDatanode<>(node.getUuid(), closeCommand); LOG.info("Send pipeline:{} close command to datanode {}", pipeline.getId(), datanodeCommand.getDatanodeId()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 1e5d5053d8b3..0a6dec60f496 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -97,7 +98,7 @@ public SCMPipelineManager(ConfigurationSource conf, this(conf, nodeManager, pipelineStore, eventPublisher, null, null); this.stateManager = new PipelineStateManager(); this.pipelineFactory = new PipelineFactory(nodeManager, - stateManager, conf, eventPublisher); + stateManager, conf, eventPublisher, SCMContext.emptyContext()); this.pipelineStore = pipelineStore; initializePipelineState(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java index ea6a148a95c2..ff68239297bc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java @@ -42,7 +42,7 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; -import org.apache.hadoop.hdds.scm.server.SCMBlockProtocolServer; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher; import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; @@ -64,6 +64,7 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB implements ScmBlockLocationProtocolPB { private final ScmBlockLocationProtocol impl; + private final StorageContainerManager scm; private static final Logger LOG = LoggerFactory .getLogger(ScmBlockLocationProtocolServerSideTranslatorPB.class); @@ -79,9 +80,11 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB */ public ScmBlockLocationProtocolServerSideTranslatorPB( ScmBlockLocationProtocol impl, + StorageContainerManager scm, ProtocolMessageMetrics metrics) throws IOException { this.impl = impl; + this.scm = scm; dispatcher = new OzoneProtocolMessageDispatcher<>( "BlockLocationProtocol", metrics, LOG); @@ -95,19 +98,13 @@ private SCMBlockLocationResponse.Builder createSCMBlockResponse( .setTraceID(traceID); } - private boolean isLeader() throws ServiceException { - if (!(impl instanceof SCMBlockProtocolServer)) { - throw new ServiceException("Should be SCMBlockProtocolServer"); - } else { - return ((SCMBlockProtocolServer) impl).getScm().checkLeader(); - } - } - @Override public SCMBlockLocationResponse send(RpcController controller, SCMBlockLocationRequest request) throws ServiceException { - if (!isLeader()) { - throw new ServiceException(new IOException("SCM IS NOT LEADER")); + if (!scm.getScmContext().isLeader()) { + throw new ServiceException(scm.getScmHAManager() + .getRatisServer() + .triggerNotLeaderException()); } return dispatcher.processRequest( request, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index 1d964d2de274..d515d379d3ea 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -79,7 +79,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; -import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher; import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics; @@ -105,6 +105,7 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB StorageContainerLocationProtocolServerSideTranslatorPB.class); private final StorageContainerLocationProtocol impl; + private final StorageContainerManager scm; private OzoneProtocolMessageDispatcher @@ -119,27 +120,23 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB */ public StorageContainerLocationProtocolServerSideTranslatorPB( StorageContainerLocationProtocol impl, + StorageContainerManager scm, ProtocolMessageMetrics protocolMetrics) throws IOException { this.impl = impl; + this.scm = scm; this.dispatcher = new OzoneProtocolMessageDispatcher<>("ScmContainerLocation", protocolMetrics, LOG); } - private boolean isLeader() throws ServiceException { - if (!(impl instanceof SCMClientProtocolServer)) { - throw new ServiceException("Should be SCMClientProtocolServer"); - } else { - return ((SCMClientProtocolServer) impl).getScm().checkLeader(); - } - } - @Override public ScmContainerLocationResponse submitRequest(RpcController controller, ScmContainerLocationRequest request) throws ServiceException { - if (!isLeader()) { - throw new ServiceException(new IOException("SCM IS NOT LEADER")); + if (!scm.getScmContext().isLeader()) { + throw new ServiceException(scm.getScmHAManager() + .getRatisServer() + .triggerNotLeaderException()); } return dispatcher .processRequest(request, this::processRequest, request.getCmdType(), diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java index 67441945c1a4..26fb80660a24 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java @@ -330,8 +330,8 @@ public OneReplicaPipelineSafeModeRule getOneReplicaPipelineSafeModeRule() { */ public static class SafeModeStatus { - private boolean safeModeStatus; - private boolean preCheckPassed; + private final boolean safeModeStatus; + private final boolean preCheckPassed; public SafeModeStatus(boolean safeModeState, boolean preCheckPassed) { this.safeModeStatus = safeModeState; @@ -345,6 +345,14 @@ public boolean isInSafeMode() { public boolean isPreCheckComplete() { return preCheckPassed; } + + @Override + public String toString() { + return "SafeModeStatus{" + + "safeModeStatus=" + safeModeStatus + + ", preCheckPassed=" + preCheckPassed + + '}'; + } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java index 170e0ee22637..d17675c4649e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java @@ -115,7 +115,7 @@ public SCMBlockProtocolServer(OzoneConfiguration conf, BlockingService blockProtoPbService = ScmBlockLocationProtocolProtos.ScmBlockLocationProtocolService .newReflectiveBlockingService( - new ScmBlockLocationProtocolServerSideTranslatorPB(this, + new ScmBlockLocationProtocolServerSideTranslatorPB(this, scm, protocolMessageMetrics)); final InetSocketAddress scmBlockAddress = HddsServerUtil @@ -293,10 +293,6 @@ public ScmInfo getScmInfo() throws IOException { } } - public StorageContainerManager getScm() { - return scm; - } - @Override public List sortDatanodes(List nodes, String clientMachine) throws IOException { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index d30370527256..9df843051bf6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -39,15 +39,12 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos; import org.apache.hadoop.hdds.scm.ScmInfo; -import org.apache.hadoop.hdds.scm.ScmUtils; import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; -import org.apache.hadoop.hdds.scm.safemode.SafeModePrecheck; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; @@ -61,9 +58,6 @@ import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocolServerSideTranslatorPB; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; -import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus; -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics; import org.apache.hadoop.io.IOUtils; @@ -93,8 +87,7 @@ * The RPC server that listens to requests from clients. */ public class SCMClientProtocolServer implements - StorageContainerLocationProtocol, Auditor, - EventHandler { + StorageContainerLocationProtocol, Auditor { private static final Logger LOG = LoggerFactory.getLogger(SCMClientProtocolServer.class); private static final AuditLogger AUDIT = @@ -103,14 +96,12 @@ public class SCMClientProtocolServer implements private final InetSocketAddress clientRpcAddress; private final StorageContainerManager scm; private final OzoneConfiguration conf; - private SafeModePrecheck safeModePrecheck; private final ProtocolMessageMetrics protocolMetrics; public SCMClientProtocolServer(OzoneConfiguration conf, StorageContainerManager scm) throws IOException { this.scm = scm; this.conf = conf; - safeModePrecheck = new SafeModePrecheck(conf); final int handlerCount = conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY, OZONE_SCM_HANDLER_COUNT_DEFAULT); @@ -126,6 +117,7 @@ public SCMClientProtocolServer(OzoneConfiguration conf, BlockingService storageProtoPbService = newReflectiveBlockingService( new StorageContainerLocationProtocolServerSideTranslatorPB(this, + scm, protocolMetrics)); final InetSocketAddress scmAddress = HddsServerUtil @@ -187,7 +179,10 @@ public String getRpcRemoteUsername() { public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType replicationType, HddsProtos.ReplicationFactor factor, String owner) throws IOException { - ScmUtils.preCheck(ScmOps.allocateContainer, safeModePrecheck); + if (scm.getScmContext().isInSafeMode()) { + throw new SCMException("SafeModePrecheck failed for allocateContainer", + ResultCodes.SAFE_MODE_EXCEPTION); + } getScm().checkAdminAccess(getRpcRemoteUsername()); final ContainerInfo container = scm.getContainerManager() @@ -229,7 +224,7 @@ private ContainerWithPipeline getContainerWithPipelineCommon( final ContainerInfo container = scm.getContainerManager() .getContainer(cid); - if (safeModePrecheck.isInSafeMode()) { + if (scm.getScmContext().isInSafeMode()) { if (container.isOpen()) { if (!hasRequiredReplicas(container)) { throw new SCMException("Open container " + containerID + " doesn't" @@ -635,7 +630,7 @@ public StorageContainerManager getScm() { * Set safe mode status based on . */ public boolean getSafeModeStatus() { - return safeModePrecheck.isInSafeMode(); + return scm.getScmContext().isInSafeMode(); } @@ -688,10 +683,4 @@ public AuditMessage buildAuditMessageForFailure(AuditAction op, Map(conf), scmNodeManager); } @@ -1004,6 +1018,13 @@ public NodeDecommissionManager getScmDecommissionManager() { return scmDecommissionManager; } + /** + * Returns SCMHAManager. + */ + public SCMHAManager getScmHAManager() { + return scmHAManager; + } + /** * Returns SCM container manager. */ @@ -1052,7 +1073,7 @@ public ReplicationManager getReplicationManager() { * @return - if the current scm is the leader. */ public boolean checkLeader() { - return scmHAManager.isLeader().isPresent(); + return scmContext.isLeader(); } public void checkAdminAccess(String remoteUser) throws IOException { @@ -1139,6 +1160,13 @@ public EventPublisher getEventQueue() { return eventQueue; } + /** + * Returns SCMContext. + */ + public SCMContext getScmContext() { + return scmContext; + } + /** * Force SCM out of safe mode. */ @@ -1208,8 +1236,4 @@ public String getScmId() { public String getClusterId() { return getScmStorageConfig().getClusterID(); } - - public SCMHAManager getScmHAManager() { - return scmHAManager; - } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index 4bac431b82c5..5bfa11ae3ac4 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerManagerV2; import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; @@ -493,6 +494,7 @@ public static StorageContainerManager getScm(OzoneConfiguration conf) throws IOException, AuthenticationException { SCMConfigurator configurator = new SCMConfigurator(); configurator.setSCMHAManager(MockSCMHAManager.getInstance(true)); + configurator.setScmContext(SCMContext.emptyContext()); return getScm(conf, configurator); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java index afb9eba61b23..c8c8243cae6d 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.ha.SCMHAManager; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; @@ -53,6 +54,7 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider; import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl; import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; +import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus; import org.apache.hadoop.hdds.scm.server.SCMConfigurator; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.server.events.EventHandler; @@ -88,6 +90,7 @@ public class TestBlockManager { private static HddsProtos.ReplicationFactor factor; private static HddsProtos.ReplicationType type; private EventQueue eventQueue; + private SCMContext scmContext; private int numContainerPerOwnerInPipeline; private OzoneConfiguration conf; @@ -117,6 +120,7 @@ public void setUp() throws Exception { scmHAManager = MockSCMHAManager.getInstance(true); eventQueue = new EventQueue(); + scmContext = SCMContext.emptyContext(); scmMetadataStore = new SCMMetadataStoreImpl(conf); scmMetadataStore.start(conf); @@ -126,7 +130,8 @@ public void setUp() throws Exception { scmHAManager, nodeManager, scmMetadataStore.getPipelineTable(), - eventQueue); + eventQueue, + scmContext); pipelineManager.allowPipelineCreation(); PipelineProvider mockRatisProvider = @@ -154,6 +159,7 @@ public void emitSafeModeStatus() { configurator.setScmSafeModeManager(safeModeManager); configurator.setMetadataStore(scmMetadataStore); configurator.setSCMHAManager(scmHAManager); + configurator.setScmContext(scmContext); scm = TestUtils.getScm(conf, configurator); // Initialize these fields so that the tests can pass. @@ -162,14 +168,13 @@ public void emitSafeModeStatus() { DatanodeCommandHandler handler = new DatanodeCommandHandler(); eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, handler); CloseContainerEventHandler closeContainerHandler = - new CloseContainerEventHandler(pipelineManager, mapping); + new CloseContainerEventHandler(pipelineManager, mapping, scmContext); eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler); factor = HddsProtos.ReplicationFactor.THREE; type = HddsProtos.ReplicationType.RATIS; - - blockManager.onMessage( - new SCMSafeModeManager.SafeModeStatus(false, false), null); + scm.getScmContext().onMessage( + new SafeModeStatus(false, true), null); } @After @@ -455,7 +460,7 @@ public void testAllocateOversizedBlock() throws Exception { @Test public void testAllocateBlockFailureInSafeMode() throws Exception { - blockManager.onMessage( + scm.getScmContext().onMessage( new SCMSafeModeManager.SafeModeStatus(true, true), null); // Test1: In safe mode expect an SCMException. thrown.expectMessage("SafeModePrecheck failed for " diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java index a29dc16907c9..6d5fee200934 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider; @@ -61,6 +62,7 @@ public class TestCloseContainerEventHandler { private static long size; private static File testDir; private static EventQueue eventQueue; + private static SCMContext scmContext; private static SCMMetadataStore scmMetadataStore; @BeforeClass @@ -75,6 +77,7 @@ public static void setUp() throws Exception { configuration.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 16); nodeManager = new MockNodeManager(true, 10); eventQueue = new EventQueue(); + scmContext = SCMContext.emptyContext(); scmMetadataStore = new SCMMetadataStoreImpl(configuration); pipelineManager = @@ -83,7 +86,8 @@ public static void setUp() throws Exception { MockSCMHAManager.getInstance(true), nodeManager, scmMetadataStore.getPipelineTable(), - eventQueue); + eventQueue, + scmContext); pipelineManager.allowPipelineCreation(); PipelineProvider mockRatisProvider = @@ -98,8 +102,7 @@ public static void setUp() throws Exception { pipelineManager.triggerPipelineCreation(); eventQueue.addHandler(CLOSE_CONTAINER, new CloseContainerEventHandler( - pipelineManager, - containerManager)); + pipelineManager, containerManager, scmContext)); eventQueue.addHandler(DATANODE_COMMAND, nodeManager); // Move all pipelines created by background from ALLOCATED to OPEN state Thread.sleep(2000); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java index 10627a3849bb..7c0c1eccd4c7 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java @@ -26,6 +26,7 @@ .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.scm.node.NodeManager; @@ -62,6 +63,7 @@ public class TestIncrementalContainerReportHandler { private ContainerManagerV2 containerManager; private ContainerStateManager containerStateManager; private EventPublisher publisher; + private SCMContext scmContext = SCMContext.emptyContext(); @Before public void setup() throws IOException, InvalidStateTransitionException { @@ -74,13 +76,12 @@ public void setup() throws IOException, InvalidStateTransitionException { NetworkTopology clusterMap = new NetworkTopologyImpl(conf); EventQueue eventQueue = new EventQueue(); SCMStorageConfig storageConfig = new SCMStorageConfig(conf); - this.nodeManager = - new SCMNodeManager(conf, storageConfig, eventQueue, clusterMap); + this.nodeManager = new SCMNodeManager( + conf, storageConfig, eventQueue, clusterMap, scmContext); this.containerStateManager = new ContainerStateManager(conf); this.publisher = Mockito.mock(EventPublisher.class); - Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class))) .thenAnswer(invocation -> containerStateManager .getContainer((ContainerID)invocation.getArguments()[0])); @@ -119,7 +120,8 @@ public void tearDown() throws IOException { @Test public void testClosingToClosed() throws IOException { final IncrementalContainerReportHandler reportHandler = - new IncrementalContainerReportHandler(nodeManager, containerManager); + new IncrementalContainerReportHandler( + nodeManager, containerManager, scmContext); final ContainerInfo container = getContainer(LifeCycleState.CLOSING); final DatanodeDetails datanodeOne = randomDatanodeDetails(); final DatanodeDetails datanodeTwo = randomDatanodeDetails(); @@ -156,7 +158,8 @@ public void testClosingToClosed() throws IOException { @Test public void testClosingToQuasiClosed() throws IOException { final IncrementalContainerReportHandler reportHandler = - new IncrementalContainerReportHandler(nodeManager, containerManager); + new IncrementalContainerReportHandler( + nodeManager, containerManager, scmContext); final ContainerInfo container = getContainer(LifeCycleState.CLOSING); final DatanodeDetails datanodeOne = randomDatanodeDetails(); final DatanodeDetails datanodeTwo = randomDatanodeDetails(); @@ -194,7 +197,8 @@ public void testClosingToQuasiClosed() throws IOException { @Test public void testQuasiClosedToClosed() throws IOException { final IncrementalContainerReportHandler reportHandler = - new IncrementalContainerReportHandler(nodeManager, containerManager); + new IncrementalContainerReportHandler( + nodeManager, containerManager, scmContext); final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED); final DatanodeDetails datanodeOne = randomDatanodeDetails(); final DatanodeDetails datanodeTwo = randomDatanodeDetails(); @@ -235,7 +239,8 @@ public void testQuasiClosedToClosed() throws IOException { @Test public void testDeleteContainer() throws IOException { final IncrementalContainerReportHandler reportHandler = - new IncrementalContainerReportHandler(nodeManager, containerManager); + new IncrementalContainerReportHandler( + nodeManager, containerManager, scmContext); final ContainerInfo container = getContainer(LifeCycleState.CLOSED); final DatanodeDetails datanodeOne = randomDatanodeDetails(); final DatanodeDetails datanodeTwo = randomDatanodeDetails(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java index 7b5d1fab5381..d926a024fc5a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.node.SCMNodeManager; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; @@ -151,6 +152,7 @@ public void setup() containerManager, containerPlacementPolicy, eventQueue, + SCMContext.emptyContext(), new LockManager<>(conf), nodeManager); replicationManager.start(); @@ -164,6 +166,7 @@ private void createReplicationManager(ReplicationManagerConfiguration rmConf) containerManager, containerPlacementPolicy, eventQueue, + SCMContext.emptyContext(), new LockManager(conf), nodeManager); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java index b45f9c10c2bf..ba0cba5fbae7 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -98,7 +99,8 @@ public static void setUp() throws Exception { MockSCMHAManager.getInstance(true), nodeManager, scmMetadataStore.getPipelineTable(), - new EventQueue()); + new EventQueue(), + SCMContext.emptyContext()); pipelineManager.allowPipelineCreation(); containerManager = new SCMContainerManager(conf, scmMetadataStore.getContainerTable(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestUnknownContainerReport.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestUnknownContainerReport.java index 9d2f5906504f..1e9d830e80f8 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestUnknownContainerReport.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestUnknownContainerReport.java @@ -33,6 +33,7 @@ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.server @@ -103,7 +104,7 @@ public void testUnknownContainerDeleted() throws IOException { */ private void sendContainerReport(OzoneConfiguration conf) { ContainerReportHandler reportHandler = new ContainerReportHandler( - nodeManager, containerManager, conf); + nodeManager, containerManager, SCMContext.emptyContext(), conf); ContainerInfo container = getContainer(LifeCycleState.CLOSED); Iterator nodeIterator = nodeManager diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMContext.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMContext.java new file mode 100644 index 000000000000..a8e4c00bbaa1 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMContext.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.ha; + +import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus; +import org.apache.ratis.protocol.exceptions.NotLeaderException; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +/** + * Test for SCMContext. + */ +public class TestSCMContext { + @Test + public void testRaftOperations() { + // start as follower + SCMContext scmContext = new SCMContext(false, 0, null, null); + assertFalse(scmContext.isLeader()); + + // become leader + scmContext.updateIsLeaderAndTerm(true, 10); + assertTrue(scmContext.isLeader()); + try { + assertEquals(scmContext.getTerm(), 10); + } catch (NotLeaderException e) { + fail("Should not throw nle."); + } + + // step down + scmContext.updateIsLeaderAndTerm(false, 0); + assertFalse(scmContext.isLeader()); + } + + @Test + public void testSafeModeOperations() { + // in safe mode + SCMContext scmContext = new SCMContext( + true, 0, new SafeModeStatus(true, false), null); + assertTrue(scmContext.isInSafeMode()); + assertFalse(scmContext.isPreCheckComplete()); + + // out of safe mode + scmContext.onMessage(new SafeModeStatus(false, true), null); + assertFalse(scmContext.isInSafeMode()); + assertTrue(scmContext.isPreCheckComplete()); + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java index 68ec266ff895..5bcdf4bcbc78 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; @@ -107,7 +108,7 @@ SCMNodeManager createNodeManager(OzoneConfiguration config) Mockito.when(storageConfig.getClusterID()).thenReturn("cluster1"); SCMNodeManager nodeManager = new SCMNodeManager(config, - storageConfig, eventQueue, null); + storageConfig, eventQueue, null, SCMContext.emptyContext()); return nodeManager; } @@ -121,7 +122,8 @@ SCMContainerManager createContainerManager(ConfigurationSource config, MockSCMHAManager.getInstance(true), scmNodeManager, scmMetadataStore.getPipelineTable(), - eventQueue); + eventQueue, + SCMContext.emptyContext()); return new SCMContainerManager(config, scmMetadataStore.getContainerTable(), scmMetadataStore.getStore(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java index 69b031c552f1..78da06670092 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode; @@ -58,8 +59,8 @@ public void resetEventCollector() throws IOException { SCMStorageConfig storageConfig = Mockito.mock(SCMStorageConfig.class); Mockito.when(storageConfig.getClusterID()).thenReturn("cluster1"); NetworkTopology clusterMap = new NetworkTopologyImpl(conf); - nodeManager = - new SCMNodeManager(conf, storageConfig, new EventQueue(), clusterMap); + nodeManager = new SCMNodeManager(conf, storageConfig, + new EventQueue(), clusterMap, SCMContext.emptyContext()); nodeReportHandler = new NodeReportHandler(nodeManager); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java index 04d140367077..0e34ae5d6026 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventQueue; @@ -40,7 +41,7 @@ public MockRatisPipelineProvider( ConfigurationSource conf, EventPublisher eventPublisher, boolean autoOpen) { super(nodeManager, stateManager, - conf, eventPublisher); + conf, eventPublisher, SCMContext.emptyContext()); autoOpenPipeline = autoOpen; } @@ -48,14 +49,14 @@ public MockRatisPipelineProvider(NodeManager nodeManager, StateManager stateManager, ConfigurationSource conf) { super(nodeManager, stateManager, - conf, new EventQueue()); + conf, new EventQueue(), SCMContext.emptyContext()); } public MockRatisPipelineProvider( NodeManager nodeManager, StateManager stateManager, ConfigurationSource conf, EventPublisher eventPublisher) { super(nodeManager, stateManager, - conf, eventPublisher); + conf, eventPublisher, SCMContext.emptyContext()); autoOpenPipeline = true; } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java index 4517b896d416..3578718e431b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineActionsProto; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; @@ -46,7 +47,7 @@ public void testCloseActionForMissingPipeline() .thenThrow(new PipelineNotFoundException()); final PipelineActionHandler actionHandler = - new PipelineActionHandler(manager, null); + new PipelineActionHandler(manager, SCMContext.emptyContext(), null); final PipelineActionsProto actionsProto = PipelineActionsProto.newBuilder() .addPipelineActions(PipelineAction.newBuilder() diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java index 2415b2baba13..c62e76961391 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition; import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher; @@ -98,7 +99,8 @@ private PipelineManagerV2Impl createPipelineManager(boolean isLeader) MockSCMHAManager.getInstance(isLeader), new MockNodeManager(true, 20), SCMDBDefinition.PIPELINES.getTable(dbStore), - new EventQueue()); + new EventQueue(), + SCMContext.emptyContext()); } @Test @@ -329,7 +331,8 @@ public void testPipelineReport() throws Exception { pipelineManager.getPipeline(pipeline.getId()).isHealthy()); // get pipeline report from each dn in the pipeline PipelineReportHandler pipelineReportHandler = - new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf); + new PipelineReportHandler(scmSafeModeManager, pipelineManager, + SCMContext.emptyContext(), conf); nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline, pipelineReportHandler, false)); sendPipelineReport(nodes.get(nodes.size() - 1), pipeline, @@ -438,7 +441,8 @@ public void testPipelineOpenOnlyWhenLeaderReported() throws Exception { new SCMSafeModeManager(new OzoneConfiguration(), new ArrayList<>(), pipelineManager, new EventQueue()); PipelineReportHandler pipelineReportHandler = - new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf); + new PipelineReportHandler(scmSafeModeManager, pipelineManager, + SCMContext.emptyContext(), conf); // Report pipelines with leaders List nodes = pipeline.getNodes(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index 1e264cb672ad..2226a43f002c 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.metadata.PipelineIDCodec; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; @@ -238,7 +239,8 @@ public void testPipelineReport() throws IOException { pipelineManager.getPipeline(pipeline.getId()).isHealthy()); // get pipeline report from each dn in the pipeline PipelineReportHandler pipelineReportHandler = - new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf); + new PipelineReportHandler(scmSafeModeManager, pipelineManager, + SCMContext.emptyContext(), conf); nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline, pipelineReportHandler, false, eventQueue)); sendPipelineReport(nodes.get(nodes.size() - 1), pipeline, @@ -495,7 +497,8 @@ public void testPipelineOpenOnlyWhenLeaderReported() throws Exception { new SCMSafeModeManager(new OzoneConfiguration(), new ArrayList<>(), pipelineManager, eventQueue); PipelineReportHandler pipelineReportHandler = - new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf); + new PipelineReportHandler(scmSafeModeManager, pipelineManager, + SCMContext.emptyContext(), conf); // Report pipelines with leaders List nodes = pipeline.getNodes(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java index 53905e7f45de..13b7d990dc76 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager; import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineProvider; @@ -51,7 +52,8 @@ public void testDefaultPolicy() { mock(NodeManager.class), mock(PipelineStateManager.class), conf, - mock(EventPublisher.class)); + mock(EventPublisher.class), + SCMContext.emptyContext()); Assert.assertSame( ratisPipelineProvider.getLeaderChoosePolicy().getClass(), DefaultLeaderChoosePolicy.class); @@ -67,7 +69,8 @@ public void testClassNotImplemented() { mock(NodeManager.class), mock(PipelineStateManager.class), conf, - mock(EventPublisher.class)); + mock(EventPublisher.class), + SCMContext.emptyContext()); // expecting exception } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java index ee1f06cbe446..19f1f308bde9 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider; @@ -77,7 +78,8 @@ public void testHealthyPipelineSafeModeRuleWithNoPipelines() MockSCMHAManager.getInstance(true), nodeManager, scmMetadataStore.getPipelineTable(), - eventQueue); + eventQueue, + SCMContext.emptyContext()); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, pipelineManager.getStateManager(), config); @@ -126,7 +128,8 @@ public void testHealthyPipelineSafeModeRuleWithPipelines() throws Exception { MockSCMHAManager.getInstance(true), nodeManager, scmMetadataStore.getPipelineTable(), - eventQueue); + eventQueue, + SCMContext.emptyContext()); pipelineManager.allowPipelineCreation(); PipelineProvider mockRatisProvider = @@ -220,7 +223,8 @@ public void testHealthyPipelineSafeModeRuleWithMixedPipelines() MockSCMHAManager.getInstance(true), nodeManager, scmMetadataStore.getPipelineTable(), - eventQueue); + eventQueue, + SCMContext.emptyContext()); pipelineManager.allowPipelineCreation(); PipelineProvider mockRatisProvider = diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java index 5e41289fe60f..b915899ce39b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider; @@ -87,7 +88,8 @@ private void setup(int nodes, int pipelineFactorThreeCount, MockSCMHAManager.getInstance(true), mockNodeManager, scmMetadataStore.getPipelineTable(), - eventQueue); + eventQueue, + SCMContext.emptyContext()); pipelineManager.allowPipelineCreation(); PipelineProvider mockRatisProvider = diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java index 0734eea18ee2..e8dbc2e450cc 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider; @@ -71,6 +72,7 @@ public class TestSCMSafeModeManager { private static EventQueue queue; + private SCMContext scmContext; private SCMSafeModeManager scmSafeModeManager; private static OzoneConfiguration config; private List containers = Collections.emptyList(); @@ -86,6 +88,7 @@ public class TestSCMSafeModeManager { @Before public void setUp() { queue = new EventQueue(); + scmContext = SCMContext.emptyContext(); config = new OzoneConfiguration(); config.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false); @@ -307,7 +310,8 @@ public void testFailWithIncorrectValueForHealthyPipelinePercent() MockSCMHAManager.getInstance(true), mockNodeManager, scmMetadataStore.getPipelineTable(), - queue); + queue, + scmContext); scmSafeModeManager = new SCMSafeModeManager( conf, containers, pipelineManager, queue); fail("testFailWithIncorrectValueForHealthyPipelinePercent"); @@ -330,7 +334,8 @@ public void testFailWithIncorrectValueForOneReplicaPipelinePercent() MockSCMHAManager.getInstance(true), mockNodeManager, scmMetadataStore.getPipelineTable(), - queue); + queue, + scmContext); scmSafeModeManager = new SCMSafeModeManager( conf, containers, pipelineManager, queue); fail("testFailWithIncorrectValueForOneReplicaPipelinePercent"); @@ -352,7 +357,8 @@ public void testFailWithIncorrectValueForSafeModePercent() throws Exception { MockSCMHAManager.getInstance(true), mockNodeManager, scmMetadataStore.getPipelineTable(), - queue); + queue, + scmContext); scmSafeModeManager = new SCMSafeModeManager( conf, containers, pipelineManager, queue); fail("testFailWithIncorrectValueForSafeModePercent"); @@ -381,7 +387,8 @@ public void testSafeModeExitRuleWithPipelineAvailabilityCheck( MockSCMHAManager.getInstance(true), mockNodeManager, scmMetadataStore.getPipelineTable(), - queue); + queue, + scmContext); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(mockNodeManager, pipelineManager.getStateManager(), config); @@ -631,7 +638,8 @@ public void testSafeModePipelineExitRule() throws Exception { MockSCMHAManager.getInstance(true), nodeManager, scmMetadataStore.getPipelineTable(), - queue); + queue, + scmContext); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, @@ -694,7 +702,8 @@ public void testPipelinesNotCreatedUntilPreCheckPasses() MockSCMHAManager.getInstance(true), nodeManager, scmMetadataStore.getPipelineTable(), - queue); + queue, + scmContext); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java index a87dde9b0019..b2201e52d1b7 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB; @@ -60,6 +61,7 @@ public void setUp() throws Exception { config.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.toString()); SCMConfigurator configurator = new SCMConfigurator(); configurator.setSCMHAManager(MockSCMHAManager.getInstance(true)); + configurator.setScmContext(SCMContext.emptyContext()); scm = TestUtils.getScm(config, configurator); scm.start(); scm.exitSafeMode(); @@ -70,7 +72,7 @@ public void setUp() throws Exception { } server = scm.getBlockProtocolServer(); - service = new ScmBlockLocationProtocolServerSideTranslatorPB(server, + service = new ScmBlockLocationProtocolServerSideTranslatorPB(server, scm, Mockito.mock(ProtocolMessageMetrics.class)); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java index fd652837755a..ab57b298da67 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.scm.node.SCMNodeManager; import org.apache.hadoop.hdds.scm.node.SCMNodeMetrics; @@ -63,7 +64,7 @@ public static void setup() throws Exception { SCMStorageConfig config = new SCMStorageConfig(NodeType.DATANODE, new File("/tmp"), "storage"); nodeManager = new SCMNodeManager(source, config, publisher, - new NetworkTopologyImpl(source)); + new NetworkTopologyImpl(source), SCMContext.emptyContext()); registeredDatanode = DatanodeDetails.newBuilder() .setHostName("localhost") diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java index 7721ac3dba02..efbf993081d9 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -176,7 +177,8 @@ public void testPipelineCloseWithPipelineAction() throws Exception { ratisContainer.getPipeline().getId()); // send closing action for pipeline PipelineActionHandler pipelineActionHandler = - new PipelineActionHandler(pipelineManager, conf); + new PipelineActionHandler( + pipelineManager, SCMContext.emptyContext(), conf); pipelineActionHandler .onMessage(pipelineActionsFromDatanode, new EventQueue()); Thread.sleep(5000); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java index d5768aa62850..5e08f2dcf64a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java @@ -46,6 +46,7 @@ import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.test.GenericTestUtils; import static org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer @@ -137,10 +138,13 @@ public void testContainerReplication() throws Exception { Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); //WHEN: send the order to replicate the container + SCMCommand command = new ReplicateContainerCommand(containerId, + sourcePipelines.getNodes()); + command.setTerm( + cluster.getStorageContainerManager().getScmContext().getTerm()); cluster.getStorageContainerManager().getScmNodeManager() .addDatanodeCommand(destinationDatanode.getDatanodeDetails().getUuid(), - new ReplicateContainerCommand(containerId, - sourcePipelines.getNodes())); + command); DatanodeStateMachine destinationDatanodeDatanodeStateMachine = destinationDatanode.getDatanodeStateMachine(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java index 853f2cd71a71..00a338bf337c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java @@ -39,6 +39,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB; import org.junit.AfterClass; @@ -143,9 +144,12 @@ public void testIfCloseContainerCommandHandlerIsInvoked() throws Exception { .getCloseContainerHandler(); int lastInvocationCount = closeContainerHandler.getInvocationCount(); //send the order to close the container + SCMCommand command = new CloseContainerCommand( + containerID, pipeline.getId()); + command.setTerm( + cluster.getStorageContainerManager().getScmContext().getTerm()); cluster.getStorageContainerManager().getScmNodeManager() - .addDatanodeCommand(datanodeDetails.getUuid(), - new CloseContainerCommand(containerID, pipeline.getId())); + .addDatanodeCommand(datanodeDetails.getUuid(), command); GenericTestUtils .waitFor(() -> isContainerClosed(cluster, containerID, datanodeDetails), 500, 5 * 1000); @@ -191,9 +195,12 @@ public void testCloseContainerViaStandAlone() // Send the order to close the container, give random pipeline id so that // the container will not be closed via RATIS + SCMCommand command = new CloseContainerCommand( + containerID, pipeline.getId()); + command.setTerm( + cluster.getStorageContainerManager().getScmContext().getTerm()); cluster.getStorageContainerManager().getScmNodeManager() - .addDatanodeCommand(datanodeDetails.getUuid(), - new CloseContainerCommand(containerID, pipeline.getId())); + .addDatanodeCommand(datanodeDetails.getUuid(), command); //double check if it's really closed (waitFor also throws an exception) // TODO: change the below line after implementing QUASI_CLOSED to CLOSED @@ -242,9 +249,12 @@ public void testCloseContainerViaRatis() throws IOException, for (DatanodeDetails details : datanodes) { Assert.assertFalse(isContainerClosed(cluster, containerID, details)); //send the order to close the container + SCMCommand command = new CloseContainerCommand( + containerID, pipeline.getId()); + command.setTerm( + cluster.getStorageContainerManager().getScmContext().getTerm()); cluster.getStorageContainerManager().getScmNodeManager() - .addDatanodeCommand(details.getUuid(), - new CloseContainerCommand(containerID, pipeline.getId())); + .addDatanodeCommand(details.getUuid(), command); int index = cluster.getHddsDatanodeIndex(details); Container dnContainer = cluster.getHddsDatanodes().get(index) .getDatanodeStateMachine().getContainer().getContainerSet() @@ -319,9 +329,12 @@ public void testQuasiCloseTransitionViaRatis() // Send close container command from SCM to datanode with forced flag as // true + SCMCommand command = new CloseContainerCommand( + containerID, pipeline.getId(), true); + command.setTerm( + cluster.getStorageContainerManager().getScmContext().getTerm()); cluster.getStorageContainerManager().getScmNodeManager() - .addDatanodeCommand(datanodeDetails.getUuid(), - new CloseContainerCommand(containerID, pipeline.getId(), true)); + .addDatanodeCommand(datanodeDetails.getUuid(), command); GenericTestUtils .waitFor(() -> isContainerClosed( cluster, containerID, datanodeDetails), 500, 5 * 1000); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java index 8bd054bfe51a..870486a7f54c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java @@ -37,6 +37,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.test.GenericTestUtils; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE; @@ -119,9 +120,12 @@ public void test() throws Exception { DatanodeDetails datanodeDetails = cluster.getHddsDatanodes().get(0).getDatanodeDetails(); //send the order to close the container + SCMCommand command = new CloseContainerCommand( + containerId.getId(), pipeline.getId()); + command.setTerm( + cluster.getStorageContainerManager().getScmContext().getTerm()); cluster.getStorageContainerManager().getScmNodeManager() - .addDatanodeCommand(datanodeDetails.getUuid(), - new CloseContainerCommand(containerId.getId(), pipeline.getId())); + .addDatanodeCommand(datanodeDetails.getUuid(), command); GenericTestUtils.waitFor(() -> isContainerClosed(cluster, containerId.getId()), diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java index 61c33696c865..40998e1868c2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java @@ -39,6 +39,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.test.GenericTestUtils; import org.junit.AfterClass; import org.junit.Assert; @@ -132,8 +133,11 @@ public void testDeleteContainerRequestHandlerOnClosedContainer() cluster.getStorageContainerManager().getScmNodeManager(); //send the order to close the container - nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), - new CloseContainerCommand(containerId.getId(), pipeline.getId())); + SCMCommand command = new CloseContainerCommand( + containerId.getId(), pipeline.getId()); + command.setTerm( + cluster.getStorageContainerManager().getScmContext().getTerm()); + nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command); GenericTestUtils.waitFor(() -> isContainerClosed(hddsDatanodeService, containerId.getId()), @@ -148,8 +152,10 @@ public void testDeleteContainerRequestHandlerOnClosedContainer() containerId.getId())); // send delete container to the datanode - nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), - new DeleteContainerCommand(containerId.getId(), false)); + command = new DeleteContainerCommand(containerId.getId(), false); + command.setTerm( + cluster.getStorageContainerManager().getScmContext().getTerm()); + nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command); GenericTestUtils.waitFor(() -> isContainerDeleted(hddsDatanodeService, containerId.getId()), @@ -183,8 +189,11 @@ public void testDeleteContainerRequestHandlerOnOpenContainer() cluster.getStorageContainerManager().getScmNodeManager(); // Send delete container command with force flag set to false. - nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), - new DeleteContainerCommand(containerId.getId(), false)); + SCMCommand command = new DeleteContainerCommand( + containerId.getId(), false); + command.setTerm( + cluster.getStorageContainerManager().getScmContext().getTerm()); + nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command); // Here it should not delete it, and the container should exist in the // containerset @@ -205,9 +214,10 @@ public void testDeleteContainerRequestHandlerOnOpenContainer() // Now delete container with force flag set to true. now it should delete // container - - nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), - new DeleteContainerCommand(containerId.getId(), true)); + command = new DeleteContainerCommand(containerId.getId(), true); + command.setTerm( + cluster.getStorageContainerManager().getScmContext().getTerm()); + nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command); GenericTestUtils.waitFor(() -> isContainerDeleted(hddsDatanodeService, containerId.getId()), diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java index 1bb0a0b95070..5f9cc2c2604a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.scm.net.NodeSchema; @@ -172,6 +173,7 @@ public static void setUp() throws Exception { configurator.setScmNodeManager(nodeManager); configurator.setNetworkTopology(clusterMap); configurator.setSCMHAManager(MockSCMHAManager.getInstance(true)); + configurator.setScmContext(SCMContext.emptyContext()); scm = TestUtils.getScm(conf, configurator); scm.start(); scm.exitSafeMode(); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java index 77e5a387971f..0cc4926bdf35 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerManagerV2; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode; @@ -44,8 +45,8 @@ public class ReconIncrementalContainerReportHandler ReconIncrementalContainerReportHandler.class); public ReconIncrementalContainerReportHandler(NodeManager nodeManager, - ContainerManagerV2 containerManager) { - super(nodeManager, containerManager); + ContainerManagerV2 containerManager, SCMContext scmContext) { + super(nodeManager, containerManager, scmContext); } @Override diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java index d7a6104cf8b0..624d7820f59a 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.node.SCMNodeManager; import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; @@ -66,7 +67,8 @@ public ReconNodeManager(OzoneConfiguration conf, EventPublisher eventPublisher, NetworkTopology networkTopology, Table nodeDB) { - super(conf, scmStorageConfig, eventPublisher, networkTopology); + super(conf, scmStorageConfig, eventPublisher, networkTopology, + SCMContext.emptyContext()); this.nodeDB = nodeDB; loadExistingNodes(); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineReportHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineReportHandler.java index 246d9baf74ee..589de00269f7 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineReportHandler.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineReportHandler.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; @@ -46,9 +47,10 @@ public class ReconPipelineReportHandler extends PipelineReportHandler { public ReconPipelineReportHandler(SafeModeManager scmSafeModeManager, PipelineManager pipelineManager, + SCMContext scmContext, ConfigurationSource conf, StorageContainerServiceProvider scmServiceProvider) { - super(scmSafeModeManager, pipelineManager, conf); + super(scmSafeModeManager, pipelineManager, scmContext, conf); this.scmServiceProvider = scmServiceProvider; } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java index 14afa2fba111..a406779187cc 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory; import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics; import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.scm.node.DeadNodeHandler; @@ -78,6 +79,7 @@ public class ReconStorageContainerManagerFacade private final OzoneConfiguration ozoneConfiguration; private final ReconDatanodeProtocolServer datanodeProtocolServer; private final EventQueue eventQueue; + private final SCMContext scmContext; private final SCMStorageConfig scmStorageConfig; private final DBStore dbStore; @@ -98,6 +100,7 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf, throws IOException { this.eventQueue = new EventQueue(); eventQueue.setSilent(true); + this.scmContext = SCMContext.emptyContext(); this.ozoneConfiguration = getReconScmConfiguration(conf); this.scmStorageConfig = new ReconStorageConfig(conf); this.clusterMap = new NetworkTopologyImpl(conf); @@ -131,11 +134,11 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf, SafeModeManager safeModeManager = new ReconSafeModeManager(); ReconPipelineReportHandler pipelineReportHandler = - new ReconPipelineReportHandler( - safeModeManager, pipelineManager, conf, scmServiceProvider); + new ReconPipelineReportHandler(safeModeManager, + pipelineManager, scmContext, conf, scmServiceProvider); PipelineActionHandler pipelineActionHandler = - new PipelineActionHandler(pipelineManager, conf); + new PipelineActionHandler(pipelineManager, scmContext, conf); StaleNodeHandler staleNodeHandler = new StaleNodeHandler(nodeManager, pipelineManager, conf); @@ -146,10 +149,11 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf, new ReconContainerReportHandler(nodeManager, containerManager); IncrementalContainerReportHandler icrHandler = - new ReconIncrementalContainerReportHandler(nodeManager, - containerManager); + new ReconIncrementalContainerReportHandler(nodeManager, + containerManager, scmContext); CloseContainerEventHandler closeContainerHandler = - new CloseContainerEventHandler(pipelineManager, containerManager); + new CloseContainerEventHandler( + pipelineManager, containerManager, scmContext); ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); ReconNewNodeHandler newNodeHandler = new ReconNewNodeHandler(nodeManager); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java index 15d16cb86dcc..f3592bcb8119 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.scm.node.NodeManager; @@ -76,8 +77,8 @@ public void setUp() throws Exception { scmStorageConfig = new ReconStorageConfig(conf); NetworkTopology clusterMap = new NetworkTopologyImpl(conf); EventQueue eventQueue = new EventQueue(); - NodeManager nodeManager = - new SCMNodeManager(conf, scmStorageConfig, eventQueue, clusterMap); + NodeManager nodeManager = new SCMNodeManager(conf, scmStorageConfig, + eventQueue, clusterMap, SCMContext.emptyContext()); pipelineManager = new ReconPipelineManager(conf, nodeManager, ReconSCMDBDefinition.PIPELINES.getTable(store), eventQueue); containerManager = new ReconContainerManager( diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java index 064c85e2f461..0b0bce5ffa0c 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.scm.node.NodeManager; @@ -83,14 +84,14 @@ public void testProcessICR() throws IOException, NodeNotFoundException { NetworkTopology clusterMap = new NetworkTopologyImpl(conf); EventQueue eventQueue = new EventQueue(); SCMStorageConfig storageConfig = new SCMStorageConfig(conf); - NodeManager nodeManager = - new SCMNodeManager(conf, storageConfig, eventQueue, clusterMap); + NodeManager nodeManager = new SCMNodeManager(conf, storageConfig, + eventQueue, clusterMap, SCMContext.emptyContext()); nodeManager.register(datanodeDetails, null, null); ReconContainerManager containerManager = getContainerManager(); ReconIncrementalContainerReportHandler reconIcr = new ReconIncrementalContainerReportHandler(nodeManager, - null); + null, SCMContext.emptyContext()); EventPublisher eventPublisherMock = mock(EventPublisher.class); reconIcr.onMessage(reportMock, eventPublisherMock); @@ -133,7 +134,7 @@ public void testProcessICRStateMismatch() throws IOException { when(reportMock.getReport()).thenReturn(containerReport); ReconIncrementalContainerReportHandler reconIcr = new ReconIncrementalContainerReportHandler(nodeManagerMock, - null); + null, SCMContext.emptyContext()); reconIcr.onMessage(reportMock, mock(EventPublisher.class)); assertTrue(containerManager.containerExist(containerID)); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java index b190810db460..2c4226fe53b0 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.scm.node.NodeManager; @@ -109,8 +110,8 @@ public void testInitialize() throws IOException { NetworkTopology clusterMap = new NetworkTopologyImpl(conf); EventQueue eventQueue = new EventQueue(); - NodeManager nodeManager = - new SCMNodeManager(conf, scmStorageConfig, eventQueue, clusterMap); + NodeManager nodeManager = new SCMNodeManager(conf, scmStorageConfig, + eventQueue, clusterMap, SCMContext.emptyContext()); try (ReconPipelineManager reconPipelineManager = new ReconPipelineManager(conf, nodeManager, @@ -145,8 +146,8 @@ public void testAddPipeline() throws IOException { Pipeline pipeline = getRandomPipeline(); NetworkTopology clusterMap = new NetworkTopologyImpl(conf); EventQueue eventQueue = new EventQueue(); - NodeManager nodeManager = - new SCMNodeManager(conf, scmStorageConfig, eventQueue, clusterMap); + NodeManager nodeManager = new SCMNodeManager(conf, scmStorageConfig, + eventQueue, clusterMap, SCMContext.emptyContext()); ReconPipelineManager reconPipelineManager = new ReconPipelineManager(conf, nodeManager, diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineReportHandler.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineReportHandler.java index e72df36cce73..fcf67a4721fa 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineReportHandler.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineReportHandler.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -62,7 +63,8 @@ public void testProcessPipelineReport() throws IOException { ReconPipelineReportHandler handler = new ReconPipelineReportHandler(new ReconSafeModeManager(), - reconPipelineManagerMock, configuration, scmServiceProviderMock); + reconPipelineManagerMock, SCMContext.emptyContext(), + configuration, scmServiceProviderMock); EventPublisher eventPublisherMock = mock(EventPublisher.class); PipelineReport report = mock(PipelineReport.class);