diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java index 59be36b0d2b9..56a51d70be81 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java @@ -63,7 +63,7 @@ public class AbstractContainerReportHandler { * * @throws IOException In case of any Exception while processing the report */ - void processContainerReplica(final DatanodeDetails datanodeDetails, + protected void processContainerReplica(final DatanodeDetails datanodeDetails, final ContainerReplicaProto replicaProto) throws IOException { final ContainerID containerId = ContainerID diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java index f9488e222eb9..973026de487f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java @@ -74,6 +74,8 @@ public interface ContainerManager extends Closeable { ContainerInfo getContainer(ContainerID containerID) throws ContainerNotFoundException; + boolean exists(ContainerID containerID); + /** * Returns containers under certain conditions. * Search container IDs from start ID(exclusive), diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index cefc185c58ca..526e4b37f3ac 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -301,15 +301,22 @@ ContainerInfo allocateContainer( .setReplicationFactor(pipeline.getFactor()) .setReplicationType(pipeline.getType()) .build(); + addContainerInfo(containerID, containerInfo, pipelineManager, pipeline); + if (LOG.isTraceEnabled()) { + LOG.trace("New container allocated: {}", containerInfo); + } + return containerInfo; + } + + public void addContainerInfo(long containerID, + ContainerInfo containerInfo, + PipelineManager pipelineManager, + Pipeline pipeline) throws IOException { Preconditions.checkNotNull(containerInfo); containers.addContainer(containerInfo); pipelineManager.addContainerToPipeline(pipeline.getId(), ContainerID.valueof(containerID)); containerStateCount.incrementAndGet(containerInfo.getState()); - if (LOG.isTraceEnabled()) { - LOG.trace("New container allocated: {}", containerInfo); - } - return containerInfo; } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java index b58100066a3d..c3fd94faef44 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java @@ -90,4 +90,7 @@ public void onMessage(final IncrementalContainerReportFromDatanode report, } + protected NodeManager getNodeManager() { + return this.nodeManager; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java index e08dd8ce81a6..9624ce0f6c20 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java @@ -194,6 +194,18 @@ public ContainerInfo getContainer(final ContainerID containerID) return containerStateManager.getContainer(containerID); } + @Override + public boolean exists(ContainerID containerID) { + lock.lock(); + try { + return (containerStateManager.getContainer(containerID) != null); + } catch (ContainerNotFoundException e) { + return false; + } finally { + lock.unlock(); + } + } + /** * {@inheritDoc} */ @@ -441,7 +453,7 @@ public ContainerInfo getMatchingContainer(final long sizeRequired, * @param containerInfo * @throws IOException */ - private void addContainerToDB(ContainerInfo containerInfo) + protected void addContainerToDB(ContainerInfo containerInfo) throws IOException { try { final byte[] containerIDBytes = Longs.toByteArray( @@ -583,4 +595,12 @@ protected File getContainerDBPath(Configuration conf) { File metaDir = ServerUtils.getScmDbDir(conf); return new File(metaDir, SCM_CONTAINER_DB); } + + protected PipelineManager getPipelineManager() { + return pipelineManager; + } + + public Lock getLock() { + return lock; + } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java index af2ae60c1387..a751da84b70f 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java @@ -24,7 +24,10 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.SCMContainerManager; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.ozone.recon.ReconUtils; @@ -55,4 +58,29 @@ protected File getContainerDBPath(Configuration conf) { File metaDir = ReconUtils.getReconScmDbDir(conf); return new File(metaDir, RECON_SCM_CONTAINER_DB); } + + /** + * Adds a new container to Recon's container manager. + * @param containerId id + * @param containerWithPipeline containerInfo with pipeline info + * @throws IOException on Error. + */ + public void addNewContainer(long containerId, + ContainerWithPipeline containerWithPipeline) + throws IOException { + ContainerInfo containerInfo = containerWithPipeline.getContainerInfo(); + getLock().lock(); + try { + getContainerStateManager().addContainerInfo(containerId, containerInfo, + getPipelineManager(), containerWithPipeline.getPipeline()); + addContainerToDB(containerInfo); + } catch (IOException ex) { + getPipelineManager().removeContainerFromPipeline( + containerInfo.getPipelineID(), + new ContainerID(containerInfo.getContainerID())); + throw ex; + } finally { + getLock().unlock(); + } + } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java new file mode 100644 index 000000000000..d50156e9abe5 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.scm; + +import java.io.IOException; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Recon ICR handler. + */ +public class ReconIncrementalContainerReportHandler + extends IncrementalContainerReportHandler { + + private static final Logger LOG = LoggerFactory.getLogger( + ReconIncrementalContainerReportHandler.class); + + private StorageContainerServiceProvider scmClient; + + public ReconIncrementalContainerReportHandler(NodeManager nodeManager, + ContainerManager containerManager, + StorageContainerServiceProvider scmClient) { + super(nodeManager, containerManager); + this.scmClient = scmClient; + } + + @Override + public void onMessage(final IncrementalContainerReportFromDatanode report, + final EventPublisher publisher) { + if (LOG.isDebugEnabled()) { + LOG.debug("Processing incremental container report from data node {}", + report.getDatanodeDetails()); + } + + ReconContainerManager containerManager = + (ReconContainerManager) getContainerManager(); + boolean success = true; + for (ContainerReplicaProto replicaProto : + report.getReport().getReportList()) { + try { + final DatanodeDetails dd = report.getDatanodeDetails(); + final ContainerID id = ContainerID.valueof( + replicaProto.getContainerID()); + if (!getContainerManager().exists(id)) { + LOG.info("New container {} got from {}.", id, + report.getDatanodeDetails()); + try { + ContainerWithPipeline containerWithPipeline = + scmClient.getContainerWithPipeline(id.getId()); + containerManager.addNewContainer(id.getId(), containerWithPipeline); + } catch (IOException ioEx) { + LOG.error("Exception while getting new container info from SCM", + ioEx); + return; + } + } + getNodeManager().addContainer(dd, id); + processContainerReplica(dd, replicaProto); + } catch (ContainerNotFoundException e) { + success = false; + LOG.warn("Container {} not found!", replicaProto.getContainerID()); + } catch (NodeNotFoundException ex) { + success = false; + LOG.error("Received ICR from unknown datanode {} {}", + report.getDatanodeDetails(), ex); + } catch (IOException e) { + success = false; + LOG.error("Exception while processing ICR for container {}", + replicaProto.getContainerID()); + } + } + getContainerManager().notifyContainerReportProcessing(false, success); + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java index db57e997099d..cf5756f54af2 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java @@ -114,7 +114,8 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf, ContainerReportHandler containerReportHandler = new ContainerReportHandler(scmNodeManager, containerManager); IncrementalContainerReportHandler icrHandler = - new IncrementalContainerReportHandler(scmNodeManager, containerManager); + new ReconIncrementalContainerReportHandler(scmNodeManager, + containerManager, scmServiceProvider); CloseContainerEventHandler closeContainerHandler = new CloseContainerEventHandler(pipelineManager, containerManager); ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java index 23db57996ea0..925f46ea4c73 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; /** @@ -43,4 +44,13 @@ public interface StorageContainerServiceProvider { * @throws IOException in case of exception */ Pipeline getPipeline(HddsProtos.PipelineID pipelineID) throws IOException; + + /** + * Requests SCM for a container given ID. + * @param containerId containerId + * @return ContainerInfo + Pipeline info + * @throws IOException in case of any exception. + */ + ContainerWithPipeline getContainerWithPipeline(long containerId) + throws IOException; } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java index 77cdb4bb8187..6ee131858a90 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java @@ -24,6 +24,7 @@ import javax.inject.Inject; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider; @@ -54,4 +55,9 @@ public Pipeline getPipeline(HddsProtos.PipelineID pipelineID) return scmClient.getPipeline(pipelineID); } + @Override + public ContainerWithPipeline getContainerWithPipeline(long containerId) + throws IOException { + return scmClient.getContainerWithPipeline(containerId); + } } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java new file mode 100644 index 000000000000..d7745c4df113 --- /dev/null +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.scm; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS; + +import java.io.IOException; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; +import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.SCMNodeManager; +import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; +import org.apache.hadoop.hdds.server.events.EventQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +/** + * Abstract class for Recon Container Manager related tests. + */ +public class AbstractReconContainerManagerTest { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private OzoneConfiguration conf; + private SCMStorageConfig scmStorageConfig; + private ReconPipelineManager pipelineManager; + private ReconContainerManager containerManager; + + @Before + public void setUp() throws Exception { + conf = new OzoneConfiguration(); + conf.set(OZONE_METADATA_DIRS, + temporaryFolder.newFolder().getAbsolutePath()); + conf.set(OZONE_SCM_NAMES, "localhost"); + scmStorageConfig = new ReconStorageConfig(conf); + NetworkTopology clusterMap = new NetworkTopologyImpl(conf); + EventQueue eventQueue = new EventQueue(); + NodeManager nodeManager = + new SCMNodeManager(conf, scmStorageConfig, eventQueue, clusterMap); + pipelineManager = new ReconPipelineManager(conf, nodeManager, eventQueue); + containerManager = new ReconContainerManager(conf, pipelineManager); + } + + @After + public void tearDown() throws IOException { + containerManager.close(); + pipelineManager.close(); + } + + protected OzoneConfiguration getConf() { + return conf; + } + + protected ReconPipelineManager getPipelineManager() { + return pipelineManager; + } + + protected ReconContainerManager getContainerManager() { + return containerManager; + } + +} diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java new file mode 100644 index 000000000000..742a2581d3ac --- /dev/null +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.scm;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.STAND_ALONE;
+import static org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest.getRandomPipeline;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.junit.Test;
+
+/**
+ * Test Recon Container Manager.
+ */
+public class TestReconContainerManager
+ extends AbstractReconContainerManagerTest {
+
+ @Test
+ public void testAddNewContainer() throws IOException {
+ ContainerID containerID = new ContainerID(100L);
+ Pipeline pipeline = getRandomPipeline();
+ ReconPipelineManager pipelineManager = getPipelineManager();
+ pipelineManager.addPipeline(pipeline);
+ ContainerInfo containerInfo =
+ new ContainerInfo.Builder()
+ .setContainerID(containerID.getId())
+ .setNumberOfKeys(10)
+ .setPipelineID(pipeline.getId())
+ .setReplicationFactor(ONE)
+ .setOwner("test")
+ .setState(OPEN)
+ .setReplicationType(STAND_ALONE)
+ .build();
+ ContainerWithPipeline containerWithPipeline =
+ new ContainerWithPipeline(containerInfo, pipeline);
+
+ ReconContainerManager containerManager = getContainerManager();
+ assertFalse(containerManager.exists(containerID));
+
+ containerManager.addNewContainer(
+ containerID.getId(), containerWithPipeline);
+
+ assertTrue(containerManager.exists(containerID));
+
+ List
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.scm;
+
+import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.STAND_ALONE;
+import static org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest.getRandomPipeline;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
+import org.junit.Test;
+
+/**
+ * Test Recon ICR handler.
+ */
+public class TestReconIncrementalContainerReportHandler
+ extends AbstractReconContainerManagerTest {
+
+ @Test
+ public void testProcessICR() throws IOException, NodeNotFoundException {
+
+ Pipeline pipeline = getRandomPipeline();
+ ReconPipelineManager pipelineManager = getPipelineManager();
+ pipelineManager.addPipeline(pipeline);
+
+ ContainerID containerID = new ContainerID(100L);
+ ContainerInfo containerInfo =
+ new ContainerInfo.Builder()
+ .setContainerID(containerID.getId())
+ .setNumberOfKeys(10)
+ .setPipelineID(pipeline.getId())
+ .setReplicationFactor(ONE)
+ .setOwner("test")
+ .setState(OPEN)
+ .setReplicationType(STAND_ALONE)
+ .build();
+ ContainerWithPipeline containerWithPipeline =
+ new ContainerWithPipeline(containerInfo, pipeline);
+
+ StorageContainerServiceProvider scmServiceProviderMock = mock(
+ StorageContainerServiceProvider.class);
+ when(scmServiceProviderMock.getContainerWithPipeline(100L))
+ .thenReturn(containerWithPipeline);
+
+ DatanodeDetails datanodeDetails = randomDatanodeDetails();
+ IncrementalContainerReportFromDatanode reportMock =
+ mock(IncrementalContainerReportFromDatanode.class);
+ when(reportMock.getDatanodeDetails()).thenReturn(datanodeDetails);
+ IncrementalContainerReportProto containerReport =
+ getIncrementalContainerReportProto(containerID,
+ State.OPEN,
+ datanodeDetails.getUuidString());
+ when(reportMock.getReport()).thenReturn(containerReport);
+
+ NodeManager nodeManagerMock = mock(NodeManager.class);
+
+ ReconContainerManager containerManager = getContainerManager();
+ ReconIncrementalContainerReportHandler recconIcr =
+ new ReconIncrementalContainerReportHandler(nodeManagerMock,
+ containerManager, scmServiceProviderMock);
+ EventPublisher eventPublisherMock = mock(EventPublisher.class);
+
+ recconIcr.onMessage(reportMock, eventPublisherMock);
+ verify(nodeManagerMock, times(1))
+ .addContainer(datanodeDetails, containerID);
+ assertTrue(containerManager.exists(containerID));
+ assertEquals(1, containerManager.getContainerReplicas(containerID).size());
+ }
+
+ private static IncrementalContainerReportProto
+ getIncrementalContainerReportProto(final ContainerID containerId,
+ final State state,
+ final String originNodeId) {
+ final IncrementalContainerReportProto.Builder crBuilder =
+ IncrementalContainerReportProto.newBuilder();
+ final ContainerReplicaProto replicaProto =
+ ContainerReplicaProto.newBuilder()
+ .setContainerID(containerId.getId())
+ .setState(state)
+ .setOriginNodeId(originNodeId)
+ .build();
+ return crBuilder.addReport(replicaProto).build();
+ }
+}
\ No newline at end of file