diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/MoveDataNodePair.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/MoveDataNodePair.java new file mode 100644 index 000000000000..578134e64cb3 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/MoveDataNodePair.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + + +package org.apache.hadoop.hdds.scm.container.common.helpers; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.ratis.util.Preconditions; + +import java.io.IOException; + +/** + * MoveDataNodePair encapsulates the source and target + * datanodes of a move option. + */ +public class MoveDataNodePair { + /** + * source datanode of current move option. + */ + private final DatanodeDetails src; + + /** + * target datanode of current move option. + */ + private final DatanodeDetails tgt; + + public MoveDataNodePair(DatanodeDetails src, DatanodeDetails tgt) { + this.src = src; + this.tgt = tgt; + } + + public DatanodeDetails getTgt() { + return tgt; + } + + public DatanodeDetails getSrc() { + return src; + } + + public HddsProtos.MoveDataNodePairProto getProtobufMessage(int clientVersion) + throws IOException { + HddsProtos.MoveDataNodePairProto.Builder builder = + HddsProtos.MoveDataNodePairProto.newBuilder() + .setSrc(src.toProto(clientVersion)) + .setTgt(tgt.toProto(clientVersion)); + return builder.build(); + } + + public static MoveDataNodePair getFromProtobuf( + HddsProtos.MoveDataNodePairProto mdnpp) { + Preconditions.assertNotNull(mdnpp, "MoveDataNodePair is null"); + DatanodeDetails src = DatanodeDetails.getFromProtoBuf(mdnpp.getSrc()); + DatanodeDetails tgt = DatanodeDetails.getFromProtoBuf(mdnpp.getTgt()); + return new MoveDataNodePair(src, tgt); + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java index 95dc4776e8fe..a63d90d0f4fd 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair; import org.apache.hadoop.hdds.security.x509.certificate.CertInfo; import org.apache.hadoop.hdds.utils.DBStoreHAManager; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -147,4 +148,9 @@ public interface SCMMetadataStore extends DBStoreHAManager { * Table that maintains sequence id information. */ Table getSequenceIdTable(); + + /** + * Table that maintains move information. + */ + Table getMoveTable(); } diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto index f115d0c59790..ee5035429fcd 100644 --- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto +++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto @@ -62,6 +62,11 @@ message ExtendedDatanodeDetailsProto { optional string buildDate = 5; } +message MoveDataNodePairProto { + required DatanodeDetailsProto src = 1; + required DatanodeDetailsProto tgt = 2; +} + /** Proto message encapsulating information required to uniquely identify a OzoneManager. diff --git a/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto index 8066c8b6070b..8e4237ee59f6 100644 --- a/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto @@ -26,6 +26,7 @@ enum RequestType { BLOCK = 3; SEQUENCE_ID = 4; CERT_STORE = 5; + MOVE = 6; } message Method { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index 6a009f34e58b..1673f30e7b3a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -32,10 +32,8 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.container.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.ContainerManagerV2; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.exceptions.SCMException; @@ -69,7 +67,6 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { private final StorageContainerManager scm; private final PipelineManager pipelineManager; - private final ContainerManagerV2 containerManager; private final WritableContainerFactory writableContainerFactory; private final long containerSize; @@ -78,7 +75,6 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { private final SCMBlockDeletingService blockDeletingService; private ObjectName mxBean; - private final PipelineChoosePolicy pipelineChoosePolicy; private final SequenceIdGenerator sequenceIdGen; private ScmBlockDeletingServiceMetrics metrics; /** @@ -94,8 +90,6 @@ public BlockManagerImpl(final ConfigurationSource conf, Objects.requireNonNull(scm, "SCM cannot be null"); this.scm = scm; this.pipelineManager = scm.getPipelineManager(); - this.containerManager = scm.getContainerManager(); - this.pipelineChoosePolicy = scm.getPipelineChoosePolicy(); this.sequenceIdGen = scm.getSequenceIdGen(); this.containerSize = (long)conf.getStorageSize( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, @@ -123,7 +117,7 @@ public BlockManagerImpl(final ConfigurationSource conf, OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); blockDeletingService = - new SCMBlockDeletingService(deletedBlockLog, containerManager, + new SCMBlockDeletingService(deletedBlockLog, scm.getScmNodeManager(), scm.getEventQueue(), scm.getScmContext(), scm.getSCMServiceManager(), svcInterval, serviceTimeout, conf, metrics); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java index 7776c56cb1a2..cd50e7d18d3e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.hdds.scm.ScmConfig; -import org.apache.hadoop.hdds.scm.container.ContainerManagerV2; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.ha.SCMService; @@ -67,7 +66,6 @@ public class SCMBlockDeletingService extends BackgroundService private static final int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 1; private final DeletedBlockLog deletedBlockLog; - private final ContainerManagerV2 containerManager; private final NodeManager nodeManager; private final EventPublisher eventPublisher; private final SCMContext scmContext; @@ -83,14 +81,13 @@ public class SCMBlockDeletingService extends BackgroundService @SuppressWarnings("parameternumber") public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog, - ContainerManagerV2 containerManager, NodeManager nodeManager, + NodeManager nodeManager, EventPublisher eventPublisher, SCMContext scmContext, SCMServiceManager serviceManager, Duration interval, long serviceTimeout, ConfigurationSource conf, ScmBlockDeletingServiceMetrics metrics) { super("SCMBlockDeletingService", interval.toMillis(), TimeUnit.MILLISECONDS, BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout); this.deletedBlockLog = deletedBlockLog; - this.containerManager = containerManager; this.nodeManager = nodeManager; this.eventPublisher = eventPublisher; this.scmContext = scmContext; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java index bf8c3b933b22..26368b46e463 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.container; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; + import java.util.Set; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 97fda610e13d..009c85026048 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.container; import java.io.IOException; +import java.lang.reflect.Proxy; import java.time.Clock; import java.time.Duration; import java.util.ArrayList; @@ -27,6 +28,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -41,8 +43,6 @@ import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; @@ -55,18 +55,27 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; +import static org.apache.hadoop.hdds.protocol.proto. + SCMRatisProtocol.RequestType.MOVE; import org.apache.hadoop.hdds.scm.ContainerPlacementStatus; import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.container.replication.ReplicationManagerMetrics; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler; +import org.apache.hadoop.hdds.scm.ha.SCMHAManager; +import org.apache.hadoop.hdds.scm.ha.SCMRatisServer; import org.apache.hadoop.hdds.scm.ha.SCMService; import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; +import org.apache.hadoop.hdds.scm.metadata.Replicate; +import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair; +import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; @@ -74,6 +83,8 @@ import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.hdds.utils.db.Table; +import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION; import com.google.protobuf.GeneratedMessage; import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE; @@ -134,12 +145,6 @@ public class ReplicationManager implements SCMService { */ private final Map> inflightDeletion; - /** - * This is used for tracking container move commands - * which are not yet complete. - */ - private final Map> inflightMove; /** * This is used for indicating the result of move option and @@ -150,7 +155,9 @@ public enum MoveResult { // both replication and deletion are completed COMPLETED, // RM is not running - RM_NOT_RUNNING, + FAIL_NOT_RUNNING, + // RM is not ratis leader + FAIL_NOT_LEADER, // replication fail because the container does not exist in src REPLICATION_FAIL_NOT_EXIST_IN_SOURCE, // replication fail because the container exists in target @@ -184,7 +191,9 @@ public enum MoveResult { //unexpected action, remove src at inflightReplication UNEXPECTED_REMOVE_SOURCE_AT_INFLIGHT_REPLICATION, //unexpected action, remove target at inflightDeletion - UNEXPECTED_REMOVE_TARGET_AT_INFLIGHT_DELETION + UNEXPECTED_REMOVE_TARGET_AT_INFLIGHT_DELETION, + //write DB error + FAIL_CAN_NOT_RECORD_TO_DB } /** @@ -232,6 +241,11 @@ public enum MoveResult { */ private ReplicationManagerMetrics metrics; + /** + * scheduler move option. + */ + private final MoveScheduler moveScheduler; + /** * Constructs ReplicationManager instance with the given configuration. * @@ -242,13 +256,16 @@ public enum MoveResult { */ @SuppressWarnings("parameternumber") public ReplicationManager(final ConfigurationSource conf, - final ContainerManagerV2 containerManager, - final PlacementPolicy containerPlacement, - final EventPublisher eventPublisher, - final SCMContext scmContext, - final SCMServiceManager serviceManager, - final NodeManager nodeManager, - final java.time.Clock clock) { + final ContainerManagerV2 containerManager, + final PlacementPolicy containerPlacement, + final EventPublisher eventPublisher, + final SCMContext scmContext, + final SCMServiceManager serviceManager, + final NodeManager nodeManager, + final java.time.Clock clock, + final SCMHAManager scmhaManager, + final Table moveTable) + throws IOException { this.containerManager = containerManager; this.containerPlacement = containerPlacement; this.eventPublisher = eventPublisher; @@ -258,7 +275,6 @@ public ReplicationManager(final ConfigurationSource conf, this.running = false; this.inflightReplication = new ConcurrentHashMap<>(); this.inflightDeletion = new ConcurrentHashMap<>(); - this.inflightMove = new ConcurrentHashMap<>(); this.inflightMoveFuture = new ConcurrentHashMap<>(); this.minHealthyForMaintenance = rmConf.getMaintenanceReplicaMinimum(); this.clock = clock; @@ -269,6 +285,11 @@ public ReplicationManager(final ConfigurationSource conf, TimeUnit.MILLISECONDS); this.metrics = null; + moveScheduler = new MoveSchedulerImpl.Builder() + .setDBTransactionBuffer(scmhaManager.getDBTransactionBuffer()) + .setRatisServer(scmhaManager.getRatisServer()) + .setMoveTable(moveTable).build(); + // register ReplicationManager to SCMServiceManager. serviceManager.register(this); @@ -317,9 +338,6 @@ public synchronized void stop() { LOG.info("Stopping Replication Monitor Thread."); inflightReplication.clear(); inflightDeletion.clear(); - //TODO: replicate inflight move through ratis - inflightMove.clear(); - inflightMoveFuture.clear(); running = false; metrics.unRegister(); notifyAll(); @@ -581,15 +599,15 @@ private void updateMoveIfNeeded(final boolean isUnhealthy, throws ContainerNotFoundException { // make sure inflightMove contains the container ContainerID id = container.containerID(); - if (!inflightMove.containsKey(id)) { - return; - } // make sure the datanode , which is removed from inflightActions, // is source or target datanode. - Pair kv = inflightMove.get(id); - final boolean isSource = kv.getKey().equals(dn); - final boolean isTarget = kv.getValue().equals(dn); + MoveDataNodePair kv = moveScheduler.getMoveDataNodePair(id); + if (kv == null) { + return; + } + final boolean isSource = kv.getSrc().equals(dn); + final boolean isTarget = kv.getTgt().equals(dn); if (!isSource && !isTarget) { return; } @@ -610,50 +628,50 @@ private void updateMoveIfNeeded(final boolean isUnhealthy, */ if (isSource && isInflightReplication) { - inflightMoveFuture.get(id).complete( + //if RM is reinitialize, inflightMove will be restored, + //but inflightMoveFuture not. so there will be a case that + //container is in inflightMove, but not in inflightMoveFuture. + compleleteMoveFutureWithResult(id, MoveResult.UNEXPECTED_REMOVE_SOURCE_AT_INFLIGHT_REPLICATION); - inflightMove.remove(id); - inflightMoveFuture.remove(id); + moveScheduler.completeMove(id.getProtobuf()); return; } if (isTarget && !isInflightReplication) { - inflightMoveFuture.get(id).complete( + compleleteMoveFutureWithResult(id, MoveResult.UNEXPECTED_REMOVE_TARGET_AT_INFLIGHT_DELETION); - inflightMove.remove(id); - inflightMoveFuture.remove(id); + moveScheduler.completeMove(id.getProtobuf()); return; } if (!(isInflightReplication && isCompleted)) { if (isInflightReplication) { if (isUnhealthy) { - inflightMoveFuture.get(id).complete( + compleleteMoveFutureWithResult(id, MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY); } else if (isNotInService) { - inflightMoveFuture.get(id).complete( + compleleteMoveFutureWithResult(id, MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE); } else { - inflightMoveFuture.get(id).complete( + compleleteMoveFutureWithResult(id, MoveResult.REPLICATION_FAIL_TIME_OUT); } } else { if (isUnhealthy) { - inflightMoveFuture.get(id).complete( + compleleteMoveFutureWithResult(id, MoveResult.DELETION_FAIL_NODE_UNHEALTHY); } else if (isTimeout) { - inflightMoveFuture.get(id).complete( + compleleteMoveFutureWithResult(id, MoveResult.DELETION_FAIL_TIME_OUT); } else if (isNotInService) { - inflightMoveFuture.get(id).complete( + compleleteMoveFutureWithResult(id, MoveResult.DELETION_FAIL_NODE_NOT_IN_SERVICE); } else { - inflightMoveFuture.get(id).complete( + compleleteMoveFutureWithResult(id, MoveResult.COMPLETED); } } - inflightMove.remove(id); - inflightMoveFuture.remove(id); + moveScheduler.completeMove(id.getProtobuf()); } else { deleteSrcDnForMove(container, containerManager.getContainerReplicas(id)); @@ -664,15 +682,32 @@ private void updateMoveIfNeeded(final boolean isUnhealthy, * add a move action for a given container. * * @param cid Container to move - * @param srcDn datanode to move from - * @param targetDn datanode to move to + * @param src source datanode + * @param tgt target datanode + */ + public CompletableFuture move(ContainerID cid, + DatanodeDetails src, DatanodeDetails tgt) + throws ContainerNotFoundException, NodeNotFoundException { + return move(cid, new MoveDataNodePair(src, tgt)); + } + + /** + * add a move action for a given container. + * + * @param cid Container to move + * @param mp MoveDataNodePair which contains source and target datanodes */ public CompletableFuture move(ContainerID cid, - DatanodeDetails srcDn, DatanodeDetails targetDn) + MoveDataNodePair mp) throws ContainerNotFoundException, NodeNotFoundException { CompletableFuture ret = new CompletableFuture<>(); if (!isRunning()) { - ret.complete(MoveResult.RM_NOT_RUNNING); + ret.complete(MoveResult.FAIL_NOT_RUNNING); + return ret; + } + + if (!scmContext.isLeader()) { + ret.complete(MoveResult.FAIL_NOT_LEADER); return ret; } @@ -693,6 +728,8 @@ public CompletableFuture move(ContainerID cid, * of deletion always depends on placement policy */ + DatanodeDetails srcDn = mp.getSrc(); + DatanodeDetails targetDn = mp.getTgt(); NodeStatus currentNodeStat = nodeManager.getNodeStatus(srcDn); NodeState healthStat = currentNodeStat.getHealth(); NodeOperationalState operationalState = @@ -777,7 +814,15 @@ public CompletableFuture move(ContainerID cid, return ret; } - inflightMove.putIfAbsent(cid, new ImmutablePair<>(srcDn, targetDn)); + try { + moveScheduler.startMove(cid.getProtobuf(), + mp.getProtobufMessage(CURRENT_VERSION)); + } catch (IOException e) { + LOG.warn("Exception while starting move {}", cid); + ret.complete(MoveResult.FAIL_CAN_NOT_RECORD_TO_DB); + return ret; + } + inflightMoveFuture.putIfAbsent(cid, ret); sendReplicateCommand(cif, targetDn, Collections.singletonList(srcDn)); } @@ -1213,47 +1258,45 @@ private void handleOverReplicatedContainer(final ContainerInfo container, private void deleteSrcDnForMove(final ContainerInfo cif, final Set replicaSet) { final ContainerID cid = cif.containerID(); - if (inflightMove.containsKey(cid)) { - Pair movePair = - inflightMove.get(cid); - final DatanodeDetails srcDn = movePair.getKey(); - ContainerReplicaCount replicaCount = - getContainerReplicaCount(cif, replicaSet); - - if(!replicaSet.stream() - .anyMatch(r -> r.getDatanodeDetails().equals(srcDn))){ - // if the target is present but source disappears somehow, - // we can consider move is successful. - inflightMoveFuture.get(cid).complete(MoveResult.COMPLETED); - inflightMove.remove(cid); - inflightMoveFuture.remove(cid); - return; - } + MoveDataNodePair movePair = moveScheduler.getMoveDataNodePair(cid); + if (movePair == null) { + return; + } + final DatanodeDetails srcDn = movePair.getSrc(); + ContainerReplicaCount replicaCount = + getContainerReplicaCount(cif, replicaSet); + + if(!replicaSet.stream() + .anyMatch(r -> r.getDatanodeDetails().equals(srcDn))){ + // if the target is present but source disappears somehow, + // we can consider move is successful. + compleleteMoveFutureWithResult(cid, MoveResult.COMPLETED); + moveScheduler.completeMove(cid.getProtobuf()); + return; + } - int replicationFactor = - cif.getReplicationConfig().getRequiredNodes(); - ContainerPlacementStatus currentCPS = - getPlacementStatus(replicaSet, replicationFactor); - Set newReplicaSet = replicaSet. - stream().collect(Collectors.toSet()); - newReplicaSet.removeIf(r -> r.getDatanodeDetails().equals(srcDn)); - ContainerPlacementStatus newCPS = - getPlacementStatus(newReplicaSet, replicationFactor); - - if (replicaCount.isOverReplicated() && - isPlacementStatusActuallyEqual(currentCPS, newCPS)) { - sendDeleteCommand(cif, srcDn, true); - } else { - // if source and target datanode are both in the replicaset, - // but we can not delete source datanode for now (e.g., - // there is only 3 replicas or not policy-statisfied , etc.), - // we just complete the future without sending a delete command. - LOG.info("can not remove source replica after successfully " + - "replicated to target datanode"); - inflightMoveFuture.get(cid).complete(MoveResult.DELETE_FAIL_POLICY); - inflightMove.remove(cid); - inflightMoveFuture.remove(cid); - } + int replicationFactor = + cif.getReplicationConfig().getRequiredNodes(); + ContainerPlacementStatus currentCPS = + getPlacementStatus(replicaSet, replicationFactor); + Set newReplicaSet = replicaSet. + stream().collect(Collectors.toSet()); + newReplicaSet.removeIf(r -> r.getDatanodeDetails().equals(srcDn)); + ContainerPlacementStatus newCPS = + getPlacementStatus(newReplicaSet, replicationFactor); + + if (replicaCount.isOverReplicated() && + isPlacementStatusActuallyEqual(currentCPS, newCPS)) { + sendDeleteCommand(cif, srcDn, true); + } else { + // if source and target datanode are both in the replicaset, + // but we can not delete source datanode for now (e.g., + // there is only 3 replicas or not policy-statisfied , etc.), + // we just complete the future without sending a delete command. + LOG.info("can not remove source replica after successfully " + + "replicated to target datanode"); + compleleteMoveFutureWithResult(cid, MoveResult.DELETE_FAIL_POLICY); + moveScheduler.completeMove(cid.getProtobuf()); } } @@ -1680,6 +1723,9 @@ public void notifyStatusChanged() { lastTimeToBeReadyInMillis = clock.millis(); serviceStatus = ServiceStatus.RUNNING; } + //now, as the current scm is leader and it`s state is up-to-date, + //we need to take some action about replicated inflight move options. + onLeaderReadyAndOutOfSafeMode(); } else { serviceStatus = ServiceStatus.PAUSING; } @@ -1717,8 +1763,244 @@ public Map> getInflightDeletion() { return inflightDeletion; } - public Map> getInflightMove() { - return inflightMove; + public Map> getInflightMove() { + return inflightMoveFuture; } -} \ No newline at end of file + + /** + * make move option HA aware. + */ + public interface MoveScheduler { + /** + * completeMove a move action for a given container. + * + * @param contianerIDProto Container to which the move option is finished + */ + @Replicate + void completeMove(HddsProtos.ContainerID contianerIDProto); + + /** + * start a move action for a given container. + * + * @param contianerIDProto Container to move + * @param mp encapsulates the source and target datanode infos + */ + @Replicate + void startMove(HddsProtos.ContainerID contianerIDProto, + HddsProtos.MoveDataNodePairProto mp) throws IOException; + + /** + * get the MoveDataNodePair of the giver container. + * + * @param cid Container to move + * @return null if cid is not found in MoveScheduler, + * or the corresponding MoveDataNodePair + */ + MoveDataNodePair getMoveDataNodePair(ContainerID cid); + + /** + * Reinitialize the MoveScheduler with DB if become leader. + */ + void reinitialize(Table moveTable) throws IOException; + + /** + * get all the inflight move info. + */ + Map getInflightMove(); + } + + /** + * @return the moveScheduler of RM + */ + public MoveScheduler getMoveScheduler() { + return moveScheduler; + } + + /** + * Ratis based MoveScheduler, db operations are stored in + * DBTransactionBuffer until a snapshot is taken. + */ + public static final class MoveSchedulerImpl implements MoveScheduler { + private Table moveTable; + private final DBTransactionBuffer transactionBuffer; + /** + * This is used for tracking container move commands + * which are not yet complete. + */ + private final Map inflightMove; + + private MoveSchedulerImpl(Table moveTable, + DBTransactionBuffer transactionBuffer) throws IOException { + this.moveTable = moveTable; + this.transactionBuffer = transactionBuffer; + this.inflightMove = new ConcurrentHashMap<>(); + initialize(); + } + + @Override + public void completeMove(HddsProtos.ContainerID contianerIDProto) { + ContainerID cid = null; + try { + cid = ContainerID.getFromProtobuf(contianerIDProto); + transactionBuffer.removeFromBuffer(moveTable, cid); + } catch (IOException e) { + LOG.warn("Exception while completing move {}", cid); + } + inflightMove.remove(cid); + } + + @Override + public void startMove(HddsProtos.ContainerID contianerIDProto, + HddsProtos.MoveDataNodePairProto mdnpp) + throws IOException { + ContainerID cid = null; + MoveDataNodePair mp = null; + try { + cid = ContainerID.getFromProtobuf(contianerIDProto); + mp = MoveDataNodePair.getFromProtobuf(mdnpp); + if(!inflightMove.containsKey(cid)) { + transactionBuffer.addToBuffer(moveTable, cid, mp); + inflightMove.putIfAbsent(cid, mp); + } + } catch (IOException e) { + LOG.warn("Exception while completing move {}", cid); + } + } + + @Override + public MoveDataNodePair getMoveDataNodePair(ContainerID cid) { + return inflightMove.get(cid); + } + + @Override + public void reinitialize(Table mt) throws IOException { + moveTable = mt; + inflightMove.clear(); + initialize(); + } + + private void initialize() throws IOException { + TableIterator> + iterator = moveTable.iterator(); + + while (iterator.hasNext()) { + Table.KeyValue kv = iterator.next(); + final ContainerID cid = kv.getKey(); + final MoveDataNodePair mp = kv.getValue(); + Preconditions.assertNotNull(cid, + "moved container id should not be null"); + Preconditions.assertNotNull(mp, + "MoveDataNodePair container id should not be null"); + inflightMove.put(cid, mp); + } + } + + @Override + public Map getInflightMove() { + return inflightMove; + } + + /** + * Builder for Ratis based MoveSchedule. + */ + public static class Builder { + private Table moveTable; + private DBTransactionBuffer transactionBuffer; + private SCMRatisServer ratisServer; + + public Builder setRatisServer(final SCMRatisServer scmRatisServer) { + ratisServer = scmRatisServer; + return this; + } + + public Builder setMoveTable( + final Table mt) { + moveTable = mt; + return this; + } + + public Builder setDBTransactionBuffer(DBTransactionBuffer trxBuffer) { + transactionBuffer = trxBuffer; + return this; + } + + public MoveScheduler build() throws IOException { + Preconditions.assertNotNull(moveTable, "moveTable is null"); + Preconditions.assertNotNull(transactionBuffer, + "transactionBuffer is null"); + + final MoveScheduler impl = + new MoveSchedulerImpl(moveTable, transactionBuffer); + + final SCMHAInvocationHandler invocationHandler + = new SCMHAInvocationHandler(MOVE, impl, ratisServer); + + return (MoveScheduler) Proxy.newProxyInstance( + SCMHAInvocationHandler.class.getClassLoader(), + new Class[]{MoveScheduler.class}, + invocationHandler); + } + } + } + + /** + * when scm become LeaderReady and out of safe mode, some actions + * should be taken. for now , it is only used for handle replicated + * infligtht move. + */ + private void onLeaderReadyAndOutOfSafeMode() { + List needToRemove = new LinkedList<>(); + moveScheduler.getInflightMove().forEach((k, v) -> { + Set replicas; + ContainerInfo cif; + try { + replicas = containerManager.getContainerReplicas(k); + cif = containerManager.getContainer(k); + } catch (ContainerNotFoundException e) { + needToRemove.add(k.getProtobuf()); + LOG.error("can not find container {} " + + "while processing replicated move", k); + return; + } + boolean isSrcExist = replicas.stream() + .anyMatch(r -> r.getDatanodeDetails().equals(v.getSrc())); + boolean isTgtExist = replicas.stream() + .anyMatch(r -> r.getDatanodeDetails().equals(v.getTgt())); + + if(isSrcExist) { + if(isTgtExist) { + //the former scm leader may or may not send the deletion command + //before reelection.here, we just try to send the command again. + deleteSrcDnForMove(cif, replicas); + } else { + // resenting replication command is ok , no matter whether there is an + // on-going replication + sendReplicateCommand(cif, v.getTgt(), + Collections.singletonList(v.getSrc())); + } + } else { + // if container does not exist in src datanode, no matter it exists + // in target datanode, we can not take more actions to this option, + // so just remove it through ratis + needToRemove.add(k.getProtobuf()); + } + }); + + needToRemove.forEach(moveScheduler::completeMove); + } + + /** + * complete the CompletableFuture of the container in the given Map with + * a given MoveResult. + */ + private void compleleteMoveFutureWithResult(ContainerID cid, MoveResult mr){ + if(inflightMoveFuture.containsKey(cid)) { + inflightMoveFuture.get(cid).complete(mr); + inflightMoveFuture.remove(cid); + } + } +} + diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java index de33120b2ff8..382157545908 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java @@ -361,6 +361,8 @@ public void startServices() throws IOException { scm.getContainerManager().reinitialize(metadataStore.getContainerTable()); scm.getScmBlockManager().getDeletedBlockLog().reinitialize( metadataStore.getDeletedBlocksTXTable()); + scm.getReplicationManager().getMoveScheduler() + .reinitialize(metadataStore.getMoveTable()); if (OzoneSecurityUtil.isSecurityEnabled(conf)) { if (scm.getRootCertificateServer() != null) { scm.getRootCertificateServer().reinitialize(metadataStore); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/MoveDataNodePairCodec.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/MoveDataNodePairCodec.java new file mode 100644 index 000000000000..24601a551072 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/MoveDataNodePairCodec.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *      http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.hadoop.hdds.scm.metadata; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.MoveDataNodePairProto; +import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair; +import org.apache.hadoop.hdds.utils.db.Codec; + +import java.io.IOException; + +import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION; + +/** + * Codec to serialize / deserialize MoveDataNodePair. + */ + +public class MoveDataNodePairCodec implements Codec { + @Override + public byte[] toPersistedFormat(MoveDataNodePair mdnp) + throws IOException { + return mdnp.getProtobufMessage(CURRENT_VERSION).toByteArray(); + } + + @Override + public MoveDataNodePair fromPersistedFormat(byte[] rawData) + throws IOException { + MoveDataNodePairProto.Builder builder = + MoveDataNodePairProto.newBuilder( + MoveDataNodePairProto.PARSER.parseFrom(rawData)); + return MoveDataNodePair.getFromProtobuf(builder.build()); + } + + @Override + public MoveDataNodePair copyObject(MoveDataNodePair object) { + throw new UnsupportedOperationException(); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java index 7da36d10a9cf..b3e861b40ccb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair; import org.apache.hadoop.hdds.security.x509.certificate.CertInfo; import org.apache.hadoop.hdds.security.x509.crl.CRLInfoCodec; import org.apache.hadoop.hdds.utils.TransactionInfo; @@ -143,6 +144,16 @@ public class SCMDBDefinition implements DBDefinition { Long.class, new LongCodec()); + public static final DBColumnFamilyDefinition + MOVE = + new DBColumnFamilyDefinition<>( + "move", + ContainerID.class, + new ContainerIDCodec(), + MoveDataNodePair.class, + new MoveDataNodePairCodec()); + @Override public String getName() { return "scm.db"; @@ -157,6 +168,6 @@ public String getLocationConfigKey() { public DBColumnFamilyDefinition[] getColumnFamilies() { return new DBColumnFamilyDefinition[] {DELETED_BLOCKS, VALID_CERTS, VALID_SCM_CERTS, REVOKED_CERTS, REVOKED_CERTS_V2, PIPELINES, CONTAINERS, - TRANSACTIONINFO, CRLS, CRL_SEQUENCE_ID, SEQUENCE_ID}; + TRANSACTIONINFO, CRLS, CRL_SEQUENCE_ID, SEQUENCE_ID, MOVE}; } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java index 170fddeeffe3..799e1282027b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair; import org.apache.hadoop.hdds.security.x509.certificate.CertInfo; import org.apache.hadoop.hdds.utils.HAUtils; import org.apache.hadoop.hdds.utils.TransactionInfo; @@ -43,6 +44,7 @@ import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.CRLS; import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.CRL_SEQUENCE_ID; import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.DELETED_BLOCKS; +import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.MOVE; import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.PIPELINES; import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.REVOKED_CERTS; import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.REVOKED_CERTS_V2; @@ -84,6 +86,8 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore { private Table sequenceIdTable; + private Table moveTable; + private static final Logger LOG = LoggerFactory.getLogger(SCMMetadataStoreImpl.class); private DBStore store; @@ -166,6 +170,10 @@ public void start(OzoneConfiguration config) sequenceIdTable = SEQUENCE_ID.getTable(store); checkTableStatus(sequenceIdTable, SEQUENCE_ID.getName()); + + moveTable = MOVE.getTable(store); + + checkTableStatus(moveTable, MOVE.getName()); } } @@ -266,6 +274,11 @@ public Table getSequenceIdTable() { return sequenceIdTable; } + @Override + public Table getMoveTable() { + return moveTable; + } + private void checkTableStatus(Table table, String name) throws IOException { String logMessage = "Unable to get a reference to %s table. Cannot " + "continue."; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index d268f2c5f089..e9d439efedc6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -606,7 +606,9 @@ private void initializeSystemManagers(OzoneConfiguration conf, scmContext, serviceManager, scmNodeManager, - new MonotonicClock(ZoneOffset.UTC)); + new MonotonicClock(ZoneOffset.UTC), + scmHAManager, + getScmMetadataStore().getMoveTable()); } if(configurator.getScmSafeModeManager() != null) { scmSafeModeManager = configurator.getScmSafeModeManager(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java index d7fcde778146..631ab2b6475e 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.container; import com.google.common.primitives.Longs; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -31,18 +32,24 @@ import org.apache.hadoop.hdds.scm.container.ReplicationManager .ReplicationManagerConfiguration; import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair; import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault; import org.apache.hadoop.hdds.scm.container.ReplicationManager.MoveResult; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMHAManager; import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; +import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition; +import org.apache.hadoop.hdds.scm.metadata.SCMDBTransactionBufferImpl; +import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.hdds.scm.node.NodeStatus; -import org.apache.hadoop.hdds.scm.node.SCMNodeManager; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.hdds.utils.db.DBStoreBuilder; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.TestClock; @@ -52,6 +59,7 @@ import org.junit.Test; import org.mockito.Mockito; +import java.io.File; import java.io.IOException; import java.time.Instant; import java.time.ZoneId; @@ -95,24 +103,27 @@ public class TestReplicationManager { private DatanodeCommandHandler datanodeCommandHandler; private SimpleMockNodeManager nodeManager; private ContainerManagerV2 containerManager; - private OzoneConfiguration conf; - private SCMNodeManager scmNodeManager; private GenericTestUtils.LogCapturer scmLogs; + private SCMServiceManager serviceManager; private TestClock clock; + private File testDir; + private DBStore dbStore; @Before public void setup() throws IOException, InterruptedException, NodeNotFoundException { - conf = new OzoneConfiguration(); + OzoneConfiguration conf = new OzoneConfiguration(); conf.setTimeDuration( HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, 0, TimeUnit.SECONDS); - scmLogs = GenericTestUtils.LogCapturer.captureLogs(ReplicationManager.LOG); + scmLogs = GenericTestUtils.LogCapturer + .captureLogs(ReplicationManager.LOG); containerManager = Mockito.mock(ContainerManagerV2.class); nodeManager = new SimpleMockNodeManager(); eventQueue = new EventQueue(); containerStateManager = new ContainerStateManager(conf); + serviceManager = new SCMServiceManager(); datanodeCommandHandler = new DatanodeCommandHandler(); eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, datanodeCommandHandler); @@ -152,28 +163,29 @@ public void setup() Mockito.when(containerPlacementPolicy.validateContainerPlacement( Mockito.any(), Mockito.anyInt() - )).thenAnswer(invocation -> { - return new ContainerPlacementStatusDefault(2, 2, 3); - }); - - scmNodeManager = Mockito.mock(SCMNodeManager.class); - Mockito.when(scmNodeManager.getNodeStatus( - Mockito.any(DatanodeDetails.class))) - .thenReturn(NodeStatus.inServiceHealthy()); - + )).thenAnswer(invocation -> + new ContainerPlacementStatusDefault(2, 2, 3)); clock = new TestClock(Instant.now(), ZoneId.of("UTC")); createReplicationManager(new ReplicationManagerConfiguration()); } private void createReplicationManager(ReplicationManagerConfiguration rmConf) - throws InterruptedException { + throws InterruptedException, IOException { OzoneConfiguration config = new OzoneConfiguration(); + testDir = GenericTestUtils + .getTestDir(TestSCMContainerManager.class.getSimpleName()); + config.set(HddsConfigKeys.OZONE_METADATA_DIRS, + testDir.getAbsolutePath()); config.setTimeDuration( HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, 0, TimeUnit.SECONDS); config.setFromObject(rmConf); - SCMServiceManager serviceManager = new SCMServiceManager(); + SCMHAManager scmHAManager = MockSCMHAManager + .getInstance(true, new SCMDBTransactionBufferImpl()); + dbStore = DBStoreBuilder.createDBStore( + config, new SCMDBDefinition()); + replicationManager = new ReplicationManager( config, containerManager, @@ -182,7 +194,9 @@ private void createReplicationManager(ReplicationManagerConfiguration rmConf) SCMContext.emptyContext(), serviceManager, nodeManager, - clock); + clock, + scmHAManager, + SCMDBDefinition.MOVE.getTable(dbStore)); serviceManager.notifyStatusChanged(); scmLogs.clearOutput(); @@ -998,7 +1012,7 @@ public void testOverReplicatedAndPolicySatisfied() throws @Test public void testOverReplicatedAndPolicyUnSatisfiedAndDeleted() throws - SCMException, ContainerNotFoundException, InterruptedException { + SCMException { final ContainerInfo container = getContainer(LifeCycleState.CLOSED); final ContainerID id = container.containerID(); final UUID originNodeId = UUID.randomUUID(); @@ -1046,7 +1060,7 @@ public void testOverReplicatedAndPolicyUnSatisfiedAndDeleted() throws */ @Test public void testUnderReplicatedDueToDecommission() throws - SCMException, ContainerNotFoundException, InterruptedException { + SCMException { final ContainerInfo container = createContainer(LifeCycleState.CLOSED); addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED); @@ -1060,7 +1074,7 @@ public void testUnderReplicatedDueToDecommission() throws */ @Test public void testUnderReplicatedDueToAllDecommission() throws - SCMException, ContainerNotFoundException, InterruptedException { + SCMException { final ContainerInfo container = createContainer(LifeCycleState.CLOSED); addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED); addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED); @@ -1074,7 +1088,7 @@ public void testUnderReplicatedDueToAllDecommission() throws */ @Test public void testCorrectlyReplicatedWithDecommission() throws - SCMException, ContainerNotFoundException, InterruptedException { + SCMException { final ContainerInfo container = createContainer(LifeCycleState.CLOSED); addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); @@ -1089,7 +1103,7 @@ public void testCorrectlyReplicatedWithDecommission() throws */ @Test public void testUnderReplicatedDueToMaintenance() throws - SCMException, ContainerNotFoundException, InterruptedException { + SCMException { final ContainerInfo container = createContainer(LifeCycleState.CLOSED); addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED); @@ -1102,12 +1116,13 @@ public void testUnderReplicatedDueToMaintenance() throws * min replica for maintenance is 1 and another replica is available. */ @Test - public void testNotUnderReplicatedDueToMaintenanceMinRepOne() throws - SCMException, ContainerNotFoundException, InterruptedException { + public void testNotUnderReplicatedDueToMaintenanceMinRepOne() + throws Exception { replicationManager.stop(); ReplicationManagerConfiguration newConf = new ReplicationManagerConfiguration(); newConf.setMaintenanceReplicaMinimum(1); + dbStore.close(); createReplicationManager(newConf); final ContainerInfo container = createContainer(LifeCycleState.CLOSED); addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); @@ -1121,12 +1136,13 @@ public void testNotUnderReplicatedDueToMaintenanceMinRepOne() throws * are going off line and min rep is 1. */ @Test - public void testUnderReplicatedDueToMaintenanceMinRepOne() throws - SCMException, ContainerNotFoundException, InterruptedException { + public void testUnderReplicatedDueToMaintenanceMinRepOne() + throws Exception { replicationManager.stop(); ReplicationManagerConfiguration newConf = new ReplicationManagerConfiguration(); newConf.setMaintenanceReplicaMinimum(1); + dbStore.close(); createReplicationManager(newConf); final ContainerInfo container = createContainer(LifeCycleState.CLOSED); addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED); @@ -1141,7 +1157,7 @@ public void testUnderReplicatedDueToMaintenanceMinRepOne() throws */ @Test public void testUnderReplicatedDueToAllMaintenance() throws - SCMException, ContainerNotFoundException, InterruptedException { + SCMException { final ContainerInfo container = createContainer(LifeCycleState.CLOSED); addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED); addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED); @@ -1155,7 +1171,7 @@ public void testUnderReplicatedDueToAllMaintenance() throws */ @Test public void testCorrectlyReplicatedWithMaintenance() throws - SCMException, ContainerNotFoundException, InterruptedException { + SCMException { final ContainerInfo container = createContainer(LifeCycleState.CLOSED); addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); @@ -1170,7 +1186,7 @@ public void testCorrectlyReplicatedWithMaintenance() throws */ @Test public void testUnderReplicatedWithDecommissionAndMaintenance() throws - SCMException, ContainerNotFoundException, InterruptedException { + SCMException { final ContainerInfo container = createContainer(LifeCycleState.CLOSED); addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED); addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED); @@ -1187,7 +1203,7 @@ public void testUnderReplicatedWithDecommissionAndMaintenance() throws */ @Test public void testOverReplicatedClosedContainerWithDecomAndMaint() - throws SCMException, ContainerNotFoundException, InterruptedException { + throws SCMException { final ContainerInfo container = createContainer(LifeCycleState.CLOSED); addReplica(container, NodeStatus.inServiceHealthy(), CLOSED); addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED); @@ -1231,7 +1247,7 @@ public void testOverReplicatedClosedContainerWithDecomAndMaint() */ @Test public void testUnderReplicatedNotHealthySource() - throws SCMException, ContainerNotFoundException, InterruptedException { + throws SCMException { final ContainerInfo container = createContainer(LifeCycleState.CLOSED); addReplica(container, NodeStatus.inServiceStale(), CLOSED); addReplica(container, new NodeStatus(DECOMMISSIONED, STALE), CLOSED); @@ -1257,7 +1273,8 @@ public void testMove() throws SCMException, NodeNotFoundException, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY)); CompletableFuture cf = - replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + replicationManager.move(id, + new MoveDataNodePair(dn1.getDatanodeDetails(), dn3)); Assert.assertTrue(scmLogs.getOutput().contains( "receive a move request about container")); Thread.sleep(100L); @@ -1283,6 +1300,100 @@ public void testMove() throws SCMException, NodeNotFoundException, Assert.assertTrue(cf.isDone() && cf.get() == MoveResult.COMPLETED); } + /** + * if crash happened and restarted, move option should work as expected. + */ + @Test + public void testMoveCrashAndRestart() throws IOException, + NodeNotFoundException, InterruptedException { + final ContainerInfo container = createContainer(LifeCycleState.CLOSED); + ContainerID id = container.containerID(); + ContainerReplica dn1 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY)); + replicationManager.move(id, + new MoveDataNodePair(dn1.getDatanodeDetails(), dn3)); + Assert.assertTrue(scmLogs.getOutput().contains( + "receive a move request about container")); + Thread.sleep(100L); + Assert.assertTrue(datanodeCommandHandler.received( + SCMCommandProto.Type.replicateContainerCommand, dn3)); + Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount( + SCMCommandProto.Type.replicateContainerCommand)); + + //crash happens, restart scm. + //clear current inflight actions and reload inflightMove from DBStore. + resetReplicationManager(); + replicationManager.getMoveScheduler() + .reinitialize(SCMDBDefinition.MOVE.getTable(dbStore)); + Assert.assertTrue(replicationManager.getMoveScheduler() + .getInflightMove().containsKey(id)); + MoveDataNodePair kv = replicationManager.getMoveScheduler() + .getInflightMove().get(id); + Assert.assertEquals(kv.getSrc(), dn1.getDatanodeDetails()); + Assert.assertEquals(kv.getTgt(), dn3); + serviceManager.notifyStatusChanged(); + + Thread.sleep(100L); + // now, the container is not over-replicated, + // so no deleteContainerCommand will be sent + Assert.assertFalse(datanodeCommandHandler.received( + SCMCommandProto.Type.deleteContainerCommand, dn1.getDatanodeDetails())); + //replica does not exist in target datanode, so a replicateContainerCommand + //will be sent again at notifyStatusChanged#onLeaderReadyAndOutOfSafeMode + Assert.assertEquals(2, datanodeCommandHandler.getInvocationCount( + SCMCommandProto.Type.replicateContainerCommand)); + + + //replicate container to dn3, now, over-replicated + addReplicaToDn(container, dn3, CLOSED); + replicationManager.processAll(); + eventQueue.processAll(1000); + + //deleteContainerCommand is sent, but the src replica is not deleted now + Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount( + SCMCommandProto.Type.deleteContainerCommand)); + + //crash happens, restart scm. + //clear current inflight actions and reload inflightMove from DBStore. + resetReplicationManager(); + replicationManager.getMoveScheduler() + .reinitialize(SCMDBDefinition.MOVE.getTable(dbStore)); + Assert.assertTrue(replicationManager.getMoveScheduler() + .getInflightMove().containsKey(id)); + kv = replicationManager.getMoveScheduler() + .getInflightMove().get(id); + Assert.assertEquals(kv.getSrc(), dn1.getDatanodeDetails()); + Assert.assertEquals(kv.getTgt(), dn3); + serviceManager.notifyStatusChanged(); + + //after restart and the container is over-replicated now, + //deleteContainerCommand will be sent again + Assert.assertEquals(2, datanodeCommandHandler.getInvocationCount( + SCMCommandProto.Type.deleteContainerCommand)); + containerStateManager.removeContainerReplica(id, dn1); + + //replica in src datanode is deleted now + containerStateManager.removeContainerReplica(id, dn1); + replicationManager.processAll(); + eventQueue.processAll(1000); + + //since the move is complete,so after scm crash and restart + //inflightMove should not contain the container again + resetReplicationManager(); + replicationManager.getMoveScheduler() + .reinitialize(SCMDBDefinition.MOVE.getTable(dbStore)); + Assert.assertFalse(replicationManager.getMoveScheduler() + .getInflightMove().containsKey(id)); + + //completeableFuture is not stored in DB, so after scm crash and + //restart ,completeableFuture is missing + } + /** * make sure RM does not delete replica if placement policy is not satisfied. */ @@ -1300,7 +1411,8 @@ public void testMoveNotDeleteSrcIfPolicyNotSatisfied() new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); DatanodeDetails dn4 = addNode(new NodeStatus(IN_SERVICE, HEALTHY)); CompletableFuture cf = - replicationManager.move(id, dn1.getDatanodeDetails(), dn4); + replicationManager.move(id, + new MoveDataNodePair(dn1.getDatanodeDetails(), dn4)); Assert.assertTrue(scmLogs.getOutput().contains( "receive a move request about container")); Thread.sleep(100L); @@ -1341,7 +1453,8 @@ public void testDnBecameUnhealthyWhenMoving() throws SCMException, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY)); CompletableFuture cf = - replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + replicationManager.move(id, + new MoveDataNodePair(dn1.getDatanodeDetails(), dn3)); Assert.assertTrue(scmLogs.getOutput().contains( "receive a move request about container")); @@ -1353,7 +1466,8 @@ public void testDnBecameUnhealthyWhenMoving() throws SCMException, MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY); nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY)); - cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + cf = replicationManager.move(id, + new MoveDataNodePair(dn1.getDatanodeDetails(), dn3)); addReplicaToDn(container, dn3, CLOSED); replicationManager.processAll(); eventQueue.processAll(1000); @@ -1384,13 +1498,18 @@ public void testMovePrerequisites() DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY)); ContainerReplica dn4 = addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); - + CompletableFuture cf; + //the above move is executed successfully, so there may be some item in + //inflightReplication or inflightDeletion. here we stop replication manager + //to clear these states, which may impact the tests below. + //we don't need a running replicationManamger now replicationManager.stop(); Thread.sleep(100L); - cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + cf = replicationManager.move(id, + new MoveDataNodePair(dn1.getDatanodeDetails(), dn3)); Assert.assertTrue(cf.isDone() && cf.get() == - MoveResult.RM_NOT_RUNNING); + MoveResult.FAIL_NOT_RUNNING); replicationManager.start(); Thread.sleep(100L); @@ -1398,7 +1517,8 @@ public void testMovePrerequisites() for (LifeCycleState state : LifeCycleState.values()) { if (state != LifeCycleState.CLOSED) { container.setState(state); - cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + cf = replicationManager.move(id, + new MoveDataNodePair(dn1.getDatanodeDetails(), dn3)); Assert.assertTrue(cf.isDone() && cf.get() == MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED); } @@ -1410,10 +1530,12 @@ public void testMovePrerequisites() if (state != HEALTHY) { nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, state)); - cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + cf = replicationManager.move(id, + new MoveDataNodePair(dn1.getDatanodeDetails(), dn3)); Assert.assertTrue(cf.isDone() && cf.get() == MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY); - cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails()); + cf = replicationManager.move(id, + new MoveDataNodePair(dn3, dn1.getDatanodeDetails())); Assert.assertTrue(cf.isDone() && cf.get() == MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY); } @@ -1426,10 +1548,12 @@ public void testMovePrerequisites() if (state != IN_SERVICE) { nodeManager.setNodeStatus(dn3, new NodeStatus(state, HEALTHY)); - cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + cf = replicationManager.move(id, + new MoveDataNodePair(dn1.getDatanodeDetails(), dn3)); Assert.assertTrue(cf.isDone() && cf.get() == MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE); - cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails()); + cf = replicationManager.move(id, + new MoveDataNodePair(dn3, dn1.getDatanodeDetails())); Assert.assertTrue(cf.isDone() && cf.get() == MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE); } @@ -1437,13 +1561,14 @@ public void testMovePrerequisites() nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY)); //container exists in target datanode - cf = replicationManager.move(id, dn1.getDatanodeDetails(), - dn2.getDatanodeDetails()); + cf = replicationManager.move(id, + new MoveDataNodePair(dn1.getDatanodeDetails(), + dn2.getDatanodeDetails())); Assert.assertTrue(cf.isDone() && cf.get() == MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET); //container does not exist in source datanode - cf = replicationManager.move(id, dn3, dn3); + cf = replicationManager.move(id, new MoveDataNodePair(dn3, dn3)); Assert.assertTrue(cf.isDone() && cf.get() == MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE); @@ -1454,7 +1579,8 @@ public void testMovePrerequisites() replicationManager.processAll(); //waiting for inflightDeletion generation eventQueue.processAll(1000); - cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + cf = replicationManager.move(id, + new MoveDataNodePair(dn1.getDatanodeDetails(), dn3)); Assert.assertTrue(cf.isDone() && cf.get() == MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION); resetReplicationManager(); @@ -1467,7 +1593,8 @@ public void testMovePrerequisites() replicationManager.processAll(); //waiting for inflightReplication generation eventQueue.processAll(1000); - cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + cf = replicationManager.move(id, + new MoveDataNodePair(dn1.getDatanodeDetails(), dn3)); Assert.assertTrue(cf.isDone() && cf.get() == MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION); } @@ -1561,7 +1688,7 @@ private ContainerReplica addReplicaToDn(ContainerInfo container, return replica; } - private void assertReplicaScheduled(int delta) throws InterruptedException { + private void assertReplicaScheduled(int delta) { final int currentReplicateCommandCount = datanodeCommandHandler .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand); @@ -1588,9 +1715,11 @@ private void assertDeleteScheduled(int delta) throws InterruptedException { } @After - public void teardown() throws IOException { + public void teardown() throws Exception { containerStateManager.close(); replicationManager.stop(); + dbStore.close(); + FileUtils.deleteDirectory(testDir); } private static class DatanodeCommandHandler implements diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java index af762d647d41..80b72409295a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.scm.container.*; +import org.apache.hadoop.hdds.scm.container.ReplicationManager; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.server.events.EventHandler; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java index c666ef8cdbcb..fb13d05af104 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair; import org.apache.hadoop.hdds.security.x509.certificate.CertInfo; import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.hdds.scm.metadata.PipelineCodec; @@ -149,6 +150,11 @@ public Table getContainerTable() { return null; } + @Override + public Table getMoveTable() { + return null; + } + /** * Test SCM DB Definition for the above class. */ diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java index 26afe7e3d3a2..03bc8ba0e81f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java @@ -750,10 +750,9 @@ public void testCloseContainerCommandOnRestart() throws Exception { new TestStorageContainerManagerHelper(cluster, conf); helper.createKeys(10, 4096); - GenericTestUtils.waitFor(() -> { - return cluster.getStorageContainerManager().getContainerManager(). - getContainers() != null; - }, 1000, 10000); + GenericTestUtils.waitFor(() -> + cluster.getStorageContainerManager().getContainerManager() + .getContainers() != null, 1000, 10000); StorageContainerManager scm = cluster.getStorageContainerManager(); List containers = cluster.getStorageContainerManager() diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java index b8ad7d511643..1607d927036c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java @@ -17,7 +17,10 @@ package org.apache.hadoop.ozone.scm; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair; import org.apache.hadoop.hdds.scm.ha.SCMHAConfiguration; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; @@ -36,10 +39,15 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.slf4j.event.Level; +import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION; import java.io.IOException; +import java.util.Map; import java.util.UUID; +import static org.apache.hadoop.hdds.scm.TestUtils.getContainer; +import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; + /** * Tests failover with SCM HA setup. */ @@ -136,6 +144,89 @@ public void testFailover() throws Exception { .contains("Performing failover to suggested leader")); } + @Test + public void testMoveFailover() throws Exception { + SCMClientConfig scmClientConfig = + conf.getObject(SCMClientConfig.class); + scmClientConfig.setRetryCount(1); + scmClientConfig.setRetryInterval(100); + scmClientConfig.setMaxRetryTimeout(1500); + Assert.assertEquals(scmClientConfig.getRetryCount(), 15); + conf.setFromObject(scmClientConfig); + StorageContainerManager scm = getLeader(cluster); + Assert.assertNotNull(scm); + + final ContainerID id = + getContainer(HddsProtos.LifeCycleState.CLOSED).containerID(); + DatanodeDetails dn1 = randomDatanodeDetails(); + DatanodeDetails dn2 = randomDatanodeDetails(); + + //here we just want to test whether the new leader will get the same + //inflight move after failover, so no need to create container and datanode, + //just mock them bypassing all the pre checks. + scm.getReplicationManager().getMoveScheduler().startMove(id.getProtobuf(), + (new MoveDataNodePair(dn1, dn2)).getProtobufMessage(CURRENT_VERSION)); + + SCMBlockLocationFailoverProxyProvider failoverProxyProvider = + new SCMBlockLocationFailoverProxyProvider(conf); + failoverProxyProvider.changeCurrentProxy(scm.getSCMNodeId()); + ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient = + new ScmBlockLocationProtocolClientSideTranslatorPB( + failoverProxyProvider); + GenericTestUtils + .setLogLevel(SCMBlockLocationFailoverProxyProvider.LOG, Level.DEBUG); + GenericTestUtils.LogCapturer logCapture = GenericTestUtils.LogCapturer + .captureLogs(SCMBlockLocationFailoverProxyProvider.LOG); + ScmBlockLocationProtocol scmBlockLocationProtocol = TracingUtil + .createProxy(scmBlockLocationClient, ScmBlockLocationProtocol.class, + conf); + scmBlockLocationProtocol.getScmInfo(); + Assert.assertTrue(logCapture.getOutput() + .contains("Performing failover to suggested leader")); + scm = getLeader(cluster); + Assert.assertNotNull(scm); + + //switch to the new leader successfully, new leader should + //get the same inflightMove + Map inflightMove = + scm.getReplicationManager().getMoveScheduler().getInflightMove(); + Assert.assertTrue(inflightMove.containsKey(id)); + MoveDataNodePair mp = inflightMove.get(id); + Assert.assertTrue(dn2.equals(mp.getTgt())); + Assert.assertTrue(dn1.equals(mp.getSrc())); + + //complete move in the new leader + scm.getReplicationManager().getMoveScheduler() + .completeMove(id.getProtobuf()); + + + SCMContainerLocationFailoverProxyProvider proxyProvider = + new SCMContainerLocationFailoverProxyProvider(conf, null); + GenericTestUtils.setLogLevel(SCMContainerLocationFailoverProxyProvider.LOG, + Level.DEBUG); + logCapture = GenericTestUtils.LogCapturer + .captureLogs(SCMContainerLocationFailoverProxyProvider.LOG); + proxyProvider.changeCurrentProxy(scm.getSCMNodeId()); + StorageContainerLocationProtocol scmContainerClient = + TracingUtil.createProxy( + new StorageContainerLocationProtocolClientSideTranslatorPB( + proxyProvider), StorageContainerLocationProtocol.class, conf); + + scmContainerClient.allocateContainer(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.ONE, "ozone"); + Assert.assertTrue(logCapture.getOutput() + .contains("Performing failover to suggested leader")); + + //switch to the new leader successfully, new leader should + //get the same inflightMove , which should not contains + //that container. + scm = getLeader(cluster); + Assert.assertNotNull(scm); + inflightMove = scm.getReplicationManager() + .getMoveScheduler().getInflightMove(); + Assert.assertFalse(inflightMove.containsKey(id)); + } + static StorageContainerManager getLeader(MiniOzoneHAClusterImpl impl) { for (StorageContainerManager scm : impl.getStorageContainerManagers()) { if (scm.checkLeader()) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java index 8dd9b66c5f08..7b4694150d3a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java @@ -514,7 +514,8 @@ public void testSCMHandlesRestartForMaintenanceNode() waitForContainerReplicas(newContainer, 3); ContainerReplicaCount counts = - scm.getReplicationManager().getContainerReplicaCount(newContainer); + scm.getReplicationManager() + .getContainerReplicaCount(newContainer.containerID()); assertEquals(1, counts.getMaintenanceCount()); assertTrue(counts.isSufficientlyReplicated()); @@ -541,7 +542,8 @@ public void testSCMHandlesRestartForMaintenanceNode() waitForContainerReplicas(nextContainer, 3); // There should be no IN_MAINTENANCE node: assertEquals(0, nm.getNodeCount(IN_MAINTENANCE, null)); - counts = scm.getReplicationManager().getContainerReplicaCount(newContainer); + counts = scm.getReplicationManager() + .getContainerReplicaCount(newContainer.containerID()); assertEquals(0, counts.getMaintenanceCount()); assertTrue(counts.isSufficientlyReplicated()); }