From d637cc4d8ba11bca30dc0d5a8041f3594856b037 Mon Sep 17 00:00:00 2001 From: sumitagrawl Date: Mon, 17 Apr 2023 17:15:48 +0530 Subject: [PATCH 1/5] HDDS-8418. Lease Manager unsafe method fix --- .../src/main/java/org/apache/hadoop/ozone/lease/Lease.java | 6 +++--- .../java/org/apache/hadoop/ozone/lease/LeaseManager.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java index 2fd7a9d4940..afa67440559 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java @@ -88,7 +88,7 @@ public boolean hasExpired() { * @throws LeaseExpiredException * If the lease has already timed out */ - public void registerCallBack(Callable callback) + public synchronized void registerCallBack(Callable callback) throws LeaseExpiredException { if (hasExpired()) { throw new LeaseExpiredException(messageForResource(resource)); @@ -143,7 +143,7 @@ public long getLeaseLifeTime() throws LeaseExpiredException { * @throws LeaseExpiredException * If the lease has already timed out */ - public void renew(long timeout) throws LeaseExpiredException { + public synchronized void renew(long timeout) throws LeaseExpiredException { if (hasExpired()) { throw new LeaseExpiredException(messageForResource(resource)); } @@ -185,7 +185,7 @@ List> getCallbacks() { /** * Expires/Invalidates the lease. */ - void invalidate() { + synchronized void invalidate() { callbacks = null; expired = true; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java index 7c3c3300e05..4b2c7b9362f 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java @@ -252,8 +252,8 @@ public void run() { long remainingTime = lease.getRemainingTime(); if (remainingTime <= 0) { //Lease has timed out - List> leaseCallbacks = lease.getCallbacks(); release(resource); + List> leaseCallbacks = lease.getCallbacks(); executorService.execute( new LeaseCallbackExecutor<>(resource, leaseCallbacks)); } else { From 2e82e60871ad575d4babca45b3508e459dda4af1 Mon Sep 17 00:00:00 2001 From: sumitagrawl Date: Tue, 18 Apr 2023 14:45:54 +0530 Subject: [PATCH 2/5] HDDS-8418. Lease Manager unsafe method fix --- .../src/main/java/org/apache/hadoop/ozone/lease/Lease.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java index afa67440559..599c3e6de98 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java @@ -186,7 +186,6 @@ List> getCallbacks() { * Expires/Invalidates the lease. */ synchronized void invalidate() { - callbacks = null; expired = true; } From 4bc0cc057129664915ee5a938b369a3d29449215 Mon Sep 17 00:00:00 2001 From: sumitagrawl Date: Mon, 26 Jun 2023 18:25:01 +0530 Subject: [PATCH 3/5] HDDS-8418. Lease Manager unsafe method fix --- .../org/apache/hadoop/ozone/lease/Lease.java | 32 +++++++--------- .../hadoop/ozone/lease/LeaseManager.java | 21 +++++++++-- .../hadoop/ozone/lease/TestLeaseManager.java | 37 ++++++++----------- .../hdds/server/events/EventWatcher.java | 18 ++------- 4 files changed, 52 insertions(+), 56 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java index 599c3e6de98..ba438bc0d63 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java @@ -71,29 +71,25 @@ public Lease(T resource, long timeout) { } /** - * Returns true if the lease has expired, else false. + * Creates a lease on the specified resource with given timeout. * - * @return true if expired, else false + * @param resource + * Resource for which the lease has to be created + * @param timeout + * Lease lifetime in milliseconds */ - public boolean hasExpired() { - return expired; + public Lease(T resource, long timeout, Callable callback) { + this(resource, timeout); + callbacks.add(callback); } /** - * Registers a callback which will be executed in case of timeout. Callbacks - * are executed in a separate Thread (by {@link LeaseManager}). + * Returns true if the lease has expired, else false. * - * @param callback - * The Callable which has to be executed - * @throws LeaseExpiredException - * If the lease has already timed out + * @return true if expired, else false */ - public synchronized void registerCallBack(Callable callback) - throws LeaseExpiredException { - if (hasExpired()) { - throw new LeaseExpiredException(messageForResource(resource)); - } - callbacks.add(callback); + public boolean hasExpired() { + return expired; } /** @@ -143,7 +139,7 @@ public long getLeaseLifeTime() throws LeaseExpiredException { * @throws LeaseExpiredException * If the lease has already timed out */ - public synchronized void renew(long timeout) throws LeaseExpiredException { + public void renew(long timeout) throws LeaseExpiredException { if (hasExpired()) { throw new LeaseExpiredException(messageForResource(resource)); } @@ -185,7 +181,7 @@ List> getCallbacks() { /** * Expires/Invalidates the lease. */ - synchronized void invalidate() { + void invalidate() { expired = true; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java index 9ce4a5ad47f..ab26c776df3 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java @@ -149,13 +149,28 @@ public synchronized Lease acquire( if (activeLeases.containsKey(resource)) { throw new LeaseAlreadyExistException(messageForResource(resource)); } - Lease lease = new Lease<>(resource, timeout); - lease.registerCallBack(callback); + Lease lease = new Lease<>(resource, timeout, callback); activeLeases.put(resource, lease); semaphore.release(); return lease; } + /** + * Returns a lease for the specified resource with the timeout provided. + * + * @param resource + * Resource for which lease has to be created + * @param callback + * The callback trigger when lease expire + * @throws LeaseAlreadyExistException + * If there is already a lease on the resource + */ + public synchronized Lease acquire( + T resource, Callable callback) + throws LeaseAlreadyExistException, LeaseExpiredException { + return acquire(resource, defaultTimeout, callback); + } + /** * Returns a lease associated with the specified resource. * @@ -253,8 +268,8 @@ public void run() { long remainingTime = lease.getRemainingTime(); if (remainingTime <= 0) { //Lease has timed out - release(resource); List> leaseCallbacks = lease.getCallbacks(); + release(resource); executorService.execute( new LeaseCallbackExecutor<>(resource, leaseCallbacks)); } else { diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lease/TestLeaseManager.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lease/TestLeaseManager.java index e6270790204..9acde4a76c9 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lease/TestLeaseManager.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lease/TestLeaseManager.java @@ -183,12 +183,12 @@ public void testLeaseCallback() throws LeaseException, InterruptedException { LeaseManager manager = new LeaseManager<>("Test", 5000); manager.start(); DummyResource resourceOne = new DummyResource("one"); - Lease leaseOne = manager.acquire(resourceOne); - leaseStatus.put(resourceOne, "lease in use"); - leaseOne.registerCallBack(() -> { + Lease leaseOne = manager.acquire(resourceOne, () -> { leaseStatus.put(resourceOne, "lease expired"); + System.out.println("lease expired"); return null; }); + leaseStatus.put(resourceOne, "lease in use"); // wait for lease to expire long sleepTime = leaseOne.getRemainingTime() + 1000; try { @@ -200,6 +200,7 @@ public void testLeaseCallback() throws LeaseException, InterruptedException { Assertions.assertTrue(leaseOne.hasExpired()); Assertions.assertThrowsExactly(LeaseNotFoundException.class, () -> manager.get(resourceOne), "Resource: " + resourceOne); + System.out.println(leaseOne.hasExpired()); // check if callback has been executed Assertions.assertEquals("lease expired", leaseStatus.get(resourceOne)); } @@ -212,12 +213,11 @@ public void testCallbackExecutionInCaseOfLeaseRelease() LeaseManager manager = new LeaseManager<>("Test", 5000); manager.start(); DummyResource resourceOne = new DummyResource("one"); - Lease leaseOne = manager.acquire(resourceOne); - leaseStatus.put(resourceOne, "lease in use"); - leaseOne.registerCallBack(() -> { + Lease leaseOne = manager.acquire(resourceOne, () -> { leaseStatus.put(resourceOne, "lease expired"); return null; }); + leaseStatus.put(resourceOne, "lease in use"); leaseStatus.put(resourceOne, "lease released"); manager.release(resourceOne); Assertions.assertTrue(leaseOne.hasExpired()); @@ -237,36 +237,31 @@ public void testLeaseCallbackWithMultipleLeases() DummyResource resourceThree = new DummyResource("three"); DummyResource resourceFour = new DummyResource("four"); DummyResource resourceFive = new DummyResource("five"); - Lease leaseOne = manager.acquire(resourceOne); - Lease leaseTwo = manager.acquire(resourceTwo); - Lease leaseThree = manager.acquire(resourceThree); - Lease leaseFour = manager.acquire(resourceFour); - Lease leaseFive = manager.acquire(resourceFive); - leaseStatus.put(resourceOne, "lease in use"); - leaseStatus.put(resourceTwo, "lease in use"); - leaseStatus.put(resourceThree, "lease in use"); - leaseStatus.put(resourceFour, "lease in use"); - leaseStatus.put(resourceFive, "lease in use"); - leaseOne.registerCallBack(() -> { + Lease leaseOne = manager.acquire(resourceOne, () -> { leaseStatus.put(resourceOne, "lease expired"); return null; }); - leaseTwo.registerCallBack(() -> { + Lease leaseTwo = manager.acquire(resourceTwo, () -> { leaseStatus.put(resourceTwo, "lease expired"); return null; }); - leaseThree.registerCallBack(() -> { + Lease leaseThree = manager.acquire(resourceThree, () -> { leaseStatus.put(resourceThree, "lease expired"); return null; }); - leaseFour.registerCallBack(() -> { + Lease leaseFour = manager.acquire(resourceFour, () -> { leaseStatus.put(resourceFour, "lease expired"); return null; }); - leaseFive.registerCallBack(() -> { + Lease leaseFive = manager.acquire(resourceFive, () -> { leaseStatus.put(resourceFive, "lease expired"); return null; }); + leaseStatus.put(resourceOne, "lease in use"); + leaseStatus.put(resourceTwo, "lease in use"); + leaseStatus.put(resourceThree, "lease in use"); + leaseStatus.put(resourceFour, "lease in use"); + leaseStatus.put(resourceFive, "lease in use"); // release lease one, two and three leaseStatus.put(resourceOne, "lease released"); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java index 3a3d59369e6..5b9f2ffee5b 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java @@ -28,7 +28,6 @@ import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.ozone.lease.Lease; import org.apache.hadoop.ozone.lease.LeaseAlreadyExistException; import org.apache.hadoop.ozone.lease.LeaseExpiredException; import org.apache.hadoop.ozone.lease.LeaseManager; @@ -124,20 +123,10 @@ private synchronized void handleStartEvent(TIMEOUT_PAYLOAD payload, trackedEventsByID.put(identifier, payload); trackedEvents.add(payload); try { - Lease lease = leaseManager.acquire(identifier); - listenForTimeout(lease, publisher, identifier); + leaseManager.acquire(identifier, + () -> handleTimeout(publisher, identifier)); } catch (LeaseAlreadyExistException e) { //No problem at all. But timer is not reset. - } - } - - private void listenForTimeout(Lease lease, EventPublisher publisher, - long identifier) { - try { - lease.registerCallBack(() -> { - handleTimeout(publisher, identifier); - return null; - }); } catch (LeaseExpiredException e) { handleTimeout(publisher, identifier); } @@ -157,13 +146,14 @@ protected synchronized void handleCompletion(COMPLETION_PAYLOAD } } - private synchronized void handleTimeout(EventPublisher publisher, + private synchronized Void handleTimeout(EventPublisher publisher, long identifier) { metrics.incrementTimedOutEvents(); TIMEOUT_PAYLOAD payload = trackedEventsByID.remove(identifier); trackedEvents.remove(payload); startTrackingTimes.remove(payload.getId()); onTimeout(publisher, payload); + return null; } From e17ec7347485a92b59b036daa62d527ef7e1d8ae Mon Sep 17 00:00:00 2001 From: sumitagrawl Date: Mon, 26 Jun 2023 18:26:58 +0530 Subject: [PATCH 4/5] HDDS-8418. Lease Manager unsafe method fix --- .../java/org/apache/hadoop/ozone/lease/TestLeaseManager.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lease/TestLeaseManager.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lease/TestLeaseManager.java index 9acde4a76c9..637c54f2857 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lease/TestLeaseManager.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lease/TestLeaseManager.java @@ -185,7 +185,6 @@ public void testLeaseCallback() throws LeaseException, InterruptedException { DummyResource resourceOne = new DummyResource("one"); Lease leaseOne = manager.acquire(resourceOne, () -> { leaseStatus.put(resourceOne, "lease expired"); - System.out.println("lease expired"); return null; }); leaseStatus.put(resourceOne, "lease in use"); @@ -200,7 +199,6 @@ public void testLeaseCallback() throws LeaseException, InterruptedException { Assertions.assertTrue(leaseOne.hasExpired()); Assertions.assertThrowsExactly(LeaseNotFoundException.class, () -> manager.get(resourceOne), "Resource: " + resourceOne); - System.out.println(leaseOne.hasExpired()); // check if callback has been executed Assertions.assertEquals("lease expired", leaseStatus.get(resourceOne)); } From d9fb8b1366ed8154024a2dc0fb2154f7d128e17b Mon Sep 17 00:00:00 2001 From: sumitagrawl Date: Mon, 26 Jun 2023 18:32:50 +0530 Subject: [PATCH 5/5] HDDS-8418. Lease Manager unsafe method fix --- .../src/main/java/org/apache/hadoop/ozone/lease/Lease.java | 3 +++ .../main/java/org/apache/hadoop/ozone/lease/LeaseManager.java | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java index ba438bc0d63..ccf33019aeb 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java @@ -77,6 +77,8 @@ public Lease(T resource, long timeout) { * Resource for which the lease has to be created * @param timeout * Lease lifetime in milliseconds + * @param callback + * Callback registered to be triggered when lease expire */ public Lease(T resource, long timeout, Callable callback) { this(resource, timeout); @@ -182,6 +184,7 @@ List> getCallbacks() { * Expires/Invalidates the lease. */ void invalidate() { + callbacks = null; expired = true; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java index ab26c776df3..ba86eefa791 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java @@ -156,7 +156,7 @@ public synchronized Lease acquire( } /** - * Returns a lease for the specified resource with the timeout provided. + * Returns a lease for the specified resource with the default timeout. * * @param resource * Resource for which lease has to be created