Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.hadoop.ozone.lease.Lease.messageForResource;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,8 +49,8 @@ public class LeaseManager<T> {

private final String name;
private final long defaultTimeout;
private final Object monitor = new Object();
private Map<T, Lease<T>> activeLeases;
private BlockingQueue<T> leaseKeyBlockingQueue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szetszwo Thanks for review, I have done changes as per patch, but added extra release in shutdown() as possibility for concurrency issue. i.e.

  1. shutdown --> disable MonitorThread (set running to false)
  2. interrupt Monitor thread

For step 2, if MonitorThread waiting, then interrupt will work else ignored. So later moves to waiting for semaphore, then there is no way to exist thread.

So added extra "semaphore.release()" in shutdown to ensure exit from tryAcquire() wait in above case.

private LeaseMonitor leaseMonitor;
private Thread leaseMonitorThread;
private boolean isRunning;
Expand All @@ -62,6 +66,7 @@ public class LeaseManager<T> {
public LeaseManager(String name, long defaultTimeout) {
this.name = name;
this.defaultTimeout = defaultTimeout;
leaseKeyBlockingQueue = new LinkedBlockingQueue<>();
}

/**
Expand Down Expand Up @@ -120,9 +125,7 @@ public synchronized Lease<T> acquire(T resource, long timeout)
}
Lease<T> lease = new Lease<>(resource, timeout);
activeLeases.put(resource, lease);
synchronized (monitor) {
monitor.notifyAll();
}
leaseKeyBlockingQueue.add(resource);
return lease;
}

Expand Down Expand Up @@ -151,9 +154,7 @@ public synchronized Lease<T> acquire(
Lease<T> lease = new Lease<>(resource, timeout);
lease.registerCallBack(callback);
activeLeases.put(resource, lease);
synchronized (monitor) {
monitor.notifyAll();
}
leaseKeyBlockingQueue.add(resource);
return lease;
}

Expand Down Expand Up @@ -204,9 +205,8 @@ public void shutdown() {
checkStatus();
LOG.debug("Shutting down LeaseManager service");
leaseMonitor.disable();
synchronized (monitor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown() should be synchronized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szetszwo currently I have reverted this as shutdown is threadsafe. Adding synchronization needs protect all member variables as findbug, which is not required.

monitor.notifyAll();
}
leaseMonitorThread.interrupt();
leaseKeyBlockingQueue.clear();
for (T resource : activeLeases.keySet()) {
try {
release(resource);
Expand Down Expand Up @@ -265,11 +265,12 @@ public void run() {
}

try {
synchronized (monitor) {
monitor.wait(sleepTime);
}
// block for event and clear all events as will be
// handled by activeLeases before going for next wait
// ignore return value
T task = leaseKeyBlockingQueue.poll(sleepTime, TimeUnit.MILLISECONDS);
leaseKeyBlockingQueue.clear();
} catch (InterruptedException e) {
// This means a new lease is added to activeLeases.
LOG.warn("Lease manager is interrupted. Shutting down...", e);
Thread.currentThread().interrupt();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.lease.Lease;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
Expand All @@ -52,6 +53,7 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;

import org.apache.ozone.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -137,18 +139,36 @@ public void testCloseContainerWithDelayByLeaseManager()
.thenReturn(pipeline);
LeaseManager<Object> leaseManager = new LeaseManager<>("test", timeoutInMs);
leaseManager.start();
LeaseManager mockLeaseManager = Mockito.mock(LeaseManager.class);
List<Lease<Object>> leaseList = new ArrayList<>(1);
Mockito.when(mockLeaseManager.acquire(any(), anyLong(), any())).thenAnswer(
invocation -> {
leaseList.add(leaseManager.acquire(
invocation.getArgument(0, Object.class),
invocation.getArgument(1),
invocation.getArgument(2, Callable.class)));
return leaseList.get(0);
});
CloseContainerEventHandler closeHandler = new CloseContainerEventHandler(
pipelineManager, containerManager, scmContext,
leaseManager, timeoutInMs);
mockLeaseManager, timeoutInMs);
closeHandler.onMessage(container.containerID(), eventPublisher);
Mockito.verify(mockLeaseManager, atLeastOnce())
.acquire(any(), anyLong(), any());
Assert.assertTrue(leaseList.size() > 0);
// immediate check if event is published, it should not publish in 500ms
Thread.sleep(500);
Mockito.verify(eventPublisher, never())
.fireEvent(eq(DATANODE_COMMAND), commandCaptor.capture());
Thread.sleep(timeoutInMs * 2);
// event publish already after waiting 4+ seconds
Mockito.verify(eventPublisher, atLeastOnce())
.fireEvent(eq(DATANODE_COMMAND), commandCaptor.capture());
// wait for event to happen
GenericTestUtils.waitFor(() -> {
try {
Mockito.verify(eventPublisher, atLeastOnce())
.fireEvent(eq(DATANODE_COMMAND), commandCaptor.capture());
} catch (Throwable ex) {
return false;
}
return true;
}, 1000, (int) timeoutInMs * 3);
leaseManager.shutdown();
}

Expand Down