diff --git a/src/Hangfire.PostgreSql/PostgreSqlJobQueue.cs b/src/Hangfire.PostgreSql/PostgreSqlJobQueue.cs index 89aa724..16f4a96 100644 --- a/src/Hangfire.PostgreSql/PostgreSqlJobQueue.cs +++ b/src/Hangfire.PostgreSql/PostgreSqlJobQueue.cs @@ -75,14 +75,10 @@ public IFetchedJob Dequeue(string[] queues, CancellationToken cancellationToken) cancelListenSource?.Cancel(); listenTask?.Wait(); } - catch (AggregateException ex) + catch (AggregateException) { - if (ex.InnerException is not TaskCanceledException) - { - throw; - } - - // Otherwise do nothing, cancel exception is expected. + // Swallow all exceptions from listen task cleanup. + // The main dequeue operation already succeeded or failed independently. } finally { @@ -282,30 +278,45 @@ private Task ListenForNotificationsAsync(CancellationToken cancellationToken) // CreateAnOpenConnection can return the same connection over and over if an existing connection // is passed in the constructor of PostgreSqlStorage. We must use a separate dedicated // connection to listen for notifications. - NpgsqlConnection clonedConnection = connection.CloneWith(connection.ConnectionString); + string connectionString = connection.ConnectionString; + NpgsqlConnection clonedConnection = connection.CloneWith(connectionString); return Task.Run(async () => { + NpgsqlConnection currentConnection = clonedConnection; try { - if (clonedConnection.State != ConnectionState.Open) - { - await clonedConnection.OpenAsync(cancellationToken); // Open so that Dapper doesn't auto-close. - } - while (!cancellationToken.IsCancellationRequested) { - await clonedConnection.ExecuteAsync($"LISTEN {JobNotificationChannel}"); - await clonedConnection.WaitAsync(cancellationToken); - JobQueueNotification.Set(); + try + { + if (currentConnection.State != ConnectionState.Open) + { + await currentConnection.OpenAsync(cancellationToken); + } + + await currentConnection.ExecuteAsync($"LISTEN {JobNotificationChannel}"); + await currentConnection.WaitAsync(cancellationToken); + JobQueueNotification.Set(); + } + catch (OperationCanceledException) + { + throw; + } + catch (Exception ex) when (ex.IsCatchableExceptionType()) + { + currentConnection?.Dispose(); + await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken); + currentConnection = new NpgsqlConnection(connectionString); + } } } - catch (TaskCanceledException) + catch (OperationCanceledException) { // Do nothing, cancellation requested so just end. } finally { - _storage.ReleaseConnection(clonedConnection); + currentConnection?.Dispose(); } }, cancellationToken); diff --git a/tests/Hangfire.PostgreSql.Tests/PostgreSqlJobQueueFacts.cs b/tests/Hangfire.PostgreSql.Tests/PostgreSqlJobQueueFacts.cs index f25cea1..f49c872 100644 --- a/tests/Hangfire.PostgreSql.Tests/PostgreSqlJobQueueFacts.cs +++ b/tests/Hangfire.PostgreSql.Tests/PostgreSqlJobQueueFacts.cs @@ -8,6 +8,7 @@ using Hangfire.PostgreSql.Tests.Utils; using Hangfire.PostgreSql.Utils; using Hangfire.Storage; +using Npgsql; using Xunit; namespace Hangfire.PostgreSql.Tests @@ -538,6 +539,54 @@ private void Enqueue_AddsAJobToTheQueue(bool useNativeDatabaseTransactions) }); } + [Fact] + [CleanDatabase] + public void Dequeue_ShouldSelfHeal_WhenListenConnectionFails() + { + UseConnection((_, storage) => { + storage.Options.QueuePollInterval = TimeSpan.FromMilliseconds(500); + PostgreSqlJobQueue queue = CreateJobQueue(storage, false, true); + Exception thrownException = null; + IFetchedJob job = null; + + CancellationTokenSource cts = new(TimeSpan.FromSeconds(10)); + + Task dequeueTask = Task.Run(() => { + try + { + job = queue.Dequeue(new[] { "default" }, cts.Token); + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + thrownException = ex; + } + }); + + Thread.Sleep(1000); + + using (NpgsqlConnection adminConnection = ConnectionUtils.CreateMasterConnection()) + { + adminConnection.Execute(@" + SELECT pg_terminate_backend(pid) + FROM pg_stat_activity + WHERE query LIKE '%LISTEN%' + AND pid <> pg_backend_pid()"); + } + + Thread.Sleep(500); + + using (NpgsqlConnection enqueueConnection = ConnectionUtils.CreateConnection()) + { + queue.Enqueue(enqueueConnection, "default", "1"); + } + + dequeueTask.Wait(TimeSpan.FromSeconds(5)); + + Assert.Null(thrownException); + Assert.NotNull(job); + }); + } + private static CancellationToken CreateTimingOutCancellationToken() { CancellationTokenSource source = new CancellationTokenSource(TimeSpan.FromSeconds(10));