Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,31 +114,36 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
}
final ContainerReportsProto containerReport =
reportFromDatanode.getReport();

try {
final List<ContainerReplicaProto> replicas =
containerReport.getReportsList();
final Set<ContainerID> containersInSCM =
nodeManager.getContainers(datanodeDetails);

final Set<ContainerID> containersInDn = replicas.parallelStream()
.map(ContainerReplicaProto::getContainerID)
.map(ContainerID::valueOf).collect(Collectors.toSet());

final Set<ContainerID> missingReplicas = new HashSet<>(containersInSCM);
missingReplicas.removeAll(containersInDn);

processContainerReplicas(datanodeDetails, replicas, publisher);
processMissingReplicas(datanodeDetails, missingReplicas);
updateDeleteTransaction(datanodeDetails, replicas, publisher);

/*
* Update the latest set of containers for this datanode in
* NodeManager
*/
nodeManager.setContainers(datanodeDetails, containersInDn);

containerManager.notifyContainerReportProcessing(true, true);
// HDDS-5249 - we must ensure that an ICR and FCR for the same datanode
// do not run at the same time or it can result in a data consistency
// issue between the container list in NodeManager and the replicas in
// ContainerManager.
synchronized (datanodeDetails) {
final List<ContainerReplicaProto> replicas =
containerReport.getReportsList();
final Set<ContainerID> containersInSCM =
nodeManager.getContainers(datanodeDetails);

final Set<ContainerID> containersInDn = replicas.parallelStream()
.map(ContainerReplicaProto::getContainerID)
.map(ContainerID::valueOf).collect(Collectors.toSet());

final Set<ContainerID> missingReplicas = new HashSet<>(containersInSCM);
missingReplicas.removeAll(containersInDn);

processContainerReplicas(datanodeDetails, replicas, publisher);
processMissingReplicas(datanodeDetails, missingReplicas);
updateDeleteTransaction(datanodeDetails, replicas, publisher);

/*
* Update the latest set of containers for this datanode in
* NodeManager
*/
nodeManager.setContainers(datanodeDetails, containersInDn);

containerManager.notifyContainerReportProcessing(true, true);
}
} catch (NodeNotFoundException ex) {
containerManager.notifyContainerReportProcessing(true, false);
LOG.error("Received container report from unknown datanode {}.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,31 +71,37 @@ public void onMessage(final IncrementalContainerReportFromDatanode report,
}

boolean success = true;
for (ContainerReplicaProto replicaProto :
report.getReport().getReportList()) {
try {
final ContainerID id = ContainerID.valueOf(
replicaProto.getContainerID());
if (!replicaProto.getState().equals(
ContainerReplicaProto.State.DELETED)) {
nodeManager.addContainer(dd, id);
// HDDS-5249 - we must ensure that an ICR and FCR for the same datanode
// do not run at the same time or it can result in a data consistency
// issue between the container list in NodeManager and the replicas in
// ContainerManager.
synchronized (dd) {
for (ContainerReplicaProto replicaProto :
report.getReport().getReportList()) {
try {
final ContainerID id = ContainerID.valueOf(
replicaProto.getContainerID());
if (!replicaProto.getState().equals(
ContainerReplicaProto.State.DELETED)) {
nodeManager.addContainer(dd, id);
}
processContainerReplica(dd, replicaProto, publisher);
} catch (ContainerNotFoundException e) {
success = false;
LOG.warn("Container {} not found!", replicaProto.getContainerID());
} catch (NodeNotFoundException ex) {
success = false;
LOG.error("Received ICR from unknown datanode {}",
report.getDatanodeDetails(), ex);
} catch (ContainerReplicaNotFoundException e) {
success = false;
LOG.warn("Container {} replica not found!",
replicaProto.getContainerID());
} catch (IOException | InvalidStateTransitionException e) {
success = false;
LOG.error("Exception while processing ICR for container {}",
replicaProto.getContainerID(), e);
}
processContainerReplica(dd, replicaProto, publisher);
} catch (ContainerNotFoundException e) {
success = false;
LOG.warn("Container {} not found!", replicaProto.getContainerID());
} catch (NodeNotFoundException ex) {
success = false;
LOG.error("Received ICR from unknown datanode {}",
report.getDatanodeDetails(), ex);
} catch (ContainerReplicaNotFoundException e){
success = false;
LOG.warn("Container {} replica not found!",
replicaProto.getContainerID());
} catch (IOException | InvalidStateTransitionException e) {
success = false;
LOG.error("Exception while processing ICR for container {}",
replicaProto.getContainerID(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,14 +663,14 @@ private ContainerReportFromDatanode getContainerReportFromDatanode(
return new ContainerReportFromDatanode(dn, containerReport);
}

private static ContainerReportsProto getContainerReportsProto(
protected static ContainerReportsProto getContainerReportsProto(
final ContainerID containerId, final ContainerReplicaProto.State state,
final String originNodeId) {
return getContainerReportsProto(containerId, state, originNodeId,
2000000000L, 100000000L);
}

private static ContainerReportsProto getContainerReportsProto(
protected static ContainerReportsProto getContainerReportsProto(
final ContainerID containerId, final ContainerReplicaProto.State state,
final String originNodeId, final long usedBytes, final long keyCount) {
final ContainerReportsProto.Builder crBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.protocol.proto
Expand All @@ -31,6 +32,8 @@
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.IncrementalContainerReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
Expand All @@ -49,8 +52,13 @@
import java.nio.file.Paths;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;

Expand Down Expand Up @@ -109,6 +117,15 @@ public void setup() throws IOException, InvalidStateTransitionException {
Mockito.any(ContainerID.class),
Mockito.any(HddsProtos.LifeCycleEvent.class));

Mockito.doAnswer(invocation -> {
containerStateManager
.updateContainerReplica((ContainerID)invocation.getArguments()[0],
(ContainerReplica) invocation.getArguments()[1]);
return null;
}).when(containerManager).updateContainerReplica(
Mockito.any(ContainerID.class),
Mockito.any(ContainerReplica.class));

}

@After
Expand Down Expand Up @@ -227,7 +244,7 @@ public void testQuasiClosedToClosed() throws IOException {

final IncrementalContainerReportProto containerReport =
getIncrementalContainerReportProto(container.containerID(),
ContainerReplicaProto.State.CLOSED,
CLOSED,
datanodeThree.getUuidString());
final IncrementalContainerReportFromDatanode icr =
new IncrementalContainerReportFromDatanode(
Expand All @@ -250,7 +267,7 @@ public void testDeleteContainer() throws IOException {
nodeManager.register(datanodeThree, null, null);
final Set<ContainerReplica> containerReplicas = getReplicas(
container.containerID(),
ContainerReplicaProto.State.CLOSED,
CLOSED,
datanodeOne, datanodeTwo, datanodeThree);

containerStateManager.loadContainer(container);
Expand All @@ -276,6 +293,81 @@ public void testDeleteContainer() throws IOException {
.getContainerReplicas(container.containerID()).size());
}

@Test
// HDDS-5249 - This test reproduces the race condition mentioned in the Jira
// until the code was changed to fix the race condition.
public void testICRFCRRace() throws IOException, NodeNotFoundException,
ExecutionException, InterruptedException {
final IncrementalContainerReportHandler reportHandler =
new IncrementalContainerReportHandler(
nodeManager, containerManager, scmContext);
final ContainerReportHandler fullReportHandler =
new ContainerReportHandler(nodeManager, containerManager);

final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED);
final DatanodeDetails datanode = randomDatanodeDetails();
nodeManager.register(datanode, null, null);

containerStateManager.loadContainer(container);
containerStateManager.loadContainer(containerTwo);

Assert.assertEquals(0, nodeManager.getContainers(datanode).size());

final IncrementalContainerReportProto containerReport =
getIncrementalContainerReportProto(container.containerID(),
CLOSED,
datanode.getUuidString());
final IncrementalContainerReportFromDatanode icr =
new IncrementalContainerReportFromDatanode(
datanode, containerReport);

final ContainerReportsProto fullReport = TestContainerReportHandler
.getContainerReportsProto(containerTwo.containerID(), CLOSED,
datanode.getUuidString());
final ContainerReportFromDatanode fcr =new ContainerReportFromDatanode(
datanode, fullReport);

// We need to run the FCR and ICR at the same time via the executor so we
// can try to simulate the race condition.
ThreadPoolExecutor executor =
(ThreadPoolExecutor)Executors.newFixedThreadPool(2);
try {
// Running this test 10 times to ensure the race condition we are testing
// for does not occur. In local tests, before the code was fixed, this
// test failed consistently every time (reproducing the issue).
for (int i=0; i<10; i++) {
Future<?> t1 =
executor.submit(() -> fullReportHandler.onMessage(fcr, publisher));
Future<?> t2 =
executor.submit(() -> reportHandler.onMessage(icr, publisher));
t1.get();
t2.get();

Set<ContainerID> nmContainers = nodeManager.getContainers(datanode);
if (nmContainers.contains(container.containerID())) {
// If we find "container" in the NM, then we must also have it in
// Container Manager.
Assert.assertEquals(1, containerStateManager
.getContainerReplicas(container.containerID()).size());
Assert.assertEquals(2, nmContainers.size());
} else {
// If the race condition occurs as mentioned in HDDS-5249, then this
// assert should fail. We will have found nothing for "container" in
// NM, but have found something for it in ContainerManager, and that
// should not happen. It should be in both, or neither.
Assert.assertEquals(0, containerStateManager
.getContainerReplicas(container.containerID()).size());
Assert.assertEquals(1, nmContainers.size());
}
Assert.assertEquals(1, containerStateManager
.getContainerReplicas(containerTwo.containerID()).size());
}
} finally {
executor.shutdown();
}
}

private static IncrementalContainerReportProto
getIncrementalContainerReportProto(
final ContainerID containerId,
Expand Down