diff --git a/docs/guide/extensions.md b/docs/guide/extensions.md
index 1564fd37f..5c33d2336 100644
--- a/docs/guide/extensions.md
+++ b/docs/guide/extensions.md
@@ -109,7 +109,7 @@ internal class DisableExternalTransports : IWolverineExtension
}
}
```
-snippet source | anchor
+snippet source | anchor
And that extension is just added to the application's IoC container at test bootstrapping time like this:
@@ -123,7 +123,7 @@ public static IServiceCollection DisableAllExternalWolverineTransports(this ISer
return services;
}
```
-snippet source | anchor
+snippet source | anchor
In usage, the `IWolverineExtension` objects added to the IoC container are applied *after* the inner configuration
diff --git a/docs/guide/handlers/middleware.md b/docs/guide/handlers/middleware.md
index 39c357912..3d71f4cd0 100644
--- a/docs/guide/handlers/middleware.md
+++ b/docs/guide/handlers/middleware.md
@@ -249,7 +249,7 @@ public static class MaybeBadThingHandler
}
}
```
-snippet source | anchor
+snippet source | anchor
Or by returning `OutgoingMessages` from a middleware method as shown below:
@@ -275,7 +275,7 @@ public static class MaybeBadThing2Handler
}
}
```
-snippet source | anchor
+snippet source | anchor
@@ -608,7 +608,35 @@ HTTP endpoint types to enable you to send messages through the usage of `Outgoin
You can now write middleware method like this:
-snippet: sample_send_messages_through_outgoing_messages_with_external_middleware
+
+
+```cs
+public record MaybeBadThing4(int Number);
+
+public static class MaybeBadThing4Middleware
+{
+ public static (OutgoingMessages, HandlerContinuation) Validate(MaybeBadThing4 thing)
+ {
+ if (thing.Number > 10)
+ {
+ return ([new RejectYourThing(thing.Number)], HandlerContinuation.Stop);
+ }
+
+ return ([], HandlerContinuation.Continue);
+ }
+}
+
+[Middleware(typeof(MaybeBadThing4Middleware))]
+public static class MaybeBadThing4Handler
+{
+ public static void Handle(MaybeBadThing4 message)
+ {
+ Debug.WriteLine("Got " + message);
+ }
+}
+```
+snippet source | anchor
+
And any objects in the `OutgoingMessages` return value from the middleware method will be sent as cascaded
messages. Wolverine will also apply a "maybe stop" frame from the `IHandlerContinuation` as well.
diff --git a/docs/guide/http/endpoints.md b/docs/guide/http/endpoints.md
index a36a9a3bb..85db8a975 100644
--- a/docs/guide/http/endpoints.md
+++ b/docs/guide/http/endpoints.md
@@ -1,5 +1,15 @@
# HTTP Endpoints
+::: warning
+While Wolverine.HTTP has a relaxed view of naming conventions since it depends on the routing attributes for discovery. It
+is very possible to utilize the same method as both an HTTP endpoint and Wolverine message handler if the method both
+follows the correct naming conventions for message handler discovery and is decorated with one of the `[WolverineVerb]` attributes.
+
+This can lead to unexpected code generation errors on the message handler side if the method refers to HTTP route arguments,
+query string values, or other AspNetCore services. Our strong advice is to use the `Endpoint` class name nomenclature for HTTP
+endpoints unless you are explicitly meaning for a method to be both an HTTP endpoint and message handler.
+:::
+
First, a little terminology about Wolverine HTTP endpoints. Consider the following endpoint method:
diff --git a/src/Persistence/EfCoreTests/Optimistic_concurrency_with_ef_core.cs b/src/Persistence/EfCoreTests/Optimistic_concurrency_with_ef_core.cs
index 7de6002bf..dc1264767 100644
--- a/src/Persistence/EfCoreTests/Optimistic_concurrency_with_ef_core.cs
+++ b/src/Persistence/EfCoreTests/Optimistic_concurrency_with_ef_core.cs
@@ -14,6 +14,7 @@
using Weasel.SqlServer.Tables;
using Wolverine;
using Wolverine.Attributes;
+using Wolverine.ComplianceTests;
using Wolverine.EntityFrameworkCore;
using Wolverine.Runtime.Handlers;
using Wolverine.SqlServer;
@@ -38,13 +39,13 @@ public async Task detect_concurrency_exception_as_SagaConcurrencyException()
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opt =>
{
+ opt.DisableConventionalDiscovery().IncludeType(typeof(ConcurrencyTestSaga));
+
opt.Services.AddDbContextWithWolverineIntegration(o =>
{
o.UseSqlServer(Servers.SqlServerConnectionString);
});
- opt.Services.AddScoped();
-
opt.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString);
opt.UseEntityFrameworkCoreTransactions();
opt.Policies.UseDurableLocalQueues();
diff --git a/src/Persistence/Wolverine.EntityFrameworkCore/Codegen/EFCorePersistenceFrameProvider.cs b/src/Persistence/Wolverine.EntityFrameworkCore/Codegen/EFCorePersistenceFrameProvider.cs
index cd6084db5..110855cc6 100644
--- a/src/Persistence/Wolverine.EntityFrameworkCore/Codegen/EFCorePersistenceFrameProvider.cs
+++ b/src/Persistence/Wolverine.EntityFrameworkCore/Codegen/EFCorePersistenceFrameProvider.cs
@@ -76,6 +76,12 @@ public Frame CommitUnitOfWorkFrame(Variable saga, IServiceContainer container)
public Frame DetermineUpdateFrame(Variable saga, IServiceContainer container)
{
+ var version = saga.VariableType.GetProperty("Version");
+ if (version == null || !(version.CanRead && version.CanWrite))
+ {
+ return new CommentFrame("No explicit update necessary with EF Core without a Version property");
+ }
+
var dbContextType = DetermineDbContextType(saga.VariableType, container);
return new IncrementSagaVersionIfNecessary(dbContextType, saga);
}
@@ -375,8 +381,8 @@ public override IEnumerable FindVariables(IMethodVariables chain)
public override void GenerateCode(GeneratedMethod method, ISourceWriter writer)
{
writer.WriteLine("");
- writer.WriteComment("If the saga state changed, then increment it's version to support optimistic concurrency");
- writer.WriteLine($"if ({_context!.Usage}.Entry({_saga.Usage}.Type == EntityState.Modified) {{ {_saga.Usage}.Version += 1; }}");
+ writer.WriteComment("If the saga state changed, then increment its version to support optimistic concurrency");
+ writer.WriteLine($"if ({_context!.Usage}.Entry({_saga.Usage}).State == {typeof(EntityState).FullName}.Modified) {{ {_saga.Usage}.Version += 1; }}");
Next?.GenerateCode(method, writer);
}
@@ -400,15 +406,15 @@ public override IEnumerable FindVariables(IMethodVariables chain)
public override void GenerateCode(GeneratedMethod method, ISourceWriter writer)
{
- writer.WriteLine("BLOCK:try");
+ writer.Write("BLOCK:try");
_frame.GenerateCode(method, writer);
writer.FinishBlock();
- writer.WriteLine("BLOCK:catch (DbUpdateConcurrencyException error)");
+ writer.Write($"BLOCK:catch ({typeof(DbUpdateConcurrencyException).FullNameInCode()} error)");
writer.WriteComment("Only intercepts concurrency error on the saga itself");
- writer.WriteLine($"BLOCK:if (error.Entries.Any(e => e.Entity == ${_saga.Usage})");
- writer.WriteLine($"throw new SagaConcurrencyException($\"Saga of type {_saga.VariableType.FullNameInCode()} and id {{ {SagaChain.SagaIdVariableName} }} cannot be updated because of optimistic concurrency violations\");");
+ writer.Write($"BLOCK:if ({typeof(Enumerable).FullNameInCode()}.Any(error.Entries, e => e.Entity == {_saga.Usage}))");
+ writer.WriteLine($"throw new {typeof(SagaConcurrencyException).FullNameInCode()}($\"Saga of type {_saga.VariableType.FullNameInCode()} and identity {SagaChain.SagaIdVariableName} cannot be updated because of optimistic concurrency violations\");");
writer.FinishBlock();
writer.WriteComment("Rethrow any other exception");
@@ -417,6 +423,7 @@ public override void GenerateCode(GeneratedMethod method, ISourceWriter writer)
Next?.GenerateCode(method, writer);
}
+
}
public class CommitDbContextTransactionIfNecessary : SyncFrame
diff --git a/src/Testing/Wolverine.ComplianceTests/Compliance/TransportCompliance.cs b/src/Testing/Wolverine.ComplianceTests/Compliance/TransportCompliance.cs
index e84676ad5..81b5f2238 100644
--- a/src/Testing/Wolverine.ComplianceTests/Compliance/TransportCompliance.cs
+++ b/src/Testing/Wolverine.ComplianceTests/Compliance/TransportCompliance.cs
@@ -349,8 +349,9 @@ public async Task can_request_reply()
var (session, response) = await theSender.TrackActivity()
.AlsoTrack(theReceiver)
+ .IncludeExternalTransports()
.Timeout(30.Seconds())
- .InvokeAndWaitAsync(request);
+ .InvokeAndWaitAsync(request, 30.Seconds());
response.Name.ShouldBe(request.Name);
}
diff --git a/src/Transports/Kafka/BatchMessaging/Program.cs b/src/Transports/Kafka/BatchMessaging/Program.cs
index e627fa105..27f41323d 100644
--- a/src/Transports/Kafka/BatchMessaging/Program.cs
+++ b/src/Transports/Kafka/BatchMessaging/Program.cs
@@ -10,7 +10,7 @@
builder.Host.UseWolverine(opts =>
{
- opts.UseKafka("localhost:9092").AutoProvision();
+ opts.UseKafka("localhost:9092").AutoProvision().AutoPurgeOnStartup();
opts.PublishAllMessages().ToKafkaTopic("topic_0");
diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs
index 106474568..a860cfa43 100644
--- a/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs
+++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/broadcast_to_topic_rules.cs
@@ -2,8 +2,11 @@
using JasperFx.Core;
using Microsoft.Extensions.Hosting;
using JasperFx.Resources;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
using Shouldly;
using Wolverine.Attributes;
+using Wolverine.ComplianceTests;
using Wolverine.Tracking;
using Xunit.Abstractions;
@@ -38,6 +41,7 @@ public async Task InitializeAsync()
opts.ServiceName = "receiver";
opts.Services.AddResourceSetupOnStartup();
+ opts.Services.AddSingleton(new OutputLoggerProvider(_output));
}).StartAsync();
@@ -52,6 +56,7 @@ public async Task InitializeAsync()
opts.ServiceName = "sender";
opts.Services.AddResourceSetupOnStartup();
+ opts.Services.AddSingleton(new OutputLoggerProvider(_output));
}).StartAsync();
}
diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs
index 1de6055d1..1355360af 100644
--- a/src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs
+++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs
@@ -1,6 +1,11 @@
using Confluent.Kafka;
using JasperFx.Resources;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using Wolverine.ComplianceTests;
using Wolverine.ComplianceTests.Compliance;
+using Xunit.Abstractions;
+using Xunit.Sdk;
namespace Wolverine.Kafka.Tests;
diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs
index 6688fa635..ee3a1939c 100644
--- a/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs
+++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs
@@ -86,7 +86,7 @@ public async Task DisposeAsync()
public async Task can_receive_the_group_id_for_the_consumer_on_the_envelope()
{
Task Send(IMessageContext c) => c.EndpointFor("red").SendAsync(new RedMessage("one")).AsTask();
- var session = await _host.ExecuteAndWaitAsync(Send);
+ var session = await _host.TrackActivity().IncludeExternalTransports().ExecuteAndWaitAsync(Send);
session.Received.SingleEnvelope()
.GroupId.ShouldBe("foo");
diff --git a/src/Transports/Kafka/Wolverine.Kafka/Wolverine.Kafka.csproj b/src/Transports/Kafka/Wolverine.Kafka/Wolverine.Kafka.csproj
index 548a32055..01e4241a9 100644
--- a/src/Transports/Kafka/Wolverine.Kafka/Wolverine.Kafka.csproj
+++ b/src/Transports/Kafka/Wolverine.Kafka/Wolverine.Kafka.csproj
@@ -15,7 +15,7 @@
-
+
diff --git a/src/Wolverine/Tracking/TrackedSessionConfiguration.cs b/src/Wolverine/Tracking/TrackedSessionConfiguration.cs
index be6578762..e72685ba5 100644
--- a/src/Wolverine/Tracking/TrackedSessionConfiguration.cs
+++ b/src/Wolverine/Tracking/TrackedSessionConfiguration.cs
@@ -239,11 +239,11 @@ public Task PublishMessageAndWaitAsync(object? message, Deliver
///
///
///
- public async Task<(ITrackedSession, T?)> InvokeAndWaitAsync(object request)
+ public async Task<(ITrackedSession, T?)> InvokeAndWaitAsync(object request, TimeSpan? timeout = null)
{
T? response = default;
- Func invocation = async c => { response = await c.InvokeAsync(request); };
+ Func invocation = async c => { response = await c.InvokeAsync(request, timeout:timeout); };
var session = await ExecuteAndWaitAsync(invocation);
diff --git a/src/Wolverine/Transports/ListeningAgent.cs b/src/Wolverine/Transports/ListeningAgent.cs
index ab64bb7d3..6a956e698 100644
--- a/src/Wolverine/Transports/ListeningAgent.cs
+++ b/src/Wolverine/Transports/ListeningAgent.cs
@@ -152,6 +152,8 @@ public async ValueTask StopAndDrainAsync()
{
using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.StoppingListener);
activity?.SetTag(WolverineTracing.EndpointAddress, Uri);
+
+ if (Listener == null) return;
await Listener.StopAsync();
await _receiver!.DrainAsync();
@@ -210,7 +212,7 @@ public async ValueTask PauseAsync(TimeSpan pauseTime)
try
{
using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.PausingListener);
- activity?.SetTag(WolverineTracing.EndpointAddress, Listener.Address);
+ activity?.SetTag(WolverineTracing.EndpointAddress, Uri);
await StopAndDrainAsync();
}
catch (Exception e)
diff --git a/src/Wolverine/Wolverine.csproj b/src/Wolverine/Wolverine.csproj
index c78850b02..f38dcedaf 100644
--- a/src/Wolverine/Wolverine.csproj
+++ b/src/Wolverine/Wolverine.csproj
@@ -5,7 +5,7 @@
-
+