Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,13 @@ public boolean handle(ContainerCheckRequest request) {
return false;
}

boolean forceClose = request.getContainerInfo().getReplicationConfig()
.getReplicationType() != HddsProtos.ReplicationType.RATIS;

for (ContainerReplica replica : request.getContainerReplicas()) {
if (replica.getState() != ContainerReplicaProto.State.UNHEALTHY) {
replicationManager.sendCloseContainerReplicaCommand(
containerInfo, replica.getDatanodeDetails(), false);
containerInfo, replica.getDatanodeDetails(), forceClose);
}
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
Expand All @@ -32,32 +33,42 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Stream;

import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSING;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;

/**
* Tests for {@link ClosingContainerHandler}.
*/
public class TestClosingContainerHandler {
private ReplicationManager replicationManager;
private ClosingContainerHandler closingContainerHandler;
private ECReplicationConfig ecReplicationConfig;
private RatisReplicationConfig ratisReplicationConfig;
private static final ECReplicationConfig EC_REPLICATION_CONFIG =
new ECReplicationConfig(3, 2);
private static final RatisReplicationConfig RATIS_REPLICATION_CONFIG =
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);

@BeforeEach
public void setup() {
ecReplicationConfig = new ECReplicationConfig(3, 2);
ratisReplicationConfig = RatisReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.THREE);
replicationManager = Mockito.mock(ReplicationManager.class);
closingContainerHandler = new ClosingContainerHandler(replicationManager);
}

private static Stream<ReplicationConfig> replicationConfigs() {
return Stream.of(RATIS_REPLICATION_CONFIG, EC_REPLICATION_CONFIG);
}

/**
* If a container is not closing, it should not be handled by
* ClosingContainerHandler. It should return false so the request can be
Expand All @@ -66,7 +77,7 @@ public void setup() {
@Test
public void testNonClosingContainerReturnsFalse() {
ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
ecReplicationConfig, 1, CLOSED);
EC_REPLICATION_CONFIG, 1, CLOSED);
Set<ContainerReplica> containerReplicas = ReplicationTestUtil
.createReplicas(containerInfo.containerID(),
ContainerReplicaProto.State.CLOSING, 1, 2, 3, 4, 5);
Expand All @@ -84,7 +95,7 @@ public void testNonClosingContainerReturnsFalse() {
@Test
public void testNonClosingRatisContainerReturnsFalse() {
ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
ratisReplicationConfig, 1, CLOSED);
RATIS_REPLICATION_CONFIG, 1, CLOSED);
Set<ContainerReplica> containerReplicas = ReplicationTestUtil
.createReplicas(containerInfo.containerID(),
ContainerReplicaProto.State.CLOSING, 0, 0, 0);
Expand All @@ -107,7 +118,7 @@ public void testNonClosingRatisContainerReturnsFalse() {
@Test
public void testUnhealthyReplicaIsNotClosed() {
ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
ecReplicationConfig, 1, CLOSING);
EC_REPLICATION_CONFIG, 1, CLOSING);
Set<ContainerReplica> containerReplicas = ReplicationTestUtil
.createReplicas(containerInfo.containerID(),
ContainerReplicaProto.State.UNHEALTHY, 1, 2, 3, 4);
Expand All @@ -130,7 +141,7 @@ public void testUnhealthyReplicaIsNotClosed() {
@Test
public void testUnhealthyRatisReplicaIsNotClosed() {
ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
ratisReplicationConfig, 1, CLOSING);
RATIS_REPLICATION_CONFIG, 1, CLOSING);
Set<ContainerReplica> containerReplicas = ReplicationTestUtil
.createReplicas(containerInfo.containerID(),
ContainerReplicaProto.State.UNHEALTHY, 0, 0);
Expand All @@ -153,51 +164,55 @@ public void testUnhealthyRatisReplicaIsNotClosed() {
/**
* Close commands should be sent for Open or Closing replicas.
*/
@Test
public void testOpenOrClosingReplicasAreClosed() {
@ParameterizedTest
@MethodSource("replicationConfigs")
public void testOpenOrClosingReplicasAreClosed(ReplicationConfig repConfig) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parameterizing this is a good idea. If I understand correctly, this test will now run for both EC and RATIS replication configs. If so, we can delete the next test testOpenOrClosingRatisReplicasAreClosed().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's covered by the parameterized test.
Thanks for pointing this out.

ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
ecReplicationConfig, 1, CLOSING);
Set<ContainerReplica> containerReplicas = ReplicationTestUtil
.createReplicas(containerInfo.containerID(),
ContainerReplicaProto.State.CLOSING, 1, 2);
containerReplicas.add(ReplicationTestUtil.createContainerReplica(
containerInfo.containerID(), 3,
HddsProtos.NodeOperationalState.IN_SERVICE,
ContainerReplicaProto.State.OPEN));
repConfig, 1, CLOSING);

