Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,6 @@ private boolean updateContainerState(final DatanodeDetails datanode,
}

if (replica.getState() == State.CLOSED) {
Preconditions.checkArgument(replica.getBlockCommitSequenceId()
== container.getSequenceId());

/*
For an EC container, only the first index and the parity indexes are
guaranteed to have block data. So, update the container's state in SCM
Expand All @@ -305,8 +302,14 @@ private boolean updateContainerState(final DatanodeDetails datanode,
}
}

logger.info("Moving container {} to CLOSED state, datanode {} " +
"reported CLOSED replica with index {}.", containerId, datanode,
if (!verifyBcsId(replica.getBlockCommitSequenceId(), container.getSequenceId(), datanode, containerId)) {
logger.warn("Ignored moving container {} from CLOSING to CLOSED state because replica bcsId ({}) " +
"reported by datanode {} does not match sequenceId ({}).",
containerId, replica.getBlockCommitSequenceId(), datanode, container.getSequenceId());
return true;
}
logger.info("Moving container {} from CLOSING to CLOSED state, datanode {} " +
"reported CLOSED replica with index {}.", containerId, datanode,
replica.getReplicaIndex());
containerManager.updateContainerState(containerId,
LifeCycleEvent.CLOSE);
Expand All @@ -330,10 +333,15 @@ private boolean updateContainerState(final DatanodeDetails datanode,
*
*/
if (replica.getState() == State.CLOSED) {
logger.info("Moving container {} to CLOSED state, datanode {} " +
"reported CLOSED replica.", containerId, datanode);
Preconditions.checkArgument(replica.getBlockCommitSequenceId()
== container.getSequenceId());
if (!verifyBcsId(replica.getBlockCommitSequenceId(), container.getSequenceId(), datanode, containerId)) {
logger.warn("Ignored moving container {} from QUASI_CLOSED to CLOSED state because replica bcsId ({}) " +
"reported by datanode {} does not match sequenceId ({}).",
containerId, replica.getBlockCommitSequenceId(), datanode, container.getSequenceId());
return true;
}
logger.info("Moving container {} from QUASI_CLOSED to CLOSED state, datanode {} " +
"reported CLOSED replica with index {}.", containerId, datanode,
replica.getReplicaIndex());
containerManager.updateContainerState(containerId,
LifeCycleEvent.FORCE_CLOSE);
}
Expand Down Expand Up @@ -372,6 +380,32 @@ private boolean updateContainerState(final DatanodeDetails datanode,
return ignored;
}

/**
* Helper method to verify that the replica's bcsId matches the container's in SCM.
* Throws IOException if the bcsIds do not match.
* <p>
* @param replicaBcsId Replica bcsId
* @param containerBcsId Container bcsId in SCM
* @param datanode DatanodeDetails for logging
* @param containerId ContainerID for logging
* @return true if verification has passed, false otherwise
*/
private boolean verifyBcsId(long replicaBcsId, long containerBcsId,
DatanodeDetails datanode, ContainerID containerId) {

if (replicaBcsId != containerBcsId) {
final String errMsg = "Unexpected bcsId for container " + containerId +
" from datanode " + datanode + ". replica's: " + replicaBcsId +
", SCM's: " + containerBcsId +
". Ignoring container report for " + containerId;

logger.error(errMsg);
return false;
} else {
return true;
}
}

private void updateContainerReplica(final DatanodeDetails datanodeDetails,
final ContainerID containerId,
final ContainerReplicaProto replicaProto)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getECContainer;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getReplicas;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -74,6 +75,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/**
* Test the behaviour of the ContainerReportHandler.
Expand Down Expand Up @@ -166,7 +169,7 @@ private void testReplicaIndexUpdate(ContainerInfo container,
Map<DatanodeDetails, Integer> expectedReplicaMap) {
final ContainerReportsProto containerReport = getContainerReportsProto(
container.containerID(), ContainerReplicaProto.State.CLOSED,
dn.getUuidString(), 2000000000L, 100000000L, replicaIndex);
dn.getUuidString(), 2000000000L, 100000000L, 10000L, replicaIndex);
final ContainerReportFromDatanode containerReportFromDatanode =
new ContainerReportFromDatanode(dn, containerReport);
final ContainerReportHandler reportHandler = new ContainerReportHandler(
Expand Down Expand Up @@ -601,7 +604,7 @@ private void createAndHandleContainerReport(ContainerID containerID,

@Test
public void testClosingToQuasiClosed()
throws NodeNotFoundException, IOException, TimeoutException {
throws NodeNotFoundException, IOException {
/*
* The container is in CLOSING state and all the replicas are in
* OPEN/CLOSING state.
Expand Down Expand Up @@ -668,7 +671,7 @@ public void testClosingToQuasiClosed()

@Test
public void testQuasiClosedToClosed()
throws NodeNotFoundException, IOException, TimeoutException {
throws NodeNotFoundException, IOException {
/*
* The container is in QUASI_CLOSED state.
* - One of the replica is in QUASI_CLOSED state
Expand Down Expand Up @@ -737,6 +740,52 @@ public void testQuasiClosedToClosed()
assertEquals(LifeCycleState.CLOSED, containerManager.getContainer(containerOne.containerID()).getState());
}

@ParameterizedTest
@EnumSource(value = LifeCycleState.class, names = {"CLOSING", "QUASI_CLOSED"})
public void testContainerStateTransitionToClosedWithMismatchingBCSID(LifeCycleState lcState)
throws NodeNotFoundException, IOException {
/*
* Negative test. When a replica with a (lower) mismatching bcsId gets reported,
* expect the ContainerReportHandler thread to not throw uncaught exception.
* (That exception lead to ContainerReportHandler thread crash before HDDS-12150.)
*/
final ContainerReportHandler reportHandler =
new ContainerReportHandler(nodeManager, containerManager);
final Iterator<DatanodeDetails> nodeIterator =
nodeManager.getNodes(NodeStatus.inServiceHealthy()).iterator();

final DatanodeDetails dn1 = nodeIterator.next();
final DatanodeDetails dn2 = nodeIterator.next();
final DatanodeDetails dn3 = nodeIterator.next();

// Initial sequenceId 10000L is set here
final ContainerInfo container1 = getContainer(lcState);

nodeManager.addContainer(dn1, container1.containerID());
nodeManager.addContainer(dn2, container1.containerID());
nodeManager.addContainer(dn3, container1.containerID());

containerStateManager.addContainer(container1.getProtobuf());

// Generate container report with replica in CLOSED state with intentional lower bcsId
final ContainerReportsProto containerReport = getContainerReportsProto(
container1.containerID(), ContainerReplicaProto.State.CLOSED,
dn1.getUuidString(),
2000L);
final ContainerReportFromDatanode containerReportFromDatanode =
new ContainerReportFromDatanode(dn1, containerReport);

// Handler should NOT throw IllegalArgumentException
try {
reportHandler.onMessage(containerReportFromDatanode, publisher);
} catch (IllegalArgumentException iaEx) {
fail("Handler should not throw IllegalArgumentException: " + iaEx.getMessage());
}

// Because the container report is ignored, the container remains in the same previous state in SCM
assertEquals(lcState, containerManager.getContainer(container1.containerID()).getState());
}

