Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/guide/extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ internal class DisableExternalTransports : IWolverineExtension
}
}
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Wolverine/HostBuilderExtensions.cs#L435-L445' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_disableexternaltransports' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Wolverine/HostBuilderExtensions.cs#L384-L394' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_disableexternaltransports' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

And that extension is just added to the application's IoC container at test bootstrapping time like this:
Expand All @@ -123,7 +123,7 @@ public static IServiceCollection DisableAllExternalWolverineTransports(this ISer
return services;
}
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Wolverine/HostBuilderExtensions.cs#L412-L420' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_extension_method_to_disable_external_transports' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Wolverine/HostBuilderExtensions.cs#L361-L369' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_extension_method_to_disable_external_transports' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

In usage, the `IWolverineExtension` objects added to the IoC container are applied *after* the inner configuration
Expand Down
34 changes: 31 additions & 3 deletions docs/guide/handlers/middleware.md
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public static class MaybeBadThingHandler
}
}
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/compound_handlers.cs#L134-L155' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_sending_messages_in_before_middleware' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/compound_handlers.cs#L163-L184' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_sending_messages_in_before_middleware' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Or by returning `OutgoingMessages` from a middleware method as shown below:
Expand All @@ -275,7 +275,7 @@ public static class MaybeBadThing2Handler
}
}
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/compound_handlers.cs#L157-L177' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_outgoing_messages_from_before_middleware' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/compound_handlers.cs#L206-L226' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_outgoing_messages_from_before_middleware' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->


Expand Down Expand Up @@ -608,7 +608,35 @@ HTTP endpoint types to enable you to send messages through the usage of `Outgoin

You can now write middleware method like this:

snippet: sample_send_messages_through_outgoing_messages_with_external_middleware
<!-- snippet: sample_send_messages_through_outgoing_messages_with_external_middleware -->
<a id='snippet-sample_send_messages_through_outgoing_messages_with_external_middleware'></a>
```cs
public record MaybeBadThing4(int Number);

public static class MaybeBadThing4Middleware
{
public static (OutgoingMessages, HandlerContinuation) Validate(MaybeBadThing4 thing)
{
if (thing.Number > 10)
{
return ([new RejectYourThing(thing.Number)], HandlerContinuation.Stop);
}

return ([], HandlerContinuation.Continue);
}
}

[Middleware(typeof(MaybeBadThing4Middleware))]
public static class MaybeBadThing4Handler
{
public static void Handle(MaybeBadThing4 message)
{
Debug.WriteLine("Got " + message);
}
}
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/compound_handlers.cs#L256-L282' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_send_messages_through_outgoing_messages_with_external_middleware' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

And any objects in the `OutgoingMessages` return value from the middleware method will be sent as cascaded
messages. Wolverine will also apply a "maybe stop" frame from the `IHandlerContinuation` as well.
Expand Down
10 changes: 10 additions & 0 deletions docs/guide/http/endpoints.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# HTTP Endpoints

::: warning
While Wolverine.HTTP has a relaxed view of naming conventions since it depends on the routing attributes for discovery. It
is very possible to utilize the same method as both an HTTP endpoint and Wolverine message handler if the method both
follows the correct naming conventions for message handler discovery and is decorated with one of the `[WolverineVerb]` attributes.

This can lead to unexpected code generation errors on the message handler side if the method refers to HTTP route arguments,
query string values, or other AspNetCore services. Our strong advice is to use the `Endpoint` class name nomenclature for HTTP
endpoints unless you are explicitly meaning for a method to be both an HTTP endpoint and message handler.
:::

First, a little terminology about Wolverine HTTP endpoints. Consider the following endpoint method:

