Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions src/Weasel.Postgresql.Tests/PostgresqlMigratorConcurrencyRetryTests.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
using System.Data;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
using Npgsql;
using Shouldly;
using Weasel.Postgresql;
Expand Down Expand Up @@ -77,4 +81,128 @@ public void unrelated_sqlstates_are_not_transient(string? sqlState)
PostgresqlMigrator.IsTransientCatalogConcurrency(sqlState, "tuple concurrently updated")
.ShouldBeFalse();
}

// Reopen-on-retry behavior: a transient PostgreSQL error can move the
// Npgsql connection to Closed or Broken. The retry has to put it back to
// Open before re-invoking the command; otherwise the next call throws
// "Connection is not open" (an InvalidOperationException, NOT a
// PostgresException — so the retry filter wouldn't catch it again).

[Fact]
public async Task ensure_connection_open_is_a_noop_when_already_open()
{
var conn = new RetryStubConnection(ConnectionState.Open);
var cmd = new RetryStubCommand(conn);

await PostgresqlMigrator.EnsureConnectionOpenAsync(cmd, CancellationToken.None);

conn.OpenCallCount.ShouldBe(0);
conn.CloseCallCount.ShouldBe(0);
}

[Fact]
public async Task ensure_connection_open_reopens_a_closed_connection()
{
var conn = new RetryStubConnection(ConnectionState.Closed);
var cmd = new RetryStubCommand(conn);

await PostgresqlMigrator.EnsureConnectionOpenAsync(cmd, CancellationToken.None);

conn.OpenCallCount.ShouldBe(1);
conn.CloseCallCount.ShouldBe(0); // No need to close before open
}

[Fact]
public async Task ensure_connection_open_closes_then_reopens_a_broken_connection()
{
// OpenAsync on a Broken connection throws — must Close first.
var conn = new RetryStubConnection(ConnectionState.Broken);
var cmd = new RetryStubCommand(conn);

await PostgresqlMigrator.EnsureConnectionOpenAsync(cmd, CancellationToken.None);

conn.CloseCallCount.ShouldBe(1);
conn.OpenCallCount.ShouldBe(1);
conn.LastOperationOrder.ShouldBe(new[] { "Close", "Open" });
}

[Fact]
public async Task ensure_connection_open_does_nothing_when_command_has_no_connection()
{
var cmd = new RetryStubCommand(connection: null);

// No throw — the helper short-circuits on a null Connection
// (DbCommand.Connection is nullable; defensive guard against
// pathological callers).
await PostgresqlMigrator.EnsureConnectionOpenAsync(cmd, CancellationToken.None);
}

#region Test stubs — minimal DbConnection / DbCommand mocks for the reopen-rules test

private sealed class RetryStubConnection: DbConnection
{
private ConnectionState _state;
public int OpenCallCount { get; private set; }
public int CloseCallCount { get; private set; }
public System.Collections.Generic.List<string> LastOperationOrder { get; } = new();

public RetryStubConnection(ConnectionState initialState)
{
_state = initialState;
}

public override ConnectionState State => _state;

public override void Open()
{
OpenCallCount++;
LastOperationOrder.Add("Open");
_state = ConnectionState.Open;
}

public override void Close()
{
CloseCallCount++;
LastOperationOrder.Add("Close");
_state = ConnectionState.Closed;
}

// The rest of DbConnection — none of these are exercised by
// EnsureConnectionOpenAsync, throw if anything calls them so a
// future change to that helper that reaches further into the
// connection surface fails loud rather than silently.
public override string ConnectionString { get; set; } = string.Empty;
public override string Database => string.Empty;
public override string DataSource => string.Empty;
public override string ServerVersion => string.Empty;
public override void ChangeDatabase(string databaseName) => throw new System.NotSupportedException();
protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel) => throw new System.NotSupportedException();
protected override DbCommand CreateDbCommand() => throw new System.NotSupportedException();
}

private sealed class RetryStubCommand: DbCommand
{
public RetryStubCommand(DbConnection? connection)
{
DbConnection = connection;
}

protected override DbConnection? DbConnection { get; set; }
public override string CommandText { get; set; } = string.Empty;
public override int CommandTimeout { get; set; }
public override CommandType CommandType { get; set; }
public override bool DesignTimeVisible { get; set; }
public override UpdateRowSource UpdatedRowSource { get; set; }
protected override DbParameterCollection DbParameterCollection => throw new System.NotSupportedException();
protected override DbTransaction? DbTransaction { get; set; }

public override void Cancel() => throw new System.NotSupportedException();
public override int ExecuteNonQuery() => throw new System.NotSupportedException();
public override object? ExecuteScalar() => throw new System.NotSupportedException();
public override void Prepare() => throw new System.NotSupportedException();
protected override DbParameter CreateDbParameter() => throw new System.NotSupportedException();
protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior) => throw new System.NotSupportedException();
}

#endregion
}
38 changes: 38 additions & 0 deletions src/Weasel.Postgresql/PostgresqlMigrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,48 @@ private static async Task executeWithConcurrencyRetryAsync(DbCommand cmd, Cancel
{
var delayMs = (50 * attempt) + Random.Shared.Next(0, 50);
await Task.Delay(delayMs, ct).ConfigureAwait(false);

// Some transient PostgreSQL errors (40P01 deadlock_detected and
// XX000 "tuple concurrently updated" in particular) leave the
// underlying Npgsql connection in a Closed or Broken state —
// Npgsql moves a connection to Broken when the server-side
// error has aborted the session. Without this guard, the next
// ExecuteNonQueryAsync on the same cmd throws
// InvalidOperationException("Connection is not open"), which
// falls out of the catch filter (not a PostgresException) and
// surfaces as a hard migration failure — defeating the retry.
//
// Repro before this guard: recurring intermittent failure on
// EventSourcingTests.end_to_end_event_capture_and_fetching_the_stream.
// query_before_saving(tenancyStyle: Conjoined) in JasperFx/marten
// PRs #4576, #4578, #4582, #4584 — all hit this exact path.
await EnsureConnectionOpenAsync(cmd, ct).ConfigureAwait(false);
}
}
}

/// <summary>
/// Re-open a <see cref="DbCommand"/>'s connection if a previous
/// transient failure has left it in a non-<see cref="ConnectionState.Open"/>
/// state. A <see cref="ConnectionState.Broken"/> connection must be
/// closed first — <c>OpenAsync</c> on a Broken connection throws. Pure
/// over <see cref="DbCommand"/> + <see cref="ConnectionState"/> so the
/// reopen rules can be exercised with a fake command in
/// <c>PostgresqlMigratorConcurrencyRetryTests</c>.
/// </summary>
internal static async Task EnsureConnectionOpenAsync(DbCommand cmd, CancellationToken ct)
{
if (cmd.Connection is null) return;
if (cmd.Connection.State == System.Data.ConnectionState.Open) return;

if (cmd.Connection.State == System.Data.ConnectionState.Broken)
{
await cmd.Connection.CloseAsync().ConfigureAwait(false);
}

await cmd.Connection.OpenAsync(ct).ConfigureAwait(false);
}

/// <summary>
/// True for the PostgreSQL SQLSTATEs that signal a transient, retry-safe
/// concurrency conflict while applying idempotent migration DDL:
Expand Down
Loading