Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions src/Persistence/Wolverine.Marten/FlushOutgoingMessagesOnCommit.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Marten;
using Marten.Services;
using Wolverine.Marten.Persistence.Operations;
using Wolverine.Persistence.Durability;
using Wolverine.Postgresql;
using Wolverine.RDBMS;
using Wolverine.Runtime;
Expand All @@ -25,27 +26,36 @@ public override Task BeforeSaveChangesAsync(IDocumentSession session, Cancellati
{
return Task.CompletedTask;
}

// Mark as handled!
if (_context.Envelope.Destination != null)
{
if (_context.Envelope.WasPersistedInInbox)
{
// GH-2155: When using ancillary stores (e.g., [MartenStore]), the incoming
// envelope was persisted in the main store by DurableReceiver. We should only
// mark it as handled within this Marten session if the session's store is
// the same store. Otherwise, let DurableReceiver handle it via the main store.
if (_context.Envelope.Store == null && _messageStore.Role == MessageStoreRole.Ancillary)
{
return Task.CompletedTask;
}

var keepUntil = DateTimeOffset.UtcNow.Add(_context.Runtime.Options.Durability.KeepAfterMessageHandling);
session.QueueSqlCommand($"update {_messageStore.IncomingFullName} set {DatabaseConstants.Status} = '{EnvelopeStatus.Handled}', {DatabaseConstants.KeepUntil} = ? where id = ?", keepUntil, _context.Envelope.Id);
_context.Envelope.Status = EnvelopeStatus.Handled;
}
// This was buggy in real usage.

// This was buggy in real usage.
// else
// {
// var envelope = Envelope.ForPersistedHandled(_context.Envelope, DateTimeOffset.UtcNow, _context.Runtime.Options.Durability);
// session.QueueOperation(new StoreIncomingEnvelope(_messageStore.IncomingFullName, envelope));
// }


}

return Task.CompletedTask;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
using IntegrationTests;
using JasperFx.Core;
using JasperFx.Resources;
using Marten;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.Attributes;
using Wolverine.Marten;
using Wolverine.Persistence.Durability;
using Wolverine.Runtime;
using Wolverine.Util;
using Wolverine.Tracking;
using Xunit;

namespace Wolverine.RabbitMQ.Tests.Bugs;

public interface IAncillaryStore2155 : IDocumentStore;

public record AncillaryMessage2155(Guid Id);

[MartenStore(typeof(IAncillaryStore2155))]
public static class AncillaryMessage2155Handler
{
[Transactional]
public static void Handle(AncillaryMessage2155 message, IDocumentSession session)
{
// Just touch the session so the transactional middleware commits
session.Store(new AncillaryDoc2155 { Id = message.Id });
}
}

public class AncillaryDoc2155
{
public Guid Id { get; set; }
}

public class Bug_2155_ancillary_store_inbox_persistence : IAsyncLifetime
{
private IHost _host;
private string _queueName;

public async Task InitializeAsync()
{
_queueName = RabbitTesting.NextQueueName();

_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Durability.Mode = DurabilityMode.Solo;

// Main Marten store
opts.Services.AddMarten(m =>
{
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "bug2155_main";
m.DisableNpgsqlLogging = true;
}).IntegrateWithWolverine(x => x.MessageStorageSchemaName = "bug2155_main");

// Ancillary Marten store on same database but different schema
opts.Services.AddMartenStore<IAncillaryStore2155>(m =>
{
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "bug2155_ancillary";
m.DisableNpgsqlLogging = true;
}).IntegrateWithWolverine(x => x.SchemaName = "bug2155_ancillary");

opts.UseRabbitMq().AutoProvision().AutoPurgeOnStartup();

opts.PublishMessage<AncillaryMessage2155>().ToRabbitQueue(_queueName).UseDurableOutbox();
opts.ListenToRabbitQueue(_queueName).UseDurableInbox();

opts.Policies.AutoApplyTransactions();

opts.Services.AddResourceSetupOnStartup();
}).StartAsync();

await _host.ResetResourceState();
}

public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
}

[Fact]
public async Task incoming_envelope_should_be_marked_as_handled_in_main_store()
{
var message = new AncillaryMessage2155(Guid.NewGuid());

await _host
.TrackActivity()
.IncludeExternalTransports()
.SendMessageAndWaitAsync(message);

// Give a moment for the async mark-as-handled to complete
await Task.Delay(500);

// The main store should have the envelope marked as Handled (not stuck as Incoming)
var runtime = _host.GetRuntime();
var incoming = await runtime.Storage.Admin.AllIncomingAsync();
var stuck = incoming.Where(x =>
x.MessageType == typeof(AncillaryMessage2155).ToMessageTypeName()
&& x.Status == EnvelopeStatus.Incoming).ToList();

stuck.ShouldBeEmpty("Incoming envelopes should not be stuck in 'Incoming' status in the main store");
}
}
Loading