Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
using IntegrationTests;
using JasperFx.Core;
using JasperFx.Resources;
using Marten;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Npgsql;
using Shouldly;
using Weasel.Postgresql;
using Weasel.Postgresql.Migrations;
using Wolverine;
using Wolverine.Marten;
using Wolverine.Tracking;
using Xunit;

namespace MartenTests.Bugs;

#region Test Infrastructure

public interface IAncillaryStore2669 : IDocumentStore;

public record DispatchAncillaryWorkFromMain2669(Guid Id);

public record AncillaryWorkFromMain2669(Guid Id);

public static class DispatchAncillaryWorkFromMain2669Handler
{
public static AncillaryWorkFromMain2669 Handle(DispatchAncillaryWorkFromMain2669 message)
{
return new AncillaryWorkFromMain2669(message.Id);
}
}

[MartenStore(typeof(IAncillaryStore2669))]
public static class AncillaryWorkFromMain2669Handler
{
public static IMartenOp Handle(AncillaryWorkFromMain2669 message)
{
return MartenOps.Store(new AncillaryWorkDocument2669 { Id = message.Id });
}
}

public class AncillaryWorkDocument2669
{
public Guid Id { get; set; }
}

#endregion

/// <summary>
/// Reproduces https://github.com/JasperFx/wolverine/issues/2669.
///
/// A durable local message published from a main-store handler can be handled
/// transactionally by an ancillary Marten store. The receiving handler's
/// ancillary store should own the inbox row, even when the envelope was
/// originally stamped by the publishing context.
/// </summary>
public class Bug_2669_ancillary_marten_store_local_message_from_main_store : IAsyncLifetime
{
private IHost _host = null!;
private string _mainConnectionString = null!;
private string _ancillaryConnectionString = null!;
private static readonly string TargetFrameworkSuffix = AppContext.TargetFrameworkName?
.Split("Version=v")
.LastOrDefault()?
.Replace(".", "_")
.ToLowerInvariant() ?? "default";

public async Task InitializeAsync()
{
await using var conn = new NpgsqlConnection(Servers.PostgresConnectionString);
await conn.OpenAsync();

_mainConnectionString = await CreateDatabaseIfNotExists(conn, $"bug_ancillary_from_main_{TargetFrameworkSuffix}");
_ancillaryConnectionString = await CreateDatabaseIfNotExists(conn, $"bug_ancillary_from_main_refs_{TargetFrameworkSuffix}");

await ResetSchema(_mainConnectionString, "public");
await ResetSchema(_ancillaryConnectionString, "public");
await ResetSchema(_ancillaryConnectionString, "organizations");

_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Durability.Mode = DurabilityMode.Solo;

opts.Services.AddMarten(m =>
{
m.Connection(_mainConnectionString);
m.DatabaseSchemaName = "public";
m.DisableNpgsqlLogging = true;
}).IntegrateWithWolverine();

opts.Services.AddMartenStore<IAncillaryStore2669>(m =>
{
m.Connection(_ancillaryConnectionString);
m.DatabaseSchemaName = "organizations";
m.DisableNpgsqlLogging = true;
}).IntegrateWithWolverine(x => x.SchemaName = "organizations");

opts.Policies.AutoApplyTransactions();
opts.Policies.UseDurableLocalQueues();

opts.Discovery.DisableConventionalDiscovery()
.IncludeType(typeof(DispatchAncillaryWorkFromMain2669Handler))
.IncludeType(typeof(AncillaryWorkFromMain2669Handler));

opts.Services.AddResourceSetupOnStartup();
}).StartAsync();
}

public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
}

[Fact]
public async Task durable_local_message_from_main_store_can_be_handled_by_ancillary_marten_store()
{
var id = Guid.NewGuid();

await _host
.TrackActivity()
.Timeout(30.Seconds())
.InvokeMessageAndWaitAsync(new DispatchAncillaryWorkFromMain2669(id));

await using var session = _host.Services
.GetRequiredService<IAncillaryStore2669>()
.LightweightSession();

var document = await session.LoadAsync<AncillaryWorkDocument2669>(id);
document.ShouldNotBeNull();
}

