Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.hadoop.ozone.lease;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
Expand All @@ -28,6 +25,8 @@
import java.util.concurrent.Executors;

import static org.apache.hadoop.ozone.lease.Lease.messageForResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* LeaseManager is someone who can provide you leases based on your
Expand All @@ -46,6 +45,7 @@ public class LeaseManager<T> {

private final String name;
private final long defaultTimeout;
private final Object monitor = new Object();
private Map<T, Lease<T>> activeLeases;
private LeaseMonitor leaseMonitor;
private Thread leaseMonitorThread;
Expand Down Expand Up @@ -115,12 +115,14 @@ public synchronized Lease<T> acquire(T resource, long timeout)
if (LOG.isDebugEnabled()) {
LOG.debug("Acquiring lease on {} for {} milliseconds", resource, timeout);
}
if(activeLeases.containsKey(resource)) {
if (activeLeases.containsKey(resource)) {
throw new LeaseAlreadyExistException(messageForResource(resource));
}
Lease<T> lease = new Lease<>(resource, timeout);
activeLeases.put(resource, lease);
leaseMonitorThread.interrupt();
synchronized (monitor) {
monitor.notifyAll();
}
return lease;
}

Expand All @@ -135,7 +137,7 @@ public synchronized Lease<T> acquire(T resource, long timeout)
public Lease<T> get(T resource) throws LeaseNotFoundException {
checkStatus();
Lease<T> lease = activeLeases.get(resource);
if(lease != null) {
if (lease != null) {
return lease;
}
throw new LeaseNotFoundException(messageForResource(resource));
Expand All @@ -156,7 +158,7 @@ public synchronized void release(T resource)
LOG.debug("Releasing lease on {}", resource);
}
Lease<T> lease = activeLeases.remove(resource);
if(lease == null) {
if (lease == null) {
throw new LeaseNotFoundException(messageForResource(resource));
}
lease.invalidate();
Expand All @@ -171,11 +173,13 @@ public void shutdown() {
checkStatus();
LOG.debug("Shutting down LeaseManager service");
leaseMonitor.disable();
leaseMonitorThread.interrupt();
for(T resource : activeLeases.keySet()) {
synchronized (monitor) {
monitor.notifyAll();
}
for (T resource : activeLeases.keySet()) {
try {
release(resource);
} catch(LeaseNotFoundException ex) {
} catch (LeaseNotFoundException ex) {
//Ignore the exception, someone might have released the lease
}
}
Expand All @@ -187,7 +191,7 @@ public void shutdown() {
* running.
*/
private void checkStatus() {
if(!isRunning) {
if (!isRunning) {
throw new LeaseManagerNotRunningException("LeaseManager not running.");
}
}
Expand All @@ -198,16 +202,16 @@ private void checkStatus() {
*/
private final class LeaseMonitor implements Runnable {

private volatile boolean monitor = true;
private final ExecutorService executorService;
private volatile boolean running = true;

private LeaseMonitor() {
this.executorService = Executors.newCachedThreadPool();
}

@Override
public void run() {
while (monitor) {
while (running) {
LOG.debug("{}-LeaseMonitor: checking for lease expiry", name);
long sleepTime = Long.MAX_VALUE;

Expand All @@ -230,12 +234,12 @@ public void run() {
}

try {
if(!Thread.interrupted()) {
Thread.sleep(sleepTime);
synchronized (monitor) {
monitor.wait(sleepTime);
}
} catch (InterruptedException e) {
// This means a new lease is added to activeLeases.
LOG.error("Execution was interrupted ", e);
LOG.warn("Lease manager is interrupted. Shutting down...", e);
Thread.currentThread().interrupt();
}
}
Expand All @@ -246,7 +250,7 @@ public void run() {
* will stop lease monitor.
*/
public void disable() {
monitor = false;
running = false;
}
}

Expand Down