diff --git a/docs/guide/durability/dead-letter-storage.md b/docs/guide/durability/dead-letter-storage.md
index ca67288b1..7551e5651 100644
--- a/docs/guide/durability/dead-letter-storage.md
+++ b/docs/guide/durability/dead-letter-storage.md
@@ -48,7 +48,7 @@ using var host = await Host.CreateDefaultBuilder()
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
Note that Wolverine will use the message's `DeliverBy` value as the expiration if that exists, otherwise, Wolverine will
diff --git a/docs/guide/durability/marten/operations.md b/docs/guide/durability/marten/operations.md
index 1d8e73657..0f6ea4631 100644
--- a/docs/guide/durability/marten/operations.md
+++ b/docs/guide/durability/marten/operations.md
@@ -29,7 +29,7 @@ public interface IMartenOp : ISideEffect
void Execute(IDocumentSession session);
}
```
-snippet source | anchor
+snippet source | anchor
The built in side effects can all be used from the `MartenOps` static class like this HTTP endpoint example:
diff --git a/docs/guide/extensions.md b/docs/guide/extensions.md
index db9f0cd76..a99dfba35 100644
--- a/docs/guide/extensions.md
+++ b/docs/guide/extensions.md
@@ -203,7 +203,7 @@ public class SampleAsyncExtension : IAsyncWolverineExtension
}
}
```
-snippet source | anchor
+snippet source | anchor
Which can be added to your application with this extension method on `IServiceCollection`:
@@ -222,7 +222,7 @@ using var host = await Host.CreateDefaultBuilder()
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
### Asynchronous Extensions and Wolverine.HTTP
@@ -287,5 +287,5 @@ using var host = await Microsoft.Extensions.Hosting.Host.CreateDefaultBuilder()
.StartAsync();
```
-snippet source | anchor
+snippet source | anchor
diff --git a/docs/guide/handlers/error-handling.md b/docs/guide/handlers/error-handling.md
index 689107f1c..7ab0e76c6 100644
--- a/docs/guide/handlers/error-handling.md
+++ b/docs/guide/handlers/error-handling.md
@@ -63,7 +63,7 @@ using var host = await Host.CreateDefaultBuilder()
opts.OnException().MoveToErrorQueue();
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
@@ -82,7 +82,7 @@ using var host = await Host.CreateDefaultBuilder()
.Discard();
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
You have to explicitly discard a message or it will eventually be sent to a dead letter queue when the message has exhausted its configured retries or requeues.
@@ -110,7 +110,7 @@ using var host = await Host.CreateDefaultBuilder()
.RetryWithCooldown(50.Milliseconds(), 100.Milliseconds(), 250.Milliseconds());
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
Or through attributes on a single message:
@@ -124,7 +124,7 @@ public class MessageWithBackoff
// whatever members
}
```
-snippet source | anchor
+snippet source | anchor
@@ -153,7 +153,7 @@ using var host = await Host.CreateDefaultBuilder()
.Requeue().AndPauseProcessing(10.Minutes());
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
@@ -190,7 +190,7 @@ public class AttributeUsingHandler
}
}
```
-snippet source | anchor
+snippet source | anchor
You can also use the fluent interface approach on a specific message type if you put a method with the signature `public static void Configure(HandlerChain chain)`
@@ -221,7 +221,7 @@ public class MyErrorCausingHandler
}
}
```
-snippet source | anchor
+snippet source | anchor
To specify global error handling rules, use the fluent interface directly on `WolverineOptions.Handlers` as shown below:
@@ -242,7 +242,7 @@ using var host = await Host.CreateDefaultBuilder()
.ScheduleRetry(5.Seconds());
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
TODO -- link to chain policies, after that exists:)
@@ -267,7 +267,7 @@ public class ErrorHandlingPolicy : IHandlerPolicy
}
}
```
-snippet source | anchor
+snippet source | anchor
## Exception Filtering
diff --git a/docs/guide/handlers/middleware.md b/docs/guide/handlers/middleware.md
index 23ced0018..b3944ceea 100644
--- a/docs/guide/handlers/middleware.md
+++ b/docs/guide/handlers/middleware.md
@@ -35,7 +35,7 @@ finally
logger.LogInformation("Ran something in " + stopwatch.ElapsedMilliseconds);
}
```
-snippet source | anchor
+snippet source | anchor
You've got a couple different options, but the easiest by far is to use Wolverine's conventional middleware approach.
@@ -69,7 +69,7 @@ public class StopwatchMiddleware
}
}
```
-snippet source | anchor
+snippet source | anchor
and that can be added to our application at bootstrapping time like this:
@@ -87,7 +87,7 @@ using var host = await Host.CreateDefaultBuilder()
chain.MessageType.IsInNamespace("MyApp.Messages.Important"));
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
And just for the sake of completeness, here's another version of the same functionality, but
@@ -118,7 +118,7 @@ public static class StopwatchMiddleware2
}
}
```
-snippet source | anchor
+snippet source | anchor
Alright, let's talk about what's happening in the code samples above:
@@ -155,7 +155,7 @@ finally
middleware.Finally();
}
```
-snippet source | anchor
+snippet source | anchor
Here's the rules for these conventional middleware classes:
@@ -350,7 +350,7 @@ public static class SomeHandler
}
}
```
-snippet source | anchor
+snippet source | anchor
Note that this attribute will accept multiple middleware types. Also note that the `[Middleware]` attribute can be placed either
@@ -410,7 +410,7 @@ public class StopwatchFrame : SyncFrame
}
}
```
-snippet source | anchor
+snippet source | anchor
@@ -430,7 +430,7 @@ public class StopwatchAttribute : ModifyChainAttribute
}
}
```
-snippet source | anchor
+snippet source | anchor
This attribute can now be placed either on a specific HTTP route endpoint method or message handler method to **only** apply to
@@ -450,7 +450,7 @@ public class ClockedEndpoint
}
}
```
-snippet source | anchor
+snippet source | anchor
Now, when the application is bootstrapped, this is the code that would be generated to handle the "GET /clocked" route:
@@ -515,7 +515,7 @@ public interface IHandlerPolicy : IWolverinePolicy
void Apply(IReadOnlyList chains, GenerationRules rules, IServiceContainer container);
}
```
-snippet source | anchor
+snippet source | anchor
Here's a simple sample that registers middleware on each handler chain:
@@ -531,7 +531,7 @@ public class WrapWithSimple : IHandlerPolicy
}
}
```
-snippet source | anchor
+snippet source | anchor
Then register your custom `IHandlerPolicy` with a Wolverine application like this:
@@ -542,7 +542,7 @@ Then register your custom `IHandlerPolicy` with a Wolverine application like thi
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts => { opts.Policies.Add(); }).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
## Using Configure(chain) Methods
diff --git a/docs/guide/handlers/persistence.md b/docs/guide/handlers/persistence.md
index a32b9c6d9..680be5834 100644
--- a/docs/guide/handlers/persistence.md
+++ b/docs/guide/handlers/persistence.md
@@ -137,7 +137,7 @@ public enum ValueSource
FromQueryString
}
```
-snippet source | anchor
+snippet source | anchor
Some other facts to know about `[Entity]` usage:
diff --git a/docs/guide/handlers/return-values.md b/docs/guide/handlers/return-values.md
index 807dd3f66..859a63760 100644
--- a/docs/guide/handlers/return-values.md
+++ b/docs/guide/handlers/return-values.md
@@ -65,7 +65,7 @@ public record WriteFile(string Path, string Contents)
}
}
```
-snippet source | anchor
+snippet source | anchor
```cs
// ISideEffect is a Wolverine marker interface
@@ -133,7 +133,7 @@ internal class WriteFilePolicy : IChainPolicy
}
}
```
-snippet source | anchor
+snippet source | anchor
and lastly, I'll register that policy in my Wolverine application at configuration time:
@@ -144,6 +144,6 @@ and lastly, I'll register that policy in my Wolverine application at configurati
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts => { opts.Policies.Add(); }).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
diff --git a/docs/guide/handlers/side-effects.md b/docs/guide/handlers/side-effects.md
index d3909e47d..3de55a213 100644
--- a/docs/guide/handlers/side-effects.md
+++ b/docs/guide/handlers/side-effects.md
@@ -35,7 +35,7 @@ public record WriteFile(string Path, string Contents)
}
}
```
-snippet source | anchor
+snippet source | anchor
```cs
// ISideEffect is a Wolverine marker interface
diff --git a/docs/guide/handlers/sticky.md b/docs/guide/handlers/sticky.md
index 41e22dc67..7dba52b26 100644
--- a/docs/guide/handlers/sticky.md
+++ b/docs/guide/handlers/sticky.md
@@ -25,7 +25,7 @@ message as an input.
```cs
public class StickyMessage;
```
-snippet source | anchor
+snippet source | anchor
And we're going to handle that `StickyMessage` message separately with two different handler types:
@@ -51,7 +51,7 @@ public static class GreenStickyHandler
}
}
```
-snippet source | anchor
+snippet source | anchor
::: tip
@@ -79,7 +79,7 @@ using var host = await Host.CreateDefaultBuilder()
opts.ListenAtPort(4000).Named("blue");
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
With all of that being said, the end result of the two `StickyMessage` handlers that are marked with `[StickyHandler]`
@@ -119,7 +119,7 @@ using var host = await Host.CreateDefaultBuilder()
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
## Configuring Local Queues
diff --git a/docs/guide/http/endpoints.md b/docs/guide/http/endpoints.md
index 355301f87..d8646315b 100644
--- a/docs/guide/http/endpoints.md
+++ b/docs/guide/http/endpoints.md
@@ -145,7 +145,7 @@ public static class TodoCreationEndpoint
}
}
```
-snippet source | anchor
+snippet source | anchor
In the case above, `TodoCreationResponse` is the first item in the tuple, so Wolverine treats that as
@@ -282,7 +282,7 @@ public interface IParameterStrategy
bool TryMatch(HttpChain chain, IServiceContainer container, ParameterInfo parameter, out Variable? variable);
}
```
-snippet source | anchor
+snippet source | anchor
As an example, let's say that you want any parameter of type `DateTimeOffset` that's named "now" to receive the current
@@ -308,7 +308,7 @@ public class NowParameterStrategy : IParameterStrategy
}
}
```
-snippet source | anchor
+snippet source | anchor
and register that strategy within our `MapWolverineEndpoints()` set up like so:
diff --git a/docs/guide/http/forms.md b/docs/guide/http/forms.md
index 60a6d6f9e..baa011409 100644
--- a/docs/guide/http/forms.md
+++ b/docs/guide/http/forms.md
@@ -70,7 +70,7 @@ public async Task use_decimal_form_hit()
body.ReadAsText().ShouldBe("Amount is 42.1");
}
```
-snippet source | anchor
+snippet source | anchor
diff --git a/docs/guide/http/index.md b/docs/guide/http/index.md
index 591d1edf8..29cf21b8c 100644
--- a/docs/guide/http/index.md
+++ b/docs/guide/http/index.md
@@ -57,7 +57,7 @@ public class CreateTodoHandler
}
}
```
-snippet source | anchor
+snippet source | anchor
Okay, but we still need to expose a web service endpoint for this functionality. We *could* utilize Wolverine within an MVC controller
@@ -81,7 +81,7 @@ public class TodoController : ControllerBase
}
}
```
-snippet source | anchor
+snippet source | anchor
Or we could do the same thing with Minimal API:
@@ -96,7 +96,7 @@ app.MapPost("/todoitems", async (CreateTodo command, IMessageBus bus) =>
return Results.Created($"/todoitems/{todo.Id}", todo);
}).Produces(201);
```
-snippet source | anchor
+snippet source | anchor
While the code above is certainly functional, and many teams are succeeding today using a similar strategy with older tools like
@@ -121,7 +121,7 @@ efficient delegation to the underlying Wolverine message handler:
// code of 200 instead of 201. If you care about that anyway.
app.MapPostToWolverine("/todoitems");
```
-snippet source | anchor
+snippet source | anchor
The code up above is very close to a functional equivalent to our early Minimal API or MVC Controller usage, but there's a
@@ -173,7 +173,7 @@ public static class TodoCreationEndpoint
}
}
```
-snippet source | anchor
+snippet source | anchor
The code above will actually generate the exact same OpenAPI documentation as the MVC Controller or Minimal API samples
diff --git a/docs/guide/http/metadata.md b/docs/guide/http/metadata.md
index 3b2313523..fba62d4fb 100644
--- a/docs/guide/http/metadata.md
+++ b/docs/guide/http/metadata.md
@@ -161,7 +161,7 @@ public record CreationResponse([StringSyntax("Route")]string Url) : IHttpAware
public static CreationResponse For(T value, string url) => new CreationResponse(url, value);
}
```
-snippet source | anchor
+snippet source | anchor
Any endpoint that returns `CreationResponse` or a sub class will automatically expose a status code of `201` for successful
diff --git a/docs/guide/http/middleware.md b/docs/guide/http/middleware.md
index e0244e871..9a966328a 100644
--- a/docs/guide/http/middleware.md
+++ b/docs/guide/http/middleware.md
@@ -177,7 +177,7 @@ public static class RequestIdMiddleware
}
}
```
-snippet source | anchor
+snippet source | anchor
And a matching `IHttpPolicy` to apply that middleware to any HTTP endpoint where there is a dependency on Wolverine's `IMessageContext` or `IMessageBus`:
@@ -201,7 +201,7 @@ internal class RequestIdPolicy : IHttpPolicy
}
}
```
-snippet source | anchor
+snippet source | anchor
Lastly, this particular policy is included by default, but if it wasn't, this is the code to apply it explicitly:
@@ -216,7 +216,7 @@ app.MapWolverineEndpoints(opts =>
opts.AddPolicy();
});
```
-snippet source | anchor
+snippet source | anchor
For simpler middleware application, you could also use this feature:
@@ -232,7 +232,7 @@ app.MapWolverineEndpoints(opts =>
c => c.HandlerCalls().Any(x => x.HandlerType.IsInNamespace("MyApp.Authenticated")));
});
```
-snippet source | anchor
+snippet source | anchor
## Required Inputs
@@ -273,7 +273,7 @@ public static class UpdateEndpoint
}
}
```
-snippet source | anchor
+snippet source | anchor
You'll notice that the `LoadAsync()` method is looking up the `Todo` entity for the route parameter, where Wolverine would
diff --git a/docs/guide/http/policies.md b/docs/guide/http/policies.md
index ccb9554f5..5492e27c9 100644
--- a/docs/guide/http/policies.md
+++ b/docs/guide/http/policies.md
@@ -20,7 +20,7 @@ public interface IHttpPolicy
void Apply(IReadOnlyList chains, GenerationRules rules, IServiceContainer container);
}
```
-snippet source | anchor
+snippet source | anchor
And then adding a policy to the `WolverineHttpOptions` like this code from the Fluent Validation extension for HTTP:
diff --git a/docs/guide/http/querystring.md b/docs/guide/http/querystring.md
index 2cf59819d..c0b068e6b 100644
--- a/docs/guide/http/querystring.md
+++ b/docs/guide/http/querystring.md
@@ -69,7 +69,7 @@ public async Task use_decimal_querystring_hit()
body.ReadAsText().ShouldBe("Amount is 42.1");
}
```
-snippet source | anchor
+snippet source | anchor
diff --git a/docs/guide/http/routing.md b/docs/guide/http/routing.md
index d82931ab8..75bf88e66 100644
--- a/docs/guide/http/routing.md
+++ b/docs/guide/http/routing.md
@@ -66,7 +66,7 @@ public static readonly Dictionary TypeOutputs = new()
{ typeof(DateOnly), typeof(DateOnly).FullName! }
};
```
-snippet source | anchor
+snippet source | anchor
::: warning
diff --git a/docs/guide/http/security.md b/docs/guide/http/security.md
index 6e3de66bc..d5979f5bf 100644
--- a/docs/guide/http/security.md
+++ b/docs/guide/http/security.md
@@ -26,5 +26,5 @@ public void RequireAuthorizeOnAll()
ConfigureEndpoints(e => e.RequireAuthorization());
}
```
-snippet source | anchor
+snippet source | anchor
diff --git a/docs/guide/messages.md b/docs/guide/messages.md
index 43ab7a0de..5e4b41fce 100644
--- a/docs/guide/messages.md
+++ b/docs/guide/messages.md
@@ -197,7 +197,7 @@ return new JsonSerializerSettings
PreserveReferencesHandling = PreserveReferencesHandling.Objects
};
```
-snippet source | anchor
+snippet source | anchor
To customize the Newtonsoft.Json serialization, use this option:
diff --git a/docs/guide/messaging/endpoint-operations.md b/docs/guide/messaging/endpoint-operations.md
index 681696aec..1394d4f5f 100644
--- a/docs/guide/messaging/endpoint-operations.md
+++ b/docs/guide/messaging/endpoint-operations.md
@@ -33,7 +33,7 @@ await bus.EndpointFor("One").InvokeAsync(new SomeMessage());
var answer = bus.EndpointFor("One")
.InvokeAsync(new Question());
```
-snippet source | anchor
+snippet source | anchor
There's another option to reference a messaging endpoint by `Uri` as shown below:
@@ -45,5 +45,5 @@ There's another option to reference a messaging endpoint by `Uri` as shown below
await bus.EndpointFor(new Uri("rabbitmq://queue/rabbit-one"))
.InvokeAsync(new SomeMessage());
```
-snippet source | anchor
+snippet source | anchor
diff --git a/docs/guide/messaging/message-bus.md b/docs/guide/messaging/message-bus.md
index ac3a8d52d..4cb24db3d 100644
--- a/docs/guide/messaging/message-bus.md
+++ b/docs/guide/messaging/message-bus.md
@@ -202,7 +202,7 @@ using var host = Host.CreateDefaultBuilder()
opts.EnableRemoteInvocation = false;
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
## Sending or Publishing Messages
@@ -230,7 +230,7 @@ public ValueTask SendMessage(IMessageContext bus)
return bus.SendAsync(@event);
}
```
-snippet source | anchor
+snippet source | anchor
That by itself will send the `InvoiceCreated` message to whatever subscribers are interested in
@@ -258,7 +258,7 @@ public ValueTask PublishMessage(IMessageContext bus)
return bus.PublishAsync(@event);
}
```
-snippet source | anchor
+snippet source | anchor
## Scheduling Message Delivery or Execution
diff --git a/docs/guide/messaging/subscriptions.md b/docs/guide/messaging/subscriptions.md
index dbfb1e5f8..13a138af1 100644
--- a/docs/guide/messaging/subscriptions.md
+++ b/docs/guide/messaging/subscriptions.md
@@ -229,7 +229,7 @@ public interface IMessageRoute
MessageSubscriptionDescriptor Describe();
}
```
-snippet source | anchor
+snippet source | anchor
This type "knows" about any endpoint or model sending customizations like delivery expiration
diff --git a/docs/guide/messaging/transports/azureservicebus/interoperability.md b/docs/guide/messaging/transports/azureservicebus/interoperability.md
index 4d695b3b1..d28b185b5 100644
--- a/docs/guide/messaging/transports/azureservicebus/interoperability.md
+++ b/docs/guide/messaging/transports/azureservicebus/interoperability.md
@@ -1,5 +1,9 @@
# Interoperability
+::: tip
+Also see the more generic [Wolverine Guide on Interoperability](/tutorials/interop)
+:::
+
Hey, it's a complicated world and Wolverine is a relative newcomer, so it's somewhat likely you'll find yourself needing to make a Wolverine application talk via Azure Service Bus to
a non-Wolverine application. Not to worry (too much), Wolverine has you covered with the ability to customize Wolverine to Azure Service Bus mapping.
@@ -30,14 +34,9 @@ public class CustomAzureServiceBusMapper : IAzureServiceBusEnvelopeMapper
// is for a listening endpoint
envelope.MessageType = typeof(Message1).ToMessageTypeName();
}
-
- public IEnumerable AllHeaders()
- {
- yield break;
- }
}
```
-snippet source | anchor
+snippet source | anchor
To apply that mapper to specific endpoints, use this syntax on any type of Azure Service Bus endpoint:
diff --git a/docs/guide/messaging/transports/azureservicebus/scheduled.md b/docs/guide/messaging/transports/azureservicebus/scheduled.md
index 74523be59..d5c70c677 100644
--- a/docs/guide/messaging/transports/azureservicebus/scheduled.md
+++ b/docs/guide/messaging/transports/azureservicebus/scheduled.md
@@ -27,7 +27,7 @@ public async Task SendScheduledMessage(IMessageContext bus, Guid invoiceId)
await bus.ScheduleAsync(message, DateTimeOffset.Now.AddDays(30));
}
```
-snippet source | anchor
+snippet source | anchor
And also use Azure Service Bus scheduled delivery for scheduled retries (assuming that the listening endpoint was an **inline** Azure Service Bus listener):
diff --git a/docs/guide/messaging/transports/gcp-pubsub/interoperability.md b/docs/guide/messaging/transports/gcp-pubsub/interoperability.md
index 3b92a8a98..e6d5b8673 100644
--- a/docs/guide/messaging/transports/gcp-pubsub/interoperability.md
+++ b/docs/guide/messaging/transports/gcp-pubsub/interoperability.md
@@ -1,5 +1,9 @@
# Interoperability
+::: tip
+Also see the more generic [Wolverine Guide on Interoperability](/tutorials/interop)
+:::
+
Hey, it's a complicated world and Wolverine is a relative newcomer, so it's somewhat likely you'll find yourself needing to make a Wolverine application talk via GCP Pub/Sub to
a non-Wolverine application. Not to worry (too much), Wolverine has you covered with the ability to customize Wolverine to GCP Pub/Sub mapping.
@@ -74,8 +78,8 @@ using var host = await Host.CreateDefaultBuilder()
{
opts.UsePubsub("your-project-id")
.UseConventionalRouting()
- .ConfigureListeners(l => l.InteropWith(e => new CustomPubsubMapper(e)))
- .ConfigureSenders(s => s.InteropWith(e => new CustomPubsubMapper(e)));
+ .ConfigureListeners(l => l.UseInterop((e, _) => new CustomPubsubMapper(e)))
+ .ConfigureSenders(s => s.UseInterop((e, _) => new CustomPubsubMapper(e)));
}).StartAsync();
```
snippet source | anchor
diff --git a/docs/guide/messaging/transports/kafka.md b/docs/guide/messaging/transports/kafka.md
index 411fba748..606f4aaf5 100644
--- a/docs/guide/messaging/transports/kafka.md
+++ b/docs/guide/messaging/transports/kafka.md
@@ -105,7 +105,7 @@ using var host = await Host.CreateDefaultBuilder()
opts.Services.AddResourceSetupOnStartup();
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
The various `Configure*****()` methods provide quick access to the full API of the Confluent Kafka library for security
@@ -129,6 +129,10 @@ public static ValueTask publish_by_partition_key(IMessageBus bus)
## Interoperability
+::: tip
+Also see the more generic [Wolverine Guide on Interoperability](/tutorials/interop)
+:::
+
It's a complex world out there, and it's more than likely you'll need your Wolverine application to interact with system
that aren't also Wolverine applications. At this time, it's possible to send or receive raw JSON through Kafka and Wolverine
by using the options shown below in test harness code:
@@ -194,7 +198,7 @@ public static class KafkaInstrumentation
}
}
```
-snippet source | anchor
+snippet source | anchor
## Connecting to Multiple Brokers
@@ -225,7 +229,7 @@ using var host = await Host.CreateDefaultBuilder()
// Other configuration
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
Note that the `Uri` scheme within Wolverine for any endpoints from a "named" Kafka broker is the name that you supply
diff --git a/docs/guide/messaging/transports/local.md b/docs/guide/messaging/transports/local.md
index 74409fd78..3322d03be 100644
--- a/docs/guide/messaging/transports/local.md
+++ b/docs/guide/messaging/transports/local.md
@@ -49,7 +49,7 @@ public ValueTask EnqueueToQueue(IMessageContext bus)
return bus.EndpointFor("highpriority").SendAsync(@event);
}
```
-snippet source | anchor
+snippet source | anchor
## Scheduling Local Execution
@@ -78,7 +78,7 @@ public async Task ScheduleLocally(IMessageContext bus, Guid invoiceId)
await bus.ScheduleAsync(message, DateTimeOffset.Now.AddDays(30));
}
```
-snippet source | anchor
+snippet source | anchor
@@ -315,21 +315,13 @@ using var host = await Host.CreateDefaultBuilder()
opts.LocalQueue("two")
.MaximumParallelMessages(5);
- // Or just edit the ActionBlock options directly
- opts.LocalQueue("three")
- .ConfigureExecution(options =>
- {
- options.MaxDegreeOfParallelism = 5;
- options.BoundedCapacity = 1000;
- });
-
// And finally, this enrolls a queue into the persistent inbox
// so that messages can happily be retained and processed
// after the service is restarted
opts.LocalQueue("four").UseDurableInbox();
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
diff --git a/docs/guide/messaging/transports/mqtt.md b/docs/guide/messaging/transports/mqtt.md
index 22b5d3f64..5a011506a 100644
--- a/docs/guide/messaging/transports/mqtt.md
+++ b/docs/guide/messaging/transports/mqtt.md
@@ -158,7 +158,7 @@ public interface ITenantMessage
string TenantId { get; }
}
```
-snippet source | anchor
+snippet source | anchor
To publish any message implementing that interface to an MQTT topic, you could specify the topic name logic like this:
@@ -193,7 +193,7 @@ builder.UseWolverine(opts =>
using var host = builder.Build();
await host.StartAsync();
```
-snippet source | anchor
+snippet source | anchor
## Listening by Topic Filter
@@ -287,14 +287,9 @@ public class MyMqttEnvelopeMapper : IMqttEnvelopeMapper
// is not successfully processed
// within 5 seconds
}
-
- public IEnumerable AllHeaders()
- {
- yield break;
- }
}
```
-snippet source | anchor
+snippet source | anchor
And apply that to an MQTT topic like so:
@@ -310,7 +305,7 @@ builder.UseWolverine(opts =>
opts.UseMqtt(mqtt =>
{
var mqttServer = builder.Configuration["mqtt_server"];
-
+
mqtt
.WithMaxPendingMessages(3)
.WithClientOptions(client => { client.WithTcpServer(mqttServer); });
@@ -325,12 +320,13 @@ builder.UseWolverine(opts =>
// with our custom strategy
.UseInterop(new MyMqttEnvelopeMapper())
.QualityOfService(MqttQualityOfServiceLevel.AtMostOnce);
+
});
using var host = builder.Build();
await host.StartAsync();
```
-snippet source | anchor
+snippet source | anchor
## Clearing Out Retained Messages
@@ -361,5 +357,81 @@ public static ClearMqttTopic Handle(TriggerZero message)
snippet source | anchor
+## Interoperability
+
+::: tip
+Also see the more generic [Wolverine Guide on Interoperability](/tutorials/interop)
+:::
+
+The Wolverine MQTT transport supports pluggable interoperability strategies through the `Wolverine.MQTT.IMqttEnvelopeMapper`
+interface to map from Wolverine's `Envelope` structure and MQTT's `MqttApplicationMessage` structure.
+
+Here's a simple example:
+
+
+
+```cs
+public class MyMqttEnvelopeMapper : IMqttEnvelopeMapper
+{
+ public void MapEnvelopeToOutgoing(Envelope envelope, MqttApplicationMessage outgoing)
+ {
+ // This is the only absolutely mandatory item
+ outgoing.PayloadSegment = envelope.Data;
+
+ // Maybe enrich this more?
+ outgoing.ContentType = envelope.ContentType;
+ }
+
+ public void MapIncomingToEnvelope(Envelope envelope, MqttApplicationMessage incoming)
+ {
+ // These are the absolute minimums necessary for Wolverine to function
+ envelope.MessageType = typeof(PaymentMade).ToMessageTypeName();
+ envelope.Data = incoming.PayloadSegment.Array;
+ // Optional items
+ envelope.DeliverWithin = 5.Seconds(); // throw away the message if it
+ // is not successfully processed
+ // within 5 seconds
+ }
+}
+```
+snippet source | anchor
+
+
+You will need to apply that mapper to each endpoint like so:
+
+
+
+```cs
+var builder = Host.CreateApplicationBuilder();
+
+builder.UseWolverine(opts =>
+{
+ // Connect to the MQTT broker
+ opts.UseMqtt(mqtt =>
+ {
+ var mqttServer = builder.Configuration["mqtt_server"];
+
+ mqtt
+ .WithMaxPendingMessages(3)
+ .WithClientOptions(client => { client.WithTcpServer(mqttServer); });
+ });
+
+ // Publish messages to MQTT topics based on
+ // the message type
+ opts.PublishAllMessages()
+ .ToMqttTopics()
+
+ // Tell Wolverine to map envelopes to MQTT messages
+ // with our custom strategy
+ .UseInterop(new MyMqttEnvelopeMapper())
+ .QualityOfService(MqttQualityOfServiceLevel.AtMostOnce);
+
+});
+
+using var host = builder.Build();
+await host.StartAsync();
+```
+snippet source | anchor
+
diff --git a/docs/guide/messaging/transports/pulsar.md b/docs/guide/messaging/transports/pulsar.md
index 06084caab..0e6fee769 100644
--- a/docs/guide/messaging/transports/pulsar.md
+++ b/docs/guide/messaging/transports/pulsar.md
@@ -118,3 +118,11 @@ builder.UseWolverine(opts =>
```
snippet source | anchor
+
+## Interoperability
+
+::: tip
+Also see the more generic [Wolverine Guide on Interoperability](/tutorials/interop)
+:::
+
+Pulsar interoperability is done through the `IPulsarEnvelopeMapper` interface.
diff --git a/docs/guide/messaging/transports/rabbitmq/conventional-routing.md b/docs/guide/messaging/transports/rabbitmq/conventional-routing.md
index 773063b89..05c8a6fe5 100644
--- a/docs/guide/messaging/transports/rabbitmq/conventional-routing.md
+++ b/docs/guide/messaging/transports/rabbitmq/conventional-routing.md
@@ -20,7 +20,7 @@ using var host = await Host.CreateDefaultBuilder()
.UseConventionalRouting();
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
With the defaults from above, for each message that the application can handle
@@ -73,7 +73,7 @@ using var host = await Host.CreateDefaultBuilder()
});
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
## Adjusting Routing Conventions
@@ -122,7 +122,7 @@ var receiver = WolverineHost.For(opts =>
});
});
```
-snippet source | anchor
+snippet source | anchor
diff --git a/docs/guide/messaging/transports/rabbitmq/deadletterqueues.md b/docs/guide/messaging/transports/rabbitmq/deadletterqueues.md
index 4b6fc6488..0681230a6 100644
--- a/docs/guide/messaging/transports/rabbitmq/deadletterqueues.md
+++ b/docs/guide/messaging/transports/rabbitmq/deadletterqueues.md
@@ -31,7 +31,7 @@ using var host = await Host.CreateDefaultBuilder()
.DeadLetterQueueing(new DeadLetterQueue("incoming-errors"));
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
::: warning
@@ -66,7 +66,7 @@ using var host = await Host.CreateDefaultBuilder()
.DeadLetterQueueing(new DeadLetterQueue("incoming-errors", DeadLetterQueueMode.InteropFriendly));
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
And lastly, if you don't particularly want to have any Rabbit MQ dead letter queues and you quite like the [database backed
@@ -93,7 +93,7 @@ using var host = await Host.CreateDefaultBuilder()
opts.ListenToRabbitQueue("incoming").DisableDeadLetterQueueing();
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
diff --git a/docs/guide/messaging/transports/rabbitmq/index.md b/docs/guide/messaging/transports/rabbitmq/index.md
index 6b225361b..b079357ca 100644
--- a/docs/guide/messaging/transports/rabbitmq/index.md
+++ b/docs/guide/messaging/transports/rabbitmq/index.md
@@ -80,7 +80,7 @@ using var host = await Host.CreateDefaultBuilder()
});
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
To only send Rabbit MQ messages, but never receive them:
@@ -109,7 +109,7 @@ using var host = await Host.CreateDefaultBuilder()
});
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
## Aspire Integration
@@ -155,7 +155,7 @@ using var host = await Host.CreateDefaultBuilder()
.EnableWolverineControlQueues();
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
@@ -191,7 +191,7 @@ using var host = await Host.CreateDefaultBuilder()
});
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
Of course, doing so means that you will not be able to do request/reply through Rabbit MQ with your Wolverine application.
diff --git a/docs/guide/messaging/transports/rabbitmq/interoperability.md b/docs/guide/messaging/transports/rabbitmq/interoperability.md
index 833c93903..1347ab71a 100644
--- a/docs/guide/messaging/transports/rabbitmq/interoperability.md
+++ b/docs/guide/messaging/transports/rabbitmq/interoperability.md
@@ -1,5 +1,9 @@
# Interoperability
+::: tip
+Also see the more generic [Wolverine Guide on Interoperability](/tutorials/interop)
+:::
+
Hey, it's a complicated world and Wolverine is a relative newcomer, so it's somewhat likely you'll find yourself needing to make a Wolverine application talk via Rabbit MQ to
a non-Wolverine application. Not to worry (too much), Wolverine has you covered with the ability to customize Wolverine to Rabbit MQ mapping and some built in recipes for
interoperability with commonly used .NET messaging frameworks.
@@ -39,7 +43,7 @@ builder.UseWolverine(opts =>
using var host = builder.Build();
await host.StartAsync();
```
-snippet source | anchor
+snippet source | anchor
With this setting, there is **no other required headers** for Wolverine to process incoming messages. However, Wolverine will be
@@ -99,14 +103,9 @@ public class SpecialMapper : IRabbitMqEnvelopeMapper
envelope.TenantId = (string)tenantId;
}
}
-
- public IEnumerable AllHeaders()
- {
- yield break;
- }
}
```
-snippet source | anchor
+snippet source | anchor
And register that special mapper like this:
@@ -135,7 +134,7 @@ builder.UseWolverine(opts =>
using var host = builder.Build();
await host.StartAsync();
```
-snippet source | anchor
+snippet source | anchor
@@ -161,8 +160,6 @@ At this point, the interoperability is only built and tested for the [Rabbit MQ
Here's a sample:
-
-
```cs
Wolverine = await Host.CreateDefaultBuilder().UseWolverine(opts =>
{
@@ -188,8 +185,6 @@ Wolverine = await Host.CreateDefaultBuilder().UseWolverine(opts =>
opts.Policies.RegisterInteropMessageAssembly(typeof(IInterfaceMessage).Assembly);
}).StartAsync();
```
-snippet source | anchor
-
## Interoperability with Mass Transit
@@ -204,8 +199,6 @@ with MassTransit, and don't try to use that endpoint for Wolverine to Wolverine
The configuration to do this is shown below:
-
-
```cs
Wolverine = await Host.CreateDefaultBuilder().UseWolverine(opts =>
{
@@ -233,5 +226,3 @@ Wolverine = await Host.CreateDefaultBuilder().UseWolverine(opts =>
.DefaultIncomingMessage().UseForReplies();
}).StartAsync();
```
-snippet source | anchor
-
diff --git a/docs/guide/messaging/transports/rabbitmq/listening.md b/docs/guide/messaging/transports/rabbitmq/listening.md
index 560ed08a9..f83a38fb2 100644
--- a/docs/guide/messaging/transports/rabbitmq/listening.md
+++ b/docs/guide/messaging/transports/rabbitmq/listening.md
@@ -38,7 +38,7 @@ using var host = await Host.CreateDefaultBuilder()
});
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
To optimize and tune the message processing, you may want to read more about the [Rabbit MQ prefetch count and prefetch
@@ -80,5 +80,5 @@ using var host = await Host.CreateDefaultBuilder()
});
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
diff --git a/docs/guide/messaging/transports/rabbitmq/multiple-brokers.md b/docs/guide/messaging/transports/rabbitmq/multiple-brokers.md
index 1a4562d62..b6c04c19e 100644
--- a/docs/guide/messaging/transports/rabbitmq/multiple-brokers.md
+++ b/docs/guide/messaging/transports/rabbitmq/multiple-brokers.md
@@ -39,7 +39,7 @@ builder.UseWolverine(opts =>
opts.PublishAllMessages().ToRabbitTopicsOnNamedBroker(external, "topics");
});
```
-snippet source | anchor
+snippet source | anchor
The `Uri` values for endpoints to the additional broker follows the same rules as the normal usage of the Rabbit MQ
diff --git a/docs/guide/messaging/transports/rabbitmq/object-management.md b/docs/guide/messaging/transports/rabbitmq/object-management.md
index 10397daf9..9516a48da 100644
--- a/docs/guide/messaging/transports/rabbitmq/object-management.md
+++ b/docs/guide/messaging/transports/rabbitmq/object-management.md
@@ -30,7 +30,7 @@ using var host = await Host.CreateDefaultBuilder()
opts.PublishAllMessages().ToRabbitExchange("exchange1");
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
At development time -- or occasionally in production systems -- you may want to have the messaging
@@ -47,7 +47,7 @@ using var host = await Host.CreateDefaultBuilder()
.AutoPurgeOnStartup();
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
Or you can be more selective and only have certain queues of volatile messages purged
@@ -64,7 +64,7 @@ using var host = await Host.CreateDefaultBuilder()
.DeclareQueue("queue2", q => q.PurgeOnStartup = true);
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
Wolverine's Rabbit MQ integration also supports the [Oakton stateful resource](https://jasperfx.github.io/oakton/guide/host/resources.html) model,
@@ -205,7 +205,7 @@ builder.UseWolverine(opts =>
});
```
-snippet source | anchor
+snippet source | anchor
There are just a few things to know:
@@ -235,6 +235,6 @@ public class MyModuleExtension : IWolverineExtension
}
}
```
-snippet source | anchor
+snippet source | anchor
diff --git a/docs/guide/messaging/transports/rabbitmq/publishing.md b/docs/guide/messaging/transports/rabbitmq/publishing.md
index b6352655b..161517fe0 100644
--- a/docs/guide/messaging/transports/rabbitmq/publishing.md
+++ b/docs/guide/messaging/transports/rabbitmq/publishing.md
@@ -23,7 +23,7 @@ using var host = await Host.CreateDefaultBuilder()
opts.PublishAllMessages().ToRabbitQueue("special", queue => { queue.IsExclusive = true; });
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
## Publish to an Exchange
@@ -55,7 +55,7 @@ using var host = await Host.CreateDefaultBuilder()
});
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
## Publish to a Routing Key
@@ -84,6 +84,6 @@ using var host = await Host.CreateDefaultBuilder()
opts.PublishAllMessages().ToRabbitExchange("exchange1");
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
diff --git a/docs/guide/messaging/transports/rabbitmq/topics.md b/docs/guide/messaging/transports/rabbitmq/topics.md
index a87bdc093..8fa6f0e1b 100644
--- a/docs/guide/messaging/transports/rabbitmq/topics.md
+++ b/docs/guide/messaging/transports/rabbitmq/topics.md
@@ -23,7 +23,7 @@ using var host = await Host.CreateDefaultBuilder()
opts.ListenToRabbitQueue("");
}).StartAsync();
```
-snippet source | anchor
+snippet source | anchor
While we're specifying the exchange name ("topics-exchange"), we did nothing to specify the topic
@@ -35,7 +35,7 @@ name. With this set up, when you publish a message in this application like so:
var publisher = host.MessageBus();
await publisher.SendAsync(new Message1());
```
-snippet source | anchor
+snippet source | anchor
You will be sending that message to the "topics-exchange" with a topic name derived from
@@ -70,7 +70,7 @@ Of course, you can always explicitly send a message to a specific topic with thi
```cs
await publisher.BroadcastToTopicAsync("color.*", new Message1());
```
-snippet source | anchor
+snippet source | anchor
Note two things about the code above:
@@ -125,7 +125,7 @@ public interface ITenantMessage
string TenantId { get; }
}
```
-snippet source | anchor
+snippet source | anchor
Let's say that any message that implements that interface, we want published to the
@@ -152,5 +152,5 @@ builder.UseWolverine(opts =>
using var host = builder.Build();
await host.StartAsync();
```
-snippet source | anchor
+snippet source | anchor
diff --git a/docs/guide/messaging/transports/sns.md b/docs/guide/messaging/transports/sns.md
index 9656523e9..c61dcfaeb 100644
--- a/docs/guide/messaging/transports/sns.md
+++ b/docs/guide/messaging/transports/sns.md
@@ -1,7 +1,7 @@
# Using Amazon SNS
::: warning
-At this moment, Wolverine cannot support request/reply mechanics (`IMessageBus.InvokeAsync()`).
+At this moment, Wolverine cannot support request/reply mechanics (`IMessageBus.InvokeAsync()`) with SNS~~~~.
:::
:::tip
@@ -190,3 +190,12 @@ var host = await Host.CreateDefaultBuilder()
```
snippet source | anchor
+
+## Interoperability
+
+::: tip
+Also see the more generic [Wolverine Guide on Interoperability](/tutorials/interop)
+:::
+
+SNS interoperability is done through the `ISnsEnvelopeMapper`. At this point, SNS supports interoperability through
+MassTransit, NServiceBus, CloudEvents, or user defined mapping strategies.
diff --git a/docs/guide/messaging/transports/sqs/interoperability.md b/docs/guide/messaging/transports/sqs/interoperability.md
index 5f0f24a01..bd8d8c3aa 100644
--- a/docs/guide/messaging/transports/sqs/interoperability.md
+++ b/docs/guide/messaging/transports/sqs/interoperability.md
@@ -1,5 +1,9 @@
# Interoperability
+::: tip
+Also see the more generic [Wolverine Guide on Interoperability](/tutorials/interop)
+:::
+
Hey, it's a complicated world and Wolverine is a relative newcomer, so it's somewhat likely you'll find yourself needing to make a Wolverine application talk via AWS SQS to
a non-Wolverine application. Not to worry (too much), Wolverine has you covered with the ability to customize Wolverine to Amazon SQS mapping.
diff --git a/docs/guide/messaging/transports/tcp.md b/docs/guide/messaging/transports/tcp.md
index 6ce1f7dbc..a457bc0cf 100644
--- a/docs/guide/messaging/transports/tcp.md
+++ b/docs/guide/messaging/transports/tcp.md
@@ -73,7 +73,7 @@ await bus.EndpointFor("One").InvokeAsync(new SomeMessage());
var answer = bus.EndpointFor("One")
.InvokeAsync(new Question());
```
-snippet source | anchor
+snippet source | anchor
or use `ToServerAndPort()` to send messages to a port on another machine:
diff --git a/docs/tutorials/cqrs-with-marten.md b/docs/tutorials/cqrs-with-marten.md
index 187c0172c..b5c404cf4 100644
--- a/docs/tutorials/cqrs-with-marten.md
+++ b/docs/tutorials/cqrs-with-marten.md
@@ -312,7 +312,7 @@ public void unit_test()
]);
}
```
-snippet source | anchor
+snippet source | anchor
::: tip
@@ -366,6 +366,7 @@ public class AppFixture : IAsyncLifetime
// its implied Program.Main() set up
Host = await AlbaHost.For(x =>
{
+
// Just showing that you *can* override service
// registrations for testing if that's useful
x.ConfigureServices(services =>
@@ -373,6 +374,10 @@ public class AppFixture : IAsyncLifetime
// If wolverine were using Rabbit MQ / SQS / Azure Service Bus,
// turn that off for now
services.DisableAllExternalWolverineTransports();
+
+ /// THIS IS IMPORTANT!
+ services.MartenDaemonModeIsSolo();
+ services.RunWolverineInSoloMode();
});
});
@@ -385,7 +390,7 @@ public class AppFixture : IAsyncLifetime
}
}
```
-snippet source | anchor
+snippet source | anchor
And I like to add a base class for integration tests with some convenience methods that have
@@ -418,6 +423,9 @@ public abstract class IntegrationContext : IAsyncLifetime
// Using Marten, wipe out all data and reset the state
// back to exactly what we described in InitialAccountData
await Store.Advanced.ResetAllData();
+
+ // SWitch to this instead please!!!! A super set of the above ^^^
+ await Host.ResetAllMartenDataAsync();
}
// This is required because of the IAsyncLifetime
@@ -455,7 +463,7 @@ public abstract class IntegrationContext : IAsyncLifetime
}
}
```
-snippet source | anchor
+snippet source | anchor
With all of that in place (and if you're using Docker for your infrastructure, a quick `docker compose up -d` command),
@@ -483,12 +491,16 @@ public async Task happy_path_end_to_end()
// Reaching into Marten to build the current state of the new Incident
// just to check the expected outcome
using var session = Host.DocumentStore().LightweightSession();
- var incident = await session.Events.AggregateStreamAsync(response.Value);
+
+
+
+ // This wallpapers over the exact projection lifecycle....
+ var incident = await session.Events.FetchLatest(response.Value);
incident.Status.ShouldBe(IncidentStatus.Pending);
}
```
-snippet source | anchor
+snippet source | anchor
## Appending Events to an Existing Stream
diff --git a/docs/tutorials/interop.md b/docs/tutorials/interop.md
index a210e6403..88f6e9bba 100644
--- a/docs/tutorials/interop.md
+++ b/docs/tutorials/interop.md
@@ -1,5 +1,12 @@
# Interoperability with Non Wolverine Systems
+::: warning
+We greatly expanded the interoperability options in Wolverine for 5.0, but some of the integrations may not have widely been used
+in real applications outside of testing by the time you try to use especially the MassTransit or NServiceBus for transports besides
+Rabbit MQ or CloudEvents with any transport. Please feel free to post issues to GitHub or use the Discord server to report
+any issues.
+:::
+
It's a complicated world, Wolverine is a relative newcomer in the asynchronous messaging space in the .NET ecosystem,
and who knows what other systems on completely different technical platforms you might have going on. As Wolverine has
gained adoption, and as a prerequisite for other folks to even consider adopting Wolverine, we've had to improve Wolverine's
@@ -18,6 +25,38 @@ that holds the .NET message object and/or the binary representation of the messa
* Information about expected replies and a `ReplyUri` that tells Wolverine where to send any responses to the current message
* Other headers
+Here's a little sample of how an `Envelope` might be used internally by Wolverine:
+
+
+
+```cs
+var message = new ApproveInvoice("1234");
+
+// I'm really creating an outgoing message here
+var envelope = new Envelope(message);
+
+// This information is assigned internally,
+// but it's good to know that it exists
+envelope.CorrelationId = "AAA";
+
+// This would refer to whatever Wolverine message
+// started a set of related activity
+envelope.ConversationId = Guid.NewGuid();
+
+// For both outgoing and incoming messages,
+// this identifies how the message data is structured
+envelope.ContentType = "application/json";
+
+// When using multi-tenancy, this is used to track
+// what tenant a message applies to
+envelope.TenantId = "222";
+
+// Not every broker cares about this of course
+envelope.GroupId = "BBB";
+```
+snippet source | anchor
+
+
As you can probably imagine, Wolverine uses this structure all throughout its internals to handle, send, track, and otherwise
coordinate message processing. When using Wolverine with external transport brokers like Kafka, Pulsar, Google Pubsub,
or Rabbit MQ, Wolverine goes through a bi-directional mapping from whatever each broker's own representation of a "message"
@@ -29,4 +68,361 @@ it's having to map its `Envelope` to the transport's outgoing message structure
As you can probably surmise from the diagram, there's an important abstraction in Wolverine called an "envelope mapper"
that does the work of translating Wolverine's `Envelope` structure to and from each message broker's own model for messages.
-MORE HERE!!!
+These abstractions are a little bit different for each external broker, and Wolverine provides some built in mappers for
+common interoperability scenarios:
+
+| Transport | Envelope Mapper Name | Built In Interop |
+|-------------------------------------------------------------------|------------------------------------------------------------------------------------------------|-------------------------------------------------|
+| [Rabbit MQ](/guide/messaging/transports/rabbitmq/) | [IRabbitMqEnvelpoeMapper](/guide/messaging/transports/rabbitmq/interoperability) | MassTransit, NServiceBus, CloudEvents, Raw Json |
+| [Azure Service Bus](/guide/messaging/transports/azureservicebus/) | [IAzureServiceBusEnvelopeMapper](/guide/messaging/transports/azureservicebus/interoperability) | MassTransit, NServiceBus, CloudEvents, Raw Json |
+| [Amazon SQS](/guide/messaging/transports/sqs/) | [ISqsEnvelopeMapper](/guide/messaging/transports/sqs/interoperability) | MassTransit, NServiceBus, CloudEvents, Raw Json |
+| [Amazon SNS](/guide/messaging/transports/sns) | [ISnsEnvelopeMapper](/guide/messaging/transports/sns.html#interoperability) | MassTransit, NServiceBus, CloudEvents, Raw Json |
+| [Kafka](/guide/messaging/transports/kafka) | [IKafkaEnvelopeMapper](/guide/messaging/transports/kafka.html#interoperability) | CloudEvents, Raw Json |
+| [Apache Pulsar](/guide/messaging/transports/pulsar) | [IPulsarEnvelopeMapper](/guide/messaging/transports/pulsar.html#interoperability) | CloudEvents |
+| [MQTT](/guide/messaging/transports/mqtt) | [IMqttEnvelopeMapper](/guide/messaging/transports/mqtt.html#interoperability)] | CloudEvents |
+
+## Writing a Custom Envelope Mapper
+
+Let's say that you're needing to interact with an upstream system that publishes messages to Wolverine
+through an external message broker in a format that's
+completely different than what Wolverine itself uses or any built in envelope mapping recipe -- which is actually quite common.
+
+When you map incoming transport messages to Wolverine's `Envelope`, **at a bare minimum**, Wolverine needs to know the binary data that Wolverine will later try to deserialize to a .NET type
+in its own execution pipeline (`Envelope.Data`) and how to read that binary data into a .NET message object. When Wolverine
+tries to handle an incoming `Envelope` in its execution pipeline, it will:
+
+1. Start some Open Telemetry span tracking using the metadata from the incoming `Envelope` to create traceability between the
+ upstream publisher and the current message execution. You don't *have* to support this in your custom mapper, but you'd ideally *like* to have this.
+2. Checks if the `Envelope` has expired based on its `DeliverBy` property, and discards the `Envelope` if so
+3. Tries to choose a [message serializer](https://wolverinefx.net/guide/messages.html) based on the `Envelope.Serializer`, then the matching serializer based on `Envelope.ContentType` if that exists, then it falls through to
+ the default serializer for the application (SystemTextJson by default) just in case the default serializer.
+
+As is hopefully clear from that series of steps above, when you are writing to the incoming `Envelope` in a custom message,
+you have to set the binary data for the incoming message, you'd ideally like to set the correlation information on `Envelope`
+to reflect the incoming data, and you need to either set at least `Envelope.MessageType` so Wolverine knows what
+message type to try to deserialize to, or just set a specific `IMessageSerializer` on `Envelope.Serializer` that Wolverine
+assumes will "know" how to build out the right type and maybe even infer more valuable metadata to the `Envelope` from
+the raw binary data (the MassTransit and CloudEvents interoperability works this way).
+
+In this first sample, I'm going to write a simplistic mapper for Kafka that assumes everything coming into an
+endpoint is JSON and a specific type:
+
+
+
+```cs
+// Simplistic envelope mapper that expects every message to be of
+// type "T" and serialized as JSON that works perfectly well w/ our
+// application's default JSON serialization
+public class OurKafkaJsonMapper : IKafkaEnvelopeMapper
+{
+ // Wolverine needs to know the
+ private readonly string _messageTypeName = typeof(TMessage).ToMessageTypeName();
+
+ // Map the Wolverine Envelope structure to the outgoing Kafka structure
+ public void MapEnvelopeToOutgoing(Envelope envelope, Message outgoing)
+ {
+ // We'll come back to this later...
+ throw new NotSupportedException();
+ }
+
+ // Map the incoming message from Kafka to the incoming Wolverine envelope
+ public void MapIncomingToEnvelope(Envelope envelope, Message incoming)
+ {
+ // We're making an assumption here that only one type of message
+ // is coming in on this particular Kafka topic, so we're telling
+ // Wolverine what the message type is according to Wolverine's own
+ // message naming scheme
+ envelope.MessageType = _messageTypeName;
+
+ // Tell Wolverine to use JSON serialization for the message
+ // data
+ envelope.ContentType = "application/json";
+
+ // Put the raw binary data right on the Envelope where
+ // Wolverine "knows" how to get at it later
+ envelope.Data = incoming.Value;
+ }
+}
+```
+snippet source | anchor
+
+
+Which is essentially how the built in "Raw JSON" mapper works in external transport mappers. In the envelope mapper above
+we can assume that the actual message data is something that a straightforward serializer can deal with the raw data, and
+we really just need to deal with setting a few headers.
+
+In some cases you might just have to do a little bit different mapping of header information to `Envelope` properties
+than Wolverine's built in protocol. For most transports (Amazon SQS and SNS are the exceptions), you can just modify
+the "header name to Envelope" mappings something like this example from Azure Service Bus:
+
+
+
+```cs
+var builder = Host.CreateApplicationBuilder();
+builder.UseWolverine(opts =>
+{
+ // One way or another, you're probably pulling the Azure Service Bus
+ // connection string out of configuration
+ var azureServiceBusConnectionString = builder
+ .Configuration
+ .GetConnectionString("azure-service-bus");
+
+ // Connect to the broker in the simplest possible way
+ opts.UseAzureServiceBus(azureServiceBusConnectionString).AutoProvision();
+
+ // I overrode the buffering limits just to show
+ // that they exist for "back pressure"
+ opts.ListenToAzureServiceBusQueue("incoming")
+ .UseInterop((queue, mapper) =>
+ {
+ // Not sure how useful this would be, but we can start from
+ // the baseline Wolverine mapping and just override a few mappings
+ mapper.MapPropertyToHeader(x => x.ContentType, "OtherTool.ContentType");
+ mapper.MapPropertyToHeader(x => x.CorrelationId, "OtherTool.CorrelationId");
+ // and more
+
+ // or a little uglier where you might be mapping and transforming data between
+ // the transport's model and the Wolverine Envelope
+ mapper.MapProperty(x => x.ReplyUri,
+ (e, msg) => e.ReplyUri = new Uri($"asb://queue/{msg.ReplyTo}"),
+ (e, msg) => msg.ReplyTo = "response");
+
+ });
+
+});
+```
+snippet source | anchor
+
+
+That code isn't necessarily for the feint of heart, but that will sometimes be an easier recipe than trying to write
+a custom mapper from scratch. The NServiceBus interoperability for everything but Amazon SQS/SNS transports uses this
+approach:
+
+
+
+```cs
+public void UseNServiceBusInterop()
+{
+ // We haven't tried to address this yet, but NSB can stick in some characters
+ // that STJ chokes on, but good ol' Newtonsoft handles just fine
+ DefaultSerializer = new NewtonsoftSerializer(new JsonSerializerSettings());
+
+ customizeMapping((m, _) =>
+ {
+ m.MapPropertyToHeader(x => x.ConversationId, "NServiceBus.ConversationId");
+ m.MapPropertyToHeader(x => x.SentAt, "NServiceBus.TimeSent");
+ m.MapPropertyToHeader(x => x.CorrelationId!, "NServiceBus.CorrelationId");
+
+ var replyAddress = new Lazy(() =>
+ {
+ var replyEndpoint = (RabbitMqEndpoint)_parent.ReplyEndpoint()!;
+ return replyEndpoint.RoutingKey();
+ });
+
+ void WriteReplyToAddress(Envelope e, IBasicProperties props)
+ {
+ props.Headers["NServiceBus.ReplyToAddress"] = replyAddress.Value;
+ }
+
+ void ReadReplyUri(Envelope e, IReadOnlyBasicProperties props)
+ {
+ if (props.Headers.TryGetValue("NServiceBus.ReplyToAddress", out var raw))
+ {
+ var queueName = (raw is byte[] b ? Encoding.Default.GetString(b) : raw.ToString())!;
+ e.ReplyUri = new Uri($"{_parent.Protocol}://queue/{queueName}");
+ }
+ }
+
+ m.MapProperty(x => x.ReplyUri!, ReadReplyUri, WriteReplyToAddress);
+ });
+}
+```
+snippet source | anchor
+
+
+Finally, here's another example that works quite differently where the mapper sets a serializer directly on the `Envelope`:
+
+
+
+```cs
+// This guy is the envelope mapper for interoperating
+// with MassTransit
+internal class MassTransitMapper : ISqsEnvelopeMapper
+{
+ private readonly IMassTransitInteropEndpoint _endpoint;
+ private MassTransitJsonSerializer _serializer;
+
+ public MassTransitMapper(IMassTransitInteropEndpoint endpoint)
+ {
+ _endpoint = endpoint;
+ _serializer = new MassTransitJsonSerializer(endpoint);
+ }
+
+ public MassTransitJsonSerializer Serializer => _serializer;
+
+ public string BuildMessageBody(Envelope envelope)
+ {
+ return Encoding.UTF8.GetString(_serializer.Write(envelope));
+ }
+
+ public IEnumerable> ToAttributes(Envelope envelope)
+ {
+ yield break;
+ }
+
+ public void ReadEnvelopeData(Envelope envelope, string messageBody, IDictionary attributes)
+ {
+ // TODO -- this could be more efficient of course
+ envelope.Data = Encoding.UTF8.GetBytes(messageBody);
+
+ // This is the really important part
+ // of the mapping
+ envelope.Serializer = _serializer;
+ }
+}
+```
+snippet source | anchor
+
+
+In the case above, the `MassTransitSerializer` is a two step process that first deserializes a JSON document that contains
+metadata about the message and also embedded JSON for the actual message, then figures out the proper message type to deserialize
+the inner JSON and *finally* sends the real message and all the expected correlation metadata about the message on to
+Wolverine's execution pipeline in such a way that Wolverine can create traceability between MassTransit on the other side and
+Wolverine.
+
+## Interop with MassTransit
+
+AWS SQS, Azure Service Bus, or Rabbit MQ can interoperate with MassTransit by opting into this setting on an endpoint
+by endpoint basis as shown in this sample with Rabbit MQ:
+
+
+
+```cs
+using var host = await Host.CreateDefaultBuilder()
+ .UseWolverine(opts =>
+ {
+ // *A* way to configure Rabbit MQ using their Uri schema
+ // documented here: https://www.rabbitmq.com/uri-spec.html
+ opts.UseRabbitMq(new Uri("amqp://localhost"));
+
+ // Set up a listener for a queue
+ opts.ListenToRabbitQueue("incoming1")
+
+ // There is a limitation here in that you will also
+ // have to tell Wolverine what the message type is
+ // because it cannot today figure out what the Wolverine
+ // message type in the current application is from
+ // MassTransit's metadata
+ .DefaultIncomingMessage()
+ .UseMassTransitInterop(
+
+ // This is optional, but just letting you know it's there
+ interop =>
+ {
+ interop.UseSystemTextJsonForSerialization(stj =>
+ {
+ // Don't worry all of this is optional, but
+ // just making sure you know that you can configure
+ // JSON serialization to work seamlessly with whatever
+ // the application on the other end is doing
+ });
+ });
+ }).StartAsync();
+```
+snippet source | anchor
+
+
+Here's some details that you will need to know:
+
+* While Wolverine *can* send message type information to MassTransit, Wolverine is not (yet) able to glean the message
+ type from MassTransit metadata, so you will have to hard code the incoming message type for a particular Wolverine endpoint
+ that is receiving messages from a MassTransit application
+* Wolverine is able to do request/reply semantics with MassTransit, but there might be hiccups using Wolverine's automatic reply queues just because
+ of differing naming conventions or reserved characters leaking through.
+* You probably want to use the `RegisterInteropMessageAssembly(Assembly)` for any assemblies of reused DTO message types between
+ MassTransit and your Wolverine application to help Wolverine be able to map from NServiceBus publishing by an interface and Wolverine only
+ handling concrete types
+
+## Interop with NServiceBus
+
+NServiceBus has a wire protocol that is much more similar to Wolverine and works a little more cleanly -- except for Amazon SQS or SNS that is again, weird.
+
+For the transports that support NServiceBus, opt into the interoperability on an endpoint by endpoint basis with this syntax:
+
+
+
+```cs
+var builder = Host.CreateApplicationBuilder();
+builder.UseWolverine(opts =>
+{
+ // One way or another, you're probably pulling the Azure Service Bus
+ // connection string out of configuration
+ var azureServiceBusConnectionString = builder
+ .Configuration
+ .GetConnectionString("azure-service-bus");
+
+ // Connect to the broker in the simplest possible way
+ opts.UseAzureServiceBus(azureServiceBusConnectionString).AutoProvision();
+
+ // I overrode the buffering limits just to show
+ // that they exist for "back pressure"
+ opts.ListenToAzureServiceBusQueue("incoming")
+ .UseNServiceBusInterop();
+
+ // This facilitates messaging from NServiceBus (or MassTransit) sending as interface
+ // types, whereas Wolverine only wants to deal with concrete types
+ opts.Policies.RegisterInteropMessageAssembly(typeof(IInterfaceMessage).Assembly);
+});
+```
+snippet source | anchor
+
+
+And some details that you will need to know:
+
+* Wolverine is able to detect the message type from the standard NServiceBus headers. You *might* need to utilize the [message type aliasing](/guide/messages.html#message-type-name-or-alias) to match
+ the NServiceBus name for a message type
+* You probably want to use the `RegisterInteropMessageAssembly(Assembly)` for any assemblies of reused DTO message types between
+ NServiceBus and your Wolverine application to help Wolverine be able to map from NServiceBus publishing by an interface and Wolverine only
+ handling concrete types
+* Wolverine does support request/reply interactions with NServiceBus. Wolverine is able to interpret and also translate to NServiceBus's version of Wolverine's `Envelope.ReplyUri`
+
+## Interop with CloudEvents
+
+We're honestly not sure how pervasive the [CloudEvents specification](https://cloudevents.io/) is really used outside of
+Microsoft's [Dapr](https://dapr.io/), but there have been enough mentions of this from the Wolverine community to justify its adoption.
+
+CloudEvents works by publishing messages in its own standardized JSON [envelope wrapper](). The Wolverine to CloudEvents interoperability
+is mapping between Wolverine's `Envelope` and the CloudEvents JSON payload, with the actual message data being embedded in
+the CloudEvents JSON.
+
+For the transports that support CloudEvents, you need to opt into the CloudEvents interoperability on an endpoint by endpoint
+basis like this:
+
+
+
+```cs
+using var host = await Host.CreateDefaultBuilder()
+ .UseWolverine(opts =>
+ {
+ // *A* way to configure Rabbit MQ using their Uri schema
+ // documented here: https://www.rabbitmq.com/uri-spec.html
+ opts.UseRabbitMq(new Uri("amqp://localhost"));
+
+ // Set up a listener for a queue
+ opts.ListenToRabbitQueue("incoming1")
+
+ // Just note that you *can* override the STJ serialization
+ // settings for messages coming in with the CloudEvents
+ // wrapper
+ .InteropWithCloudEvents(new JsonSerializerOptions());
+ }).StartAsync();
+```
+snippet source | anchor
+
+
+With CloudEvents interoperability:
+
+* Basic correlation and causation is mapped for Open Telemetry style traceability
+* Wolverine is again depending on [message type aliases](/guide/messages.html#message-type-name-or-alias) to "know" what message type the CloudEvents envelopes are referring to, and you might very well
+ have to explicitly register message type aliases to bridge the gap between CloudEvents and your Wolverine application.
+
diff --git a/src/Samples/DocumentationSamples/InteropSamples.cs b/src/Samples/DocumentationSamples/InteropSamples.cs
new file mode 100644
index 000000000..8f4f8af43
--- /dev/null
+++ b/src/Samples/DocumentationSamples/InteropSamples.cs
@@ -0,0 +1,41 @@
+using Wolverine;
+
+namespace DocumentationSamples;
+
+public class InteropSamples
+{
+ public static void build_envelope()
+ {
+ #region sample_create_an_outgoing_envelope
+
+ var message = new ApproveInvoice("1234");
+
+ // I'm really creating an outgoing message here
+ var envelope = new Envelope(message);
+
+ // This information is assigned internally,
+ // but it's good to know that it exists
+ envelope.CorrelationId = "AAA";
+
+ // This would refer to whatever Wolverine message
+ // started a set of related activity
+ envelope.ConversationId = Guid.NewGuid();
+
+ // For both outgoing and incoming messages,
+ // this identifies how the message data is structured
+ envelope.ContentType = "application/json";
+
+ // When using multi-tenancy, this is used to track
+ // what tenant a message applies to
+ envelope.TenantId = "222";
+
+ // Not every broker cares about this of course
+ envelope.GroupId = "BBB";
+
+ #endregion
+ }
+
+
+}
+
+public record ApproveInvoice(string Id);
\ No newline at end of file
diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/MassTransitMapper.cs b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/MassTransitMapper.cs
index 9b0bc361f..db7f22c80 100644
--- a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/MassTransitMapper.cs
+++ b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/MassTransitMapper.cs
@@ -4,6 +4,10 @@
namespace Wolverine.AmazonSqs.Internal;
+#region sample_MassTransitMapper_for_SQS
+
+// This guy is the envelope mapper for interoperating
+// with MassTransit
internal class MassTransitMapper : ISqsEnvelopeMapper
{
private readonly IMassTransitInteropEndpoint _endpoint;
@@ -31,7 +35,11 @@ public void ReadEnvelopeData(Envelope envelope, string messageBody, IDictionary<
{
// TODO -- this could be more efficient of course
envelope.Data = Encoding.UTF8.GetBytes(messageBody);
+
+ // This is the really important part
+ // of the mapping
envelope.Serializer = _serializer;
}
+}
-}
\ No newline at end of file
+#endregion
\ No newline at end of file
diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs
index c90d003f5..06cc50dfe 100644
--- a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs
+++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs
@@ -459,4 +459,82 @@ public async Task message_expiration(IMessageBus bus)
#endregion
public record StatusUpdate(string Status);
-}
\ No newline at end of file
+
+
+ public static async Task custom_mapping()
+ {
+ #region sample_customized_envelope_mapping
+
+ var builder = Host.CreateApplicationBuilder();
+ builder.UseWolverine(opts =>
+ {
+ // One way or another, you're probably pulling the Azure Service Bus
+ // connection string out of configuration
+ var azureServiceBusConnectionString = builder
+ .Configuration
+ .GetConnectionString("azure-service-bus");
+
+ // Connect to the broker in the simplest possible way
+ opts.UseAzureServiceBus(azureServiceBusConnectionString).AutoProvision();
+
+ // I overrode the buffering limits just to show
+ // that they exist for "back pressure"
+ opts.ListenToAzureServiceBusQueue("incoming")
+ .UseInterop((queue, mapper) =>
+ {
+ // Not sure how useful this would be, but we can start from
+ // the baseline Wolverine mapping and just override a few mappings
+ mapper.MapPropertyToHeader(x => x.ContentType, "OtherTool.ContentType");
+ mapper.MapPropertyToHeader(x => x.CorrelationId, "OtherTool.CorrelationId");
+ // and more
+
+ // or a little uglier where you might be mapping and transforming data between
+ // the transport's model and the Wolverine Envelope
+ mapper.MapProperty(x => x.ReplyUri,
+ (e, msg) => e.ReplyUri = new Uri($"asb://queue/{msg.ReplyTo}"),
+ (e, msg) => msg.ReplyTo = "response");
+
+ });
+
+ });
+
+ #endregion
+
+ using var host = builder.Build();
+ await host.StartAsync();
+ }
+
+ public static async Task nservicebus()
+ {
+ #region sample_opting_into_nservicebus
+
+ var builder = Host.CreateApplicationBuilder();
+ builder.UseWolverine(opts =>
+ {
+ // One way or another, you're probably pulling the Azure Service Bus
+ // connection string out of configuration
+ var azureServiceBusConnectionString = builder
+ .Configuration
+ .GetConnectionString("azure-service-bus");
+
+ // Connect to the broker in the simplest possible way
+ opts.UseAzureServiceBus(azureServiceBusConnectionString).AutoProvision();
+
+ // I overrode the buffering limits just to show
+ // that they exist for "back pressure"
+ opts.ListenToAzureServiceBusQueue("incoming")
+ .UseNServiceBusInterop();
+
+ // This facilitates messaging from NServiceBus (or MassTransit) sending as interface
+ // types, whereas Wolverine only wants to deal with concrete types
+ opts.Policies.RegisterInteropMessageAssembly(typeof(IInterfaceMessage).Assembly);
+ });
+
+ #endregion
+
+ using var host = builder.Build();
+ await host.StartAsync();
+ }
+}
+
+public interface IInterfaceMessage;
\ No newline at end of file
diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/Samples.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/Samples.cs
index ac1e9ac02..3fb565aaf 100644
--- a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/Samples.cs
+++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/Samples.cs
@@ -211,11 +211,6 @@ public void MapIncomingToEnvelope(Envelope envelope, ServiceBusReceivedMessage i
// is for a listening endpoint
envelope.MessageType = typeof(Message1).ToMessageTypeName();
}
-
- public IEnumerable AllHeaders()
- {
- yield break;
- }
}
#endregion
\ No newline at end of file
diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs
index 7d8e72914..09f5520da 100644
--- a/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs
+++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs
@@ -1,6 +1,10 @@
+using System.Text.Json;
+using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using JasperFx.Resources;
+using Wolverine.Util;
+
namespace Wolverine.Kafka.Tests;
public class DocumentationSamples
@@ -140,3 +144,48 @@ public static void Before(Envelope envelope, ILogger logger)
#endregion
+#region sample_OurKafkaJsonMapper
+
+// Simplistic envelope mapper that expects every message to be of
+// type "T" and serialized as JSON that works perfectly well w/ our
+// application's default JSON serialization
+public class OurKafkaJsonMapper : IKafkaEnvelopeMapper
+{
+ // Wolverine needs to know the
+ private readonly string _messageTypeName = typeof(TMessage).ToMessageTypeName();
+
+ // Map the Wolverine Envelope structure to the outgoing Kafka structure
+ public void MapEnvelopeToOutgoing(Envelope envelope, Message outgoing)
+ {
+ // We'll come back to this later...
+ throw new NotSupportedException();
+ }
+
+ // Map the incoming message from Kafka to the incoming Wolverine envelope
+ public void MapIncomingToEnvelope(Envelope envelope, Message incoming)
+ {
+ // We're making an assumption here that only one type of message
+ // is coming in on this particular Kafka topic, so we're telling
+ // Wolverine what the message type is according to Wolverine's own
+ // message naming scheme
+ envelope.MessageType = _messageTypeName;
+
+ // Tell Wolverine to use JSON serialization for the message
+ // data
+ envelope.ContentType = "application/json";
+
+ // Put the raw binary data right on the Envelope where
+ // Wolverine "knows" how to get at it later
+ envelope.Data = incoming.Value;
+ }
+}
+
+#endregion
+
+/*
+// Who knows, maybe the upstream app uses a different JSON naming
+// scheme than our .NET message types, so let's have the ability
+// to specify JSON serialization policies just in case
+_options = options;
+*/
+
diff --git a/src/Transports/Kafka/Wolverine.Kafka/IKafkaEnvelopeMapper.cs b/src/Transports/Kafka/Wolverine.Kafka/IKafkaEnvelopeMapper.cs
index eb75dcabe..1770ce6f4 100644
--- a/src/Transports/Kafka/Wolverine.Kafka/IKafkaEnvelopeMapper.cs
+++ b/src/Transports/Kafka/Wolverine.Kafka/IKafkaEnvelopeMapper.cs
@@ -47,9 +47,4 @@ public void MapIncomingToEnvelope(Envelope envelope, Message inc
envelope.Data = incoming.Value;
envelope.MessageType = _messageTypeName;
}
-
- public IEnumerable AllHeaders()
- {
- yield break;
- }
}
\ No newline at end of file
diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/Samples.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/Samples.cs
index faddcd0a2..37217d228 100644
--- a/src/Transports/MQTT/Wolverine.MQTT.Tests/Samples.cs
+++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/Samples.cs
@@ -135,7 +135,7 @@ public static async Task apply_custom_envelope_mapper()
opts.UseMqtt(mqtt =>
{
var mqttServer = builder.Configuration["mqtt_server"];
-
+
mqtt
.WithMaxPendingMessages(3)
.WithClientOptions(client => { client.WithTcpServer(mqttServer); });
@@ -150,6 +150,7 @@ public static async Task apply_custom_envelope_mapper()
// with our custom strategy
.UseInterop(new MyMqttEnvelopeMapper())
.QualityOfService(MqttQualityOfServiceLevel.AtMostOnce);
+
});
using var host = builder.Build();
@@ -228,11 +229,6 @@ public void MapIncomingToEnvelope(Envelope envelope, MqttApplicationMessage inco
// is not successfully processed
// within 5 seconds
}
-
- public IEnumerable AllHeaders()
- {
- yield break;
- }
}
#endregion
\ No newline at end of file
diff --git a/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttEnvelopeMapper.cs b/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttEnvelopeMapper.cs
index ced3b9597..ef458988a 100644
--- a/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttEnvelopeMapper.cs
+++ b/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttEnvelopeMapper.cs
@@ -112,9 +112,4 @@ public void MapIncomingToEnvelope(Envelope envelope, MqttApplicationMessage inco
EnvelopeSerializer.ReadDataElement(envelope, property.Name, property.Value);
}
}
-
- public IEnumerable AllHeaders()
- {
- yield break;
- }
}
\ No newline at end of file
diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEnvelopeMapper.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEnvelopeMapper.cs
index 992299639..3ce205d4f 100644
--- a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEnvelopeMapper.cs
+++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEnvelopeMapper.cs
@@ -14,7 +14,6 @@ namespace Wolverine.Pulsar;
///
public interface IPulsarEnvelopeMapper
{
- IEnumerable AllHeaders();
void MapIncomingToEnvelope(Envelope envelope, IMessage> incoming);
void MapEnvelopeToOutgoing(Envelope envelope, MessageMetadata outgoing);
}
diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs
index d275a15e2..8af2dd30e 100644
--- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs
+++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs
@@ -1,3 +1,4 @@
+using System.Text.Json;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Microsoft.Extensions.Configuration;
@@ -187,6 +188,67 @@ public static async Task listen_to_queue()
#endregion
}
+ public static async Task interop_with_masstransit()
+ {
+ #region sample_rabbitmq_interop_with_masstransit
+
+ using var host = await Host.CreateDefaultBuilder()
+ .UseWolverine(opts =>
+ {
+ // *A* way to configure Rabbit MQ using their Uri schema
+ // documented here: https://www.rabbitmq.com/uri-spec.html
+ opts.UseRabbitMq(new Uri("amqp://localhost"));
+
+ // Set up a listener for a queue
+ opts.ListenToRabbitQueue("incoming1")
+
+ // There is a limitation here in that you will also
+ // have to tell Wolverine what the message type is
+ // because it cannot today figure out what the Wolverine
+ // message type in the current application is from
+ // MassTransit's metadata
+ .DefaultIncomingMessage()
+ .UseMassTransitInterop(
+
+ // This is optional, but just letting you know it's there
+ interop =>
+ {
+ interop.UseSystemTextJsonForSerialization(stj =>
+ {
+ // Don't worry all of this is optional, but
+ // just making sure you know that you can configure
+ // JSON serialization to work seamlessly with whatever
+ // the application on the other end is doing
+ });
+ });
+ }).StartAsync();
+
+ #endregion
+ }
+
+ public static async Task interop_with_cloudevents()
+ {
+ #region sample_rabbitmq_interop_with_cloudevents
+
+ using var host = await Host.CreateDefaultBuilder()
+ .UseWolverine(opts =>
+ {
+ // *A* way to configure Rabbit MQ using their Uri schema
+ // documented here: https://www.rabbitmq.com/uri-spec.html
+ opts.UseRabbitMq(new Uri("amqp://localhost"));
+
+ // Set up a listener for a queue
+ opts.ListenToRabbitQueue("incoming1")
+
+ // Just note that you *can* override the STJ serialization
+ // settings for messages coming in with the CloudEvents
+ // wrapper
+ .InteropWithCloudEvents(new JsonSerializerOptions());
+ }).StartAsync();
+
+ #endregion
+ }
+
public static async Task publish_to_queue()
{
#region sample_publish_to_rabbitmq_queue
diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/SpecialMapper.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/SpecialMapper.cs
index 1601a7ce8..e73561b58 100644
--- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/SpecialMapper.cs
+++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/SpecialMapper.cs
@@ -4,6 +4,7 @@
namespace Wolverine.RabbitMQ.Tests;
+
#region sample_rabbit_special_mapper
public class SpecialMapper : IRabbitMqEnvelopeMapper
@@ -49,11 +50,6 @@ public void MapIncomingToEnvelope(Envelope envelope, IReadOnlyBasicProperties in
envelope.TenantId = (string)tenantId;
}
}
-
- public IEnumerable AllHeaders()
- {
- yield break;
- }
}
#endregion
\ No newline at end of file
diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqEndpoint.NServiceBus.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqEndpoint.NServiceBus.cs
index 830c88454..30c3ba437 100644
--- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqEndpoint.NServiceBus.cs
+++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqEndpoint.NServiceBus.cs
@@ -7,8 +7,12 @@ namespace Wolverine.RabbitMQ.Internal;
public abstract partial class RabbitMqEndpoint
{
+ #region sample_show_the_NServiceBus_mapping
+
public void UseNServiceBusInterop()
{
+ // We haven't tried to address this yet, but NSB can stick in some characters
+ // that STJ chokes on, but good ol' Newtonsoft handles just fine
DefaultSerializer = new NewtonsoftSerializer(new JsonSerializerSettings());
customizeMapping((m, _) =>
@@ -40,4 +44,6 @@ void ReadReplyUri(Envelope e, IReadOnlyBasicProperties props)
m.MapProperty(x => x.ReplyUri!, ReadReplyUri, WriteReplyToAddress);
});
}
+
+ #endregion
}
\ No newline at end of file
diff --git a/src/Wolverine/Transports/EnvelopeMapper.cs b/src/Wolverine/Transports/EnvelopeMapper.cs
index 0f6d24466..9e2f4a14d 100644
--- a/src/Wolverine/Transports/EnvelopeMapper.cs
+++ b/src/Wolverine/Transports/EnvelopeMapper.cs
@@ -18,15 +18,12 @@ public interface IOutgoingMapper
public interface IIncomingMapper
{
public void MapIncomingToEnvelope(Envelope envelope, TIncoming incoming);
- public IEnumerable AllHeaders();
}
public interface IEnvelopeMapper : IOutgoingMapper, IIncomingMapper;
public interface IEnvelopeMapper
{
- public IEnumerable AllHeaders();
-
///
/// This endpoint will assume that any unidentified incoming message types
/// are the supplied message type. This is meant primarily for interaction
@@ -88,11 +85,6 @@ public EnvelopeMapper(Endpoint endpoint)
MapPropertyToHeader(x => x.Attempts, EnvelopeConstants.AttemptsKey);
}
- public IEnumerable AllHeaders()
- {
- return _envelopeToHeader.Values;
- }
-
public void MapIncomingToEnvelope(Envelope envelope, TIncoming incoming)
{
_mapIncoming.Value(envelope, incoming);