diff --git a/Directory.Build.props b/Directory.Build.props index f474e11bb..ef74843a3 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -11,7 +11,7 @@ 1570;1571;1572;1573;1574;1587;1591;1701;1702;1711;1735;0618 true enable - 5.9.1 + 5.9.2 $(PackageProjectUrl) true true diff --git a/src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs b/src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs index e749717ad..61e711716 100644 --- a/src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs +++ b/src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs @@ -362,3 +362,113 @@ private async Task readNodeAsync(DbDataReader reader) return null; } } + +internal class AdvisoryLock : IAdvisoryLock +{ + private readonly string _databaseName; + private readonly List _locks = new(); + private readonly ILogger _logger; + private readonly NpgsqlDataSource _source; + private NpgsqlConnection _conn; + + public AdvisoryLock(NpgsqlDataSource source, ILogger logger, string databaseName) + { + _source = source; + _logger = logger; + _databaseName = databaseName; + } + + public bool HasLock(int lockId) + { + return _conn is not { State: ConnectionState.Closed } && _locks.Contains(lockId); + } + + public async Task TryAttainLockAsync(int lockId, CancellationToken token) + { + if (_conn == null) + { + _conn = _source.CreateConnection(); + await _conn.OpenAsync(token).ConfigureAwait(false); + } + + if (_conn.State == ConnectionState.Closed) + { + try + { + await _conn.DisposeAsync().ConfigureAwait(false); + } + catch (Exception e) + { + _logger.LogError(e, "Error trying to clean up and restart an advisory lock connection"); + } + finally + { + _conn = null; + } + + return false; + } + + + var attained = await _conn.TryGetGlobalLock(lockId, token).ConfigureAwait(false); + if (attained == AttainLockResult.Success) + { + _locks.Add(lockId); + return true; + } + + return false; + } + + public async Task ReleaseLockAsync(int lockId) + { + if (!_locks.Contains(lockId)) + { + return; + } + + if (_conn == null || _conn.State == ConnectionState.Closed) + { + _locks.Remove(lockId); + return; + } + + var cancellation = new CancellationTokenSource(); + cancellation.CancelAfter(1.Seconds()); + + await _conn.ReleaseGlobalLock(lockId, cancellation.Token).ConfigureAwait(false); + _locks.Remove(lockId); + + if (!_locks.Any()) + { + await _conn.CloseAsync().ConfigureAwait(false); + await _conn.DisposeAsync().ConfigureAwait(false); + _conn = null; + } + } + + public async ValueTask DisposeAsync() + { + if (_conn == null) + { + return; + } + + try + { + foreach (var i in _locks) await _conn.ReleaseGlobalLock(i, CancellationToken.None).ConfigureAwait(false); + + await _conn.CloseAsync().ConfigureAwait(false); + await _conn.DisposeAsync().ConfigureAwait(false); + } + catch (Exception e) + { + _logger.LogError(e, "Error trying to dispose of advisory locks for database {Identifier}", + _databaseName); + } + finally + { + await _conn.DisposeAsync().ConfigureAwait(false); + } + } +}