-
Notifications
You must be signed in to change notification settings - Fork 588
HDDS-12566. Handle Over replication of Quasi Closed Stuck containers #8061
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
22b2ebb
b797414
011450a
974aad9
9831a5b
0e6717e
8f5c3ec
6e52321
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hadoop.hdds.scm.container.replication; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.Comparator; | ||
| import java.util.List; | ||
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.hadoop.hdds.scm.container.ContainerInfo; | ||
| import org.apache.hadoop.hdds.scm.container.ContainerReplica; | ||
|
|
||
| /** | ||
| * Class to correct over replicated QuasiClosed Stuck Ratis containers. | ||
| */ | ||
| public class QuasiClosedStuckOverReplicationHandler implements UnhealthyReplicationHandler { | ||
|
|
||
| private static final org.slf4j.Logger LOG = | ||
| org.slf4j.LoggerFactory.getLogger(QuasiClosedStuckOverReplicationHandler.class); | ||
| private final ReplicationManager replicationManager; | ||
| private final ReplicationManagerMetrics metrics; | ||
|
|
||
| public QuasiClosedStuckOverReplicationHandler(final ReplicationManager replicationManager) { | ||
| this.replicationManager = replicationManager; | ||
| this.metrics = replicationManager.getMetrics(); | ||
| } | ||
|
|
||
| @Override | ||
| public int processAndSendCommands(Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps, | ||
| ContainerHealthResult result, int remainingMaintenanceRedundancy) | ||
| throws IOException { | ||
|
|
||
| ContainerInfo containerInfo = result.getContainerInfo(); | ||
| LOG.debug("Handling over replicated QuasiClosed Stuck Ratis container {}", containerInfo); | ||
|
|
||
| int pendingDelete = 0; | ||
| for (ContainerReplicaOp op : pendingOps) { | ||
| if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) { | ||
| pendingDelete++; | ||
| } | ||
| } | ||
|
|
||
| if (pendingDelete > 0) { | ||
| LOG.debug("Container {} has pending delete operations. No more over replication will be scheduled until they " + | ||
| "complete", containerInfo); | ||
| return 0; | ||
| } | ||
|
|
||
| QuasiClosedStuckReplicaCount replicaCount = | ||
| new QuasiClosedStuckReplicaCount(replicas, remainingMaintenanceRedundancy); | ||
|
|
||
| List<QuasiClosedStuckReplicaCount.MisReplicatedOrigin> misReplicatedOrigins | ||
| = replicaCount.getOverReplicatedOrigins(); | ||
|
|
||
| if (misReplicatedOrigins.isEmpty()) { | ||
| LOG.debug("Container {} is not over replicated", containerInfo); | ||
| return 0; | ||
| } | ||
|
|
||
| int totalCommandsSent = 0; | ||
| IOException firstException = null; | ||
| for (QuasiClosedStuckReplicaCount.MisReplicatedOrigin origin : misReplicatedOrigins) { | ||
| List<ContainerReplica> sortedReplicas = getSortedReplicas(origin.getSources()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's remove any replicas that are on stale Datanodes first and then check if the container is still over replicated?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea. I have made that change too. |
||
| for (int i = 0; i < origin.getReplicaDelta(); i++) { | ||
| try { | ||
| replicationManager.sendThrottledDeleteCommand( | ||
| containerInfo, 0, sortedReplicas.get(i).getDatanodeDetails(), true); | ||
| totalCommandsSent++; | ||
| } catch (CommandTargetOverloadedException e) { | ||
| LOG.debug("Unable to send delete command for container {} to {} as it has too many pending delete commands", | ||
| containerInfo, sortedReplicas.get(i).getDatanodeDetails()); | ||
| firstException = e; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (firstException != null) { | ||
| // Some nodes were overloaded when attempting to send commands. | ||
| if (totalCommandsSent > 0) { | ||
| metrics.incrPartialReplicationTotal(); | ||
| } | ||
| throw firstException; | ||
| } | ||
| return totalCommandsSent; | ||
| } | ||
|
|
||
| private List<ContainerReplica> getSortedReplicas( | ||
| Set<ContainerReplica> replicas) { | ||
| // sort replicas so that they can be selected in a deterministic way | ||
| return replicas.stream() | ||
| .sorted(Comparator.comparingLong(ContainerReplica::hashCode)) | ||
| .collect(Collectors.toList()); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,168 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hadoop.hdds.scm.container.replication; | ||
|
|
||
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE; | ||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertThrows; | ||
| import static org.mockito.ArgumentMatchers.any; | ||
| import static org.mockito.Mockito.mock; | ||
| import static org.mockito.Mockito.when; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Set; | ||
| import java.util.UUID; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import org.apache.commons.lang3.tuple.Pair; | ||
| import org.apache.hadoop.hdds.client.RatisReplicationConfig; | ||
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; | ||
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; | ||
| import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; | ||
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; | ||
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; | ||
| import org.apache.hadoop.hdds.scm.container.ContainerInfo; | ||
| import org.apache.hadoop.hdds.scm.container.ContainerReplica; | ||
| import org.apache.hadoop.hdds.scm.node.NodeStatus; | ||
| import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; | ||
| import org.apache.hadoop.ozone.protocol.commands.SCMCommand; | ||
| import org.apache.ratis.protocol.exceptions.NotLeaderException; | ||
| import org.junit.jupiter.api.BeforeEach; | ||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| /** | ||
| * Test for QuasiClosedStuckOverReplicationHandler. | ||
| */ | ||
| public class TestQuasiClosedStuckOverReplicationHandler { | ||
|
|
||
| private static final RatisReplicationConfig RATIS_REPLICATION_CONFIG = RatisReplicationConfig.getInstance(THREE); | ||
| private ContainerInfo container; | ||
| private ReplicationManager replicationManager; | ||
| private ReplicationManagerMetrics metrics; | ||
| private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent; | ||
| private QuasiClosedStuckOverReplicationHandler handler; | ||
|
|
||
| @BeforeEach | ||
| void setup() throws NodeNotFoundException, | ||
| CommandTargetOverloadedException, NotLeaderException { | ||
| container = ReplicationTestUtil.createContainer( | ||
| HddsProtos.LifeCycleState.QUASI_CLOSED, RATIS_REPLICATION_CONFIG); | ||
|
|
||
| replicationManager = mock(ReplicationManager.class); | ||
| OzoneConfiguration ozoneConfiguration = new OzoneConfiguration(); | ||
| ozoneConfiguration.setBoolean("hdds.scm.replication.push", true); | ||
| when(replicationManager.getConfig()) | ||
| .thenReturn(ozoneConfiguration.getObject( | ||
| ReplicationManager.ReplicationManagerConfiguration.class)); | ||
| metrics = ReplicationManagerMetrics.create(replicationManager); | ||
| when(replicationManager.getMetrics()).thenReturn(metrics); | ||
|
|
||
| /* | ||
| Return NodeStatus with NodeOperationalState as specified in | ||
| DatanodeDetails, and NodeState as HEALTHY. | ||
| */ | ||
| when( | ||
| replicationManager.getNodeStatus(any(DatanodeDetails.class))) | ||
| .thenAnswer(invocationOnMock -> { | ||
| DatanodeDetails dn = invocationOnMock.getArgument(0); | ||
| return new NodeStatus(dn.getPersistedOpState(), | ||
| HddsProtos.NodeState.HEALTHY); | ||
| }); | ||
|
|
||
| commandsSent = new HashSet<>(); | ||
| ReplicationTestUtil.mockRMSendThrottledDeleteCommand( | ||
| replicationManager, commandsSent); | ||
| handler = new QuasiClosedStuckOverReplicationHandler(replicationManager); | ||
| } | ||
|
|
||
| @Test | ||
| public void testReturnsZeroIfNotOverReplicated() throws IOException { | ||
| UUID origin = UUID.randomUUID(); | ||
| Set<ContainerReplica> replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState(container.containerID(), | ||
| StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED, | ||
| Pair.of(origin, HddsProtos.NodeOperationalState.IN_SERVICE), | ||
| Pair.of(origin, HddsProtos.NodeOperationalState.IN_SERVICE), | ||
| Pair.of(origin, HddsProtos.NodeOperationalState.IN_SERVICE)); | ||
|
|
||
| int count = handler.processAndSendCommands(replicas, Collections.emptyList(), getOverReplicatedHealthResult(), 1); | ||
| assertEquals(0, count); | ||
| } | ||
|
|
||
| @Test | ||
| public void testNoCommandsScheduledIfPendingOps() throws IOException { | ||
| UUID origin = UUID.randomUUID(); | ||
| Set<ContainerReplica> replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState(container.containerID(), | ||
| StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED, | ||
| Pair.of(origin, HddsProtos.NodeOperationalState.IN_SERVICE), | ||
| Pair.of(origin, HddsProtos.NodeOperationalState.IN_SERVICE)); | ||
| List<ContainerReplicaOp> pendingOps = new ArrayList<>(); | ||
| pendingOps.add(ContainerReplicaOp.create( | ||
| ContainerReplicaOp.PendingOpType.DELETE, MockDatanodeDetails.randomDatanodeDetails(), 0)); | ||
|
|
||
| int count = handler.processAndSendCommands(replicas, pendingOps, getOverReplicatedHealthResult(), 1); | ||
| assertEquals(0, count); | ||
| } | ||
|
|
||
| @Test | ||
| public void testCommandScheduledForOverReplicatedContainer() throws IOException { | ||
| UUID origin = UUID.randomUUID(); | ||
| Set<ContainerReplica> replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState(container.containerID(), | ||
| StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED, | ||
| Pair.of(origin, HddsProtos.NodeOperationalState.IN_SERVICE), | ||
| Pair.of(origin, HddsProtos.NodeOperationalState.IN_SERVICE), | ||
| Pair.of(origin, HddsProtos.NodeOperationalState.IN_SERVICE), | ||
| Pair.of(origin, HddsProtos.NodeOperationalState.IN_SERVICE)); | ||
|
|
||
| int count = handler.processAndSendCommands(replicas, Collections.emptyList(), getOverReplicatedHealthResult(), 1); | ||
| assertEquals(1, count); | ||
| SCMCommand<?> command = commandsSent.iterator().next().getRight(); | ||
| assertEquals(StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand, command.getType()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest also verifying that the delete was sent for one of the origin1 replicas.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have enhanced the scenario tests to perform this sort of validation. I had avoided that before because a command could be sent to dn1 or dn2, and the framework didn't support that. However now you can give "dn1|dn2" in the scenario spec and it will check it goes to one of the listed DNs.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice idea, LGTM. |
||
| } | ||
|
|
||
| @Test | ||
| public void testOverloadedExceptionContinuesAndThrows() throws NotLeaderException, CommandTargetOverloadedException { | ||
| UUID origin1 = UUID.randomUUID(); | ||
| UUID origin2 = UUID.randomUUID(); | ||
| Set<ContainerReplica> replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState(container.containerID(), | ||
| StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED, | ||
| Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE), | ||
| Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE), | ||
| Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE), | ||
| Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE), | ||
| Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE), | ||
| Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE)); | ||
|
|
||
| ReplicationTestUtil.mockRMSendThrottledDeleteCommand(replicationManager, commandsSent, new AtomicBoolean(true)); | ||
|
|
||
| assertThrows(CommandTargetOverloadedException.class, () -> | ||
| handler.processAndSendCommands(replicas, Collections.emptyList(), getOverReplicatedHealthResult(), 1)); | ||
| assertEquals(1, commandsSent.size()); | ||
| } | ||
|
|
||
|
|
||
| private ContainerHealthResult.OverReplicatedHealthResult getOverReplicatedHealthResult() { | ||
| ContainerHealthResult.OverReplicatedHealthResult | ||
| healthResult = mock(ContainerHealthResult.OverReplicatedHealthResult.class); | ||
| when(healthResult.getContainerInfo()).thenReturn(container); | ||
| return healthResult; | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be
DELETE.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well spotted. I also fixed the test which should have caught this, but was also broken!