final int replicas = repConfig.getRequiredNodes();
final int closing = replicas / 2;
final boolean force = repConfig.getReplicationType() != RATIS;

Set<ContainerReplica> containerReplicas = new HashSet<>();

// Add CLOSING container replicas.
// For EC, replica index will be in [1, closing].
for (int i = 1; i <= closing; i++) {
containerReplicas.add(ReplicationTestUtil.createContainerReplica(
containerInfo.containerID(),
repConfig.getReplicationType() == EC ? i : 0,
HddsProtos.NodeOperationalState.IN_SERVICE,
ContainerReplicaProto.State.CLOSING));
}

// Add OPEN container replicas.
// For EC, replica index will be in [closing + 1, replicas].
for (int i = closing + 1; i <= replicas; i++) {
containerReplicas.add(ReplicationTestUtil.createContainerReplica(
containerInfo.containerID(),
repConfig.getReplicationType() == EC ? i : 0,
HddsProtos.NodeOperationalState.IN_SERVICE,
ContainerReplicaProto.State.OPEN));
}

ContainerCheckRequest request = new ContainerCheckRequest.Builder()
.setPendingOps(Collections.EMPTY_LIST)
.setPendingOps(Collections.emptyList())
.setReport(new ReplicationManagerReport())
.setContainerInfo(containerInfo)
.setContainerReplicas(containerReplicas)
.build();

assertAndVerify(request, true, 3);
}

@Test
public void testOpenOrClosingRatisReplicasAreClosed() {
ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
ratisReplicationConfig, 1, CLOSING);
Set<ContainerReplica> containerReplicas = ReplicationTestUtil
.createReplicas(containerInfo.containerID(),
ContainerReplicaProto.State.CLOSING, 0, 0);
containerReplicas.add(ReplicationTestUtil.createContainerReplica(
containerInfo.containerID(), 0,
HddsProtos.NodeOperationalState.IN_SERVICE,
ContainerReplicaProto.State.OPEN));

ContainerCheckRequest request = new ContainerCheckRequest.Builder()
.setPendingOps(Collections.EMPTY_LIST)
.setReport(new ReplicationManagerReport())
.setContainerInfo(containerInfo)
.setContainerReplicas(containerReplicas)
.build();

assertAndVerify(request, true, 3);
ArgumentCaptor<Boolean> forceCaptor =
ArgumentCaptor.forClass(Boolean.class);
Assertions.assertTrue(closingContainerHandler.handle(request));
Mockito.verify(replicationManager, Mockito.times(replicas))
.sendCloseContainerReplicaCommand(Mockito.any(ContainerInfo.class),
Mockito.any(DatanodeDetails.class), forceCaptor.capture());
forceCaptor.getAllValues()
.forEach(f -> Assertions.assertEquals(force, f));
}


private void assertAndVerify(ContainerCheckRequest request,
boolean assertion, int times) {
Assertions.assertEquals(assertion, closingContainerHandler.handle(request));
Expand Down