diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/HeartbeatManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/HeartbeatManager.java
new file mode 100644
index 0000000000000..97c9b864d498c
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/HeartbeatManager.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+/**
+ * The heartbeat manager interface is meant to manage the lifecycle of heartbeat tasks.
+ *
+ */
+public interface HeartbeatManager extends AutoCloseable {
+
+ /**
+ * Starts the heartbeat for the given thread and does not stop until stopHeartbeat is called or the thread has died.
+ * @param threadToMonitor The thread to pass to/monitor when running the heartbeat task.
+ * @return @return True when there is no previously active heartbeat and the heartbeat is successfully started. False
+ * otherwise.
+ */
+ boolean startHeartbeatForThread(Thread threadToMonitor);
+
+ /**
+ * Stops the heartbeat, if one is active.
+ * This is a blocking call, which drains any in-flight heart beat task execution before return.
+ * @param mayInterruptIfRunning Whether we may interrupt the underlying heartbeat task if it is in-flight.
+ * @return true: no heartbeat task is in-flight or to be executed.
+ * false: failed to stop the heartbeat, there can still be recurring execution of heartbeat tasks.
+ */
+ boolean stopHeartbeat(boolean mayInterruptIfRunning);
+
+ /**
+ * Whether the heartbeat manager has an active heartbeat task currently.
+ * @return A boolean.
+ */
+ boolean hasActiveHeartbeat();
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
new file mode 100644
index 0000000000000..be73bd7cb2c0e
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * LockProviderHeartbeatManager is a helper class which handles the scheduling and stopping of heartbeat
+ * tasks. This is intended for use with the storage based lock provider, which requires
+ * a separate thread to spawn and renew the lock repeatedly.
+ * It should be responsible for the entire lifecycle of the heartbeat task.
+ * Importantly, a new instance should be created for each lock provider.
+ */
+@ThreadSafe
+public class LockProviderHeartbeatManager implements HeartbeatManager {
+ public static long DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS = 15_000L;
+ @GuardedBy("this")
+ private final ScheduledExecutorService scheduler;
+
+ // Constant does not need multi-threading protections.
+ private final String ownerId;
+ private final Logger logger;
+ private final long heartbeatTimeMs;
+
+ /**
+ * Contract for the heartbeat function execution.
+ *
+ *
Behavior of the heartbeat manager (consumer):
+ *
+ * - Executes heartBeatFuncToExec every heartbeatTimeMs when:
+ *
+ * - heartBeatFuncToExec returns true
+ *
+ *
+ * - Stops executing heartBeatFuncToExec when:
+ *
+ * - heartBeatFuncToExec returns false
+ * - heartBeatFuncToExec throws an exception
+ * - heart beat manager calls stopHeartbeat, which will interrupt any inflight execution
+ * and prevent further recurring executions
+ *
+ *
+ *
+ *
+ * Requirements for heartBeatFuncToExec implementation:
+ *
+ * - Should perform the logic of renewing lock lease
+ * - Should be super light-weight, typically runs within 1 second
+ * - Should handle thread interruptions so that the stopHeartbeat function will not wait long for any
+ * inflight execution to complete
+ * - Should almost always return true in cases like:
+ *
+ * - Successfully extending the lock lease
+ * - Transient failures (network partition, remote service errors) to allow automatic retry
+ *
+ *
+ * - Should return false only in specific cases:
+ *
+ * - When the lock is already expired (no point in extending an expired lock)
+ * - When the writer thread does not hold any lock
+ *
+ *
+ *
+ *
+ * Warning: Returning false stops all future lock renewal attempts. If the writer thread
+ * is still running, it will execute with a lock that can expire at any time, potentially
+ * leading to corrupted data.
+ */
+ private final Supplier heartbeatFuncToExec;
+ private final long stopHeartbeatTimeoutMs;
+
+ // We ensure within the context of LockProviderHeartbeatManager, heartbeatFuncToExec only execute in a single thread periodically.
+ @GuardedBy("this")
+ private ScheduledFuture> scheduledFuture;
+
+ /**
+ * Semaphore for managing heartbeat task execution synchronization.
+ *
+ * IMPORTANT: Thread Safety Warning
+ * This semaphore is mutually exclusive with {@code synchronized(this)}. Never synchronize
+ * on {@code this} while the thread is holding the heartbeatSemaphore.
+ *
+ *
Execution flow:
+ *
+ * - Heartbeat task always attempts to acquire the semaphore before proceeding and
+ * releases it before finishing the current round of execution
+ * - The heartbeat manager acquires the semaphore when it needs to:
+ *
+ * - Drain any inflight execution
+ * - Prevent further execution of the heartbeat task
+ *
+ *
+ *
+ */
+ private final Semaphore heartbeatSemaphore;
+
+ private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(LockProviderHeartbeatManager.class);
+
+ /**
+ * Initializes a heartbeat manager.
+ * @param ownerId The identifier for logging of who owns this heartbeat manager.
+ * @param heartbeatTimeMs The time between heartbeat executions.
+ * The first heartbeat will execute after this amount of time elapses.
+ * @param heartbeatFuncToExec The function to execute on each heartbeat. This should handle interrupts.
+ */
+ public LockProviderHeartbeatManager(String ownerId,
+ long heartbeatTimeMs,
+ Supplier heartbeatFuncToExec) {
+ this(
+ ownerId,
+ createThreadScheduler((ownerId != null && ownerId.length() >= 6) ? ownerId.substring(0, 6) : ""),
+ heartbeatTimeMs,
+ DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS,
+ heartbeatFuncToExec,
+ new Semaphore(1),
+ DEFAULT_LOGGER);
+ }
+
+ @VisibleForTesting
+ LockProviderHeartbeatManager(String ownerId,
+ ScheduledExecutorService scheduler,
+ long heartbeatTimeMs,
+ long stopHeartbeatTimeoutMs,
+ Supplier heartbeatFuncToExec,
+ Semaphore heartbeatSemaphore,
+ Logger testLogger) {
+ this.ownerId = ownerId;
+ this.heartbeatTimeMs = heartbeatTimeMs;
+ this.heartbeatFuncToExec = heartbeatFuncToExec;
+ this.logger = testLogger;
+ this.scheduler = scheduler;
+ this.heartbeatSemaphore = heartbeatSemaphore;
+ this.stopHeartbeatTimeoutMs = stopHeartbeatTimeoutMs;
+ }
+
+ /**
+ * Creates a new thread scheduler for heartbeat execution.
+ */
+ private static ScheduledExecutorService createThreadScheduler(String shortUuid) {
+ return Executors.newSingleThreadScheduledExecutor(
+ r -> new Thread(r, "LockProvider-HeartbeatManager-Thread-" + shortUuid));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public synchronized boolean startHeartbeatForThread(Thread threadToMonitor) {
+ if (threadToMonitor == null) {
+ throw new IllegalArgumentException("threadToMonitor cannot be null.");
+ }
+
+ if (this.hasActiveHeartbeat()) {
+ logger.warn("Owner {}: Heartbeat is already running.", ownerId);
+ return false;
+ }
+ try {
+ scheduledFuture = scheduler.scheduleAtFixedRate(() -> heartbeatTaskRunner(threadToMonitor), heartbeatTimeMs, heartbeatTimeMs, TimeUnit.MILLISECONDS);
+ logger.debug("Owner {}: Heartbeat started with interval: {} ms", ownerId, heartbeatTimeMs);
+ return true;
+ } catch (Exception e) {
+ logger.error("Owner {}: Unable to schedule heartbeat task. {}", ownerId, e);
+ return false;
+ }
+ }
+
+ /**
+ * Responsible for managing the execution and result of the heartbeat task.
+ * Maintains a semaphore which ensures thread safety for determining the state
+ * of the heartbeat (is the heartbeat executing or not).
+ * @param threadToMonitor The thread to monitor. Required by heartbeat execution.
+ */
+ private void heartbeatTaskRunner(Thread threadToMonitor) {
+ if (!heartbeatSemaphore.tryAcquire()) {
+ logger.error("Owner {}: Heartbeat semaphore should be acquirable at the start of every heartbeat!", ownerId);
+ return;
+ }
+
+ boolean heartbeatExecutionSuccessful;
+ try {
+ heartbeatExecutionSuccessful = executeHeartbeat(threadToMonitor);
+ } finally {
+ heartbeatSemaphore.release();
+ }
+
+ // Call synchronized method after releasing the semaphore
+ if (!heartbeatExecutionSuccessful) {
+ logger.warn("Owner {}: Heartbeat function did not succeed.", ownerId);
+ // Unschedule self from further execution if heartbeat was unsuccessful.
+ heartbeatTaskUnscheduleItself();
+ }
+ }
+
+ /**
+ * Executes the heartbeat task.
+ * @param threadToMonitor The thread to monitor. If we detect that
+ * this thread has stopped we should end the heartbeat.
+ * @return Whether the heartbeat task successfully ran.
+ */
+ private boolean executeHeartbeat(Thread threadToMonitor) {
+ // Check if monitored thread is dead
+ if (!threadToMonitor.isAlive()) {
+ logger.warn("Owner {}: Monitored thread is no longer alive.", ownerId);
+ return false;
+ }
+
+ // Execute heartbeat function
+ try {
+ return heartbeatFuncToExec.get();
+ } catch (Exception e) {
+ logger.error("Owner {}: Heartbeat function threw exception {}", ownerId, e);
+ }
+ return false;
+ }
+
+ /**
+ * This prevents further scheduling of the heartbeat task. Intended to be used by heartbeat task itself.
+ */
+ private synchronized void heartbeatTaskUnscheduleItself() {
+ // Do not interrupt this current task.
+ // This will cancel all future invocations.
+ if (scheduledFuture != null) {
+ boolean cancellationSuccessful = scheduledFuture.cancel(true);
+ logger.info("Owner {}: Requested termination of heartbeat task. Cancellation returned {}.", this.ownerId, cancellationSuccessful);
+ scheduledFuture = null;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean stopHeartbeat(boolean mayInterruptIfRunning) {
+ if (cancelRecurringHeartbeatTask(mayInterruptIfRunning)) {
+ return false;
+ }
+
+ // If we requested to stop heartbeat, here we ensure the cancel request results in heartbeat task
+ // exiting synchronously.
+ boolean heartbeatStillInflight = syncWaitInflightHeartbeatTaskToFinish();
+ if (heartbeatStillInflight) {
+ // If waiting for cancellation was interrupted, do not log an error.
+ if (Thread.currentThread().isInterrupted()) {
+ logger.warn("Owner {}: Heartbeat is still in flight due to interruption!", ownerId);
+ } else {
+ logger.error("Owner {}: Heartbeat is still in flight!", ownerId);
+ }
+ return false;
+ }
+
+ // We have stopped the heartbeat, now clean up any leftover states.
+ synchronized (this) {
+ logger.debug("Owner {}: Heartbeat task successfully terminated.", ownerId);
+ scheduledFuture = null;
+ }
+ return true;
+ }
+
+ /**
+ * Cancels the recurring heartbeat task.
+ * @param mayInterruptIfRunning Whether to interrupt the heartbeat task if it is currently running.
+ * @return True if the heartbeat task did not need to be stopped.
+ */
+ private synchronized boolean cancelRecurringHeartbeatTask(boolean mayInterruptIfRunning) {
+ if (!this.hasActiveHeartbeat()) {
+ logger.warn("Owner {}: No active heartbeat task to stop.", ownerId);
+ return true;
+ }
+
+ // Attempt to cancel the scheduled future
+ boolean cancellationSuccessful = scheduledFuture.cancel(mayInterruptIfRunning);
+ logger.debug("Owner {}: Requested termination of heartbeat task. Cancellation returned {}", ownerId, cancellationSuccessful);
+ return false;
+ }
+
+ private boolean syncWaitInflightHeartbeatTaskToFinish() {
+ // Wait for up to stopHeartbeatTimeoutMs for the currently executing heartbeat task to complete.
+ // It is assumed that the heartbeat task, when finishing its execution,
+ // sets heartbeatIsExecuting to false and calls notifyAll() on this object.
+ boolean heartbeatStillInflight = true;
+ try {
+ // Semaphore successfully acquired here excludes the heart execution task. tryAcquire with timeout
+ // means we wait any inflight task execution to finish synchronously.
+ heartbeatStillInflight = !heartbeatSemaphore.tryAcquire(stopHeartbeatTimeoutMs, TimeUnit.MILLISECONDS);
+ if (heartbeatStillInflight) {
+ logger.warn("Owner {}: Timed out while waiting for heartbeat termination.", ownerId);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.warn("Owner {}: Interrupted while waiting for heartbeat termination.", ownerId);
+ }
+ // If we successfully acquired the semaphore before, return it here.
+ heartbeatSemaphore.release(heartbeatStillInflight ? 0 : 1);
+ return heartbeatStillInflight;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public synchronized boolean hasActiveHeartbeat() {
+ return scheduledFuture != null;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public synchronized void close() throws Exception {
+ if (hasActiveHeartbeat()) {
+ stopHeartbeat(true);
+ }
+ scheduler.shutdown();
+
+ try {
+ if (!scheduler.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)) {
+ scheduler.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ scheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/TestLockProviderHeartbeatManager.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/TestLockProviderHeartbeatManager.java
new file mode 100644
index 0000000000000..cdeca536b3ae3
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/TestLockProviderHeartbeatManager.java
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import static org.apache.hudi.client.transaction.lock.models.LockProviderHeartbeatManager.DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestLockProviderHeartbeatManager {
+
+ private ScheduledExecutorService mockScheduler;
+ private Logger mockLogger;
+ private ScheduledFuture> mockFuture;
+ private HeartbeatManager manager;
+ private static final String LOGGER_ID = "test-owner";
+ private ScheduledExecutorService actualExecutorService;
+
+ @BeforeEach
+ void setUp() {
+ mockScheduler = mock(ScheduledExecutorService.class);
+ mockLogger = mock(Logger.class);
+ mockFuture = mock(ScheduledFuture.class);
+ actualExecutorService = Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "Heartbeat-Test-Thread");
+ t.setDaemon(true);
+ return t;
+ });
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (manager != null) {
+ manager.close();
+ manager = null;
+ }
+ actualExecutorService.shutdownNow();
+ }
+
+ @Test
+ void testStartHeartbeatSuccess() {
+ when(mockScheduler.scheduleAtFixedRate(any(Runnable.class), eq(100L), eq(100L), eq(TimeUnit.MILLISECONDS)))
+ .thenAnswer(invocation -> mockFuture);
+ manager = createDefaultManagerWithMocks(() -> true);
+ assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+ }
+
+ @Test
+ void testStartHeartbeatAlreadyRunning() {
+ when(mockScheduler.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)))
+ .thenAnswer(invocation -> mockFuture);
+
+ manager = createDefaultManagerWithMocks(() -> true);
+
+ assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+ assertFalse(manager.startHeartbeatForThread(Thread.currentThread()));
+ verify(mockLogger).warn("Owner {}: Heartbeat is already running.", LOGGER_ID);
+ }
+
+ @Test
+ void testStartHeartbeatSchedulerException() {
+ doThrow(new RejectedExecutionException("Scheduler failure"))
+ .when(mockScheduler)
+ .scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
+
+ manager = createDefaultManagerWithMocks(() -> true);
+
+ assertFalse(manager.startHeartbeatForThread(Thread.currentThread()));
+ verify(mockLogger).error(eq("Owner {}: Unable to schedule heartbeat task. {}"), eq(LOGGER_ID), any(RejectedExecutionException.class));
+ }
+
+ @Test
+ void testStopHeartbeatNeverStarted() {
+ manager = createDefaultManagerWithMocks(() -> true);
+
+ assertFalse(manager.stopHeartbeat(true));
+ verify(mockLogger).warn("Owner {}: No active heartbeat task to stop.", LOGGER_ID);
+ }
+
+ @Test
+ void testStopHeartbeatAlreadyRequested() {
+ when(mockScheduler.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)))
+ .thenAnswer(invocation -> mockFuture);
+
+ manager = createDefaultManagerWithMocks(() -> true);
+ assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+
+ when(mockFuture.cancel(true)).thenReturn(true);
+ when(mockFuture.isDone()).thenReturn(false).thenReturn(true);
+
+ assertTrue(manager.stopHeartbeat(true));
+
+ // Call stop again
+ assertFalse(manager.stopHeartbeat(true));
+ verify(mockLogger).warn("Owner {}: No active heartbeat task to stop.", LOGGER_ID);
+ }
+
+ @Test
+ void testHeartbeatUnableToAcquireSemaphore() throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference t = new AtomicReference<>();
+ when(mockScheduler.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)))
+ .thenAnswer(invocation -> {
+ Runnable task = invocation.getArgument(0);
+ t.set(new Thread(() -> {
+ task.run();
+ latch.countDown();
+ }));
+ return mockFuture;
+ });
+
+ when(mockFuture.cancel(true)).thenReturn(true);
+ Semaphore semaphore = mock(Semaphore.class);
+
+ // Stub the tryAcquire() method to return false (for the heartbeat) and true (for stop)
+ when(semaphore.tryAcquire()).thenReturn(false);
+ when(semaphore.tryAcquire(eq(DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS), eq(TimeUnit.MILLISECONDS))).thenReturn(true);
+ manager = new LockProviderHeartbeatManager(
+ LOGGER_ID,
+ mockScheduler,
+ 100L,
+ DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS,
+ () -> true,
+ semaphore,
+ mockLogger);
+ assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+ t.get().start();
+ assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
+ assertTrue(manager.stopHeartbeat(true));
+
+ verify(mockLogger).error("Owner {}: Heartbeat semaphore should be acquirable at the start of every heartbeat!", LOGGER_ID);
+ assertFalse(manager.hasActiveHeartbeat());
+ }
+
+ @Test
+ void testStopHeartbeatMockSuccessfulCancel() {
+ when(mockScheduler.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)))
+ .thenAnswer(invocation -> mockFuture);
+ when(mockFuture.cancel(true)).thenReturn(true);
+
+ manager = createDefaultManagerWithMocks(() -> true);
+ manager.startHeartbeatForThread(Thread.currentThread());
+
+ when(mockFuture.isDone()).thenReturn(false).thenReturn(true);
+ assertTrue(manager.stopHeartbeat(true));
+ }
+
+ @Test
+ void testHeartbeatTaskHandlesInterrupt() throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference t = new AtomicReference<>();
+ when(mockScheduler.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)))
+ .thenAnswer(invocation -> {
+ Runnable task = invocation.getArgument(0);
+ t.set(new Thread(() -> {
+ task.run();
+ latch.countDown();
+ }));
+ return mockFuture;
+ });
+
+ when(mockFuture.cancel(true)).thenReturn(true);
+
+ // Initialize heartbeat manager with a function that always returns false (renewal failure)
+ manager = createDefaultManagerWithMocks(() -> false);
+ assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+ t.get().start();
+ t.get().interrupt();
+
+ assertTrue(latch.await(500, TimeUnit.MILLISECONDS), "Heartbeat task did not run in time");
+
+ // This call will wait for heartbeat task to stop itself, as the semaphore has already been acquired by the heartbeat task.
+ assertFalse(manager.stopHeartbeat(true));
+
+ verify(mockLogger).warn("Owner {}: No active heartbeat task to stop.", LOGGER_ID);
+ verify(mockLogger).debug(
+ "Owner {}: Heartbeat started with interval: {} ms",
+ "test-owner",
+ 100L
+ );
+ verify(mockLogger).info("Owner {}: Requested termination of heartbeat task. Cancellation returned {}.", LOGGER_ID, true);
+ assertFalse(manager.hasActiveHeartbeat());
+ }
+
+ @Test
+ void testHeartbeatTaskNullWriter() {
+ manager = createDefaultManagerWithMocks(() -> true);
+ assertThrows(IllegalArgumentException.class, () -> manager.startHeartbeatForThread(null));
+ }
+
+ @Test
+ void testHeartbeatTaskImmediateDeadMonitoringThread() throws InterruptedException {
+ // Use a real thread that will terminate immediately.
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference t = new AtomicReference<>();
+ when(mockScheduler.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)))
+ .thenAnswer(invocation -> {
+ Runnable task = invocation.getArgument(0);
+ t.set(new Thread(() -> {
+ task.run();
+ latch.countDown();
+ }));
+ return mockFuture;
+ });
+
+ when(mockFuture.cancel(false)).thenReturn(false);
+ Thread deadThread = new Thread(() -> {
+ });
+ deadThread.start();
+ deadThread.join();
+ manager = createDefaultManagerWithMocks(() -> true);
+
+ assertTrue(manager.startHeartbeatForThread(deadThread));
+ t.get().start();
+
+ assertTrue(latch.await(500, TimeUnit.MILLISECONDS), "Heartbeat task did not run in time");
+ verify(mockLogger).warn("Owner {}: Monitored thread is no longer alive.", LOGGER_ID);
+ verify(mockLogger).info("Owner {}: Requested termination of heartbeat task. Cancellation returned {}.", LOGGER_ID, false);
+ assertFalse(manager.hasActiveHeartbeat());
+ }
+
+ @Test
+ void testHeartbeatTaskRenewalException() throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference t = new AtomicReference<>();
+ when(mockScheduler.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)))
+ .thenAnswer(invocation -> {
+ Runnable task = invocation.getArgument(0);
+ t.set(new Thread(() -> {
+ task.run();
+ latch.countDown();
+ }));
+ return mockFuture;
+ });
+ manager = createDefaultManagerWithMocks(() -> {
+ throw new RuntimeException("Renewal error");
+ });
+
+ assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+ t.get().start();
+ assertTrue(latch.await(500, TimeUnit.MILLISECONDS), "Heartbeat task did not run in time");
+ verify(mockLogger).error(
+ eq("Owner {}: Heartbeat function threw exception {}"),
+ eq(LOGGER_ID),
+ any(RuntimeException.class));
+ assertFalse(manager.hasActiveHeartbeat());
+ }
+
+ @Test
+ void testHeartbeatStopWaitsForHeartbeatTaskToFinish() throws InterruptedException {
+ // Use a real thread
+ CountDownLatch stopHeartbeatTaskLatch = new CountDownLatch(1);
+ manager = createDefaultManagerWithRealExecutor(() -> {
+ try {
+ // This will freeze the heartbeat task.
+ assertTrue(stopHeartbeatTaskLatch.await(500, TimeUnit.MILLISECONDS));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return true;
+ });
+
+ assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+ CountDownLatch finishStopHeartbeatLatch = new CountDownLatch(1);
+ Thread t = new Thread(() -> {
+ assertTrue(manager.stopHeartbeat(false));
+ finishStopHeartbeatLatch.countDown();
+ });
+ t.start();
+ // Unblock the heartbeat task.
+ stopHeartbeatTaskLatch.countDown();
+ assertTrue(finishStopHeartbeatLatch.await(500, TimeUnit.MILLISECONDS), "Stop heartbeat task did not finish.");
+ assertFalse(manager.hasActiveHeartbeat());
+ verify(mockLogger).debug("Owner {}: Heartbeat task successfully terminated.", LOGGER_ID);
+ }
+
+ @Test
+ void testHeartbeatUnableToStopHeartbeatTask() throws InterruptedException {
+ CountDownLatch stopHeartbeatTaskLatch = new CountDownLatch(1);
+ CountDownLatch heartbeatStartedLatch = new CountDownLatch(1);
+ // Set stop heartbeat timeout to 5000ms
+ manager = new LockProviderHeartbeatManager(LOGGER_ID, actualExecutorService, 100L, 5000L, () -> {
+ try {
+ // Tells us that the heartbeat has started
+ heartbeatStartedLatch.countDown();
+ // This will freeze the heartbeat task.
+ assertTrue(stopHeartbeatTaskLatch.await(10000, TimeUnit.MILLISECONDS));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ // Regardless of whether we return true or false the future executions will be cancelled.
+ return true;
+ }, new Semaphore(1), mockLogger);
+
+ assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+ CountDownLatch stopHeartbeatLatch = new CountDownLatch(1);
+ Thread stopHeartbeatThread = new Thread(() -> {
+ // Try to stop the heartbeat (this should hang for 15 seconds)
+ assertFalse(manager.stopHeartbeat(false));
+ stopHeartbeatLatch.countDown();
+ });
+ assertTrue(heartbeatStartedLatch.await(500, TimeUnit.MILLISECONDS), "Heartbeat task did not start.");
+ stopHeartbeatThread.start();
+ assertTrue(stopHeartbeatLatch.await(7000, TimeUnit.MILLISECONDS), "Stop heartbeat task did not finish.");
+ assertTrue(manager.hasActiveHeartbeat());
+ verify(mockLogger).error("Owner {}: Heartbeat is still in flight!", LOGGER_ID);
+ // Unblock the heartbeat task.
+ stopHeartbeatTaskLatch.countDown();
+ }
+
+ @Test
+ void testHeartbeatInterruptStopHeartbeatTask() throws InterruptedException {
+ CountDownLatch stopHeartbeatTaskLatch = new CountDownLatch(1);
+ CountDownLatch heartbeatStartedLatch = new CountDownLatch(1);
+ // Set stop heartbeat timeout to 5000ms
+ manager = new LockProviderHeartbeatManager(LOGGER_ID, actualExecutorService, 100L, 5000L, () -> {
+ try {
+ // Tells us that the heartbeat has started
+ heartbeatStartedLatch.countDown();
+ // This will freeze the heartbeat task.
+ assertTrue(stopHeartbeatTaskLatch.await(10000, TimeUnit.MILLISECONDS));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ // Regardless of whether we return true or false the future executions will be cancelled.
+ return true;
+ }, new Semaphore(1), mockLogger);
+
+ assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+ CountDownLatch stopHeartbeatLatch = new CountDownLatch(1);
+ Thread stopHeartbeatThread = new Thread(() -> {
+ // Try to stop the heartbeat (this should hang for 15 seconds)
+ assertFalse(manager.stopHeartbeat(false));
+ stopHeartbeatLatch.countDown();
+ });
+ assertTrue(heartbeatStartedLatch.await(500, TimeUnit.MILLISECONDS), "Heartbeat task did not start.");
+ stopHeartbeatThread.start();
+ stopHeartbeatThread.interrupt();
+ assertTrue(stopHeartbeatLatch.await(7000, TimeUnit.MILLISECONDS), "Stop heartbeat task did not finish.");
+ assertTrue(manager.hasActiveHeartbeat());
+ verify(mockLogger).warn("Owner {}: Interrupted while waiting for heartbeat termination.", LOGGER_ID);
+ // Unblock the heartbeat task.
+ stopHeartbeatTaskLatch.countDown();
+ }
+
+ @Test
+ void testHeartbeatTaskValidateStop() throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(2);
+
+ manager = createDefaultManagerWithRealExecutor(() -> {
+ latch.countDown();
+ return true;
+ });
+
+ assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+
+ // Wait until at least two heartbeat renewals have occurred
+ assertTrue(latch.await(2000, TimeUnit.MILLISECONDS), "Heartbeat did not renew twice in time");
+
+ assertEquals(0, latch.getCount(), "Heartbeat did not execute exactly twice");
+
+ assertTrue(manager.hasActiveHeartbeat());
+ assertTrue(manager.stopHeartbeat(false));
+ assertFalse(manager.hasActiveHeartbeat());
+ }
+
+ @Test
+ void testDefaultManagerRapidStartStop1Ms() {
+ manager = new LockProviderHeartbeatManager(LOGGER_ID, 1, () -> true);
+
+ for (int i = 0; i < 100; i++) {
+ assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+ assertTrue(manager.hasActiveHeartbeat());
+ assertTrue(manager.stopHeartbeat(true));
+ assertFalse(manager.hasActiveHeartbeat());
+ }
+ }
+
+ @Test
+ void testClose() throws Exception {
+ manager = createDefaultManagerWithMocks(() -> true);
+ manager.close();
+ assertFalse(manager.hasActiveHeartbeat());
+ }
+
+ @Test
+ void testClose_StopsHeartbeatAndShutsDownScheduler() throws Exception {
+ when(mockScheduler.awaitTermination(5, TimeUnit.SECONDS)).thenReturn(true);
+ manager = createDefaultManagerWithMocks(() -> true);
+
+ manager.close();
+
+ verify(mockScheduler).shutdown();
+ verify(mockScheduler, never()).shutdownNow();
+ }
+
+ @Test
+ void testClose_ForceShutdownWhenTerminationTimesOut() throws Exception {
+ when(mockScheduler.awaitTermination(5, TimeUnit.SECONDS)).thenReturn(false);
+ manager = createDefaultManagerWithMocks(() -> true);
+
+ manager.close();
+
+ verify(mockScheduler).shutdown();
+ verify(mockScheduler).shutdownNow();
+ }
+
+ @Test
+ void testClose_HandlesInterruptedException() throws Exception {
+ when(mockScheduler.awaitTermination(5, TimeUnit.SECONDS)).thenThrow(new InterruptedException());
+ manager = createDefaultManagerWithMocks(() -> true);
+
+ manager.close();
+
+ verify(mockScheduler).shutdown();
+ verify(mockScheduler).shutdownNow();
+ assertTrue(Thread.currentThread().isInterrupted(), "Thread should be interrupted after exception handling");
+ }
+
+ private LockProviderHeartbeatManager createDefaultManagerWithMocks(Supplier heartbeatFunc) {
+ return new LockProviderHeartbeatManager(
+ LOGGER_ID,
+ mockScheduler,
+ 100L,
+ DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS,
+ heartbeatFunc,
+ new Semaphore(1),
+ mockLogger);
+ }
+
+ private LockProviderHeartbeatManager createDefaultManagerWithRealExecutor(Supplier heartbeatFunc) {
+ return new LockProviderHeartbeatManager(
+ LOGGER_ID,
+ actualExecutorService,
+ 100L,
+ DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS,
+ heartbeatFunc,
+ new Semaphore(1),
+ mockLogger);
+ }
+}