<!-- snippet: sample_simple_wolverine_http_endpoint -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Weasel.SqlServer.Tables;
using Wolverine;
using Wolverine.Attributes;
using Wolverine.ComplianceTests;
using Wolverine.EntityFrameworkCore;
using Wolverine.Runtime.Handlers;
using Wolverine.SqlServer;
Expand All @@ -38,13 +39,13 @@ public async Task detect_concurrency_exception_as_SagaConcurrencyException()
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opt =>
{
opt.DisableConventionalDiscovery().IncludeType(typeof(ConcurrencyTestSaga));

opt.Services.AddDbContextWithWolverineIntegration<OptConcurrencyDbContext>(o =>
{
o.UseSqlServer(Servers.SqlServerConnectionString);
});

opt.Services.AddScoped<IOrderRepository, OrderRepository>();

opt.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString);
opt.UseEntityFrameworkCoreTransactions();
opt.Policies.UseDurableLocalQueues();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ public Frame CommitUnitOfWorkFrame(Variable saga, IServiceContainer container)

public Frame DetermineUpdateFrame(Variable saga, IServiceContainer container)
{
var version = saga.VariableType.GetProperty("Version");
if (version == null || !(version.CanRead && version.CanWrite))
{
return new CommentFrame("No explicit update necessary with EF Core without a Version property");
}

var dbContextType = DetermineDbContextType(saga.VariableType, container);
return new IncrementSagaVersionIfNecessary(dbContextType, saga);
}
Expand Down Expand Up @@ -375,8 +381,8 @@ public override IEnumerable<Variable> FindVariables(IMethodVariables chain)
public override void GenerateCode(GeneratedMethod method, ISourceWriter writer)
{
writer.WriteLine("");
writer.WriteComment("If the saga state changed, then increment it's version to support optimistic concurrency");
writer.WriteLine($"if ({_context!.Usage}.Entry({_saga.Usage}.Type == EntityState.Modified) {{ {_saga.Usage}.Version += 1; }}");
writer.WriteComment("If the saga state changed, then increment its version to support optimistic concurrency");
writer.WriteLine($"if ({_context!.Usage}.Entry({_saga.Usage}).State == {typeof(EntityState).FullName}.Modified) {{ {_saga.Usage}.Version += 1; }}");

Next?.GenerateCode(method, writer);
}
Expand All @@ -400,15 +406,15 @@ public override IEnumerable<Variable> FindVariables(IMethodVariables chain)

public override void GenerateCode(GeneratedMethod method, ISourceWriter writer)
{
writer.WriteLine("BLOCK:try");
writer.Write("BLOCK:try");
_frame.GenerateCode(method, writer);
writer.FinishBlock();

writer.WriteLine("BLOCK:catch (DbUpdateConcurrencyException error)");
writer.Write($"BLOCK:catch ({typeof(DbUpdateConcurrencyException).FullNameInCode()} error)");
writer.WriteComment("Only intercepts concurrency error on the saga itself");

writer.WriteLine($"BLOCK:if (error.Entries.Any(e => e.Entity == ${_saga.Usage})");
writer.WriteLine($"throw new SagaConcurrencyException($\"Saga of type {_saga.VariableType.FullNameInCode()} and id {{ {SagaChain.SagaIdVariableName} }} cannot be updated because of optimistic concurrency violations\");");
writer.Write($"BLOCK:if ({typeof(Enumerable).FullNameInCode()}.Any(error.Entries, e => e.Entity == {_saga.Usage}))");
writer.WriteLine($"throw new {typeof(SagaConcurrencyException).FullNameInCode()}($\"Saga of type {_saga.VariableType.FullNameInCode()} and identity {SagaChain.SagaIdVariableName} cannot be updated because of optimistic concurrency violations\");");
writer.FinishBlock();

writer.WriteComment("Rethrow any other exception");
Expand All @@ -417,6 +423,7 @@ public override void GenerateCode(GeneratedMethod method, ISourceWriter writer)

Next?.GenerateCode(method, writer);
}

}

public class CommitDbContextTransactionIfNecessary : SyncFrame
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,9 @@ public async Task can_request_reply()

var (session, response) = await theSender.TrackActivity()
.AlsoTrack(theReceiver)
.IncludeExternalTransports()
.Timeout(30.Seconds())
.InvokeAndWaitAsync<Response>(request);
.InvokeAndWaitAsync<Response>(request, 30.Seconds());

response.Name.ShouldBe(request.Name);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Transports/Kafka/BatchMessaging/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

