diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java index 7c3c3300e05e..f04ce79569c1 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java @@ -25,6 +25,9 @@ import java.util.concurrent.Executors; import static org.apache.hadoop.ozone.lease.Lease.messageForResource; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,8 +48,8 @@ public class LeaseManager { private final String name; private final long defaultTimeout; - private final Object monitor = new Object(); private Map> activeLeases; + private Semaphore semaphore = new Semaphore(0); private LeaseMonitor leaseMonitor; private Thread leaseMonitorThread; private boolean isRunning; @@ -120,9 +123,7 @@ public synchronized Lease acquire(T resource, long timeout) } Lease lease = new Lease<>(resource, timeout); activeLeases.put(resource, lease); - synchronized (monitor) { - monitor.notifyAll(); - } + semaphore.release(); return lease; } @@ -151,9 +152,7 @@ public synchronized Lease acquire( Lease lease = new Lease<>(resource, timeout); lease.registerCallBack(callback); activeLeases.put(resource, lease); - synchronized (monitor) { - monitor.notifyAll(); - } + semaphore.release(); return lease; } @@ -204,9 +203,11 @@ public void shutdown() { checkStatus(); LOG.debug("Shutting down LeaseManager service"); leaseMonitor.disable(); - synchronized (monitor) { - monitor.notifyAll(); - } + // added extra release for case when interrupt is called + // before going to semaphore's tryAcquire. This will ensure release + // of wait and exit of while loop as leaseMonitor.disable() is done. + semaphore.release(); + leaseMonitorThread.interrupt(); for (T resource : activeLeases.keySet()) { try { release(resource); @@ -265,11 +266,9 @@ public void run() { } try { - synchronized (monitor) { - monitor.wait(sleepTime); - } + // ignore return value, just used for wait + boolean b = semaphore.tryAcquire(sleepTime, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - // This means a new lease is added to activeLeases. LOG.warn("Lease manager is interrupted. Shutting down...", e); Thread.currentThread().interrupt(); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java index 77cd08251a93..e6b2d3940621 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; +import org.apache.hadoop.ozone.lease.Lease; import org.apache.hadoop.ozone.lease.LeaseManager; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; @@ -52,6 +53,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; +import org.apache.ozone.test.GenericTestUtils; import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -137,18 +139,36 @@ public void testCloseContainerWithDelayByLeaseManager() .thenReturn(pipeline); LeaseManager leaseManager = new LeaseManager<>("test", timeoutInMs); leaseManager.start(); + LeaseManager mockLeaseManager = Mockito.mock(LeaseManager.class); + List> leaseList = new ArrayList<>(1); + Mockito.when(mockLeaseManager.acquire(any(), anyLong(), any())).thenAnswer( + invocation -> { + leaseList.add(leaseManager.acquire( + invocation.getArgument(0, Object.class), + invocation.getArgument(1), + invocation.getArgument(2, Callable.class))); + return leaseList.get(0); + }); CloseContainerEventHandler closeHandler = new CloseContainerEventHandler( pipelineManager, containerManager, scmContext, - leaseManager, timeoutInMs); + mockLeaseManager, timeoutInMs); closeHandler.onMessage(container.containerID(), eventPublisher); - // immediate check if event is published, it should not publish in 500ms - Thread.sleep(500); + Mockito.verify(mockLeaseManager, atLeastOnce()) + .acquire(any(), anyLong(), any()); + Assert.assertTrue(leaseList.size() > 0); + // immediate check if event is published Mockito.verify(eventPublisher, never()) .fireEvent(eq(DATANODE_COMMAND), commandCaptor.capture()); - Thread.sleep(timeoutInMs * 2); - // event publish already after waiting 4+ seconds - Mockito.verify(eventPublisher, atLeastOnce()) - .fireEvent(eq(DATANODE_COMMAND), commandCaptor.capture()); + // wait for event to happen + GenericTestUtils.waitFor(() -> { + try { + Mockito.verify(eventPublisher, atLeastOnce()) + .fireEvent(eq(DATANODE_COMMAND), commandCaptor.capture()); + } catch (Throwable ex) { + return false; + } + return true; + }, 1000, (int) timeoutInMs * 3); leaseManager.shutdown(); }