Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,27 @@ public Lease(T resource, long timeout) {
}

/**
* Returns true if the lease has expired, else false.
* Creates a lease on the specified resource with given timeout.
*
* @return true if expired, else false
* @param resource
* Resource for which the lease has to be created
* @param timeout
* Lease lifetime in milliseconds
* @param callback
* Callback registered to be triggered when lease expire
*/
public boolean hasExpired() {
return expired;
public Lease(T resource, long timeout, Callable<Void> callback) {
this(resource, timeout);
callbacks.add(callback);
}

/**
* Registers a callback which will be executed in case of timeout. Callbacks
* are executed in a separate Thread (by {@link LeaseManager}).
* Returns true if the lease has expired, else false.
*
* @param callback
* The Callable which has to be executed
* @throws LeaseExpiredException
* If the lease has already timed out
* @return true if expired, else false
*/
public void registerCallBack(Callable<Void> callback)
throws LeaseExpiredException {
if (hasExpired()) {
throw new LeaseExpiredException(messageForResource(resource));
}
callbacks.add(callback);
public boolean hasExpired() {
return expired;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,28 @@ public synchronized Lease<T> acquire(
if (activeLeases.containsKey(resource)) {
throw new LeaseAlreadyExistException(messageForResource(resource));
}
Lease<T> lease = new Lease<>(resource, timeout);
lease.registerCallBack(callback);
Lease<T> lease = new Lease<>(resource, timeout, callback);
activeLeases.put(resource, lease);
semaphore.release();
return lease;
}

/**
* Returns a lease for the specified resource with the default timeout.
*
* @param resource
* Resource for which lease has to be created
* @param callback
* The callback trigger when lease expire
* @throws LeaseAlreadyExistException
* If there is already a lease on the resource
*/
public synchronized Lease<T> acquire(
T resource, Callable<Void> callback)
throws LeaseAlreadyExistException, LeaseExpiredException {
return acquire(resource, defaultTimeout, callback);
}

/**
* Returns a lease associated with the specified resource.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,11 @@ public void testLeaseCallback() throws LeaseException, InterruptedException {
LeaseManager<DummyResource> manager = new LeaseManager<>("Test", 5000);
manager.start();
DummyResource resourceOne = new DummyResource("one");
Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
leaseStatus.put(resourceOne, "lease in use");
leaseOne.registerCallBack(() -> {
Lease<DummyResource> leaseOne = manager.acquire(resourceOne, () -> {
leaseStatus.put(resourceOne, "lease expired");
return null;
});
leaseStatus.put(resourceOne, "lease in use");
// wait for lease to expire
long sleepTime = leaseOne.getRemainingTime() + 1000;
try {
Expand All @@ -212,12 +211,11 @@ public void testCallbackExecutionInCaseOfLeaseRelease()
LeaseManager<DummyResource> manager = new LeaseManager<>("Test", 5000);
manager.start();
DummyResource resourceOne = new DummyResource("one");
Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
leaseStatus.put(resourceOne, "lease in use");
leaseOne.registerCallBack(() -> {
Lease<DummyResource> leaseOne = manager.acquire(resourceOne, () -> {
leaseStatus.put(resourceOne, "lease expired");
return null;
});
leaseStatus.put(resourceOne, "lease in use");
leaseStatus.put(resourceOne, "lease released");
manager.release(resourceOne);
Assertions.assertTrue(leaseOne.hasExpired());
Expand All @@ -237,36 +235,31 @@ public void testLeaseCallbackWithMultipleLeases()
DummyResource resourceThree = new DummyResource("three");
DummyResource resourceFour = new DummyResource("four");
DummyResource resourceFive = new DummyResource("five");
Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
Lease<DummyResource> leaseTwo = manager.acquire(resourceTwo);
Lease<DummyResource> leaseThree = manager.acquire(resourceThree);
Lease<DummyResource> leaseFour = manager.acquire(resourceFour);
Lease<DummyResource> leaseFive = manager.acquire(resourceFive);
leaseStatus.put(resourceOne, "lease in use");
leaseStatus.put(resourceTwo, "lease in use");
leaseStatus.put(resourceThree, "lease in use");
leaseStatus.put(resourceFour, "lease in use");
leaseStatus.put(resourceFive, "lease in use");
leaseOne.registerCallBack(() -> {
Lease<DummyResource> leaseOne = manager.acquire(resourceOne, () -> {
leaseStatus.put(resourceOne, "lease expired");
return null;
});
leaseTwo.registerCallBack(() -> {
Lease<DummyResource> leaseTwo = manager.acquire(resourceTwo, () -> {
leaseStatus.put(resourceTwo, "lease expired");
return null;
});
leaseThree.registerCallBack(() -> {
Lease<DummyResource> leaseThree = manager.acquire(resourceThree, () -> {
leaseStatus.put(resourceThree, "lease expired");
return null;
});
leaseFour.registerCallBack(() -> {
Lease<DummyResource> leaseFour = manager.acquire(resourceFour, () -> {
leaseStatus.put(resourceFour, "lease expired");
return null;
});
leaseFive.registerCallBack(() -> {
Lease<DummyResource> leaseFive = manager.acquire(resourceFive, () -> {
leaseStatus.put(resourceFive, "lease expired");
return null;
});
leaseStatus.put(resourceOne, "lease in use");
leaseStatus.put(resourceTwo, "lease in use");
leaseStatus.put(resourceThree, "lease in use");
leaseStatus.put(resourceFour, "lease in use");
leaseStatus.put(resourceFive, "lease in use");

// release lease one, two and three
leaseStatus.put(resourceOne, "lease released");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.ozone.lease.Lease;
import org.apache.hadoop.ozone.lease.LeaseAlreadyExistException;
import org.apache.hadoop.ozone.lease.LeaseExpiredException;
import org.apache.hadoop.ozone.lease.LeaseManager;
Expand Down Expand Up @@ -124,20 +123,10 @@ private synchronized void handleStartEvent(TIMEOUT_PAYLOAD payload,
trackedEventsByID.put(identifier, payload);
trackedEvents.add(payload);
try {
Lease<Long> lease = leaseManager.acquire(identifier);
listenForTimeout(lease, publisher, identifier);
leaseManager.acquire(identifier,
() -> handleTimeout(publisher, identifier));
} catch (LeaseAlreadyExistException e) {
//No problem at all. But timer is not reset.
}
}

private void listenForTimeout(Lease<Long> lease, EventPublisher publisher,
long identifier) {
try {
lease.registerCallBack(() -> {
handleTimeout(publisher, identifier);
return null;
});
} catch (LeaseExpiredException e) {
handleTimeout(publisher, identifier);
}
Expand All @@ -157,13 +146,14 @@ protected synchronized void handleCompletion(COMPLETION_PAYLOAD
}
}

private synchronized void handleTimeout(EventPublisher publisher,
private synchronized Void handleTimeout(EventPublisher publisher,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get this part, what is the impact of changing void-> Void and returning null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callback as getting registered has return type is Callback with "Void" Object, so this gives error if return type mismatch,

Syntax for acquire: public synchronized Lease<T> acquire( T resource, Callable<Void> callback)

Below is error if we keep void:
Bad return type in lambda expression: void cannot be converted to Void

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it , looks like autoboxing won't happen for void type. Thanks for explaining

long identifier) {
metrics.incrementTimedOutEvents();
TIMEOUT_PAYLOAD payload = trackedEventsByID.remove(identifier);
trackedEvents.remove(payload);
startTrackingTimes.remove(payload.getId());
onTimeout(publisher, payload);
return null;
}


Expand Down