diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index 48603c001df5..ff37283bdd9b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -114,31 +114,36 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode, } final ContainerReportsProto containerReport = reportFromDatanode.getReport(); - try { - final List replicas = - containerReport.getReportsList(); - final Set containersInSCM = - nodeManager.getContainers(datanodeDetails); - - final Set containersInDn = replicas.parallelStream() - .map(ContainerReplicaProto::getContainerID) - .map(ContainerID::valueOf).collect(Collectors.toSet()); - - final Set missingReplicas = new HashSet<>(containersInSCM); - missingReplicas.removeAll(containersInDn); - - processContainerReplicas(datanodeDetails, replicas, publisher); - processMissingReplicas(datanodeDetails, missingReplicas); - updateDeleteTransaction(datanodeDetails, replicas, publisher); - - /* - * Update the latest set of containers for this datanode in - * NodeManager - */ - nodeManager.setContainers(datanodeDetails, containersInDn); - - containerManager.notifyContainerReportProcessing(true, true); + // HDDS-5249 - we must ensure that an ICR and FCR for the same datanode + // do not run at the same time or it can result in a data consistency + // issue between the container list in NodeManager and the replicas in + // ContainerManager. + synchronized (datanodeDetails) { + final List replicas = + containerReport.getReportsList(); + final Set containersInSCM = + nodeManager.getContainers(datanodeDetails); + + final Set containersInDn = replicas.parallelStream() + .map(ContainerReplicaProto::getContainerID) + .map(ContainerID::valueOf).collect(Collectors.toSet()); + + final Set missingReplicas = new HashSet<>(containersInSCM); + missingReplicas.removeAll(containersInDn); + + processContainerReplicas(datanodeDetails, replicas, publisher); + processMissingReplicas(datanodeDetails, missingReplicas); + updateDeleteTransaction(datanodeDetails, replicas, publisher); + + /* + * Update the latest set of containers for this datanode in + * NodeManager + */ + nodeManager.setContainers(datanodeDetails, containersInDn); + + containerManager.notifyContainerReportProcessing(true, true); + } } catch (NodeNotFoundException ex) { containerManager.notifyContainerReportProcessing(true, false); LOG.error("Received container report from unknown datanode {}.", diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java index 1ad507dd1b10..3b54dffb303f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java @@ -71,31 +71,37 @@ public void onMessage(final IncrementalContainerReportFromDatanode report, } boolean success = true; - for (ContainerReplicaProto replicaProto : - report.getReport().getReportList()) { - try { - final ContainerID id = ContainerID.valueOf( - replicaProto.getContainerID()); - if (!replicaProto.getState().equals( - ContainerReplicaProto.State.DELETED)) { - nodeManager.addContainer(dd, id); + // HDDS-5249 - we must ensure that an ICR and FCR for the same datanode + // do not run at the same time or it can result in a data consistency + // issue between the container list in NodeManager and the replicas in + // ContainerManager. + synchronized (dd) { + for (ContainerReplicaProto replicaProto : + report.getReport().getReportList()) { + try { + final ContainerID id = ContainerID.valueOf( + replicaProto.getContainerID()); + if (!replicaProto.getState().equals( + ContainerReplicaProto.State.DELETED)) { + nodeManager.addContainer(dd, id); + } + processContainerReplica(dd, replicaProto, publisher); + } catch (ContainerNotFoundException e) { + success = false; + LOG.warn("Container {} not found!", replicaProto.getContainerID()); + } catch (NodeNotFoundException ex) { + success = false; + LOG.error("Received ICR from unknown datanode {}", + report.getDatanodeDetails(), ex); + } catch (ContainerReplicaNotFoundException e) { + success = false; + LOG.warn("Container {} replica not found!", + replicaProto.getContainerID()); + } catch (IOException | InvalidStateTransitionException e) { + success = false; + LOG.error("Exception while processing ICR for container {}", + replicaProto.getContainerID(), e); } - processContainerReplica(dd, replicaProto, publisher); - } catch (ContainerNotFoundException e) { - success = false; - LOG.warn("Container {} not found!", replicaProto.getContainerID()); - } catch (NodeNotFoundException ex) { - success = false; - LOG.error("Received ICR from unknown datanode {}", - report.getDatanodeDetails(), ex); - } catch (ContainerReplicaNotFoundException e){ - success = false; - LOG.warn("Container {} replica not found!", - replicaProto.getContainerID()); - } catch (IOException | InvalidStateTransitionException e) { - success = false; - LOG.error("Exception while processing ICR for container {}", - replicaProto.getContainerID(), e); } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java index b02d51846685..a72ab61fb429 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java @@ -663,14 +663,14 @@ private ContainerReportFromDatanode getContainerReportFromDatanode( return new ContainerReportFromDatanode(dn, containerReport); } - private static ContainerReportsProto getContainerReportsProto( + protected static ContainerReportsProto getContainerReportsProto( final ContainerID containerId, final ContainerReplicaProto.State state, final String originNodeId) { return getContainerReportsProto(containerId, state, originNodeId, 2000000000L, 100000000L); } - private static ContainerReportsProto getContainerReportsProto( + protected static ContainerReportsProto getContainerReportsProto( final ContainerID containerId, final ContainerReplicaProto.State state, final String originNodeId, final long usedBytes, final long keyCount) { final ContainerReportsProto.Builder crBuilder = diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java index 7c0c1eccd4c7..cde7f3d75a2a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.protocol.proto @@ -31,6 +32,8 @@ import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.SCMNodeManager; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .IncrementalContainerReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; @@ -49,8 +52,13 @@ import java.nio.file.Paths; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED; import static org.apache.hadoop.hdds.scm.TestUtils.getContainer; import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas; @@ -109,6 +117,15 @@ public void setup() throws IOException, InvalidStateTransitionException { Mockito.any(ContainerID.class), Mockito.any(HddsProtos.LifeCycleEvent.class)); + Mockito.doAnswer(invocation -> { + containerStateManager + .updateContainerReplica((ContainerID)invocation.getArguments()[0], + (ContainerReplica) invocation.getArguments()[1]); + return null; + }).when(containerManager).updateContainerReplica( + Mockito.any(ContainerID.class), + Mockito.any(ContainerReplica.class)); + } @After @@ -227,7 +244,7 @@ public void testQuasiClosedToClosed() throws IOException { final IncrementalContainerReportProto containerReport = getIncrementalContainerReportProto(container.containerID(), - ContainerReplicaProto.State.CLOSED, + CLOSED, datanodeThree.getUuidString()); final IncrementalContainerReportFromDatanode icr = new IncrementalContainerReportFromDatanode( @@ -250,7 +267,7 @@ public void testDeleteContainer() throws IOException { nodeManager.register(datanodeThree, null, null); final Set containerReplicas = getReplicas( container.containerID(), - ContainerReplicaProto.State.CLOSED, + CLOSED, datanodeOne, datanodeTwo, datanodeThree); containerStateManager.loadContainer(container); @@ -276,6 +293,81 @@ public void testDeleteContainer() throws IOException { .getContainerReplicas(container.containerID()).size()); } + @Test + // HDDS-5249 - This test reproduces the race condition mentioned in the Jira + // until the code was changed to fix the race condition. + public void testICRFCRRace() throws IOException, NodeNotFoundException, + ExecutionException, InterruptedException { + final IncrementalContainerReportHandler reportHandler = + new IncrementalContainerReportHandler( + nodeManager, containerManager, scmContext); + final ContainerReportHandler fullReportHandler = + new ContainerReportHandler(nodeManager, containerManager); + + final ContainerInfo container = getContainer(LifeCycleState.CLOSED); + final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED); + final DatanodeDetails datanode = randomDatanodeDetails(); + nodeManager.register(datanode, null, null); + + containerStateManager.loadContainer(container); + containerStateManager.loadContainer(containerTwo); + + Assert.assertEquals(0, nodeManager.getContainers(datanode).size()); + + final IncrementalContainerReportProto containerReport = + getIncrementalContainerReportProto(container.containerID(), + CLOSED, + datanode.getUuidString()); + final IncrementalContainerReportFromDatanode icr = + new IncrementalContainerReportFromDatanode( + datanode, containerReport); + + final ContainerReportsProto fullReport = TestContainerReportHandler + .getContainerReportsProto(containerTwo.containerID(), CLOSED, + datanode.getUuidString()); + final ContainerReportFromDatanode fcr =new ContainerReportFromDatanode( + datanode, fullReport); + + // We need to run the FCR and ICR at the same time via the executor so we + // can try to simulate the race condition. + ThreadPoolExecutor executor = + (ThreadPoolExecutor)Executors.newFixedThreadPool(2); + try { + // Running this test 10 times to ensure the race condition we are testing + // for does not occur. In local tests, before the code was fixed, this + // test failed consistently every time (reproducing the issue). + for (int i=0; i<10; i++) { + Future t1 = + executor.submit(() -> fullReportHandler.onMessage(fcr, publisher)); + Future t2 = + executor.submit(() -> reportHandler.onMessage(icr, publisher)); + t1.get(); + t2.get(); + + Set nmContainers = nodeManager.getContainers(datanode); + if (nmContainers.contains(container.containerID())) { + // If we find "container" in the NM, then we must also have it in + // Container Manager. + Assert.assertEquals(1, containerStateManager + .getContainerReplicas(container.containerID()).size()); + Assert.assertEquals(2, nmContainers.size()); + } else { + // If the race condition occurs as mentioned in HDDS-5249, then this + // assert should fail. We will have found nothing for "container" in + // NM, but have found something for it in ContainerManager, and that + // should not happen. It should be in both, or neither. + Assert.assertEquals(0, containerStateManager + .getContainerReplicas(container.containerID()).size()); + Assert.assertEquals(1, nmContainers.size()); + } + Assert.assertEquals(1, containerStateManager + .getContainerReplicas(containerTwo.containerID()).size()); + } + } finally { + executor.shutdown(); + } + } + private static IncrementalContainerReportProto getIncrementalContainerReportProto( final ContainerID containerId,