Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 99 additions & 9 deletions src/DistributedLock.Postgres/PostgresAdvisoryLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,19 @@ private PostgresAdvisoryLock(bool isShared)
return null;
}

// Only in the case where we will try to acquire a transaction-scoped lock, we will define a save point, but we won't be able to roll it back
// in case of a successful lock acquisition becuase the lock will be released. Therefore, in such cases, we capture the timeout settings values before
// we set a save point, and then we try to restore the values after the attempt to acquire the lock.
// NOTE: the save point functionality can't be removed in favor of capturing and restoring the values for all cases.
// When an error occurs while attempting to acquire the lock, the transaction is aborted and we can't run any other query, unless either
// the transaction or the save point are rolled back.
var capturedTimeoutSettings = await CaptureTimeoutSettingsIfNeededAsync(connection, cancellationToken).ConfigureAwait(false);

// Our acquire command will use SET LOCAL to set up statement timeouts. This lasts until the end
// of the current transaction instead of just the current batch if we're in a transaction. To make sure
// we don't leak those settings, in the case of a transaction, we first set up a save point which we can
// later roll back (taking the settings changes with it but NOT the lock). Because we can't confidently
// roll back a save point without knowing that it has been set up, we start the save point in its own
// later roll back (only in cases where we don't acquire a transaction scoped lock - taking the settings changes with it but NOT the lock).
// Because we can't confidently roll back a save point without knowing that it has been set up, we start the save point in its own
// query before we try-catch.
var needsSavePoint = await ShouldDefineSavePoint(connection).ConfigureAwait(false);

