diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java index 5e52b4050acc..68ae49b26a27 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java @@ -17,9 +17,6 @@ package org.apache.hadoop.ozone.lease; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -28,6 +25,8 @@ import java.util.concurrent.Executors; import static org.apache.hadoop.ozone.lease.Lease.messageForResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * LeaseManager is someone who can provide you leases based on your @@ -46,6 +45,7 @@ public class LeaseManager { private final String name; private final long defaultTimeout; + private final Object monitor = new Object(); private Map> activeLeases; private LeaseMonitor leaseMonitor; private Thread leaseMonitorThread; @@ -115,12 +115,14 @@ public synchronized Lease acquire(T resource, long timeout) if (LOG.isDebugEnabled()) { LOG.debug("Acquiring lease on {} for {} milliseconds", resource, timeout); } - if(activeLeases.containsKey(resource)) { + if (activeLeases.containsKey(resource)) { throw new LeaseAlreadyExistException(messageForResource(resource)); } Lease lease = new Lease<>(resource, timeout); activeLeases.put(resource, lease); - leaseMonitorThread.interrupt(); + synchronized (monitor) { + monitor.notifyAll(); + } return lease; } @@ -135,7 +137,7 @@ public synchronized Lease acquire(T resource, long timeout) public Lease get(T resource) throws LeaseNotFoundException { checkStatus(); Lease lease = activeLeases.get(resource); - if(lease != null) { + if (lease != null) { return lease; } throw new LeaseNotFoundException(messageForResource(resource)); @@ -156,7 +158,7 @@ public synchronized void release(T resource) LOG.debug("Releasing lease on {}", resource); } Lease lease = activeLeases.remove(resource); - if(lease == null) { + if (lease == null) { throw new LeaseNotFoundException(messageForResource(resource)); } lease.invalidate(); @@ -171,11 +173,13 @@ public void shutdown() { checkStatus(); LOG.debug("Shutting down LeaseManager service"); leaseMonitor.disable(); - leaseMonitorThread.interrupt(); - for(T resource : activeLeases.keySet()) { + synchronized (monitor) { + monitor.notifyAll(); + } + for (T resource : activeLeases.keySet()) { try { release(resource); - } catch(LeaseNotFoundException ex) { + } catch (LeaseNotFoundException ex) { //Ignore the exception, someone might have released the lease } } @@ -187,7 +191,7 @@ public void shutdown() { * running. */ private void checkStatus() { - if(!isRunning) { + if (!isRunning) { throw new LeaseManagerNotRunningException("LeaseManager not running."); } } @@ -198,8 +202,8 @@ private void checkStatus() { */ private final class LeaseMonitor implements Runnable { - private volatile boolean monitor = true; private final ExecutorService executorService; + private volatile boolean running = true; private LeaseMonitor() { this.executorService = Executors.newCachedThreadPool(); @@ -207,7 +211,7 @@ private LeaseMonitor() { @Override public void run() { - while (monitor) { + while (running) { LOG.debug("{}-LeaseMonitor: checking for lease expiry", name); long sleepTime = Long.MAX_VALUE; @@ -230,12 +234,12 @@ public void run() { } try { - if(!Thread.interrupted()) { - Thread.sleep(sleepTime); + synchronized (monitor) { + monitor.wait(sleepTime); } } catch (InterruptedException e) { // This means a new lease is added to activeLeases. - LOG.error("Execution was interrupted ", e); + LOG.warn("Lease manager is interrupted. Shutting down...", e); Thread.currentThread().interrupt(); } } @@ -246,7 +250,7 @@ public void run() { * will stop lease monitor. */ public void disable() { - monitor = false; + running = false; } }