Skip to content

Commit 5abb6fc

Browse files
committed
Addressed an error with PostgreSQL transport configuration w/o an explicit message store schema configuration in IntegrateWitWolverine(). Closes GH-1175
1 parent b28c4c3 commit 5abb6fc

File tree

2 files changed

+95
-1
lines changed

2 files changed

+95
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
using System.Diagnostics;
2+
using IntegrationTests;
3+
using JasperFx.Core;
4+
using Marten;
5+
using Marten.Storage;
6+
using Microsoft.Extensions.Hosting;
7+
using Oakton.Resources;
8+
using Shouldly;
9+
using Weasel.Core;
10+
using Wolverine;
11+
using Wolverine.Configuration;
12+
using Wolverine.Marten;
13+
using Wolverine.Postgresql;
14+
using Wolverine.Tracking;
15+
16+
namespace MartenTests.Bugs;
17+
18+
public class Bug_1175_schema_name_with_queues
19+
{
20+
[Fact]
21+
public async Task send_messages_with_postgresql_queueing()
22+
{
23+
using var sender = await Host.CreateDefaultBuilder()
24+
.UseWolverine(opts =>
25+
{
26+
opts.ServiceName = "Service";
27+
28+
opts.ListenToPostgresqlQueue("response").MaximumParallelMessages(14, ProcessingOrder.UnOrdered);
29+
opts.PublishMessage<ColorRequest>().ToPostgresqlQueue("request");
30+
31+
opts.Services.AddMarten(opt =>
32+
{
33+
opt.Connection(Servers.PostgresConnectionString);
34+
opt.Events.TenancyStyle = TenancyStyle.Conjoined;
35+
})
36+
.UseLightweightSessions()
37+
.IntegrateWithWolverine(options =>
38+
{
39+
options.AutoCreate = AutoCreate.CreateOrUpdate;
40+
options.MessageStorageSchemaName = "sender";
41+
});
42+
43+
opts.Services.AddResourceSetupOnStartup();
44+
45+
}).StartAsync();
46+
47+
using var listener = await Host.CreateDefaultBuilder()
48+
.UseWolverine(opts =>
49+
{
50+
opts.ServiceName = "Listener";
51+
52+
opts.ListenToPostgresqlQueue("request").MaximumParallelMessages(14, ProcessingOrder.UnOrdered);
53+
opts.PublishMessage<ColorResponse>().ToPostgresqlQueue("response");
54+
55+
opts.Services.AddMarten(opt =>
56+
{
57+
opt.Connection(Servers.PostgresConnectionString);
58+
opt.Events.TenancyStyle = TenancyStyle.Conjoined;
59+
})
60+
.UseLightweightSessions()
61+
.IntegrateWithWolverine(options =>
62+
{
63+
options.AutoCreate = AutoCreate.CreateOrUpdate;
64+
options.MessageStorageSchemaName = "listener";
65+
});
66+
67+
opts.Services.AddResourceSetupOnStartup();
68+
69+
}).StartAsync();
70+
71+
var tracked = await sender.TrackActivity().AlsoTrack(listener).SendMessageAndWaitAsync(new ColorRequest("red"));
72+
tracked.Received.SingleMessage<ColorResponse>().Color.ShouldBe("red");
73+
tracked.Received.SingleEnvelope<ColorResponse>()
74+
.Destination.ShouldBe(new Uri("postgresql://response/"));
75+
76+
}
77+
}
78+
79+
public record ColorRequest(string Color);
80+
public record ColorResponse(string Color);
81+
82+
public static class ColorRequestHandler
83+
{
84+
public static async Task<ColorResponse> Handle(ColorRequest request)
85+
{
86+
await Task.Delay(Random.Shared.Next(0, 500).Milliseconds());
87+
return new ColorResponse(request.Color);
88+
}
89+
}
90+
91+
public static class ColorResponseHandler
92+
{
93+
public static void Handle(ColorResponse response) => Debug.WriteLine("Got color response for " + response.Color);
94+
}

src/Persistence/Wolverine.Marten/MartenIntegration.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void Configure(WolverineOptions options)
5757

5858
var transport = options.Transports.GetOrCreate<PostgresqlTransport>();
5959
transport.TransportSchemaName = TransportSchemaName;
60-
transport.MessageStorageSchemaName = MessageStorageSchemaName;
60+
transport.MessageStorageSchemaName = MessageStorageSchemaName ?? "public";
6161
}
6262

6363
/// <summary>

0 commit comments

Comments
 (0)