Skip to content

Commit

Permalink
Moved some more operations on the message databases up to the message…
Browse files Browse the repository at this point in the history
… level and added Destination to the WHERE clauses
  • Loading branch information
jeremydmiller committed Jan 18, 2025
1 parent c9f2b7e commit 5783a83
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 79 deletions.
40 changes: 1 addition & 39 deletions src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public PostgresqlMessageStore(DatabaseSettings databaseSettings, DurabilitySetti

internal class PostgresqlMessageStore : MessageDatabase<NpgsqlConnection>, IDatabaseSagaStorage
{
private readonly string _deleteIncomingEnvelopesSql;
private readonly string _deleteOutgoingEnvelopesSql;
private readonly string _discardAndReassignOutgoingSql;
private readonly string _findAtLargeEnvelopesSql;
Expand All @@ -65,9 +64,6 @@ public PostgresqlMessageStore(DatabaseSettings databaseSettings, DurabilitySetti
ILogger<PostgresqlMessageStore> logger, IEnumerable<SagaTableDefinition> sagaTypes) : base(databaseSettings, dataSource,
settings, logger, new PostgresqlMigrator(), "public")
{
_deleteIncomingEnvelopesSql =
$"delete from {SchemaName}.{DatabaseConstants.IncomingTable} WHERE id = ANY(@ids);";

_reassignIncomingSql =
$"update {SchemaName}.{DatabaseConstants.IncomingTable} set owner_id = @owner where id = ANY(@ids)";
_deleteOutgoingEnvelopesSql =
Expand Down Expand Up @@ -196,41 +192,7 @@ public override async Task<PersistedCounts> FetchCountsAsync()

return counts;
}

public override async Task MoveToDeadLetterStorageAsync(Envelope envelope, Exception? exception)
{
if (HasDisposed) return;

try
{
var builder = ToCommandBuilder();
builder.Append(_deleteIncomingEnvelopesSql);
var param = (NpgsqlParameter)builder.AddNamedParameter("ids", DBNull.Value);
param.Value = new[] { envelope.Id };
// ReSharper disable once BitwiseOperatorOnEnumWithoutFlags
param.NpgsqlDbType = NpgsqlDbType.Uuid | NpgsqlDbType.Array;

DatabasePersistence.ConfigureDeadLetterCommands(envelope, exception, builder, this);

var cmd = builder.Compile();
await using var conn = await DataSource.OpenConnectionAsync(_cancellation);
cmd.Connection = conn;
try
{
await cmd.ExecuteNonQueryAsync(_cancellation);
}
finally
{
await conn.CloseAsync();
}
}
catch (Exception e)
{
if (isExceptionFromDuplicateEnvelope(e)) return;
throw;
}
}


public override async Task MarkDeadLetterEnvelopesAsReplayableAsync(Guid[] ids, string? tenantId = null)
{
var builder = ToCommandBuilder();
Expand Down
24 changes: 23 additions & 1 deletion src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,29 @@ public Task StoreIncomingAsync(DbTransaction tx, Envelope[] envelopes)
return cmd.ExecuteNonQueryAsync(_cancellation);
}

public abstract Task MoveToDeadLetterStorageAsync(Envelope envelope, Exception? exception);
public async Task MoveToDeadLetterStorageAsync(Envelope envelope, Exception? exception)
{
if (HasDisposed) return;

try
{
var builder = ToCommandBuilder();
builder.Append($"delete from {SchemaName}.{DatabaseConstants.IncomingTable} WHERE id = ");
builder.AppendParameter(envelope.Id);
builder.Append($" and {DatabaseConstants.ReceivedAt} = ");
builder.AppendParameter(envelope.Destination.ToString());
builder.Append(';');

DatabasePersistence.ConfigureDeadLetterCommands(envelope, exception, builder, this);

await executeCommandBatch(builder);
}
catch (Exception e)
{
if (isExceptionFromDuplicateEnvelope(e)) return;
throw;
}
}

public Task MarkIncomingEnvelopeAsHandledAsync(Envelope envelope)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
using System.Data;
using System.Data.Common;
using System.Runtime.InteropServices;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;
using Weasel.Core;
using Weasel.Core.Migrations;
using Weasel.SqlServer;
using Weasel.SqlServer.Tables;
using Wolverine.Logging;
Expand Down Expand Up @@ -109,43 +107,6 @@ public override async Task<PersistedCounts> FetchCountsAsync()
/// </summary>
public string DatabasePrincipal { get; set; } = "dbo";

public override async Task MoveToDeadLetterStorageAsync(Envelope envelope, Exception? exception)
{
if (HasDisposed) return;

var table = new DataTable();
table.Columns.Add(new DataColumn("ID", typeof(Guid)));
table.Rows.Add(envelope.Id);

var builder = ToCommandBuilder();

var list = builder.AddNamedParameter("IDLIST", table).As<SqlParameter>();
list.SqlDbType = SqlDbType.Structured;
list.TypeName = $"{SchemaName}.EnvelopeIdList";

builder.Append(_moveToDeadLetterStorageSql);

DatabasePersistence.ConfigureDeadLetterCommands(envelope, exception, builder, this);

var cmd = builder.Compile();
await using var conn = await DataSource.OpenConnectionAsync(_cancellation);
cmd.Connection = conn;

try
{
await cmd.ExecuteNonQueryAsync(_cancellation);
}
catch (Exception e)
{
if (isExceptionFromDuplicateEnvelope(e)) return;
throw;
}
finally
{
await conn.CloseAsync();
}
}

public override Task MarkDeadLetterEnvelopesAsReplayableAsync(Guid[] ids, string? tenantId = null)
{
var table = new DataTable();
Expand Down

0 comments on commit 5783a83

Please sign in to comment.