getContainerReplicas(final ContainerID id) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
index 25a4a80f233..83791c3257d 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
@@ -41,12 +41,14 @@
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -132,6 +134,62 @@ void testUpdateContainerState() throws Exception {
assertEquals(LifeCycleState.CLOSED, containerManager.getContainer(cid).getState());
}
+ @Test
+ void testTransitionDeletingToClosedState() throws IOException, InvalidStateTransitionException {
+ // allocate OPEN Ratis and Ec containers, and do a series of state changes to transition them to DELETING
+ final ContainerInfo container = containerManager.allocateContainer(
+ RatisReplicationConfig.getInstance(
+ ReplicationFactor.THREE), "admin");
+ ContainerInfo ecContainer = containerManager.allocateContainer(new ECReplicationConfig(3, 2), "admin");
+ final ContainerID cid = container.containerID();
+ final ContainerID ecCid = ecContainer.containerID();
+ assertEquals(LifeCycleState.OPEN, containerManager.getContainer(cid).getState());
+ assertEquals(LifeCycleState.OPEN, containerManager.getContainer(ecCid).getState());
+
+ // OPEN -> CLOSING
+ containerManager.updateContainerState(cid,
+ HddsProtos.LifeCycleEvent.FINALIZE);
+ containerManager.updateContainerState(ecCid, HddsProtos.LifeCycleEvent.FINALIZE);
+ assertEquals(LifeCycleState.CLOSING, containerManager.getContainer(cid).getState());
+ assertEquals(LifeCycleState.CLOSING, containerManager.getContainer(ecCid).getState());
+
+ // CLOSING -> CLOSED
+ containerManager.updateContainerState(cid, HddsProtos.LifeCycleEvent.CLOSE);
+ containerManager.updateContainerState(ecCid, HddsProtos.LifeCycleEvent.CLOSE);
+ assertEquals(LifeCycleState.CLOSED, containerManager.getContainer(cid).getState());
+ assertEquals(LifeCycleState.CLOSED, containerManager.getContainer(ecCid).getState());
+
+ // CLOSED -> DELETING
+ containerManager.updateContainerState(cid, HddsProtos.LifeCycleEvent.DELETE);
+ containerManager.updateContainerState(ecCid, HddsProtos.LifeCycleEvent.DELETE);
+ assertEquals(LifeCycleState.DELETING, containerManager.getContainer(cid).getState());
+ assertEquals(LifeCycleState.DELETING, containerManager.getContainer(ecCid).getState());
+
+ // DELETING -> CLOSED
+ containerManager.transitionDeletingToClosedState(cid);
+ containerManager.transitionDeletingToClosedState(ecCid);
+ // the containers should be back in CLOSED state now
+ assertEquals(LifeCycleState.CLOSED, containerManager.getContainer(cid).getState());
+ assertEquals(LifeCycleState.CLOSED, containerManager.getContainer(ecCid).getState());
+ }
+
+ @Test
+ void testTransitionDeletingToClosedStateAllowsOnlyDeletingContainers() throws IOException {
+ // test for RATIS container
+ final ContainerInfo container = containerManager.allocateContainer(
+ RatisReplicationConfig.getInstance(
+ ReplicationFactor.THREE), "admin");
+ final ContainerID cid = container.containerID();
+ assertEquals(LifeCycleState.OPEN, containerManager.getContainer(cid).getState());
+ assertThrows(IOException.class, () -> containerManager.transitionDeletingToClosedState(cid));
+
+ // test for EC container
+ final ContainerInfo ecContainer = containerManager.allocateContainer(new ECReplicationConfig(3, 2), "admin");
+ final ContainerID ecCid = ecContainer.containerID();
+ assertEquals(LifeCycleState.OPEN, containerManager.getContainer(ecCid).getState());
+ assertThrows(IOException.class, () -> containerManager.transitionDeletingToClosedState(ecCid));
+ }
+
@Test
void testGetContainers() throws Exception {
assertEquals(emptyList(), containerManager.getContainers());
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index 695c88d11a3..7c3666ad617 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -66,6 +66,7 @@
import java.util.stream.Stream;
import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
+import static org.apache.hadoop.hdds.scm.HddsTestUtils.getContainerReports;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doAnswer;
@@ -100,7 +101,6 @@ void setup() throws IOException, InvalidStateTransitionException {
dbStore = DBStoreBuilder.createDBStore(
conf, new SCMDBDefinition());
scmhaManager = SCMHAManagerStub.getInstance(true);
- nodeManager = new MockNodeManager(true, 10);
pipelineManager =
new MockPipelineManager(dbStore, scmhaManager, nodeManager);
containerStateManager = ContainerStateManagerImpl.newBuilder()
@@ -151,6 +151,10 @@ void setup() throws IOException, InvalidStateTransitionException {
}).when(containerManager).removeContainerReplica(
any(ContainerID.class), any(ContainerReplica.class));
+ doAnswer(invocation -> {
+ containerStateManager.transitionDeletingToClosedState(((ContainerID) invocation.getArgument(0)).getProtobuf());
+ return null;
+ }).when(containerManager).transitionDeletingToClosedState(any(ContainerID.class));
}
@AfterEach
@@ -442,6 +446,107 @@ public void testClosingToClosedForECContainer()
assertEquals(LifeCycleState.CLOSED, containerManager.getContainer(container2.containerID()).getState());
}
+ /**
+ * Tests that a DELETING RATIS container transitions to CLOSED if a non-empty CLOSED replica is reported. It does not
+ * transition if a non-empty CLOSING replica is reported.
+ */
+ @Test
+ public void ratisContainerShouldTransitionFromDeletingToClosedWhenNonEmptyClosedReplica() throws IOException {
+ ContainerInfo container = getContainer(LifeCycleState.DELETING);
+ containerStateManager.addContainer(container.getProtobuf());
+
+ // set up a non-empty CLOSED replica
+ DatanodeDetails dnWithClosedReplica = nodeManager.getNodes(NodeStatus.inServiceHealthy()).get(0);
+ ContainerReplicaProto.Builder builder = ContainerReplicaProto.newBuilder();
+ ContainerReplicaProto closedReplica = builder.setContainerID(container.getContainerID())
+ .setIsEmpty(false)
+ .setState(ContainerReplicaProto.State.CLOSED)
+ .setKeyCount(0)
+ .setBlockCommitSequenceId(123)
+ .setOriginNodeId(dnWithClosedReplica.getUuidString()).build();
+
+ // set up a non-empty CLOSING replica
+ DatanodeDetails dnWithClosingReplica = nodeManager.getNodes(NodeStatus.inServiceHealthy()).get(1);
+ ContainerReplicaProto closingReplica = builder.setState(ContainerReplicaProto.State.CLOSING)
+ .setOriginNodeId(dnWithClosingReplica.getUuidString()).build();
+
+ // should not transition on processing the CLOSING replica's report
+ ContainerReportHandler containerReportHandler = new ContainerReportHandler(nodeManager, containerManager);
+ ContainerReportsProto closingContainerReport = getContainerReports(closingReplica);
+ containerReportHandler
+ .onMessage(new ContainerReportFromDatanode(dnWithClosingReplica, closingContainerReport), publisher);
+
+ assertEquals(LifeCycleState.DELETING, containerStateManager.getContainer(container.containerID()).getState());
+
+ // should transition on processing the CLOSED replica's report
+ ContainerReportsProto closedContainerReport = getContainerReports(closedReplica);
+ containerReportHandler
+ .onMessage(new ContainerReportFromDatanode(dnWithClosedReplica, closedContainerReport), publisher);
+ assertEquals(LifeCycleState.CLOSED, containerStateManager.getContainer(container.containerID()).getState());
+ }
+
+ @Test
+ public void ratisContainerShouldNotTransitionFromDeletingToClosedWhenEmptyClosedReplica() throws IOException {
+ ContainerInfo container = getContainer(LifeCycleState.DELETING);
+ containerStateManager.addContainer(container.getProtobuf());
+
+ // set up an empty CLOSED replica
+ DatanodeDetails dnWithClosedReplica = nodeManager.getNodes(NodeStatus.inServiceHealthy()).get(0);
+ ContainerReplicaProto.Builder builder = ContainerReplicaProto.newBuilder();
+ ContainerReplicaProto closedReplica = builder.setContainerID(container.getContainerID())
+ .setIsEmpty(true)
+ .setState(ContainerReplicaProto.State.CLOSED)
+ .setKeyCount(0)
+ .setBlockCommitSequenceId(123)
+ .setOriginNodeId(dnWithClosedReplica.getUuidString()).build();
+
+ ContainerReportHandler containerReportHandler = new ContainerReportHandler(nodeManager, containerManager);
+ ContainerReportsProto closedContainerReport = getContainerReports(closedReplica);
+ containerReportHandler
+ .onMessage(new ContainerReportFromDatanode(dnWithClosedReplica, closedContainerReport), publisher);
+ assertEquals(LifeCycleState.DELETING, containerStateManager.getContainer(container.containerID()).getState());
+ }
+
+ /**
+ * Tests that a DELETING EC container transitions to CLOSED if a non-empty CLOSED replica is reported. It does not
+ * transition if a non-empty CLOSING (or any other state) replica is reported.
+ */
+ @Test
+ public void ecContainerShouldTransitionFromDeletingToClosedWhenNonEmptyClosedReplica() throws IOException {
+ ContainerInfo container = getECContainer(LifeCycleState.DELETING, PipelineID.randomId(),
+ new ECReplicationConfig(6, 3));
+ containerStateManager.addContainer(container.getProtobuf());
+
+ // set up a non-empty CLOSED replica
+ DatanodeDetails dnWithClosedReplica = nodeManager.getNodes(NodeStatus.inServiceHealthy()).get(0);
+ ContainerReplicaProto.Builder builder = ContainerReplicaProto.newBuilder();
+ ContainerReplicaProto closedReplica = builder.setContainerID(container.getContainerID())
+ .setIsEmpty(false)
+ .setState(ContainerReplicaProto.State.CLOSED)
+ .setKeyCount(0)
+ .setBlockCommitSequenceId(0)
+ .setReplicaIndex(1)
+ .setOriginNodeId(dnWithClosedReplica.getUuidString()).build();
+
+ // set up a non-empty CLOSING replica
+ DatanodeDetails dnWithClosingReplica = nodeManager.getNodes(NodeStatus.inServiceHealthy()).get(1);
+ ContainerReplicaProto closingReplica = builder.setState(ContainerReplicaProto.State.CLOSING).setReplicaIndex(2)
+ .setOriginNodeId(dnWithClosingReplica.getUuidString()).build();
+
+ // should not transition on processing the CLOSING replica's report
+ ContainerReportHandler containerReportHandler = new ContainerReportHandler(nodeManager, containerManager);
+ ContainerReportsProto closingContainerReport = getContainerReports(closingReplica);
+ containerReportHandler
+ .onMessage(new ContainerReportFromDatanode(dnWithClosingReplica, closingContainerReport), publisher);
+ assertEquals(LifeCycleState.DELETING, containerStateManager.getContainer(container.containerID()).getState());
+
+ // should transition on processing the CLOSED replica's report
+ ContainerReportsProto closedContainerReport = getContainerReports(closedReplica);
+ containerReportHandler
+ .onMessage(new ContainerReportFromDatanode(dnWithClosedReplica, closedContainerReport), publisher);
+ assertEquals(LifeCycleState.CLOSED, containerStateManager.getContainer(container.containerID()).getState());
+ }
+
/**
* Creates the required number of DNs that will hold a replica each for the
* specified EC container. Registers these DNs with the NodeManager, adds this
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
index 27505c6dd3b..a7043d02642 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers.InvalidContainerStateException;
import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
@@ -50,6 +51,8 @@
import org.junit.jupiter.api.io.TempDir;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.when;
@@ -148,6 +151,47 @@ public void checkReplicationStateMissingReplica()
assertEquals(3, c1.getReplicationConfig().getRequiredNodes());
}
+ @Test
+ public void testTransitionDeletingToClosedState() throws IOException {
+ HddsProtos.ContainerInfoProto.Builder builder = HddsProtos.ContainerInfoProto.newBuilder();
+ builder.setContainerID(1)
+ .setState(HddsProtos.LifeCycleState.DELETING)
+ .setUsedBytes(0)
+ .setNumberOfKeys(0)
+ .setOwner("root")
+ .setReplicationType(HddsProtos.ReplicationType.RATIS)
+ .setReplicationFactor(ReplicationFactor.THREE);
+
+ HddsProtos.ContainerInfoProto container = builder.build();
+ HddsProtos.ContainerID cid = HddsProtos.ContainerID.newBuilder().setId(container.getContainerID()).build();
+ containerStateManager.addContainer(container);
+ containerStateManager.transitionDeletingToClosedState(cid);
+ assertEquals(HddsProtos.LifeCycleState.CLOSED, containerStateManager.getContainer(ContainerID.getFromProtobuf(cid))
+ .getState());
+ }
+
+ @Test
+ public void testTransitionDeletingToClosedStateAllowsOnlyDeletingContainer() throws IOException {
+ HddsProtos.ContainerInfoProto.Builder builder = HddsProtos.ContainerInfoProto.newBuilder();
+ builder.setContainerID(1)
+ .setState(HddsProtos.LifeCycleState.QUASI_CLOSED)
+ .setUsedBytes(0)
+ .setNumberOfKeys(0)
+ .setOwner("root")
+ .setReplicationType(HddsProtos.ReplicationType.RATIS)
+ .setReplicationFactor(ReplicationFactor.THREE);
+
+ HddsProtos.ContainerInfoProto container = builder.build();
+ HddsProtos.ContainerID cid = HddsProtos.ContainerID.newBuilder().setId(container.getContainerID()).build();
+ containerStateManager.addContainer(container);
+ try {
+ containerStateManager.transitionDeletingToClosedState(cid);
+ fail("Was expecting an Exception, but did not catch any.");
+ } catch (IOException e) {
+ assertInstanceOf(InvalidContainerStateException.class, e.getCause().getCause());
+ }
+ }
+
private void addReplica(ContainerInfo cont, DatanodeDetails node) {
ContainerReplica replica = ContainerReplica.newBuilder()
.setContainerID(cont.containerID())
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReportHandling.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReportHandling.java
new file mode 100644
index 00000000000..c7d142ac2f4
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReportHandling.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.emptyMap;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.ozone.container.TestHelper.waitForContainerClose;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Tests for container report handling.
+ */
+public class TestContainerReportHandling {
+ private static final String VOLUME = "vol1";
+ private static final String BUCKET = "bucket1";
+ private static final String KEY = "key1";
+
+ /**
+ * Tests that a DELETING container moves to the CLOSED state if a non-empty CLOSED replica is reported. To do this,
+ * the test first creates a key and closes its corresponding container. Then it moves that container to the
+ * DELETING state using ContainerManager. Then it restarts a Datanode hosting that container, making it send a full
+ * container report. Then the test waits for the container to move from DELETING to CLOSED.
+ */
+ @Test
+ void testDeletingContainerTransitionsToClosedWhenNonEmptyReplicaIsReported() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+ conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, TimeUnit.SECONDS);
+
+ Path clusterPath = null;
+ try (MiniOzoneCluster cluster = newCluster(conf)) {
+ cluster.waitForClusterToBeReady();
+ clusterPath = Paths.get(cluster.getBaseDir());
+
+ try (OzoneClient client = cluster.newClient()) {
+ // create a container and close it
+ createTestData(client);
+ List keyLocations = lookupKey(cluster);
+ assertThat(keyLocations).isNotEmpty();
+ OmKeyLocationInfo keyLocation = keyLocations.get(0);
+ ContainerID containerID = ContainerID.valueOf(keyLocation.getContainerID());
+ waitForContainerClose(cluster, containerID.getId());
+
+ // move the container to DELETING
+ ContainerManager containerManager = cluster.getStorageContainerManager().getContainerManager();
+ containerManager.updateContainerState(containerID, HddsProtos.LifeCycleEvent.DELETE);
+ assertEquals(HddsProtos.LifeCycleState.DELETING, containerManager.getContainer(containerID).getState());
+
+ // restart a DN and wait for the container to get CLOSED.
+ HddsDatanodeService dn = cluster.getHddsDatanode(keyLocation.getPipeline().getFirstNode());
+ cluster.restartHddsDatanode(dn.getDatanodeDetails(), false);
+ GenericTestUtils.waitFor(() -> {
+ try {
+ return containerManager.getContainer(containerID).getState() == HddsProtos.LifeCycleState.CLOSED;
+ } catch (ContainerNotFoundException e) {
+ fail(e);
+ }
+ return false;
+ }, 2000, 20000);
+
+ assertEquals(HddsProtos.LifeCycleState.CLOSED, containerManager.getContainer(containerID).getState());
+ }
+ } finally {
+ if (clusterPath != null) {
+ System.out.println("Deleting path " + clusterPath);
+ boolean deleted = FileUtil.fullyDelete(clusterPath.toFile());
+ assertTrue(deleted);
+ }
+ }
+ }
+
+ private static MiniOzoneCluster newCluster(OzoneConfiguration conf)
+ throws IOException {
+ return MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(3)
+ .build();
+ }
+
+ private static List lookupKey(MiniOzoneCluster cluster)
+ throws IOException {
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+ .setVolumeName(VOLUME)
+ .setBucketName(BUCKET)
+ .setKeyName(KEY)
+ .build();
+ OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+ OmKeyLocationInfoGroup locations = keyInfo.getLatestVersionLocations();
+ assertNotNull(locations);
+ return locations.getLocationList();
+ }
+
+ private void createTestData(OzoneClient client) throws IOException {
+ ObjectStore objectStore = client.getObjectStore();
+ objectStore.createVolume(VOLUME);
+ OzoneVolume volume = objectStore.getVolume(VOLUME);
+ volume.createBucket(BUCKET);
+
+ OzoneBucket bucket = volume.getBucket(BUCKET);
+
+ try (OutputStream out = bucket.createKey(KEY, 0,
+ RatisReplicationConfig.getInstance(THREE), emptyMap())) {
+ out.write("Hello".getBytes(UTF_8));
+ }
+ }
+
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReportHandlingWithHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReportHandlingWithHA.java
new file mode 100644
index 00000000000..82b89961f96
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReportHandlingWithHA.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.emptyMap;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.ozone.container.TestHelper.waitForContainerClose;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Tests for container report handling with SCM High Availability.
+ */
+public class TestContainerReportHandlingWithHA {
+ private static final String VOLUME = "vol1";
+ private static final String BUCKET = "bucket1";
+ private static final String KEY = "key1";
+
+ /**
+ * Tests that a DELETING container moves to the CLOSED state if a non-empty CLOSED replica is reported. To do this,
+ * the test first creates a key and closes its corresponding container. Then it moves that container to the
+ * DELETING state using ContainerManager. Then it restarts a Datanode hosting that container, making it send a full
+ * container report. Then the test waits for the container to move from DELETING to CLOSED in all SCMs.
+ */
+ @Test
+ void testDeletingContainerTransitionsToClosedWhenNonEmptyReplicaIsReportedWithScmHA() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+ conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, TimeUnit.SECONDS);
+
+ int numSCM = 3;
+ Path clusterPath = null;
+ try (MiniOzoneHAClusterImpl cluster = newHACluster(conf, numSCM)) {
+ cluster.waitForClusterToBeReady();
+ clusterPath = Paths.get(cluster.getBaseDir());
+
+ try (OzoneClient client = cluster.newClient()) {
+ // create a container and close it
+ createTestData(client);
+ List keyLocations = lookupKey(cluster);
+ assertThat(keyLocations).isNotEmpty();
+ OmKeyLocationInfo keyLocation = keyLocations.get(0);
+ ContainerID containerID = ContainerID.valueOf(keyLocation.getContainerID());
+ waitForContainerClose(cluster, containerID.getId());
+
+ // move the container to DELETING
+ ContainerManager containerManager = cluster.getScmLeader().getContainerManager();
+ containerManager.updateContainerState(containerID, HddsProtos.LifeCycleEvent.DELETE);
+ assertEquals(HddsProtos.LifeCycleState.DELETING, containerManager.getContainer(containerID).getState());
+
+ // restart a DN and wait for the container to get CLOSED in all SCMs
+ HddsDatanodeService dn = cluster.getHddsDatanode(keyLocation.getPipeline().getFirstNode());
+ cluster.restartHddsDatanode(dn.getDatanodeDetails(), false);
+ ContainerManager[] array = new ContainerManager[numSCM];
+ for (int i = 0; i < numSCM; i++) {
+ array[i] = cluster.getStorageContainerManager(i).getContainerManager();
+ }
+ GenericTestUtils.waitFor(() -> {
+ try {
+ for (ContainerManager manager : array) {
+ if (manager.getContainer(containerID).getState() != HddsProtos.LifeCycleState.CLOSED) {
+ return false;
+ }
+ }
+ return true;
+ } catch (ContainerNotFoundException e) {
+ fail(e);
+ }
+ return false;
+ }, 2000, 20000);
+
+ assertEquals(HddsProtos.LifeCycleState.CLOSED, containerManager.getContainer(containerID).getState());
+ }
+ } finally {
+ if (clusterPath != null) {
+ boolean deleted = FileUtil.fullyDelete(clusterPath.toFile());
+ assertTrue(deleted);
+ }
+ }
+ }
+
+ private static MiniOzoneHAClusterImpl newHACluster(OzoneConfiguration conf, int numSCM) throws IOException {
+ return MiniOzoneCluster.newHABuilder(conf)
+ .setOMServiceId("om-service")
+ .setSCMServiceId("scm-service")
+ .setNumOfOzoneManagers(1)
+ .setNumOfStorageContainerManagers(numSCM)
+ .build();
+ }
+
+ private static List lookupKey(MiniOzoneCluster cluster)
+ throws IOException {
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+ .setVolumeName(VOLUME)
+ .setBucketName(BUCKET)
+ .setKeyName(KEY)
+ .build();
+ OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+ OmKeyLocationInfoGroup locations = keyInfo.getLatestVersionLocations();
+ assertNotNull(locations);
+ return locations.getLocationList();
+ }
+
+ private void createTestData(OzoneClient client) throws IOException {
+ ObjectStore objectStore = client.getObjectStore();
+ objectStore.createVolume(VOLUME);
+ OzoneVolume volume = objectStore.getVolume(VOLUME);
+ volume.createBucket(BUCKET);
+
+ OzoneBucket bucket = volume.getBucket(BUCKET);
+
+ try (OutputStream out = bucket.createKey(KEY, 0,
+ RatisReplicationConfig.getInstance(THREE), emptyMap())) {
+ out.write("Hello".getBytes(UTF_8));
+ }
+ }
+
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
index 2a33ddc5677..cb8173b2f07 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
@@ -41,8 +41,10 @@
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.io.BlockDataStreamOutputEntry;
@@ -329,14 +331,17 @@ public static void waitForContainerClose(MiniOzoneCluster cluster,
Long... containerIdList)
throws ContainerNotFoundException, PipelineNotFoundException,
TimeoutException, InterruptedException {
+ StorageContainerManager scm;
+ if (cluster instanceof MiniOzoneHAClusterImpl) {
+ MiniOzoneHAClusterImpl haCluster = (MiniOzoneHAClusterImpl) cluster;
+ scm = haCluster.getScmLeader();
+ } else {
+ scm = cluster.getStorageContainerManager();
+ }
List pipelineList = new ArrayList<>();
for (long containerID : containerIdList) {
- ContainerInfo container =
- cluster.getStorageContainerManager().getContainerManager()
- .getContainer(ContainerID.valueOf(containerID));
- Pipeline pipeline =
- cluster.getStorageContainerManager().getPipelineManager()
- .getPipeline(container.getPipelineID());
+ ContainerInfo container = scm.getContainerManager().getContainer(ContainerID.valueOf(containerID));
+ Pipeline pipeline = scm.getPipelineManager().getPipeline(container.getPipelineID());
pipelineList.add(pipeline);
List datanodes = pipeline.getNodes();
@@ -352,9 +357,7 @@ public static void waitForContainerClose(MiniOzoneCluster cluster,
// make sure the container gets created first
assertFalse(isContainerClosed(cluster, containerID, details));
// send the order to close the container
- cluster.getStorageContainerManager().getEventQueue()
- .fireEvent(SCMEvents.CLOSE_CONTAINER,
- ContainerID.valueOf(containerID));
+ scm.getEventQueue().fireEvent(SCMEvents.CLOSE_CONTAINER, ContainerID.valueOf(containerID));
}
}
int index = 0;