private static async Task<string> CreateDatabaseIfNotExists(NpgsqlConnection conn, string databaseName)
{
var builder = new NpgsqlConnectionStringBuilder(Servers.PostgresConnectionString);

var exists = await conn.DatabaseExists(databaseName);
if (!exists)
{
try
{
await new DatabaseSpecification().BuildDatabase(conn, databaseName);
}
catch (PostgresException e) when (e.SqlState == PostgresErrorCodes.UniqueViolation)
{
// Parallel target framework runs can race to create the same database.
}
}

builder.Database = databaseName;

return builder.ConnectionString;
}

private static async Task ResetSchema(string connectionString, string schemaName)
{
await using var conn = new NpgsqlConnection(connectionString);
await conn.OpenAsync();

await conn.DropSchemaAsync(schemaName);

if (schemaName == "public")
{
await using var cmd = conn.CreateCommand();
cmd.CommandText = "create schema if not exists public;";
await cmd.ExecuteNonQueryAsync();
}
}
}
25 changes: 23 additions & 2 deletions src/Persistence/Wolverine.Marten/FlushOutgoingMessagesOnCommit.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,29 @@ public override Task BeforeSaveChangesAsync(IDocumentSession session, Cancellati
{
if (_context.Envelope.Store is PostgresqlMessageStore envelopeStore)
{
// Envelope was routed to a specific store (possibly this one)
incomingTableName = envelopeStore.IncomingFullName;
// Envelope was routed to a specific store. Only fold the
// handled-update into THIS Marten transaction if envelopeStore
// sits on the same connection / schema as _messageStore — the
// session is open against _messageStore's database, so an
// UPDATE against a different database's inbox table simply
// can't run here. Compare by Uri (the existing same-database
// heuristic in the envelope.Store==null branch below uses
// the same approach), which keeps this from depending on
// IMessageStore.Id and matches the local notion of "same
// store" the rest of this method already uses.
//
// Cross-store envelopes (e.g. a main-store handler dispatches
// a local message to an ancillary-store handler — GH-2669)
// are skipped here so the envelope's owning store handles
// the mark-handled separately via its own connection.
if (envelopeStore.Uri == _messageStore.Uri)
{
incomingTableName = envelopeStore.IncomingFullName;
}
else
{
return Task.CompletedTask;
}
}
else if (_context.Envelope.Store == null)
{
Expand Down
7 changes: 5 additions & 2 deletions src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,15 @@ public DurableReceiver(Endpoint endpoint, IWolverineRuntime runtime, IHandlerPip
/// <summary>
/// If the handler for this message type targets an ancillary store on a
/// different database, set envelope.Store so that the DelegatingMessageInbox
/// persists it in the correct store for transactional atomicity.
/// persists it in the correct store for transactional atomicity. The
/// receiving handler's store association wins over the publishing context's
/// store — see the equivalent method on
/// <see cref="Wolverine.Transports.Local.DurableLocalQueue"/> for the full
/// rationale (GH-2669).
/// </summary>
private void assignAncillaryStoreIfNeeded(Envelope envelope)
{
if (_runtime.Stores == null) return;
if (envelope.Store != null) return; // already stamped (e.g. from Option B at read time)
var store = _runtime.Stores.TryFindAncillaryStoreForMessageType(envelope.MessageType);
if (store != null)
{
Expand Down
13 changes: 8 additions & 5 deletions src/Wolverine/Transports/Local/DurableLocalQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,18 @@ public ValueTask StoreAndForwardAsync(Envelope envelope)
/// <summary>
/// If the handler for this message type targets an ancillary store on a
/// different database, set envelope.Store so that the DelegatingMessageInbox
/// persists it in the correct store for transactional atomicity.
/// This is a safety net for envelopes that arrive without Store already set
/// (e.g. from scheduled-job recovery). Envelopes published via PublishAsync
/// from a handler will already have Store stamped by MessageBus.
/// persists it in the correct store for transactional atomicity. The
/// receiving handler's store association wins over the publishing context's
/// store: a message published from a main-store handler can be persisted
/// transactionally by an ancillary-store handler. Without overriding here,
/// a publisher-stamped envelope.Store (the main store) would carry through
/// the inbox and cause FlushOutgoingMessagesOnCommit to point at the
/// publisher's inbox table while the receiving Marten/Polecat session was
/// connected to the ancillary database. See GH-2669.
/// </summary>
private void assignAncillaryStoreIfNeeded(Envelope envelope)
{
if (_runtime.Stores == null) return;
if (envelope.Store != null) return;
var store = _runtime.Stores.TryFindAncillaryStoreForMessageType(envelope.MessageType);
if (store != null)
{
Expand Down
Loading