diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java index 2fd7a9d4940a..ccf33019aebc 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java @@ -71,29 +71,27 @@ public Lease(T resource, long timeout) { } /** - * Returns true if the lease has expired, else false. + * Creates a lease on the specified resource with given timeout. * - * @return true if expired, else false + * @param resource + * Resource for which the lease has to be created + * @param timeout + * Lease lifetime in milliseconds + * @param callback + * Callback registered to be triggered when lease expire */ - public boolean hasExpired() { - return expired; + public Lease(T resource, long timeout, Callable callback) { + this(resource, timeout); + callbacks.add(callback); } /** - * Registers a callback which will be executed in case of timeout. Callbacks - * are executed in a separate Thread (by {@link LeaseManager}). + * Returns true if the lease has expired, else false. * - * @param callback - * The Callable which has to be executed - * @throws LeaseExpiredException - * If the lease has already timed out + * @return true if expired, else false */ - public void registerCallBack(Callable callback) - throws LeaseExpiredException { - if (hasExpired()) { - throw new LeaseExpiredException(messageForResource(resource)); - } - callbacks.add(callback); + public boolean hasExpired() { + return expired; } /** diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java index f04ce79569c1..ba86eefa791f 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java @@ -149,13 +149,28 @@ public synchronized Lease acquire( if (activeLeases.containsKey(resource)) { throw new LeaseAlreadyExistException(messageForResource(resource)); } - Lease lease = new Lease<>(resource, timeout); - lease.registerCallBack(callback); + Lease lease = new Lease<>(resource, timeout, callback); activeLeases.put(resource, lease); semaphore.release(); return lease; } + /** + * Returns a lease for the specified resource with the default timeout. + * + * @param resource + * Resource for which lease has to be created + * @param callback + * The callback trigger when lease expire + * @throws LeaseAlreadyExistException + * If there is already a lease on the resource + */ + public synchronized Lease acquire( + T resource, Callable callback) + throws LeaseAlreadyExistException, LeaseExpiredException { + return acquire(resource, defaultTimeout, callback); + } + /** * Returns a lease associated with the specified resource. * diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lease/TestLeaseManager.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lease/TestLeaseManager.java index e6270790204b..637c54f28577 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lease/TestLeaseManager.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lease/TestLeaseManager.java @@ -183,12 +183,11 @@ public void testLeaseCallback() throws LeaseException, InterruptedException { LeaseManager manager = new LeaseManager<>("Test", 5000); manager.start(); DummyResource resourceOne = new DummyResource("one"); - Lease leaseOne = manager.acquire(resourceOne); - leaseStatus.put(resourceOne, "lease in use"); - leaseOne.registerCallBack(() -> { + Lease leaseOne = manager.acquire(resourceOne, () -> { leaseStatus.put(resourceOne, "lease expired"); return null; }); + leaseStatus.put(resourceOne, "lease in use"); // wait for lease to expire long sleepTime = leaseOne.getRemainingTime() + 1000; try { @@ -212,12 +211,11 @@ public void testCallbackExecutionInCaseOfLeaseRelease() LeaseManager manager = new LeaseManager<>("Test", 5000); manager.start(); DummyResource resourceOne = new DummyResource("one"); - Lease leaseOne = manager.acquire(resourceOne); - leaseStatus.put(resourceOne, "lease in use"); - leaseOne.registerCallBack(() -> { + Lease leaseOne = manager.acquire(resourceOne, () -> { leaseStatus.put(resourceOne, "lease expired"); return null; }); + leaseStatus.put(resourceOne, "lease in use"); leaseStatus.put(resourceOne, "lease released"); manager.release(resourceOne); Assertions.assertTrue(leaseOne.hasExpired()); @@ -237,36 +235,31 @@ public void testLeaseCallbackWithMultipleLeases() DummyResource resourceThree = new DummyResource("three"); DummyResource resourceFour = new DummyResource("four"); DummyResource resourceFive = new DummyResource("five"); - Lease leaseOne = manager.acquire(resourceOne); - Lease leaseTwo = manager.acquire(resourceTwo); - Lease leaseThree = manager.acquire(resourceThree); - Lease leaseFour = manager.acquire(resourceFour); - Lease leaseFive = manager.acquire(resourceFive); - leaseStatus.put(resourceOne, "lease in use"); - leaseStatus.put(resourceTwo, "lease in use"); - leaseStatus.put(resourceThree, "lease in use"); - leaseStatus.put(resourceFour, "lease in use"); - leaseStatus.put(resourceFive, "lease in use"); - leaseOne.registerCallBack(() -> { + Lease leaseOne = manager.acquire(resourceOne, () -> { leaseStatus.put(resourceOne, "lease expired"); return null; }); - leaseTwo.registerCallBack(() -> { + Lease leaseTwo = manager.acquire(resourceTwo, () -> { leaseStatus.put(resourceTwo, "lease expired"); return null; }); - leaseThree.registerCallBack(() -> { + Lease leaseThree = manager.acquire(resourceThree, () -> { leaseStatus.put(resourceThree, "lease expired"); return null; }); - leaseFour.registerCallBack(() -> { + Lease leaseFour = manager.acquire(resourceFour, () -> { leaseStatus.put(resourceFour, "lease expired"); return null; }); - leaseFive.registerCallBack(() -> { + Lease leaseFive = manager.acquire(resourceFive, () -> { leaseStatus.put(resourceFive, "lease expired"); return null; }); + leaseStatus.put(resourceOne, "lease in use"); + leaseStatus.put(resourceTwo, "lease in use"); + leaseStatus.put(resourceThree, "lease in use"); + leaseStatus.put(resourceFour, "lease in use"); + leaseStatus.put(resourceFive, "lease in use"); // release lease one, two and three leaseStatus.put(resourceOne, "lease released"); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java index 3a3d59369e65..5b9f2ffee5be 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java @@ -28,7 +28,6 @@ import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.ozone.lease.Lease; import org.apache.hadoop.ozone.lease.LeaseAlreadyExistException; import org.apache.hadoop.ozone.lease.LeaseExpiredException; import org.apache.hadoop.ozone.lease.LeaseManager; @@ -124,20 +123,10 @@ private synchronized void handleStartEvent(TIMEOUT_PAYLOAD payload, trackedEventsByID.put(identifier, payload); trackedEvents.add(payload); try { - Lease lease = leaseManager.acquire(identifier); - listenForTimeout(lease, publisher, identifier); + leaseManager.acquire(identifier, + () -> handleTimeout(publisher, identifier)); } catch (LeaseAlreadyExistException e) { //No problem at all. But timer is not reset. - } - } - - private void listenForTimeout(Lease lease, EventPublisher publisher, - long identifier) { - try { - lease.registerCallBack(() -> { - handleTimeout(publisher, identifier); - return null; - }); } catch (LeaseExpiredException e) { handleTimeout(publisher, identifier); } @@ -157,13 +146,14 @@ protected synchronized void handleCompletion(COMPLETION_PAYLOAD } } - private synchronized void handleTimeout(EventPublisher publisher, + private synchronized Void handleTimeout(EventPublisher publisher, long identifier) { metrics.incrementTimedOutEvents(); TIMEOUT_PAYLOAD payload = trackedEventsByID.remove(identifier); trackedEvents.remove(payload); startTrackingTimes.remove(payload.getId()); onTimeout(publisher, payload); + return null; }