@Test
public void openContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas()
throws IOException, TimeoutException {
Expand Down Expand Up @@ -1092,7 +1141,7 @@ private ContainerReportFromDatanode getContainerReportFromDatanode(
DatanodeDetails dn, long bytesUsed, long keyCount, int replicaIndex) {
ContainerReportsProto containerReport = getContainerReportsProto(
containerId, state, dn.getUuidString(), bytesUsed, keyCount,
replicaIndex);
10000L, replicaIndex);

return new ContainerReportFromDatanode(dn, containerReport);
}
Expand All @@ -1101,20 +1150,34 @@ protected static ContainerReportsProto getContainerReportsProto(
final ContainerID containerId, final ContainerReplicaProto.State state,
final String originNodeId) {
return getContainerReportsProto(containerId, state, originNodeId,
2000000000L, 100000000L, 0);
2000000000L, 100000000L, 10000L, 0);
}

protected static ContainerReportsProto getContainerReportsProto(
final ContainerID containerId, final ContainerReplicaProto.State state,
final String originNodeId, final long bcsId) {
return getContainerReportsProto(containerId, state, originNodeId,
2000000000L, 100000000L, bcsId, 0);
}

protected static ContainerReportsProto getContainerReportsProto(
final ContainerID containerId, final ContainerReplicaProto.State state,
final String originNodeId, int replicaIndex) {
return getContainerReportsProto(containerId, state, originNodeId,
2000000000L, 100000000L, replicaIndex);
2000000000L, 100000000L, 10000L, replicaIndex);
}

protected static ContainerReportsProto getContainerReportsProto(
final ContainerID containerId, final ContainerReplicaProto.State state,
final String originNodeId, final long bcsId, int replicaIndex) {
return getContainerReportsProto(containerId, state, originNodeId,
2000000000L, 100000000L, bcsId, replicaIndex);
}

