diff --git a/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs b/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs index f6a139322..db3478172 100644 --- a/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs +++ b/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs @@ -38,7 +38,6 @@ public PostgresqlMessageStore(DatabaseSettings databaseSettings, DurabilitySetti internal class PostgresqlMessageStore : MessageDatabase, IDatabaseSagaStorage { - private readonly string _deleteIncomingEnvelopesSql; private readonly string _deleteOutgoingEnvelopesSql; private readonly string _discardAndReassignOutgoingSql; private readonly string _findAtLargeEnvelopesSql; @@ -65,9 +64,6 @@ public PostgresqlMessageStore(DatabaseSettings databaseSettings, DurabilitySetti ILogger logger, IEnumerable sagaTypes) : base(databaseSettings, dataSource, settings, logger, new PostgresqlMigrator(), "public") { - _deleteIncomingEnvelopesSql = - $"delete from {SchemaName}.{DatabaseConstants.IncomingTable} WHERE id = ANY(@ids);"; - _reassignIncomingSql = $"update {SchemaName}.{DatabaseConstants.IncomingTable} set owner_id = @owner where id = ANY(@ids)"; _deleteOutgoingEnvelopesSql = @@ -196,41 +192,7 @@ public override async Task FetchCountsAsync() return counts; } - - public override async Task MoveToDeadLetterStorageAsync(Envelope envelope, Exception? exception) - { - if (HasDisposed) return; - - try - { - var builder = ToCommandBuilder(); - builder.Append(_deleteIncomingEnvelopesSql); - var param = (NpgsqlParameter)builder.AddNamedParameter("ids", DBNull.Value); - param.Value = new[] { envelope.Id }; - // ReSharper disable once BitwiseOperatorOnEnumWithoutFlags - param.NpgsqlDbType = NpgsqlDbType.Uuid | NpgsqlDbType.Array; - - DatabasePersistence.ConfigureDeadLetterCommands(envelope, exception, builder, this); - - var cmd = builder.Compile(); - await using var conn = await DataSource.OpenConnectionAsync(_cancellation); - cmd.Connection = conn; - try - { - await cmd.ExecuteNonQueryAsync(_cancellation); - } - finally - { - await conn.CloseAsync(); - } - } - catch (Exception e) - { - if (isExceptionFromDuplicateEnvelope(e)) return; - throw; - } - } - + public override async Task MarkDeadLetterEnvelopesAsReplayableAsync(Guid[] ids, string? tenantId = null) { var builder = ToCommandBuilder(); diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs index 24212a8e3..e6fc3fe13 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs @@ -38,7 +38,29 @@ public Task StoreIncomingAsync(DbTransaction tx, Envelope[] envelopes) return cmd.ExecuteNonQueryAsync(_cancellation); } - public abstract Task MoveToDeadLetterStorageAsync(Envelope envelope, Exception? exception); + public async Task MoveToDeadLetterStorageAsync(Envelope envelope, Exception? exception) + { + if (HasDisposed) return; + + try + { + var builder = ToCommandBuilder(); + builder.Append($"delete from {SchemaName}.{DatabaseConstants.IncomingTable} WHERE id = "); + builder.AppendParameter(envelope.Id); + builder.Append($" and {DatabaseConstants.ReceivedAt} = "); + builder.AppendParameter(envelope.Destination.ToString()); + builder.Append(';'); + + DatabasePersistence.ConfigureDeadLetterCommands(envelope, exception, builder, this); + + await executeCommandBatch(builder); + } + catch (Exception e) + { + if (isExceptionFromDuplicateEnvelope(e)) return; + throw; + } + } public Task MarkIncomingEnvelopeAsHandledAsync(Envelope envelope) { diff --git a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs index 979565b2b..583e34217 100644 --- a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs +++ b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs @@ -1,12 +1,10 @@ using System.Data; using System.Data.Common; -using System.Runtime.InteropServices; using JasperFx.Core; using JasperFx.Core.Reflection; using Microsoft.Data.SqlClient; using Microsoft.Extensions.Logging; using Weasel.Core; -using Weasel.Core.Migrations; using Weasel.SqlServer; using Weasel.SqlServer.Tables; using Wolverine.Logging; @@ -109,43 +107,6 @@ public override async Task FetchCountsAsync() /// public string DatabasePrincipal { get; set; } = "dbo"; - public override async Task MoveToDeadLetterStorageAsync(Envelope envelope, Exception? exception) - { - if (HasDisposed) return; - - var table = new DataTable(); - table.Columns.Add(new DataColumn("ID", typeof(Guid))); - table.Rows.Add(envelope.Id); - - var builder = ToCommandBuilder(); - - var list = builder.AddNamedParameter("IDLIST", table).As(); - list.SqlDbType = SqlDbType.Structured; - list.TypeName = $"{SchemaName}.EnvelopeIdList"; - - builder.Append(_moveToDeadLetterStorageSql); - - DatabasePersistence.ConfigureDeadLetterCommands(envelope, exception, builder, this); - - var cmd = builder.Compile(); - await using var conn = await DataSource.OpenConnectionAsync(_cancellation); - cmd.Connection = conn; - - try - { - await cmd.ExecuteNonQueryAsync(_cancellation); - } - catch (Exception e) - { - if (isExceptionFromDuplicateEnvelope(e)) return; - throw; - } - finally - { - await conn.CloseAsync(); - } - } - public override Task MarkDeadLetterEnvelopesAsReplayableAsync(Guid[] ids, string? tenantId = null) { var table = new DataTable();