Skip to content

Commit 324e17c

Browse files
authored
Merge pull request #222 from Tzachi009/Add-support-for-transaction-scoped-advisory-locks-with-external-db-connections
#213 Postgres: Add support for transaction-scoped advisory locks with external transactions
2 parents 24a862b + a56a7c7 commit 324e17c

File tree

4 files changed

+208
-15
lines changed

4 files changed

+208
-15
lines changed

src/DistributedLock.Postgres/PostgresAdvisoryLock.cs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,12 @@ private PostgresAdvisoryLock(bool isShared)
4646

4747
// Our acquire command will use SET LOCAL to set up statement timeouts. This lasts until the end
4848
// of the current transaction instead of just the current batch if we're in a transaction. To make sure
49-
// we don't leak those settings, in the case of a transaction we first set up a save point which we can
49+
// we don't leak those settings, in the case of a transaction, we first set up a save point which we can
5050
// later roll back (taking the settings changes with it but NOT the lock). Because we can't confidently
5151
// roll back a save point without knowing that it has been set up, we start the save point in its own
52-
// query before we try-catch
53-
var needsSavePoint = await HasTransactionAsync(connection).ConfigureAwait(false);
52+
// query before we try-catch.
53+
var needsSavePoint = await ShouldDefineSavePoint(connection).ConfigureAwait(false);
54+
5455
if (needsSavePoint)
5556
{
5657
using var setSavePointCommand = connection.CreateCommand();
@@ -124,9 +125,7 @@ private PostgresAdvisoryLock(bool isShared)
124125
async ValueTask RollBackTransactionTimeoutVariablesIfNeededAsync(bool acquired)
125126
{
126127
if (needsSavePoint
127-
// For transaction scoped locks, we can't roll back the save point on success because that will roll
128-
// back our hold on the lock. It's ok to "leak" the savepoint in that case because it's an internally-owned
129-
// transaction/connection and the savepoint will be cleaned up with the disposal of the transaction.
128+
// For transaction scoped locks, we can't roll back the save point on success because that will roll back our hold on the lock.
130129
&& !(acquired && UseTransactionScopedLock(connection)))
131130
{
132131
// attempt to clear the timeout variables we set
@@ -182,13 +181,17 @@ private DatabaseCommand CreateAcquireCommand(DatabaseConnection connection, Post
182181
return command;
183182
}
184183

185-
private static async ValueTask<bool> HasTransactionAsync(DatabaseConnection connection)
184+
private static async ValueTask<bool> ShouldDefineSavePoint(DatabaseConnection connection)
186185
{
187-
if (connection.HasTransaction) { return true; }
188-
if (!connection.IsExernallyOwned) { return false; }
186+
// If the connection is internally-owned, we only define a save point if a transaction has been opened.
187+
if (!connection.IsExernallyOwned) { return connection.HasTransaction; }
188+
189+
// If the connection is externally-owned with an established transaction, we don't want to pollute it with a save point
190+
// which we won't be able to release in case the lock will be acquired.
191+
if (connection.HasTransaction) { return false; }
189192

190-
// If the connection is externally owned, then it might be part of a transaction that we can't
191-
// see. In that case, the only real way to detect it is to begin a new one
193+
// The externally-owned connection might still be part of a transaction that we can't see.
194+
// In that case, the only real way to detect it is to begin a new one.
192195
try
193196
{
194197
await connection.BeginTransactionAsync().ConfigureAwait(false);
@@ -199,6 +202,7 @@ private static async ValueTask<bool> HasTransactionAsync(DatabaseConnection conn
199202
}
200203

201204
await connection.DisposeTransactionAsync().ConfigureAwait(false);
205+
202206
return false;
203207
}
204208

@@ -235,10 +239,9 @@ private static string AddKeyParametersAndGetKeyArguments(DatabaseCommand command
235239
}
236240

237241
private static bool UseTransactionScopedLock(DatabaseConnection connection) =>
238-
// This implementation (similar to what we do for SQL Server) is based on the fact that we only create transactions on
239-
// internally-owned connections when doing transaction-scoped locking, and we only support transaction-scoped locking on
240-
// internally-owned connections (since there's no explicit release).
241-
!connection.IsExernallyOwned && connection.HasTransaction;
242+
// Transaction-scoped locking is supported on both externally-owned and internally-owned connections,
243+
// as long as the connection has a transaction.
244+
connection.HasTransaction;
242245

243246
private static string AddPGLocksFilterParametersAndGetFilterExpression(DatabaseCommand command, PostgresAdvisoryLockKey key)
244247
{
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
using Medallion.Threading.Internal;
2+
using System.Data;
3+
4+
namespace Medallion.Threading.Postgres;
5+
6+
public partial class PostgresDistributedLock
7+
{
8+
/// <summary>
9+
/// Attempts to acquire a transaction-scoped advisory lock synchronously with an externally owned transaction. Usage:
10+
/// <code>
11+
/// var transaction = /* create a DB transaction */
12+
///
13+
/// var isLockAcquired = myLock.TryAcquireWithTransaction(..., transaction, ...)
14+
///
15+
/// if (isLockAcquired != null)
16+
/// {
17+
/// /* we have the lock! */
18+
///
19+
/// // Commit or Rollback the transaction, which in turn will release the lock
20+
/// }
21+
/// </code>
22+
///
23+
/// NOTE: The owner of the transaction is the responsible party for it - the owner must commit or rollback the transaction in order to release the acquired lock.
24+
/// </summary>
25+
/// <param name="key">The postgres advisory lock key which will be used to acquire the lock.</param>
26+
/// <param name="transaction">The externally owned transaction which will be used to acquire the lock. The owner of the transaction must commit or rollback it for the lock to be released.</param>
27+
/// <param name="timeout">How long to wait before giving up on the acquisition attempt. Defaults to 0.</param>
28+
/// <param name="cancellationToken">Specifies a token by which the wait can be canceled</param>
29+
/// <returns>Whether the lock has been acquired</returns>
30+
public static bool TryAcquireWithTransaction(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan timeout = default, CancellationToken cancellationToken = default) =>
31+
SyncViaAsync.Run(state => TryAcquireWithTransactionAsyncInternal(state.key, state.transaction, state.timeout, state.cancellationToken), (key, transaction, timeout, cancellationToken));
32+
33+
/// <summary>
34+
/// Acquires a transaction-scoped advisory lock synchronously, failing with <see cref="TimeoutException"/> if the attempt times out. Usage:
35+
/// <code>
36+
/// var transaction = /* create a DB transaction */
37+
///
38+
/// myLock.AcquireWithTransaction(..., transaction, ...)
39+
///
40+
/// /* we have the lock! */
41+
///
42+
/// // Commit or Rollback the transaction, which in turn will release the lock
43+
/// </code>
44+
///
45+
/// NOTE: The owner of the transaction is the responsible party for it - the owner must commit or rollback the transaction in order to release the acquired lock.
46+
/// </summary>
47+
/// <param name="key">The postgres advisory lock key which will be used to acquire the lock.</param>
48+
/// <param name="transaction">The externally owned transaction which will be used to acquire the lock. The owner of the transaction must commit or rollback it for the lock to be released.</param>
49+
/// <param name="timeout">How long to wait before giving up on the acquisition attempt. Defaults to <see cref="Timeout.InfiniteTimeSpan"/></param>
50+
/// <param name="cancellationToken">Specifies a token by which the wait can be canceled</param>
51+
public static void AcquireWithTransaction(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan? timeout = null, CancellationToken cancellationToken = default) =>
52+
SyncViaAsync.Run(state => AcquireWithTransactionAsyncInternal(state.key, state.transaction, state.timeout, state.cancellationToken), (key, transaction, timeout, cancellationToken));
53+
54+
/// <summary>
55+
/// Attempts to acquire a transaction-scoped advisory lock asynchronously with an externally owned transaction. Usage:
56+
/// <code>
57+
/// var transaction = /* create a DB transaction */
58+
///
59+
/// var isLockAcquired = await myLock.TryAcquireWithTransactionAsync(..., transaction, ...)
60+
///
61+
/// if (isLockAcquired != null)
62+
/// {
63+
/// /* we have the lock! */
64+
///
65+
/// // Commit or Rollback the transaction, which in turn will release the lock
66+
/// }
67+
/// </code>
68+
///
69+
/// NOTE: The owner of the transaction is the responsible party for it - the owner must commit or rollback the transaction in order to release the acquired lock.
70+
/// </summary>
71+
/// <param name="key">The postgres advisory lock key which will be used to acquire the lock.</param>
72+
/// <param name="transaction">The externally owned transaction which will be used to acquire the lock. The owner of the transaction must commit or rollback it for the lock to be released.</param>
73+
/// <param name="timeout">How long to wait before giving up on the acquisition attempt. Defaults to 0.</param>
74+
/// <param name="cancellationToken">Specifies a token by which the wait can be canceled</param>
75+
/// <returns>Whether the lock has been acquired</returns>
76+
public static ValueTask<bool> TryAcquireWithTransactionAsync(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan timeout = default, CancellationToken cancellationToken = default) =>
77+
TryAcquireWithTransactionAsyncInternal(key, transaction, timeout, cancellationToken);
78+
79+
/// <summary>
80+
/// Acquires a transaction-scoped advisory lock asynchronously, failing with <see cref="TimeoutException"/> if the attempt times out. Usage:
81+
/// <code>
82+
/// var transaction = /* create a DB transaction */
83+
///
84+
/// await myLock.AcquireWithTransaction(..., transaction, ...)
85+
///
86+
/// /* we have the lock! */
87+
///
88+
/// // Commit or Rollback the transaction, which in turn will release the lock
89+
/// </code>
90+
///
91+
/// NOTE: The owner of the transaction is the responsible party for it - the owner must commit or rollback the transaction in order to release the acquired lock.
92+
/// </summary>
93+
/// <param name="key">The postgres advisory lock key which will be used to acquire the lock.</param>
94+
/// <param name="transaction">The externally owned transaction which will be used to acquire the lock. The owner of the transaction must commit or rollback it for the lock to be released.</param>
95+
/// <param name="timeout">How long to wait before giving up on the acquisition attempt. Defaults to <see cref="Timeout.InfiniteTimeSpan"/></param>
96+
/// <param name="cancellationToken">Specifies a token by which the wait can be canceled</param>
97+
public static ValueTask AcquireWithTransactionAsync(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan? timeout = null, CancellationToken cancellationToken = default) =>
98+
AcquireWithTransactionAsyncInternal(key, transaction, timeout, cancellationToken);
99+
100+
internal static ValueTask<bool> TryAcquireWithTransactionAsyncInternal(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan timeout, CancellationToken cancellationToken)
101+
{
102+
if (key == null) { throw new ArgumentNullException(nameof(key)); }
103+
if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); }
104+
105+
return TryAcquireAsync();
106+
107+
async ValueTask<bool> TryAcquireAsync()
108+
{
109+
var connection = new PostgresDatabaseConnection(transaction);
110+
111+
await using (connection.ConfigureAwait(false))
112+
{
113+
var lockAcquiredCookie = await PostgresAdvisoryLock.ExclusiveLock.TryAcquireAsync(connection, key.ToString(), timeout, cancellationToken).ConfigureAwait(false);
114+
115+
return lockAcquiredCookie != null;
116+
}
117+
}
118+
}
119+
120+
internal static ValueTask AcquireWithTransactionAsyncInternal(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan? timeout, CancellationToken cancellationToken)
121+
{
122+
if (key == null) { throw new ArgumentNullException(nameof(key)); }
123+
if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); }
124+
125+
return AcquireAsync();
126+
127+
async ValueTask AcquireAsync()
128+
{
129+
var connection = new PostgresDatabaseConnection(transaction);
130+
131+
await using (connection.ConfigureAwait(false))
132+
{
133+
await PostgresAdvisoryLock.ExclusiveLock.TryAcquireAsync(connection, key.ToString(), timeout, cancellationToken).ThrowTimeoutIfNull().ConfigureAwait(false);
134+
}
135+
}
136+
}
137+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#nullable enable
2+
static Medallion.Threading.Postgres.PostgresDistributedLock.AcquireWithTransaction(Medallion.Threading.Postgres.PostgresAdvisoryLockKey key, System.Data.IDbTransaction! transaction, System.TimeSpan? timeout = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void
3+
static Medallion.Threading.Postgres.PostgresDistributedLock.AcquireWithTransactionAsync(Medallion.Threading.Postgres.PostgresAdvisoryLockKey key, System.Data.IDbTransaction! transaction, System.TimeSpan? timeout = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
4+
static Medallion.Threading.Postgres.PostgresDistributedLock.TryAcquireWithTransaction(Medallion.Threading.Postgres.PostgresAdvisoryLockKey key, System.Data.IDbTransaction! transaction, System.TimeSpan timeout = default(System.TimeSpan), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> bool
5+
static Medallion.Threading.Postgres.PostgresDistributedLock.TryAcquireWithTransactionAsync(Medallion.Threading.Postgres.PostgresAdvisoryLockKey key, System.Data.IDbTransaction! transaction, System.TimeSpan timeout = default(System.TimeSpan), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<bool>
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
using Medallion.Threading.Postgres;
2+
using Npgsql;
3+
using NUnit.Framework;
4+
5+
namespace Medallion.Threading.Tests.Postgres;
6+
7+
internal class PostgresDistributedLockExtensionsTest
8+
{
9+
[Test]
10+
public void TestValidatesConstructorArguments()
11+
{
12+
Assert.Throws<ArgumentNullException>(() => PostgresDistributedLock.TryAcquireWithTransaction(default, null!));
13+
Assert.ThrowsAsync<ArgumentNullException>(async () => await PostgresDistributedLock.TryAcquireWithTransactionAsync(default, null!).ConfigureAwait(false));
14+
Assert.Throws<ArgumentNullException>(() => PostgresDistributedLock.AcquireWithTransaction(default, null!));
15+
Assert.ThrowsAsync<ArgumentNullException>(async () => await PostgresDistributedLock.AcquireWithTransactionAsync(default, null!).ConfigureAwait(false));
16+
}
17+
18+
[Test]
19+
public async Task TestWorksWithExternalTransaction()
20+
{
21+
bool isLockAcquired;
22+
23+
var key = new PostgresAdvisoryLockKey(0);
24+
25+
using var connection = new NpgsqlConnection(TestingPostgresDb.DefaultConnectionString);
26+
await connection.OpenAsync();
27+
28+
using (var transaction = connection.BeginTransaction())
29+
{
30+
PostgresDistributedLock.AcquireWithTransaction(key, transaction);
31+
32+
isLockAcquired = PostgresDistributedLock.TryAcquireWithTransaction(key, transaction);
33+
Assert.That(isLockAcquired, Is.False);
34+
35+
transaction.Rollback();
36+
}
37+
38+
using (var transaction = connection.BeginTransaction())
39+
{
40+
isLockAcquired = await PostgresDistributedLock.TryAcquireWithTransactionAsync(key, transaction).ConfigureAwait(false);
41+
Assert.That(isLockAcquired, Is.True);
42+
43+
Assert.ThrowsAsync<TimeoutException>(async () => await PostgresDistributedLock.AcquireWithTransactionAsync(key, transaction, TimeSpan.FromMilliseconds(10)).ConfigureAwait(false));
44+
45+
transaction.Commit();
46+
}
47+
}
48+
}

0 commit comments

Comments
 (0)