Expand All @@ -70,6 +78,8 @@ private PostgresAdvisoryLock(bool isShared)
{
await RollBackTransactionTimeoutVariablesIfNeededAsync(acquired: false).ConfigureAwait(false);

await RestoreTimeoutSettingsIfNeededAsync(capturedTimeoutSettings, connection).ConfigureAwait(false);

if (ex is PostgresException postgresException)
{
switch (postgresException.SqlState)
Expand Down Expand Up @@ -115,6 +125,8 @@ private PostgresAdvisoryLock(bool isShared)

await RollBackTransactionTimeoutVariablesIfNeededAsync(acquired: acquired == true).ConfigureAwait(false);

await RestoreTimeoutSettingsIfNeededAsync(capturedTimeoutSettings, connection).ConfigureAwait(false);

return acquired switch
{
false => null,
Expand All @@ -124,8 +136,11 @@ private PostgresAdvisoryLock(bool isShared)

async ValueTask RollBackTransactionTimeoutVariablesIfNeededAsync(bool acquired)
{
if (needsSavePoint
if (needsSavePoint
// For transaction scoped locks, we can't roll back the save point on success because that will roll back our hold on the lock.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This used to also say It's ok to "leak" the savepoint in that case because it's an internally-owned transaction/connection and the savepoint will be cleaned up with the disposal of the transaction.

The previous justification doesn't quite hold since it is now sometimes externally-owned transaction.

We should add back a comment justifying the leak.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add a comment after we'll decide what to do regarding the leaked save points.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment

// It's ok to "leak" the savepoint because if it's an internally-owned transaction then the savepoint will be cleaned up with the disposal of the transaction.
// If it's an externally-owned transaction then we must "leak" it or we will lose the lock. Also, we can't avoid using a save point in this case
// because otherwise if an exception had occurred the extrenally-owned transaction will be aborted and become completely unusable.
&& !(acquired && UseTransactionScopedLock(connection)))
{
// attempt to clear the timeout variables we set
Expand Down Expand Up @@ -181,31 +196,75 @@ private DatabaseCommand CreateAcquireCommand(DatabaseConnection connection, Post
return command;
}

private static async ValueTask<CapturedTimeoutSettings?> CaptureTimeoutSettingsIfNeededAsync(DatabaseConnection connection, CancellationToken cancellationToken)
{
var shouldCaptureTimeoutSettings = connection.IsExernallyOwned && UseTransactionScopedLock(connection);

// Return null in case we won't try to acquire an externally-owned transaction-scoped lock.
if (!shouldCaptureTimeoutSettings) { return null; }

var statementTimeout = await GetCurrentSetting("statement_timeout", connection, cancellationToken).ConfigureAwait(false);
var lockTimeout = await GetCurrentSetting("lock_timeout", connection, cancellationToken).ConfigureAwait(false);

var capturedTimeoutSettings = new CapturedTimeoutSettings(statementTimeout!, lockTimeout!);

return capturedTimeoutSettings;
}

private static async ValueTask<string?> GetCurrentSetting(string settingName, DatabaseConnection connection, CancellationToken cancellationToken)
{
using var getCurrentSettingCommand = connection.CreateCommand();

getCurrentSettingCommand.SetCommandText($"SELECT current_setting('{settingName}', 'true') AS {settingName};");

return (string?) await getCurrentSettingCommand.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
}

private static async ValueTask<bool> ShouldDefineSavePoint(DatabaseConnection connection)
{
// If the connection is internally-owned, we only define a save point if a transaction has been opened.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson I added the functionality for capturing and restoring the timeout settings, please review it.

You should know that I tried to remove the save point functionality entirely, but turns out that you can't do that. In case you get an error when attempting to acquire the lock, you must roll back the transaction (or the save point) if you want to run any other DB command, otherwise you will get an error about the aborted transaction.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case you get an error when attempting to acquire the lock, you must roll back the transaction (or the save point) if you want to run any other DB command, otherwise you will get an error about the aborted transaction.

What exactly are the cases impacted by this? Cancellation? Timeout? Something else? I'm wondering because it seems like these would also be issues for the new APIs, e.g. someone passes in a cancellation token or does TryAcquire with a timeout and it silently dooms the transaction that feels like it would be pretty undesirable and unexpected, particularly in the former case. Thoughts?

Copy link
Author

@Tzachi009 Tzachi009 Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transaction will not be doomed with the current code, because I didn't remove the save point functionality. They will be functional even if an exception will occur.

When I did try to remove it - it did impact the handling of the timeout (because it checks if the lock is held) and the new code that try to restore the previous timeout settings.

The RollBackTransactionTimeoutVariablesIfNeededAsync function still rolls back the save point for transactional locks in cases of any failure (when you get any exception - the lock is assumed as unobtained), and thus the transaction isn't aborted (see the if statement - if (needsSavePoint && !(acquired && UseTransactionScopedLock(connection)))). This was true even before my changes.

There is only one edge case that I can think of, where the save point roll back works against you in a way for transactional locks, but it probably existed before my changes (for internal connection's transactions) - when you get a timeout and still hold the lock because of the Postgres bug (lines 88-96), I assume the save point's roll back will release the lock beforehand, so you will get a timeout even though for a moment you acquired the lock every time, but this is an edge case that isn't really visible for the library's users.

Copy link
Owner

@madelson madelson Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I wasn't clear. What I'm trying to understand is how this interacts with the new transactional locking APIs that do not get to use the savepoint.

Under what conditions does the transaction get doomed? Can this happen under "normal" operation of the lock?

Said another way, if the savepoint is critical for all the existing lock APIs, how is removing it not a problem for the new APIs?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am actually slightly more confused by your questions now :).

I think that maybe you didn't notice, but I changed the code so the new transactional locking APIs will always use save points, in addition to the capture/restore settings logic (see line 227) - this is because otherwise the transaction will get doomed when you get any exception while trying to acquire the lock.

So, the bottom line - the transaction will never get doomed in the new APIs, since we keep using save points in this scenario.
Just wanted to let you know that save points can't be removed from the code, because otherwise transactions will indeed reach an aborted state.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the new transactional locking APIs will always use save points

Ok sorry didn't realize this. However, now I'm wondering why it is OK to "leak" a SAVEPOINT on every lock acquisition. I'm not an expert on Postgres transactions, but this seems like a side-effect that could cause issues for people. At minimum, subsequent lock acquisitions on the same connection would try to recreate the same named SAVEPOINT which probably wouldn't work (would be good to add a test case for subsequent lock acquisitions on the same transaction).

I wonder if the real solution here is to restrict the new APIs wrt timeout/cancellation functionality. If we only support TryAcquire with timeout 0 and no cancellation, then we dodge all the issues with setting leaks and savepoint leaks (we can modify the code so that for 0-timeout acquires it doesn't issue any SET LOCAL commands and therefore also doesn't need the SAVEPOINT. Thoughts?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your point regarding a "nested" save point with the same name was really good, so I wrote a test and checked it, and it seems to work as intended. I wondered why that is, so I went and read postgres' documentation (https://www.postgresql.org/docs/current/sql-savepoint.html):

savepoint_name
The name to give to the new savepoint. If savepoints with the same name already exist, they will be inaccessible until newer identically-named savepoints are released.

SQL requires a savepoint to be destroyed automatically when another savepoint with the same name is established. In PostgreSQL, the old savepoint is kept, though only the more recent one will be used when rolling back or releasing. (Releasing the newer savepoint with RELEASE SAVEPOINT will cause the older one to again become accessible to ROLLBACK TO SAVEPOINT and RELEASE SAVEPOINT.) Otherwise, SAVEPOINT is fully SQL conforming.

If you have any other concern regarding this scenario, please do tell.

As for the timeout/cancellation restrictions in the new API as a solution for the leaked save point - I am against it. It will make the new API less useful but more importantly it won't solve the issue. We can't really dodge it, if you get any exception from the DB, not a timeout or a cancellation specifically, the transaction will be dead without a save point roll back, so the restrictions won't be a safe option to prevent this scenario from happening altogether.

I think that bottom line is that the save point must be leaked in this case, because if you roll it back after an acquisition you don't have a lock, and if you don't use a save point then the transaction may be aborted (which is the least favorable option IMO), so I'm not seeing a way around it.
We can add a note about it in the new API methods, so users will be able to choose whether they can/should use the API.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know that leaking a savepoint does not cause a conflict with further acquisitions!

I do want to confirm that there isn't any other observable side-effect to leaking savepoints that could cause issues for users. I'm not finding anything from my quick reading, but curious if you know either way. I want to avoid a scenario where calling the API has some hidden side-effect that causes issues for users, leading to bug reports here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not aware of any other issue that may occur because of that. Do you think we should add a note about it in the new API?

if (!connection.IsExernallyOwned) { return connection.HasTransaction; }

// If the connection is externally-owned with an established transaction, we don't want to pollute it with a save point
// which we won't be able to release in case the lock will be acquired.
if (connection.HasTransaction) { return false; }
// If the connection is externally-owned with an established transaction,
// it means that the connection came through the transactional locking APIs (see PostgresDistributedLock.Extensions class).
if (connection.HasTransaction) { return true; }

// The externally-owned connection might still be part of a transaction that we can't see.
// In that case, the only real way to detect it is to begin a new one.
// This can only be the case if the externally-owned connection didn't came through the transactional locking APIs (see PostgresDistributedLock.Extensions class).
// In that case, the only real way to detect the transaction is to begin a new one.
try
{
await connection.BeginTransactionAsync().ConfigureAwait(false);
}
catch (InvalidOperationException)
{
// If we reached this point, it means the externally-owned connection has a transaction, therefore we need to define a save point.
return true;
}

await connection.DisposeTransactionAsync().ConfigureAwait(false);

// If we reached this point, it means the externally-owned connection has no transaction, therefore we can't define a save point.
return false;
}

private static async ValueTask RestoreTimeoutSettingsIfNeededAsync(CapturedTimeoutSettings? settings, DatabaseConnection connection)
{
// Settings is expected to be null in case we didn't try to acquire an externally-owned transaction-scoped lock.
if (settings is null) { return; }

using var restoreTimeoutSettingsCommand = connection.CreateCommand();

var commandText = new StringBuilder();

commandText.AppendLine($"SET LOCAL statement_timeout = {settings.Value.StatementTimeout};");
commandText.AppendLine($"SET LOCAL lock_timeout = {settings.Value.LockTimeout};");

restoreTimeoutSettingsCommand.SetCommandText(commandText.ToString());

await restoreTimeoutSettingsCommand.ExecuteNonQueryAsync(CancellationToken.None).ConfigureAwait(false);
}

public ValueTask ReleaseAsync(DatabaseConnection connection, string resourceName, object lockCookie) =>
this.ReleaseAsync(connection, new PostgresAdvisoryLockKey(resourceName), isTry: false);

Expand Down Expand Up @@ -239,8 +298,8 @@ private static string AddKeyParametersAndGetKeyArguments(DatabaseCommand command
}

private static bool UseTransactionScopedLock(DatabaseConnection connection) =>
// Transaction-scoped locking is supported on both externally-owned and internally-owned connections,
// as long as the connection has a transaction.
// Transaction-scoped locking is supported on internally-owned connections and externally-owned connections which explicitly have a transaction
// (meaning that the external connection came through the transactional locking APIs, see PostgresDistributedLock.Extensions class).
connection.HasTransaction;

private static string AddPGLocksFilterParametersAndGetFilterExpression(DatabaseCommand command, PostgresAdvisoryLockKey key)
Expand Down Expand Up @@ -271,4 +330,35 @@ private static string AddPGLocksFilterParametersAndGetFilterExpression(DatabaseC

return $"(l.classid = @{classIdParameter} AND l.objid = @{objIdParameter} AND l.objsubid = {objSubId})";
}

private readonly struct CapturedTimeoutSettings
{
public CapturedTimeoutSettings(string statementTimeout, string lockTimeout)
{
this.StatementTimeout = ParsePostgresTimeout(statementTimeout);
this.LockTimeout = ParsePostgresTimeout(lockTimeout);
}

public int StatementTimeout { get; }

public int LockTimeout { get; }

private static int ParsePostgresTimeout(string timeout)
{
if (timeout == "0") { return 0; } // This will be the case if the timeout is disabled.

// In any other case we need to extract the timeout from the string, since Postgres returns timeouts with their unit attached, e.g. "5000ms".
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need to parse the value? Can't we leave it as a string and just use the same string value when we restore the timeout?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortuanetly we can't just send a string. When you call SET LOCAL for a timeout Postgres expects a numeric value. If you send a string you will get an error - 42601: trailing junk after numeric literal at or near "{string_value}".

var timeoutOnlyDigits = string.Empty;

for (var i = 0; i < timeout.Length; ++i)
{
if (char.IsDigit(timeout[i]))
{
timeoutOnlyDigits += timeout[i];
}
}

return int.Parse(timeoutOnlyDigits);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,98 @@ public async Task TestWorksWithExternalTransaction()
transaction.Commit();
}
}

[Test]
public async Task TestTimeoutSettingsRestoredWithExternalTransaction()
{
bool isLockAcquired;

var key = new PostgresAdvisoryLockKey(0);

using var connection = new NpgsqlConnection(TestingPostgresDb.DefaultConnectionString);
await connection.OpenAsync();

using (var transaction = connection.BeginTransaction())
{
using var transactionCommand = connection.CreateCommand();
transactionCommand.Transaction = transaction;

transactionCommand.CommandText = "SET LOCAL statement_timeout = 1010;SET LOCAL lock_timeout = 510;";
await transactionCommand.ExecuteNonQueryAsync();

isLockAcquired = await PostgresDistributedLock.TryAcquireWithTransactionAsync(key, transaction).ConfigureAwait(false);
Assert.That(isLockAcquired, Is.True);

(await GetTimeoutAsync("statement_timeout", transactionCommand)).ShouldEqual("1010ms");
(await GetTimeoutAsync("lock_timeout", transactionCommand)).ShouldEqual("510ms");

isLockAcquired = PostgresDistributedLock.TryAcquireWithTransaction(key, transaction, TimeSpan.FromMilliseconds(10));
Assert.That(isLockAcquired, Is.False);

(await GetTimeoutAsync("statement_timeout", transactionCommand)).ShouldEqual("1010ms");
(await GetTimeoutAsync("lock_timeout", transactionCommand)).ShouldEqual("510ms");

transaction.Rollback();

(await GetTimeoutAsync("statement_timeout", transactionCommand)).ShouldEqual("0");
(await GetTimeoutAsync("lock_timeout", transactionCommand)).ShouldEqual("0");
}

using (var transaction = connection.BeginTransaction())
{
using var transactionCommand = connection.CreateCommand();
transactionCommand.Transaction = transaction;

transactionCommand.CommandText = "SET LOCAL statement_timeout = 1010;SET LOCAL lock_timeout = 510;";
await transactionCommand.ExecuteNonQueryAsync();

await PostgresDistributedLock.AcquireWithTransactionAsync(key, transaction).ConfigureAwait(false);

(await GetTimeoutAsync("statement_timeout", transactionCommand)).ShouldEqual("1010ms");
(await GetTimeoutAsync("lock_timeout", transactionCommand)).ShouldEqual("510ms");

Assert.Throws<TimeoutException>(() => PostgresDistributedLock.AcquireWithTransaction(key, transaction, TimeSpan.FromMilliseconds(10)));

(await GetTimeoutAsync("statement_timeout", transactionCommand)).ShouldEqual("1010ms");
(await GetTimeoutAsync("lock_timeout", transactionCommand)).ShouldEqual("510ms");

transaction.Commit();

(await GetTimeoutAsync("statement_timeout", transactionCommand)).ShouldEqual("0");
(await GetTimeoutAsync("lock_timeout", transactionCommand)).ShouldEqual("0");
}
}


[Test]
// Each lock acquisition creates the same named savepoint; this seems like it would create a conflict
// but it actually works fine in Postgres (see https://www.postgresql.org/docs/current/sql-savepoint.html)
public async Task TestWorksForMultipleLocksUnderTheSameConnectionWithExternalTransaction()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave a comment here along the lines of "Each acquisition creates the same named savepoint; this seems like it would create a conflict but it actually works fine in Postgres ".

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

{
var key1 = new PostgresAdvisoryLockKey(1);
var key2 = new PostgresAdvisoryLockKey(2);

using var connection = new NpgsqlConnection(TestingPostgresDb.DefaultConnectionString);
await connection.OpenAsync();

using (var transaction = connection.BeginTransaction())
{
var isFirstLockAcquired = await PostgresDistributedLock.TryAcquireWithTransactionAsync(key1, transaction).ConfigureAwait(false);
Assert.That(isFirstLockAcquired, Is.True);

var isSecondLockAcquired = await PostgresDistributedLock.TryAcquireWithTransactionAsync(key2, transaction).ConfigureAwait(false);
Assert.That(isSecondLockAcquired, Is.True);

isSecondLockAcquired = await PostgresDistributedLock.TryAcquireWithTransactionAsync(key2, transaction).ConfigureAwait(false);
Assert.That(isSecondLockAcquired, Is.False);

transaction.Rollback();
}
}

private static Task<object> GetTimeoutAsync(string timeoutName, NpgsqlCommand command)
{
command.CommandText = $"SHOW {timeoutName}";
return command.ExecuteScalarAsync()!;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,24 @@ public async Task TestInt64AndInt32PairKeyNamespacesAreDifferent()
Assert.That(handle2, Is.Not.Null);
}

[Test]
public async Task TestWorksWithInternalTransaction()
{
using var connection = new NpgsqlConnection(TestingPostgresDb.DefaultConnectionString);
await connection.OpenAsync();

using var command = connection.CreateCommand();

var transactionLock = new PostgresDistributedLock(new PostgresAdvisoryLockKey("InternTrans", true), TestingPostgresDb.DefaultConnectionString, o => o.UseTransaction());

using (var transactionLockHandle = await transactionLock.TryAcquireAsync(TimeSpan.FromSeconds(.3)))
{
(await GetTimeoutAsync("lock_timeout", command)).ShouldEqual("0");
}

(await GetTimeoutAsync("lock_timeout", command)).ShouldEqual("0");
}

[Test]
public async Task TestWorksWithAmbientTransaction()
{
Expand All @@ -79,26 +97,28 @@ public async Task TestWorksWithAmbientTransaction()

using (var timedOutHandle = await connectionLock.TryAcquireAsync(TimeSpan.FromSeconds(.2)))
{
(await GetTimeoutAsync("statement_timeout", transactionCommand)).ShouldEqual("1010ms");

Assert.That(timedOutHandle, Is.Null);
}

(await GetTimeoutAsync(transactionCommand)).ShouldEqual("1010ms");
(await GetTimeoutAsync("statement_timeout", transactionCommand)).ShouldEqual("1010ms");

var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(.3));
var task = connectionLock.AcquireAsync(cancellationToken: cancellationTokenSource.Token).AsTask();
task.ContinueWith(_ => { }).Wait(TimeSpan.FromSeconds(5)).ShouldEqual(true);
task.Status.ShouldEqual(TaskStatus.Canceled);

(await GetTimeoutAsync(transactionCommand)).ShouldEqual("1010ms");
(await GetTimeoutAsync("statement_timeout", transactionCommand)).ShouldEqual("1010ms");
}

using var connectionCommand = connection.CreateCommand();
(await GetTimeoutAsync(connectionCommand)).ShouldEqual("0");
(await GetTimeoutAsync("statement_timeout", connectionCommand)).ShouldEqual("0");
}

static Task<object> GetTimeoutAsync(NpgsqlCommand command)
{
command.CommandText = "SHOW statement_timeout";
return command.ExecuteScalarAsync()!;
}
private static Task<object> GetTimeoutAsync(string timeoutName, NpgsqlCommand command)
{
command.CommandText = $"SHOW {timeoutName}";
return command.ExecuteScalarAsync()!;
}
}