protected static ContainerReportsProto getContainerReportsProto(
final ContainerID containerId, final ContainerReplicaProto.State state,
final String originNodeId, final long usedBytes, final long keyCount,
final int replicaIndex) {
final long bcsId, final int replicaIndex) {
final ContainerReportsProto.Builder crBuilder =
ContainerReportsProto.newBuilder();
final ContainerReplicaProto replicaProto =
Expand All @@ -1130,7 +1193,7 @@ protected static ContainerReportsProto getContainerReportsProto(
.setWriteCount(100000000L)
.setReadBytes(2000000000L)
.setWriteBytes(2000000000L)
.setBlockCommitSequenceId(10000L)
.setBlockCommitSequenceId(bcsId)
.setDeleteTransactionId(0)
.setReplicaIndex(replicaIndex)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager.maxLayoutVersion;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -87,6 +88,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/**
* Test cases to verify the functionality of IncrementalContainerReportHandler.
Expand Down Expand Up @@ -339,7 +342,7 @@ private List<DatanodeDetails> setupECContainerForTesting(
}

@Test
public void testClosingToQuasiClosed() throws IOException, TimeoutException {
public void testClosingToQuasiClosed() throws IOException {
final IncrementalContainerReportHandler reportHandler =
new IncrementalContainerReportHandler(
nodeManager, containerManager, scmContext);
Expand Down Expand Up @@ -372,7 +375,7 @@ public void testClosingToQuasiClosed() throws IOException, TimeoutException {
}

@Test
public void testQuasiClosedToClosed() throws IOException, TimeoutException {
public void testQuasiClosedToClosed() throws IOException {
final IncrementalContainerReportHandler reportHandler =
new IncrementalContainerReportHandler(
nodeManager, containerManager, scmContext);
Expand Down Expand Up @@ -407,6 +410,59 @@ public void testQuasiClosedToClosed() throws IOException, TimeoutException {
assertEquals(LifeCycleState.CLOSED, containerManager.getContainer(container.containerID()).getState());
}

@ParameterizedTest
@EnumSource(value = LifeCycleState.class, names = {"CLOSING", "QUASI_CLOSED"})
public void testContainerStateTransitionToClosedWithMismatchingBCSID(LifeCycleState lcState) throws IOException {
/*
* Negative test. When a replica with a (lower) mismatching bcsId gets reported,
* expect the ContainerReportHandler thread to not throw uncaught exception.
* (That exception lead to ContainerReportHandler thread crash before HDDS-12150.)
*/
final IncrementalContainerReportHandler reportHandler =
new IncrementalContainerReportHandler(nodeManager, containerManager, scmContext);

// Initial sequenceId 10000L is set here
final ContainerInfo container = getContainer(lcState);
final DatanodeDetails datanodeOne = randomDatanodeDetails();
final DatanodeDetails datanodeTwo = randomDatanodeDetails();
final DatanodeDetails datanodeThree = randomDatanodeDetails();
nodeManager.register(datanodeOne, null, null);
nodeManager.register(datanodeTwo, null, null);
nodeManager.register(datanodeThree, null, null);

final Set<ContainerReplica> containerReplicas = getReplicas(
container.containerID(),
ContainerReplicaProto.State.CLOSING,
datanodeOne, datanodeTwo);
containerReplicas.addAll(getReplicas(
container.containerID(),
ContainerReplicaProto.State.QUASI_CLOSED,
datanodeThree));

containerStateManager.addContainer(container.getProtobuf());
containerReplicas.forEach(r -> containerStateManager.updateContainerReplica(
container.containerID(), r));

// Generate incremental container report with replica in CLOSED state with intentional lower bcsId
final IncrementalContainerReportProto containerReport =
getIncrementalContainerReportProto(container.containerID(),
CLOSED, datanodeThree.getUuidString(), false, 0,
2000L);
final IncrementalContainerReportFromDatanode icr =
new IncrementalContainerReportFromDatanode(
datanodeOne, containerReport);

// Handler should NOT throw IllegalArgumentException
try {
reportHandler.onMessage(icr, publisher);
} catch (IllegalArgumentException iaEx) {
fail("Handler should not throw IllegalArgumentException: " + iaEx.getMessage());
}

// Because the container report is ignored, the container remains in the same previous state in SCM
assertEquals(lcState, containerManager.getContainer(container.containerID()).getState());
}

@Test
public void testOpenWithUnhealthyReplica() throws IOException {
final IncrementalContainerReportHandler reportHandler =
Expand Down Expand Up @@ -580,11 +636,23 @@ public void testICRFCRRace() throws IOException, NodeNotFoundException,

private static IncrementalContainerReportProto
getIncrementalContainerReportProto(
final ContainerID containerId,
final ContainerReplicaProto.State state,
final String originNodeId,
final boolean hasReplicaIndex,
final int replicaIndex) {
final ContainerID containerId,
final ContainerReplicaProto.State state,
final String originNodeId,
final boolean hasReplicaIndex,
final int replicaIndex) {
return getIncrementalContainerReportProto(containerId, state, originNodeId,
hasReplicaIndex, replicaIndex, 10000L);
}

private static IncrementalContainerReportProto
getIncrementalContainerReportProto(
final ContainerID containerId,
final ContainerReplicaProto.State state,
final String originNodeId,
final boolean hasReplicaIndex,
final int replicaIndex,
final long bcsId) {
final ContainerReplicaProto.Builder replicaProto =
ContainerReplicaProto.newBuilder()
.setContainerID(containerId.getId())
Expand All @@ -598,7 +666,7 @@ public void testICRFCRRace() throws IOException, NodeNotFoundException,
.setWriteCount(100000000L)
.setReadBytes(2000000000L)
.setWriteBytes(2000000000L)
.setBlockCommitSequenceId(10000L)
.setBlockCommitSequenceId(bcsId)
.setDeleteTransactionId(0);
if (hasReplicaIndex) {
replicaProto.setReplicaIndex(replicaIndex);
Expand Down