diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
index d0fb94a9401b..aeb96cbf3196 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
@@ -287,9 +287,6 @@ private boolean updateContainerState(final DatanodeDetails datanode,
}
if (replica.getState() == State.CLOSED) {
- Preconditions.checkArgument(replica.getBlockCommitSequenceId()
- == container.getSequenceId());
-
/*
For an EC container, only the first index and the parity indexes are
guaranteed to have block data. So, update the container's state in SCM
@@ -305,8 +302,14 @@ private boolean updateContainerState(final DatanodeDetails datanode,
}
}
- logger.info("Moving container {} to CLOSED state, datanode {} " +
- "reported CLOSED replica with index {}.", containerId, datanode,
+ if (!verifyBcsId(replica.getBlockCommitSequenceId(), container.getSequenceId(), datanode, containerId)) {
+ logger.warn("Ignored moving container {} from CLOSING to CLOSED state because replica bcsId ({}) " +
+ "reported by datanode {} does not match sequenceId ({}).",
+ containerId, replica.getBlockCommitSequenceId(), datanode, container.getSequenceId());
+ return true;
+ }
+ logger.info("Moving container {} from CLOSING to CLOSED state, datanode {} " +
+ "reported CLOSED replica with index {}.", containerId, datanode,
replica.getReplicaIndex());
containerManager.updateContainerState(containerId,
LifeCycleEvent.CLOSE);
@@ -330,10 +333,15 @@ private boolean updateContainerState(final DatanodeDetails datanode,
*
*/
if (replica.getState() == State.CLOSED) {
- logger.info("Moving container {} to CLOSED state, datanode {} " +
- "reported CLOSED replica.", containerId, datanode);
- Preconditions.checkArgument(replica.getBlockCommitSequenceId()
- == container.getSequenceId());
+ if (!verifyBcsId(replica.getBlockCommitSequenceId(), container.getSequenceId(), datanode, containerId)) {
+ logger.warn("Ignored moving container {} from QUASI_CLOSED to CLOSED state because replica bcsId ({}) " +
+ "reported by datanode {} does not match sequenceId ({}).",
+ containerId, replica.getBlockCommitSequenceId(), datanode, container.getSequenceId());
+ return true;
+ }
+ logger.info("Moving container {} from QUASI_CLOSED to CLOSED state, datanode {} " +
+ "reported CLOSED replica with index {}.", containerId, datanode,
+ replica.getReplicaIndex());
containerManager.updateContainerState(containerId,
LifeCycleEvent.FORCE_CLOSE);
}
@@ -372,6 +380,32 @@ private boolean updateContainerState(final DatanodeDetails datanode,
return ignored;
}
+ /**
+ * Helper method to verify that the replica's bcsId matches the container's in SCM.
+ * Throws IOException if the bcsIds do not match.
+ *
+ * @param replicaBcsId Replica bcsId
+ * @param containerBcsId Container bcsId in SCM
+ * @param datanode DatanodeDetails for logging
+ * @param containerId ContainerID for logging
+ * @return true if verification has passed, false otherwise
+ */
+ private boolean verifyBcsId(long replicaBcsId, long containerBcsId,
+ DatanodeDetails datanode, ContainerID containerId) {
+
+ if (replicaBcsId != containerBcsId) {
+ final String errMsg = "Unexpected bcsId for container " + containerId +
+ " from datanode " + datanode + ". replica's: " + replicaBcsId +
+ ", SCM's: " + containerBcsId +
+ ". Ignoring container report for " + containerId;
+
+ logger.error(errMsg);
+ return false;
+ } else {
+ return true;
+ }
+ }
+
private void updateContainerReplica(final DatanodeDetails datanodeDetails,
final ContainerID containerId,
final ContainerReplicaProto replicaProto)
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index 2041c83b368a..f1a9091a991c 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -23,6 +23,7 @@
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getECContainer;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getReplicas;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -74,6 +75,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
/**
* Test the behaviour of the ContainerReportHandler.
@@ -166,7 +169,7 @@ private void testReplicaIndexUpdate(ContainerInfo container,
Map expectedReplicaMap) {
final ContainerReportsProto containerReport = getContainerReportsProto(
container.containerID(), ContainerReplicaProto.State.CLOSED,
- dn.getUuidString(), 2000000000L, 100000000L, replicaIndex);
+ dn.getUuidString(), 2000000000L, 100000000L, 10000L, replicaIndex);
final ContainerReportFromDatanode containerReportFromDatanode =
new ContainerReportFromDatanode(dn, containerReport);
final ContainerReportHandler reportHandler = new ContainerReportHandler(
@@ -601,7 +604,7 @@ private void createAndHandleContainerReport(ContainerID containerID,
@Test
public void testClosingToQuasiClosed()
- throws NodeNotFoundException, IOException, TimeoutException {
+ throws NodeNotFoundException, IOException {
/*
* The container is in CLOSING state and all the replicas are in
* OPEN/CLOSING state.
@@ -668,7 +671,7 @@ public void testClosingToQuasiClosed()
@Test
public void testQuasiClosedToClosed()
- throws NodeNotFoundException, IOException, TimeoutException {
+ throws NodeNotFoundException, IOException {
/*
* The container is in QUASI_CLOSED state.
* - One of the replica is in QUASI_CLOSED state
@@ -737,6 +740,52 @@ public void testQuasiClosedToClosed()
assertEquals(LifeCycleState.CLOSED, containerManager.getContainer(containerOne.containerID()).getState());
}
+ @ParameterizedTest
+ @EnumSource(value = LifeCycleState.class, names = {"CLOSING", "QUASI_CLOSED"})
+ public void testContainerStateTransitionToClosedWithMismatchingBCSID(LifeCycleState lcState)
+ throws NodeNotFoundException, IOException {
+ /*
+ * Negative test. When a replica with a (lower) mismatching bcsId gets reported,
+ * expect the ContainerReportHandler thread to not throw uncaught exception.
+ * (That exception lead to ContainerReportHandler thread crash before HDDS-12150.)
+ */
+ final ContainerReportHandler reportHandler =
+ new ContainerReportHandler(nodeManager, containerManager);
+ final Iterator nodeIterator =
+ nodeManager.getNodes(NodeStatus.inServiceHealthy()).iterator();
+
+ final DatanodeDetails dn1 = nodeIterator.next();
+ final DatanodeDetails dn2 = nodeIterator.next();
+ final DatanodeDetails dn3 = nodeIterator.next();
+
+ // Initial sequenceId 10000L is set here
+ final ContainerInfo container1 = getContainer(lcState);
+
+ nodeManager.addContainer(dn1, container1.containerID());
+ nodeManager.addContainer(dn2, container1.containerID());
+ nodeManager.addContainer(dn3, container1.containerID());
+
+ containerStateManager.addContainer(container1.getProtobuf());
+
+ // Generate container report with replica in CLOSED state with intentional lower bcsId
+ final ContainerReportsProto containerReport = getContainerReportsProto(
+ container1.containerID(), ContainerReplicaProto.State.CLOSED,
+ dn1.getUuidString(),
+ 2000L);
+ final ContainerReportFromDatanode containerReportFromDatanode =
+ new ContainerReportFromDatanode(dn1, containerReport);
+
+ // Handler should NOT throw IllegalArgumentException
+ try {
+ reportHandler.onMessage(containerReportFromDatanode, publisher);
+ } catch (IllegalArgumentException iaEx) {
+ fail("Handler should not throw IllegalArgumentException: " + iaEx.getMessage());
+ }
+
+ // Because the container report is ignored, the container remains in the same previous state in SCM
+ assertEquals(lcState, containerManager.getContainer(container1.containerID()).getState());
+ }
+
@Test
public void openContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas()
throws IOException, TimeoutException {
@@ -1092,7 +1141,7 @@ private ContainerReportFromDatanode getContainerReportFromDatanode(
DatanodeDetails dn, long bytesUsed, long keyCount, int replicaIndex) {
ContainerReportsProto containerReport = getContainerReportsProto(
containerId, state, dn.getUuidString(), bytesUsed, keyCount,
- replicaIndex);
+ 10000L, replicaIndex);
return new ContainerReportFromDatanode(dn, containerReport);
}
@@ -1101,20 +1150,34 @@ protected static ContainerReportsProto getContainerReportsProto(
final ContainerID containerId, final ContainerReplicaProto.State state,
final String originNodeId) {
return getContainerReportsProto(containerId, state, originNodeId,
- 2000000000L, 100000000L, 0);
+ 2000000000L, 100000000L, 10000L, 0);
+ }
+
+ protected static ContainerReportsProto getContainerReportsProto(
+ final ContainerID containerId, final ContainerReplicaProto.State state,
+ final String originNodeId, final long bcsId) {
+ return getContainerReportsProto(containerId, state, originNodeId,
+ 2000000000L, 100000000L, bcsId, 0);
}
protected static ContainerReportsProto getContainerReportsProto(
final ContainerID containerId, final ContainerReplicaProto.State state,
final String originNodeId, int replicaIndex) {
return getContainerReportsProto(containerId, state, originNodeId,
- 2000000000L, 100000000L, replicaIndex);
+ 2000000000L, 100000000L, 10000L, replicaIndex);
+ }
+
+ protected static ContainerReportsProto getContainerReportsProto(
+ final ContainerID containerId, final ContainerReplicaProto.State state,
+ final String originNodeId, final long bcsId, int replicaIndex) {
+ return getContainerReportsProto(containerId, state, originNodeId,
+ 2000000000L, 100000000L, bcsId, replicaIndex);
}
protected static ContainerReportsProto getContainerReportsProto(
final ContainerID containerId, final ContainerReplicaProto.State state,
final String originNodeId, final long usedBytes, final long keyCount,
- final int replicaIndex) {
+ final long bcsId, final int replicaIndex) {
final ContainerReportsProto.Builder crBuilder =
ContainerReportsProto.newBuilder();
final ContainerReplicaProto replicaProto =
@@ -1130,7 +1193,7 @@ protected static ContainerReportsProto getContainerReportsProto(
.setWriteCount(100000000L)
.setReadBytes(2000000000L)
.setWriteBytes(2000000000L)
- .setBlockCommitSequenceId(10000L)
+ .setBlockCommitSequenceId(bcsId)
.setDeleteTransactionId(0)
.setReplicaIndex(replicaIndex)
.build();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
index a4872c1f7456..fa92fa1f7749 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
@@ -27,6 +27,7 @@
import static org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager.maxLayoutVersion;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -87,6 +88,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
/**
* Test cases to verify the functionality of IncrementalContainerReportHandler.
@@ -339,7 +342,7 @@ private List setupECContainerForTesting(
}
@Test
- public void testClosingToQuasiClosed() throws IOException, TimeoutException {
+ public void testClosingToQuasiClosed() throws IOException {
final IncrementalContainerReportHandler reportHandler =
new IncrementalContainerReportHandler(
nodeManager, containerManager, scmContext);
@@ -372,7 +375,7 @@ public void testClosingToQuasiClosed() throws IOException, TimeoutException {
}
@Test
- public void testQuasiClosedToClosed() throws IOException, TimeoutException {
+ public void testQuasiClosedToClosed() throws IOException {
final IncrementalContainerReportHandler reportHandler =
new IncrementalContainerReportHandler(
nodeManager, containerManager, scmContext);
@@ -407,6 +410,59 @@ public void testQuasiClosedToClosed() throws IOException, TimeoutException {
assertEquals(LifeCycleState.CLOSED, containerManager.getContainer(container.containerID()).getState());
}
+ @ParameterizedTest
+ @EnumSource(value = LifeCycleState.class, names = {"CLOSING", "QUASI_CLOSED"})
+ public void testContainerStateTransitionToClosedWithMismatchingBCSID(LifeCycleState lcState) throws IOException {
+ /*
+ * Negative test. When a replica with a (lower) mismatching bcsId gets reported,
+ * expect the ContainerReportHandler thread to not throw uncaught exception.
+ * (That exception lead to ContainerReportHandler thread crash before HDDS-12150.)
+ */
+ final IncrementalContainerReportHandler reportHandler =
+ new IncrementalContainerReportHandler(nodeManager, containerManager, scmContext);
+
+ // Initial sequenceId 10000L is set here
+ final ContainerInfo container = getContainer(lcState);
+ final DatanodeDetails datanodeOne = randomDatanodeDetails();
+ final DatanodeDetails datanodeTwo = randomDatanodeDetails();
+ final DatanodeDetails datanodeThree = randomDatanodeDetails();
+ nodeManager.register(datanodeOne, null, null);
+ nodeManager.register(datanodeTwo, null, null);
+ nodeManager.register(datanodeThree, null, null);
+
+ final Set containerReplicas = getReplicas(
+ container.containerID(),
+ ContainerReplicaProto.State.CLOSING,
+ datanodeOne, datanodeTwo);
+ containerReplicas.addAll(getReplicas(
+ container.containerID(),
+ ContainerReplicaProto.State.QUASI_CLOSED,
+ datanodeThree));
+
+ containerStateManager.addContainer(container.getProtobuf());
+ containerReplicas.forEach(r -> containerStateManager.updateContainerReplica(
+ container.containerID(), r));
+
+ // Generate incremental container report with replica in CLOSED state with intentional lower bcsId
+ final IncrementalContainerReportProto containerReport =
+ getIncrementalContainerReportProto(container.containerID(),
+ CLOSED, datanodeThree.getUuidString(), false, 0,
+ 2000L);
+ final IncrementalContainerReportFromDatanode icr =
+ new IncrementalContainerReportFromDatanode(
+ datanodeOne, containerReport);
+
+ // Handler should NOT throw IllegalArgumentException
+ try {
+ reportHandler.onMessage(icr, publisher);
+ } catch (IllegalArgumentException iaEx) {
+ fail("Handler should not throw IllegalArgumentException: " + iaEx.getMessage());
+ }
+
+ // Because the container report is ignored, the container remains in the same previous state in SCM
+ assertEquals(lcState, containerManager.getContainer(container.containerID()).getState());
+ }
+
@Test
public void testOpenWithUnhealthyReplica() throws IOException {
final IncrementalContainerReportHandler reportHandler =
@@ -580,11 +636,23 @@ public void testICRFCRRace() throws IOException, NodeNotFoundException,
private static IncrementalContainerReportProto
getIncrementalContainerReportProto(
- final ContainerID containerId,
- final ContainerReplicaProto.State state,
- final String originNodeId,
- final boolean hasReplicaIndex,
- final int replicaIndex) {
+ final ContainerID containerId,
+ final ContainerReplicaProto.State state,
+ final String originNodeId,
+ final boolean hasReplicaIndex,
+ final int replicaIndex) {
+ return getIncrementalContainerReportProto(containerId, state, originNodeId,
+ hasReplicaIndex, replicaIndex, 10000L);
+ }
+
+ private static IncrementalContainerReportProto
+ getIncrementalContainerReportProto(
+ final ContainerID containerId,
+ final ContainerReplicaProto.State state,
+ final String originNodeId,
+ final boolean hasReplicaIndex,
+ final int replicaIndex,
+ final long bcsId) {
final ContainerReplicaProto.Builder replicaProto =
ContainerReplicaProto.newBuilder()
.setContainerID(containerId.getId())
@@ -598,7 +666,7 @@ public void testICRFCRRace() throws IOException, NodeNotFoundException,
.setWriteCount(100000000L)
.setReadBytes(2000000000L)
.setWriteBytes(2000000000L)
- .setBlockCommitSequenceId(10000L)
+ .setBlockCommitSequenceId(bcsId)
.setDeleteTransactionId(0);
if (hasReplicaIndex) {
replicaProto.setReplicaIndex(replicaIndex);