diff --git a/src/Weasel.Postgresql.Tests/PostgresqlMigratorConcurrencyRetryTests.cs b/src/Weasel.Postgresql.Tests/PostgresqlMigratorConcurrencyRetryTests.cs index 98738bd..f2494a2 100644 --- a/src/Weasel.Postgresql.Tests/PostgresqlMigratorConcurrencyRetryTests.cs +++ b/src/Weasel.Postgresql.Tests/PostgresqlMigratorConcurrencyRetryTests.cs @@ -1,3 +1,7 @@ +using System.Data; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; using Npgsql; using Shouldly; using Weasel.Postgresql; @@ -77,4 +81,128 @@ public void unrelated_sqlstates_are_not_transient(string? sqlState) PostgresqlMigrator.IsTransientCatalogConcurrency(sqlState, "tuple concurrently updated") .ShouldBeFalse(); } + + // Reopen-on-retry behavior: a transient PostgreSQL error can move the + // Npgsql connection to Closed or Broken. The retry has to put it back to + // Open before re-invoking the command; otherwise the next call throws + // "Connection is not open" (an InvalidOperationException, NOT a + // PostgresException — so the retry filter wouldn't catch it again). + + [Fact] + public async Task ensure_connection_open_is_a_noop_when_already_open() + { + var conn = new RetryStubConnection(ConnectionState.Open); + var cmd = new RetryStubCommand(conn); + + await PostgresqlMigrator.EnsureConnectionOpenAsync(cmd, CancellationToken.None); + + conn.OpenCallCount.ShouldBe(0); + conn.CloseCallCount.ShouldBe(0); + } + + [Fact] + public async Task ensure_connection_open_reopens_a_closed_connection() + { + var conn = new RetryStubConnection(ConnectionState.Closed); + var cmd = new RetryStubCommand(conn); + + await PostgresqlMigrator.EnsureConnectionOpenAsync(cmd, CancellationToken.None); + + conn.OpenCallCount.ShouldBe(1); + conn.CloseCallCount.ShouldBe(0); // No need to close before open + } + + [Fact] + public async Task ensure_connection_open_closes_then_reopens_a_broken_connection() + { + // OpenAsync on a Broken connection throws — must Close first. + var conn = new RetryStubConnection(ConnectionState.Broken); + var cmd = new RetryStubCommand(conn); + + await PostgresqlMigrator.EnsureConnectionOpenAsync(cmd, CancellationToken.None); + + conn.CloseCallCount.ShouldBe(1); + conn.OpenCallCount.ShouldBe(1); + conn.LastOperationOrder.ShouldBe(new[] { "Close", "Open" }); + } + + [Fact] + public async Task ensure_connection_open_does_nothing_when_command_has_no_connection() + { + var cmd = new RetryStubCommand(connection: null); + + // No throw — the helper short-circuits on a null Connection + // (DbCommand.Connection is nullable; defensive guard against + // pathological callers). + await PostgresqlMigrator.EnsureConnectionOpenAsync(cmd, CancellationToken.None); + } + + #region Test stubs — minimal DbConnection / DbCommand mocks for the reopen-rules test + + private sealed class RetryStubConnection: DbConnection + { + private ConnectionState _state; + public int OpenCallCount { get; private set; } + public int CloseCallCount { get; private set; } + public System.Collections.Generic.List LastOperationOrder { get; } = new(); + + public RetryStubConnection(ConnectionState initialState) + { + _state = initialState; + } + + public override ConnectionState State => _state; + + public override void Open() + { + OpenCallCount++; + LastOperationOrder.Add("Open"); + _state = ConnectionState.Open; + } + + public override void Close() + { + CloseCallCount++; + LastOperationOrder.Add("Close"); + _state = ConnectionState.Closed; + } + + // The rest of DbConnection — none of these are exercised by + // EnsureConnectionOpenAsync, throw if anything calls them so a + // future change to that helper that reaches further into the + // connection surface fails loud rather than silently. + public override string ConnectionString { get; set; } = string.Empty; + public override string Database => string.Empty; + public override string DataSource => string.Empty; + public override string ServerVersion => string.Empty; + public override void ChangeDatabase(string databaseName) => throw new System.NotSupportedException(); + protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel) => throw new System.NotSupportedException(); + protected override DbCommand CreateDbCommand() => throw new System.NotSupportedException(); + } + + private sealed class RetryStubCommand: DbCommand + { + public RetryStubCommand(DbConnection? connection) + { + DbConnection = connection; + } + + protected override DbConnection? DbConnection { get; set; } + public override string CommandText { get; set; } = string.Empty; + public override int CommandTimeout { get; set; } + public override CommandType CommandType { get; set; } + public override bool DesignTimeVisible { get; set; } + public override UpdateRowSource UpdatedRowSource { get; set; } + protected override DbParameterCollection DbParameterCollection => throw new System.NotSupportedException(); + protected override DbTransaction? DbTransaction { get; set; } + + public override void Cancel() => throw new System.NotSupportedException(); + public override int ExecuteNonQuery() => throw new System.NotSupportedException(); + public override object? ExecuteScalar() => throw new System.NotSupportedException(); + public override void Prepare() => throw new System.NotSupportedException(); + protected override DbParameter CreateDbParameter() => throw new System.NotSupportedException(); + protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior) => throw new System.NotSupportedException(); + } + + #endregion } diff --git a/src/Weasel.Postgresql/PostgresqlMigrator.cs b/src/Weasel.Postgresql/PostgresqlMigrator.cs index 7a729b6..955af1d 100644 --- a/src/Weasel.Postgresql/PostgresqlMigrator.cs +++ b/src/Weasel.Postgresql/PostgresqlMigrator.cs @@ -184,10 +184,48 @@ private static async Task executeWithConcurrencyRetryAsync(DbCommand cmd, Cancel { var delayMs = (50 * attempt) + Random.Shared.Next(0, 50); await Task.Delay(delayMs, ct).ConfigureAwait(false); + + // Some transient PostgreSQL errors (40P01 deadlock_detected and + // XX000 "tuple concurrently updated" in particular) leave the + // underlying Npgsql connection in a Closed or Broken state — + // Npgsql moves a connection to Broken when the server-side + // error has aborted the session. Without this guard, the next + // ExecuteNonQueryAsync on the same cmd throws + // InvalidOperationException("Connection is not open"), which + // falls out of the catch filter (not a PostgresException) and + // surfaces as a hard migration failure — defeating the retry. + // + // Repro before this guard: recurring intermittent failure on + // EventSourcingTests.end_to_end_event_capture_and_fetching_the_stream. + // query_before_saving(tenancyStyle: Conjoined) in JasperFx/marten + // PRs #4576, #4578, #4582, #4584 — all hit this exact path. + await EnsureConnectionOpenAsync(cmd, ct).ConfigureAwait(false); } } } + /// + /// Re-open a 's connection if a previous + /// transient failure has left it in a non- + /// state. A connection must be + /// closed first — OpenAsync on a Broken connection throws. Pure + /// over + so the + /// reopen rules can be exercised with a fake command in + /// PostgresqlMigratorConcurrencyRetryTests. + /// + internal static async Task EnsureConnectionOpenAsync(DbCommand cmd, CancellationToken ct) + { + if (cmd.Connection is null) return; + if (cmd.Connection.State == System.Data.ConnectionState.Open) return; + + if (cmd.Connection.State == System.Data.ConnectionState.Broken) + { + await cmd.Connection.CloseAsync().ConfigureAwait(false); + } + + await cmd.Connection.OpenAsync(ct).ConfigureAwait(false); + } + /// /// True for the PostgreSQL SQLSTATEs that signal a transient, retry-safe /// concurrency conflict while applying idempotent migration DDL: