Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.ExitUtil;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.GeneratedMessage;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;

Expand Down Expand Up @@ -246,16 +244,6 @@ public boolean isRunning() {
return true;
}

/**
* Process all the containers immediately.
*/
@VisibleForTesting
@SuppressFBWarnings(value="NN_NAKED_NOTIFY",
justification="Used only for testing")
public synchronized void processContainersNow() {
notifyAll();
}

/**
* Stops Replication Monitor thread.
*/
Expand All @@ -272,22 +260,29 @@ public synchronized void stop() {
}
}

/**
* Process all the containers now, and wait for the processing to complete.
* This in intended to be used in tests.
*/
public synchronized void processAll() {
final long start = clock.millis();
final List<ContainerInfo> containers =
containerManager.getContainers();
containers.forEach(this::processContainer);

LOG.info("Replication Monitor Thread took {} milliseconds for" +
" processing {} containers.", clock.millis() - start,
containers.size());
}

/**
* ReplicationMonitor thread runnable. This wakes up at configured
* interval and processes all the containers in the system.
*/
private synchronized void run() {
try {
while (running) {
final long start = clock.millis();
final List<ContainerInfo> containers =
containerManager.getContainers();
containers.forEach(this::processContainer);

LOG.info("Replication Monitor Thread took {} milliseconds for" +
" processing {} containers.", clock.millis() - start,
containers.size());

processAll();
wait(rmConf.getInterval());
}
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ public void testReplicationManagerRestart() throws InterruptedException {
public void testOpenContainer() throws SCMException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.OPEN);
containerStateManager.loadContainer(container);
replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());

}
Expand Down Expand Up @@ -241,9 +241,9 @@ public void testClosingContainer() throws
final int currentCloseCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand);

replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
Assert.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));

Expand All @@ -252,9 +252,9 @@ public void testClosingContainer() throws
containerStateManager.updateContainerReplica(id, replica);
}

replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
Assert.assertEquals(currentCloseCommandCount + 6, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
}
Expand Down Expand Up @@ -287,9 +287,9 @@ public void testQuasiClosedContainerWithTwoOpenReplica() throws
final int currentCloseCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
// Two of the replicas are in OPEN state
replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
Assert.assertEquals(currentCloseCommandCount + 2, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
Assert.assertTrue(datanodeCommandHandler.received(
Expand Down Expand Up @@ -325,9 +325,9 @@ public void testHealthyQuasiClosedContainer() throws

// All the QUASI_CLOSED replicas have same originNodeId, so the
// container will not be closed. ReplicationManager should take no action.
replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
}

Expand Down Expand Up @@ -366,9 +366,9 @@ public void testQuasiClosedContainerWithUnhealthyReplica()

// All the QUASI_CLOSED replicas have same originNodeId, so the
// container will not be closed. ReplicationManager should take no action.
replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());

// Make the first replica unhealthy
Expand All @@ -377,9 +377,9 @@ public void testQuasiClosedContainerWithUnhealthyReplica()
replicaOne.getDatanodeDetails());
containerStateManager.updateContainerReplica(id, unhealthyReplica);

replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assert.assertTrue(datanodeCommandHandler.received(
Expand All @@ -390,9 +390,9 @@ public void testQuasiClosedContainerWithUnhealthyReplica()
containerStateManager.removeContainerReplica(id, replicaOne);

// The container is under replicated as unhealthy replica is removed
replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);

// We should get replicate command
Assert.assertEquals(currentReplicateCommandCount + 1,
Expand Down Expand Up @@ -428,9 +428,9 @@ public void testOverReplicatedQuasiClosedContainer() throws
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);

replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
}
Expand Down Expand Up @@ -465,9 +465,9 @@ public void testOverReplicatedQuasiClosedContainerWithUnhealthyReplica()
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);

replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assert.assertTrue(datanodeCommandHandler.received(
Expand Down Expand Up @@ -497,9 +497,9 @@ public void testUnderReplicatedQuasiClosedContainer() throws
final int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);

replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
Assert.assertEquals(currentReplicateCommandCount + 1,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
Expand Down Expand Up @@ -545,7 +545,7 @@ public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica()
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);

replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
GenericTestUtils.waitFor(
() -> (currentReplicateCommandCount + 1) == datanodeCommandHandler
Expand All @@ -571,9 +571,9 @@ public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica()
* iteration it should delete the unhealthy replica.
*/

replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
// ReplicaTwo should be deleted, that is the unhealthy one
Expand All @@ -589,9 +589,9 @@ public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica()
* is under replicated now
*/

replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
Assert.assertEquals(currentReplicateCommandCount + 2,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
Expand Down Expand Up @@ -621,9 +621,9 @@ public void testQuasiClosedToClosed() throws
final int currentCloseCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand);

replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);

// All the replicas have same BCSID, so all of them will be closed.
Assert.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
Expand Down Expand Up @@ -651,9 +651,9 @@ public void testHealthyClosedContainer()
containerStateManager.updateContainerReplica(id, replica);
}

replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
}

Expand All @@ -679,10 +679,9 @@ public void testUnhealthyOpenContainer()
Mockito.mock(CloseContainerEventHandler.class);
eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);

replicationManager.processContainersNow();

replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
Mockito.verify(closeContainerHandler, Mockito.times(1))
.onMessage(id, eventQueue);
}
Expand Down Expand Up @@ -730,9 +729,9 @@ public void additionalReplicaScheduledWhenMisReplicated()
int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);

replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
// At this stage, due to the mocked calls to validteContainerPlacement
// the mis-replicated racks will not have improved, so expect to see nothing
// scheduled.
Expand All @@ -752,9 +751,9 @@ public void additionalReplicaScheduledWhenMisReplicated()
currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);

replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
// At this stage, due to the mocked calls to validteContainerPlacement
// the mis-replicated racks will not have improved, so expect to see nothing
// scheduled.
Expand Down Expand Up @@ -798,9 +797,9 @@ public void overReplicatedButRemovingMakesMisReplicated()
int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);

replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
// The unhealthy replica should be removed, but not the other replica
// as each time we test with 3 replicas, Mockitor ensures it returns
// mis-replicated
Expand Down Expand Up @@ -842,9 +841,9 @@ public void testOverReplicatedAndPolicySatisfied() throws
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);

replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
}
Expand Down Expand Up @@ -882,9 +881,9 @@ public void testOverReplicatedAndPolicyUnSatisfiedAndDeleted() throws
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);

replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 2, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
}
Expand Down Expand Up @@ -1049,9 +1048,9 @@ public void testOverReplicatedClosedContainerWithDecomAndMaint()
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);

replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
Assert.assertEquals(currentDeleteCommandCount + 2, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
// Get the DECOM and Maint replica and ensure none of them are scheduled
Expand Down Expand Up @@ -1136,9 +1135,9 @@ private void assertReplicaScheduled(int delta) throws InterruptedException {
final int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);

replicationManager.processContainersNow();
replicationManager.processAll();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
eventQueue.processAll(1000);
Assert.assertEquals(currentReplicateCommandCount + delta,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ public void testCloseContainerCommandOnRestart() throws Exception {
Thread.sleep(5000);
// Give ReplicationManager some time to process the containers.
cluster.getStorageContainerManager()
.getReplicationManager().processContainersNow();
.getReplicationManager().processAll();
Thread.sleep(5000);

verify(publisher).fireEvent(eq(SCMEvents.DATANODE_COMMAND), argThat(new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,7 @@ public void testContainerStatisticsAfterDelete() throws Exception {
});

cluster.shutdownHddsDatanode(0);
scm.getReplicationManager().processContainersNow();
// Wait for container state change to DELETING
Thread.sleep(100);
scm.getReplicationManager().processAll();
containerInfos = scm.getContainerManager().getContainers();
containerInfos.stream().forEach(container ->
Assert.assertEquals(HddsProtos.LifeCycleState.DELETING,
Expand All @@ -336,17 +334,13 @@ public void testContainerStatisticsAfterDelete() throws Exception {
LogCapturer.captureLogs(ReplicationManager.LOG);
logCapturer.clearOutput();

scm.getReplicationManager().processContainersNow();
Thread.sleep(100);
// Wait for delete replica command resend
scm.getReplicationManager().processAll();
GenericTestUtils.waitFor(() -> logCapturer.getOutput()
.contains("Resend delete Container"), 500, 5000);
cluster.restartHddsDatanode(0, true);
Thread.sleep(100);

scm.getReplicationManager().processContainersNow();
// Wait for container state change to DELETED
Thread.sleep(100);
scm.getReplicationManager().processAll();
containerInfos = scm.getContainerManager().getContainers();
containerInfos.stream().forEach(container -> {
Assert.assertEquals(HddsProtos.LifeCycleState.DELETED,
Expand Down
Loading