Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 52 additions & 11 deletions src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -427,23 +427,30 @@ public async Task ReleaseLockAsync(int lockId)
return;
}

if (_conn == null || _conn.State == ConnectionState.Closed)
if (_conn == null || _conn.State != ConnectionState.Open)
{
_locks.Remove(lockId);
return;
}

var cancellation = new CancellationTokenSource();
cancellation.CancelAfter(1.Seconds());
try
{
var cancellation = new CancellationTokenSource();
cancellation.CancelAfter(1.Seconds());

await _conn.ReleaseGlobalLock(lockId, cancellation.Token).ConfigureAwait(false);
}
catch (Exception e)
{
_logger.LogDebug(e, "Error trying to release advisory lock {LockId} for database {Identifier}",
lockId, _databaseName);
}

await _conn.ReleaseGlobalLock(lockId, cancellation.Token).ConfigureAwait(false);
_locks.Remove(lockId);

if (!_locks.Any())
{
await _conn.CloseAsync().ConfigureAwait(false);
await _conn.DisposeAsync().ConfigureAwait(false);
_conn = null;
await safeCloseConnectionAsync().ConfigureAwait(false);
}
}

Expand All @@ -456,19 +463,53 @@ public async ValueTask DisposeAsync()

try
{
foreach (var i in _locks) await _conn.ReleaseGlobalLock(i, CancellationToken.None).ConfigureAwait(false);
if (_conn.State == ConnectionState.Open)
{
foreach (var i in _locks)
{
try
{
await _conn.ReleaseGlobalLock(i, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception e)
{
_logger.LogDebug(e,
"Error trying to release advisory lock {LockId} during dispose for database {Identifier}",
i, _databaseName);
}
}
}

await safeCloseConnectionAsync().ConfigureAwait(false);
}
catch (Exception e)
{
_logger.LogDebug(e, "Error trying to dispose of advisory locks for database {Identifier}",
_databaseName);
}
}

private async Task safeCloseConnectionAsync()
{
if (_conn == null) return;

try
{
if (_conn.State == ConnectionState.Open)
{
await _conn.CloseAsync().ConfigureAwait(false);
}

await _conn.CloseAsync().ConfigureAwait(false);
await _conn.DisposeAsync().ConfigureAwait(false);
}
catch (Exception e)
{
_logger.LogError(e, "Error trying to dispose of advisory locks for database {Identifier}",
_logger.LogDebug(e, "Error trying to close advisory lock connection for database {Identifier}",
_databaseName);
}
finally
{
await _conn.DisposeAsync().ConfigureAwait(false);
_conn = null;
}
}
}
Loading