Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.util.concurrent.Executors;

import static org.apache.hadoop.ozone.lease.Lease.messageForResource;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,8 +48,8 @@ public class LeaseManager<T> {

private final String name;
private final long defaultTimeout;
private final Object monitor = new Object();
private Map<T, Lease<T>> activeLeases;
private Semaphore semaphore = new Semaphore(0);
private LeaseMonitor leaseMonitor;
private Thread leaseMonitorThread;
private boolean isRunning;
Expand Down Expand Up @@ -120,9 +123,7 @@ public synchronized Lease<T> acquire(T resource, long timeout)
}
Lease<T> lease = new Lease<>(resource, timeout);
activeLeases.put(resource, lease);
synchronized (monitor) {
monitor.notifyAll();
}
semaphore.release();
return lease;
}

Expand Down Expand Up @@ -151,9 +152,7 @@ public synchronized Lease<T> acquire(
Lease<T> lease = new Lease<>(resource, timeout);
lease.registerCallBack(callback);
activeLeases.put(resource, lease);
synchronized (monitor) {
monitor.notifyAll();
}
semaphore.release();
return lease;
}

Expand Down Expand Up @@ -204,9 +203,11 @@ public void shutdown() {
checkStatus();
LOG.debug("Shutting down LeaseManager service");
leaseMonitor.disable();
synchronized (monitor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown() should be synchronized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szetszwo currently I have reverted this as shutdown is threadsafe. Adding synchronization needs protect all member variables as findbug, which is not required.

monitor.notifyAll();
}
// added extra release for case when interrupt is called
// before going to semaphore's tryAcquire. This will ensure release
// of wait and exit of while loop as leaseMonitor.disable() is done.
semaphore.release();
leaseMonitorThread.interrupt();
for (T resource : activeLeases.keySet()) {
try {
release(resource);
Expand Down Expand Up @@ -265,11 +266,9 @@ public void run() {
}

try {
synchronized (monitor) {
monitor.wait(sleepTime);
}
// ignore return value, just used for wait
boolean b = semaphore.tryAcquire(sleepTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// This means a new lease is added to activeLeases.
LOG.warn("Lease manager is interrupted. Shutting down...", e);
Thread.currentThread().interrupt();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.lease.Lease;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
Expand All @@ -52,6 +53,7 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;

import org.apache.ozone.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -137,18 +139,36 @@ public void testCloseContainerWithDelayByLeaseManager()
.thenReturn(pipeline);
LeaseManager<Object> leaseManager = new LeaseManager<>("test", timeoutInMs);
leaseManager.start();
LeaseManager mockLeaseManager = Mockito.mock(LeaseManager.class);
List<Lease<Object>> leaseList = new ArrayList<>(1);
Mockito.when(mockLeaseManager.acquire(any(), anyLong(), any())).thenAnswer(
invocation -> {
leaseList.add(leaseManager.acquire(
invocation.getArgument(0, Object.class),
invocation.getArgument(1),
invocation.getArgument(2, Callable.class)));
return leaseList.get(0);
});
CloseContainerEventHandler closeHandler = new CloseContainerEventHandler(
pipelineManager, containerManager, scmContext,
leaseManager, timeoutInMs);
mockLeaseManager, timeoutInMs);
closeHandler.onMessage(container.containerID(), eventPublisher);
// immediate check if event is published, it should not publish in 500ms
Thread.sleep(500);
Mockito.verify(mockLeaseManager, atLeastOnce())
.acquire(any(), anyLong(), any());
Assert.assertTrue(leaseList.size() > 0);
// immediate check if event is published
Mockito.verify(eventPublisher, never())
.fireEvent(eq(DATANODE_COMMAND), commandCaptor.capture());
Thread.sleep(timeoutInMs * 2);
// event publish already after waiting 4+ seconds
Mockito.verify(eventPublisher, atLeastOnce())
.fireEvent(eq(DATANODE_COMMAND), commandCaptor.capture());
// wait for event to happen
GenericTestUtils.waitFor(() -> {
try {
Mockito.verify(eventPublisher, atLeastOnce())
.fireEvent(eq(DATANODE_COMMAND), commandCaptor.capture());
} catch (Throwable ex) {
return false;
}
return true;
}, 1000, (int) timeoutInMs * 3);
leaseManager.shutdown();
}

Expand Down