From 973e142b55f347554fad0a29851dd649b49d2d22 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 20 Oct 2025 14:32:42 -0500 Subject: [PATCH 1/5] Warning about an http endpoint being discovered as a message handler. Closes GH-1769 --- docs/guide/extensions.md | 4 ++-- docs/guide/handlers/middleware.md | 34 ++++++++++++++++++++++++++++--- docs/guide/http/endpoints.md | 10 +++++++++ 3 files changed, 43 insertions(+), 5 deletions(-) diff --git a/docs/guide/extensions.md b/docs/guide/extensions.md index 1564fd37f..5c33d2336 100644 --- a/docs/guide/extensions.md +++ b/docs/guide/extensions.md @@ -109,7 +109,7 @@ internal class DisableExternalTransports : IWolverineExtension } } ``` -snippet source | anchor +snippet source | anchor And that extension is just added to the application's IoC container at test bootstrapping time like this: @@ -123,7 +123,7 @@ public static IServiceCollection DisableAllExternalWolverineTransports(this ISer return services; } ``` -snippet source | anchor +snippet source | anchor In usage, the `IWolverineExtension` objects added to the IoC container are applied *after* the inner configuration diff --git a/docs/guide/handlers/middleware.md b/docs/guide/handlers/middleware.md index 39c357912..3d71f4cd0 100644 --- a/docs/guide/handlers/middleware.md +++ b/docs/guide/handlers/middleware.md @@ -249,7 +249,7 @@ public static class MaybeBadThingHandler } } ``` -snippet source | anchor +snippet source | anchor Or by returning `OutgoingMessages` from a middleware method as shown below: @@ -275,7 +275,7 @@ public static class MaybeBadThing2Handler } } ``` -snippet source | anchor +snippet source | anchor @@ -608,7 +608,35 @@ HTTP endpoint types to enable you to send messages through the usage of `Outgoin You can now write middleware method like this: -snippet: sample_send_messages_through_outgoing_messages_with_external_middleware + + +```cs +public record MaybeBadThing4(int Number); + +public static class MaybeBadThing4Middleware +{ + public static (OutgoingMessages, HandlerContinuation) Validate(MaybeBadThing4 thing) + { + if (thing.Number > 10) + { + return ([new RejectYourThing(thing.Number)], HandlerContinuation.Stop); + } + + return ([], HandlerContinuation.Continue); + } +} + +[Middleware(typeof(MaybeBadThing4Middleware))] +public static class MaybeBadThing4Handler +{ + public static void Handle(MaybeBadThing4 message) + { + Debug.WriteLine("Got " + message); + } +} +``` +snippet source | anchor + And any objects in the `OutgoingMessages` return value from the middleware method will be sent as cascaded messages. Wolverine will also apply a "maybe stop" frame from the `IHandlerContinuation` as well. diff --git a/docs/guide/http/endpoints.md b/docs/guide/http/endpoints.md index a36a9a3bb..85db8a975 100644 --- a/docs/guide/http/endpoints.md +++ b/docs/guide/http/endpoints.md @@ -1,5 +1,15 @@ # HTTP Endpoints +::: warning +While Wolverine.HTTP has a relaxed view of naming conventions since it depends on the routing attributes for discovery. It +is very possible to utilize the same method as both an HTTP endpoint and Wolverine message handler if the method both +follows the correct naming conventions for message handler discovery and is decorated with one of the `[WolverineVerb]` attributes. + +This can lead to unexpected code generation errors on the message handler side if the method refers to HTTP route arguments, +query string values, or other AspNetCore services. Our strong advice is to use the `Endpoint` class name nomenclature for HTTP +endpoints unless you are explicitly meaning for a method to be both an HTTP endpoint and message handler. +::: + First, a little terminology about Wolverine HTTP endpoints. Consider the following endpoint method: From c3afe748692049afe3e5f08b799c3292fcd4d429 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 20 Oct 2025 14:41:03 -0500 Subject: [PATCH 2/5] Can call ListeningAgent.PauseAsync() even when it's already paused. Closes GH-1715 --- src/Wolverine/Transports/ListeningAgent.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Wolverine/Transports/ListeningAgent.cs b/src/Wolverine/Transports/ListeningAgent.cs index ab64bb7d3..6a956e698 100644 --- a/src/Wolverine/Transports/ListeningAgent.cs +++ b/src/Wolverine/Transports/ListeningAgent.cs @@ -152,6 +152,8 @@ public async ValueTask StopAndDrainAsync() { using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.StoppingListener); activity?.SetTag(WolverineTracing.EndpointAddress, Uri); + + if (Listener == null) return; await Listener.StopAsync(); await _receiver!.DrainAsync(); @@ -210,7 +212,7 @@ public async ValueTask PauseAsync(TimeSpan pauseTime) try { using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.PausingListener); - activity?.SetTag(WolverineTracing.EndpointAddress, Listener.Address); + activity?.SetTag(WolverineTracing.EndpointAddress, Uri); await StopAndDrainAsync(); } catch (Exception e) From 8c8f7b80e197b5c130125a4aa22feda14215e217 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 20 Oct 2025 16:17:22 -0500 Subject: [PATCH 3/5] Fixed the new EF Core concurrency --- .../Optimistic_concurrency_with_ef_core.cs | 5 +++-- .../Codegen/EFCorePersistenceFrameProvider.cs | 19 +++++++++++++------ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/Persistence/EfCoreTests/Optimistic_concurrency_with_ef_core.cs b/src/Persistence/EfCoreTests/Optimistic_concurrency_with_ef_core.cs index 7de6002bf..dc1264767 100644 --- a/src/Persistence/EfCoreTests/Optimistic_concurrency_with_ef_core.cs +++ b/src/Persistence/EfCoreTests/Optimistic_concurrency_with_ef_core.cs @@ -14,6 +14,7 @@ using Weasel.SqlServer.Tables; using Wolverine; using Wolverine.Attributes; +using Wolverine.ComplianceTests; using Wolverine.EntityFrameworkCore; using Wolverine.Runtime.Handlers; using Wolverine.SqlServer; @@ -38,13 +39,13 @@ public async Task detect_concurrency_exception_as_SagaConcurrencyException() using var host = await Host.CreateDefaultBuilder() .UseWolverine(opt => { + opt.DisableConventionalDiscovery().IncludeType(typeof(ConcurrencyTestSaga)); + opt.Services.AddDbContextWithWolverineIntegration(o => { o.UseSqlServer(Servers.SqlServerConnectionString); }); - opt.Services.AddScoped(); - opt.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString); opt.UseEntityFrameworkCoreTransactions(); opt.Policies.UseDurableLocalQueues(); diff --git a/src/Persistence/Wolverine.EntityFrameworkCore/Codegen/EFCorePersistenceFrameProvider.cs b/src/Persistence/Wolverine.EntityFrameworkCore/Codegen/EFCorePersistenceFrameProvider.cs index cd6084db5..110855cc6 100644 --- a/src/Persistence/Wolverine.EntityFrameworkCore/Codegen/EFCorePersistenceFrameProvider.cs +++ b/src/Persistence/Wolverine.EntityFrameworkCore/Codegen/EFCorePersistenceFrameProvider.cs @@ -76,6 +76,12 @@ public Frame CommitUnitOfWorkFrame(Variable saga, IServiceContainer container) public Frame DetermineUpdateFrame(Variable saga, IServiceContainer container) { + var version = saga.VariableType.GetProperty("Version"); + if (version == null || !(version.CanRead && version.CanWrite)) + { + return new CommentFrame("No explicit update necessary with EF Core without a Version property"); + } + var dbContextType = DetermineDbContextType(saga.VariableType, container); return new IncrementSagaVersionIfNecessary(dbContextType, saga); } @@ -375,8 +381,8 @@ public override IEnumerable FindVariables(IMethodVariables chain) public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) { writer.WriteLine(""); - writer.WriteComment("If the saga state changed, then increment it's version to support optimistic concurrency"); - writer.WriteLine($"if ({_context!.Usage}.Entry({_saga.Usage}.Type == EntityState.Modified) {{ {_saga.Usage}.Version += 1; }}"); + writer.WriteComment("If the saga state changed, then increment its version to support optimistic concurrency"); + writer.WriteLine($"if ({_context!.Usage}.Entry({_saga.Usage}).State == {typeof(EntityState).FullName}.Modified) {{ {_saga.Usage}.Version += 1; }}"); Next?.GenerateCode(method, writer); } @@ -400,15 +406,15 @@ public override IEnumerable FindVariables(IMethodVariables chain) public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) { - writer.WriteLine("BLOCK:try"); + writer.Write("BLOCK:try"); _frame.GenerateCode(method, writer); writer.FinishBlock(); - writer.WriteLine("BLOCK:catch (DbUpdateConcurrencyException error)"); + writer.Write($"BLOCK:catch ({typeof(DbUpdateConcurrencyException).FullNameInCode()} error)"); writer.WriteComment("Only intercepts concurrency error on the saga itself"); - writer.WriteLine($"BLOCK:if (error.Entries.Any(e => e.Entity == ${_saga.Usage})"); - writer.WriteLine($"throw new SagaConcurrencyException($\"Saga of type {_saga.VariableType.FullNameInCode()} and id {{ {SagaChain.SagaIdVariableName} }} cannot be updated because of optimistic concurrency violations\");"); + writer.Write($"BLOCK:if ({typeof(Enumerable).FullNameInCode()}.Any(error.Entries, e => e.Entity == {_saga.Usage}))"); + writer.WriteLine($"throw new {typeof(SagaConcurrencyException).FullNameInCode()}($\"Saga of type {_saga.VariableType.FullNameInCode()} and identity {SagaChain.SagaIdVariableName} cannot be updated because of optimistic concurrency violations\");"); writer.FinishBlock(); writer.WriteComment("Rethrow any other exception"); @@ -417,6 +423,7 @@ public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) Next?.GenerateCode(method, writer); } + } public class CommitDbContextTransactionIfNecessary : SyncFrame From 5b73cc2862a2e0e08209c0ca0e5a8de35d3750d2 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 20 Oct 2025 16:21:41 -0500 Subject: [PATCH 4/5] Updated JasperFx.RuntimeCompiler so the new service location checks work --- src/Wolverine/Wolverine.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Wolverine/Wolverine.csproj b/src/Wolverine/Wolverine.csproj index c78850b02..f38dcedaf 100644 --- a/src/Wolverine/Wolverine.csproj +++ b/src/Wolverine/Wolverine.csproj @@ -5,7 +5,7 @@ - + From 0ad5e71af8efe06e31c5620b9518e0ad0c102c04 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 21 Oct 2025 12:40:28 -0500 Subject: [PATCH 5/5] Tweaks for the Kafka tests --- .../Compliance/TransportCompliance.cs | 3 ++- src/Transports/Kafka/BatchMessaging/Program.cs | 2 +- .../Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs | 5 +++++ src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs | 5 +++++ .../configure_consumers_and_publishers.cs | 2 +- src/Transports/Kafka/Wolverine.Kafka/Wolverine.Kafka.csproj | 2 +- src/Wolverine/Tracking/TrackedSessionConfiguration.cs | 4 ++-- 7 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/Testing/Wolverine.ComplianceTests/Compliance/TransportCompliance.cs b/src/Testing/Wolverine.ComplianceTests/Compliance/TransportCompliance.cs index e84676ad5..81b5f2238 100644 --- a/src/Testing/Wolverine.ComplianceTests/Compliance/TransportCompliance.cs +++ b/src/Testing/Wolverine.ComplianceTests/Compliance/TransportCompliance.cs @@ -349,8 +349,9 @@ public async Task can_request_reply() var (session, response) = await theSender.TrackActivity() .AlsoTrack(theReceiver) + .IncludeExternalTransports() .Timeout(30.Seconds()) - .InvokeAndWaitAsync(request); + .InvokeAndWaitAsync(request, 30.Seconds()); response.Name.ShouldBe(request.Name); } diff --git a/src/Transports/Kafka/BatchMessaging/Program.cs b/src/Transports/Kafka/BatchMessaging/Program.cs index e627fa105..27f41323d 100644 --- a/src/Transports/Kafka/BatchMessaging/Program.cs +++ b/src/Transports/Kafka/BatchMessaging/Program.cs @@ -10,7 +10,7 @@ builder.Host.UseWolverine(opts => { - opts.UseKafka("localhost:9092").AutoProvision(); + opts.UseKafka("localhost:9092").AutoProvision().AutoPurgeOnStartup(); opts.PublishAllMessages().ToKafkaTopic("topic_0"); diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs index 106474568..a860cfa43 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs @@ -2,8 +2,11 @@ using JasperFx.Core; using Microsoft.Extensions.Hosting; using JasperFx.Resources; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; using Shouldly; using Wolverine.Attributes; +using Wolverine.ComplianceTests; using Wolverine.Tracking; using Xunit.Abstractions; @@ -38,6 +41,7 @@ public async Task InitializeAsync() opts.ServiceName = "receiver"; opts.Services.AddResourceSetupOnStartup(); + opts.Services.AddSingleton(new OutputLoggerProvider(_output)); }).StartAsync(); @@ -52,6 +56,7 @@ public async Task InitializeAsync() opts.ServiceName = "sender"; opts.Services.AddResourceSetupOnStartup(); + opts.Services.AddSingleton(new OutputLoggerProvider(_output)); }).StartAsync(); } diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs index 1de6055d1..1355360af 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs @@ -1,6 +1,11 @@ using Confluent.Kafka; using JasperFx.Resources; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Wolverine.ComplianceTests; using Wolverine.ComplianceTests.Compliance; +using Xunit.Abstractions; +using Xunit.Sdk; namespace Wolverine.Kafka.Tests; diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs index 6688fa635..ee3a1939c 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs @@ -86,7 +86,7 @@ public async Task DisposeAsync() public async Task can_receive_the_group_id_for_the_consumer_on_the_envelope() { Task Send(IMessageContext c) => c.EndpointFor("red").SendAsync(new RedMessage("one")).AsTask(); - var session = await _host.ExecuteAndWaitAsync(Send); + var session = await _host.TrackActivity().IncludeExternalTransports().ExecuteAndWaitAsync(Send); session.Received.SingleEnvelope() .GroupId.ShouldBe("foo"); diff --git a/src/Transports/Kafka/Wolverine.Kafka/Wolverine.Kafka.csproj b/src/Transports/Kafka/Wolverine.Kafka/Wolverine.Kafka.csproj index 548a32055..01e4241a9 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/Wolverine.Kafka.csproj +++ b/src/Transports/Kafka/Wolverine.Kafka/Wolverine.Kafka.csproj @@ -15,7 +15,7 @@ - + diff --git a/src/Wolverine/Tracking/TrackedSessionConfiguration.cs b/src/Wolverine/Tracking/TrackedSessionConfiguration.cs index be6578762..e72685ba5 100644 --- a/src/Wolverine/Tracking/TrackedSessionConfiguration.cs +++ b/src/Wolverine/Tracking/TrackedSessionConfiguration.cs @@ -239,11 +239,11 @@ public Task PublishMessageAndWaitAsync(object? message, Deliver /// /// /// - public async Task<(ITrackedSession, T?)> InvokeAndWaitAsync(object request) + public async Task<(ITrackedSession, T?)> InvokeAndWaitAsync(object request, TimeSpan? timeout = null) { T? response = default; - Func invocation = async c => { response = await c.InvokeAsync(request); }; + Func invocation = async c => { response = await c.InvokeAsync(request, timeout:timeout); }; var session = await ExecuteAndWaitAsync(invocation);