builder.Host.UseWolverine(opts =>
{
opts.UseKafka("localhost:9092").AutoProvision();
opts.UseKafka("localhost:9092").AutoProvision().AutoPurgeOnStartup();

opts.PublishAllMessages().ToKafkaTopic("topic_0");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
using JasperFx.Core;
using Microsoft.Extensions.Hosting;
using JasperFx.Resources;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Shouldly;
using Wolverine.Attributes;
using Wolverine.ComplianceTests;
using Wolverine.Tracking;
using Xunit.Abstractions;

Expand Down Expand Up @@ -38,6 +41,7 @@ public async Task InitializeAsync()
opts.ServiceName = "receiver";

opts.Services.AddResourceSetupOnStartup();
opts.Services.AddSingleton<ILoggerProvider>(new OutputLoggerProvider(_output));
}).StartAsync();


Expand All @@ -52,6 +56,7 @@ public async Task InitializeAsync()
opts.ServiceName = "sender";

opts.Services.AddResourceSetupOnStartup();
opts.Services.AddSingleton<ILoggerProvider>(new OutputLoggerProvider(_output));
}).StartAsync();
}

Expand Down
5 changes: 5 additions & 0 deletions src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
using Confluent.Kafka;
using JasperFx.Resources;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Wolverine.ComplianceTests;
using Wolverine.ComplianceTests.Compliance;
using Xunit.Abstractions;
using Xunit.Sdk;

namespace Wolverine.Kafka.Tests;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public async Task DisposeAsync()
public async Task can_receive_the_group_id_for_the_consumer_on_the_envelope()
{
Task Send(IMessageContext c) => c.EndpointFor("red").SendAsync(new RedMessage("one")).AsTask();
var session = await _host.ExecuteAndWaitAsync(Send);
var session = await _host.TrackActivity().IncludeExternalTransports().ExecuteAndWaitAsync(Send);

session.Received.SingleEnvelope<RedMessage>()
.GroupId.ShouldBe("foo");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.10.0" />
<PackageReference Include="Confluent.Kafka" Version="2.12.0" />
</ItemGroup>

</Project>
4 changes: 2 additions & 2 deletions src/Wolverine/Tracking/TrackedSessionConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,11 @@ public Task<ITrackedSession> PublishMessageAndWaitAsync(object? message, Deliver
/// <param name="requestInvocation"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public async Task<(ITrackedSession, T?)> InvokeAndWaitAsync<T>(object request)
public async Task<(ITrackedSession, T?)> InvokeAndWaitAsync<T>(object request, TimeSpan? timeout = null)
{
T? response = default;

Func<IMessageContext, Task> invocation = async c => { response = await c.InvokeAsync<T>(request); };
Func<IMessageContext, Task> invocation = async c => { response = await c.InvokeAsync<T>(request, timeout:timeout); };

var session = await ExecuteAndWaitAsync(invocation);

Expand Down
4 changes: 3 additions & 1 deletion src/Wolverine/Transports/ListeningAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ public async ValueTask StopAndDrainAsync()
{
using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.StoppingListener);
activity?.SetTag(WolverineTracing.EndpointAddress, Uri);

if (Listener == null) return;

await Listener.StopAsync();
await _receiver!.DrainAsync();
Expand Down Expand Up @@ -210,7 +212,7 @@ public async ValueTask PauseAsync(TimeSpan pauseTime)
try
{
using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.PausingListener);
activity?.SetTag(WolverineTracing.EndpointAddress, Listener.Address);
activity?.SetTag(WolverineTracing.EndpointAddress, Uri);
await StopAndDrainAsync();
}
catch (Exception e)
Expand Down
2 changes: 1 addition & 1 deletion src/Wolverine/Wolverine.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="JasperFx" Version="1.9.0" />
<PackageReference Include="JasperFx.RuntimeCompiler" Version="4.0.0" />
<PackageReference Include="JasperFx.RuntimeCompiler" Version="4.1.0" />
<PackageReference Include="NewId" Version="4.0.1" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="System.Net.NameResolution" Version="4.3.0" />
Expand Down
Loading