Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
<PackageVersion Include="Grpc.Core" Version="2.46.6" />
<PackageVersion Include="Grpc.Tools" Version="2.72.0" />
<PackageVersion Include="HtmlTags" Version="9.0.0" />
<PackageVersion Include="JasperFx" Version="1.17.2" />
<PackageVersion Include="JasperFx" Version="1.18.0" />
<PackageVersion Include="JasperFx.Events" Version="1.19.1" />
<PackageVersion Include="JasperFx.RuntimeCompiler" Version="4.3.2" />
<PackageVersion Include="JasperFx.RuntimeCompiler" Version="4.4.0" />
<PackageVersion Include="Lamar.Microsoft.DependencyInjection" Version="15.0.1" />
<PackageVersion Include="Marten" Version="8.21.0" />
<PackageVersion Include="Microsoft.Azure.Cosmos" Version="3.46.1" />
<PackageVersion Include="Marten.AspNetCore" Version="8.19.0" />
<PackageVersion Include="MemoryPack" Version="1.21.3" />
<PackageVersion Include="MessagePack" Version="3.1.3" />
Expand Down Expand Up @@ -79,13 +80,13 @@
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="9.0.5" />
<PackageVersion Include="System.Net.NameResolution" Version="4.3.0" />
<PackageVersion Include="System.Threading.Tasks.Dataflow" Version="9.0.5" />
<PackageVersion Include="Weasel.Core" Version="8.6.0" />
<PackageVersion Include="Weasel.EntityFrameworkCore" Version="8.6.0" />
<PackageVersion Include="Weasel.MySql" Version="8.5.1" />
<PackageVersion Include="Weasel.Oracle" Version="8.6.1" />
<PackageVersion Include="Weasel.Postgresql" Version="8.6.0" />
<PackageVersion Include="Weasel.SqlServer" Version="8.6.0" />
<PackageVersion Include="Weasel.Sqlite" Version="8.6.0" />
<PackageVersion Include="Weasel.Core" Version="8.6.2" />
<PackageVersion Include="Weasel.EntityFrameworkCore" Version="8.6.2" />
<PackageVersion Include="Weasel.MySql" Version="8.6.2" />
<PackageVersion Include="Weasel.Oracle" Version="8.6.2" />
<PackageVersion Include="Weasel.Postgresql" Version="8.6.2" />
<PackageVersion Include="Weasel.SqlServer" Version="8.6.2" />
<PackageVersion Include="Weasel.Sqlite" Version="8.6.2" />
<PackageVersion Include="xunit" Version="2.9.3" />
<PackageVersion Include="xunit.assemblyfixture" Version="2.2.0" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
Expand Down
8 changes: 8 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ services:
networks:
sb-emulator:

cosmosdb:
image: "mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator:vnext-preview"
ports:
- "8081:8081"
- "1234:1234"
environment:
- "PROTOCOL=https"

asb-sql-2:
image: "mcr.microsoft.com/azure-sql-edge"
environment:
Expand Down
119 changes: 119 additions & 0 deletions docs/guide/durability/cosmosdb.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# CosmosDb Integration

Wolverine supports an [Azure CosmosDB](https://learn.microsoft.com/en-us/azure/cosmos-db/) backed message persistence strategy
option as well as CosmosDB-backed transactional middleware and saga persistence. To get started, add the `WolverineFx.CosmosDb` dependency to your application:

```bash
dotnet add package WolverineFx.CosmosDb
```

and in your application, tell Wolverine to use CosmosDB for message persistence:

```cs
var builder = Host.CreateApplicationBuilder();

// Register CosmosClient with DI
builder.Services.AddSingleton(new CosmosClient(
"your-connection-string",
new CosmosClientOptions { /* options */ }
));

builder.UseWolverine(opts =>
{
// Tell Wolverine to use CosmosDB, specifying the database name
opts.UseCosmosDbPersistence("your-database-name");

// The CosmosDB integration supports basic transactional
// middleware just fine
opts.Policies.AutoApplyTransactions();
});
```

## Container Setup

Wolverine uses a single CosmosDB container named `wolverine` with a partition key path of `/partitionKey`.
The container is automatically created during database migration if it does not exist.

All Wolverine document types are stored in the same container, differentiated by a `docType` field:
- `incoming` - Incoming message envelopes
- `outgoing` - Outgoing message envelopes
- `deadletter` - Dead letter queue messages
- `node` - Node registration documents
- `agent-assignment` - Agent assignment documents
- `lock` - Distributed lock documents

## Message Persistence

The [durable inbox and outbox](/guide/durability/) options in Wolverine are completely supported with
CosmosDB as the persistence mechanism. This includes scheduled execution (and retries), dead letter queue storage,
and the ability to replay designated messages in the dead letter queue storage.

## Saga Persistence

The CosmosDB integration can serve as a [Wolverine Saga persistence mechanism](/guide/durability/sagas). The only limitation is that
all saga identity values must be `string` types. The saga id is used as both the CosmosDB document id and partition key.

## Transactional Middleware

Wolverine's CosmosDB integration supports [transactional middleware](/guide/durability/marten/transactional-middleware)
using the CosmosDB `Container` type. When using `AutoApplyTransactions()`, Wolverine will automatically detect
handlers that use `Container` and apply the transactional middleware.

## Storage Side Effects (ICosmosDbOp)

Use `ICosmosDbOp` as return values from handlers for a cleaner approach to CosmosDB operations:

```cs
public static class MyHandler
{
public static ICosmosDbOp Handle(CreateOrder command)
{
var order = new Order { id = command.Id, Name = command.Name };
return CosmosDbOps.Store(order);
}
}
```

Available side effect operations:
- `CosmosDbOps.Store<T>(document)` - Upsert a document
- `CosmosDbOps.Delete(id, partitionKey)` - Delete a document by id and partition key

## Outbox Pattern

You can use the `ICosmosDbOutbox` interface to combine CosmosDB operations with outgoing messages
in a single logical transaction:

```cs
public class MyService
{
private readonly ICosmosDbOutbox _outbox;

public MyService(ICosmosDbOutbox outbox)
{
_outbox = outbox;
}

public async Task DoWorkAsync(Container container)
{
_outbox.Enroll(container);

// Send messages through the outbox
await _outbox.SendAsync(new MyMessage());

// Flush outgoing messages
await _outbox.SaveChangesAsync();
}
}
```

## Dead Letter Queue Management

Dead letter messages are stored in the same CosmosDB container with `docType = "deadletter"` and can be
managed through the standard Wolverine dead letter queue APIs. Messages can be marked as replayable and
will be moved back to the incoming queue.

## Distributed Locking

The CosmosDB integration implements distributed locking using document-based locks with ETag-based
optimistic concurrency. Lock documents have a 5-minute expiration time and are automatically
reclaimed if a node fails to renew them.
85 changes: 85 additions & 0 deletions src/Persistence/CosmosDbTests/AppFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using System.Net;
using Microsoft.Azure.Cosmos;
using Wolverine;
using Wolverine.CosmosDb;
using Wolverine.CosmosDb.Internals;
using Wolverine.Persistence.Durability;

namespace CosmosDbTests;

public class AppFixture : IAsyncLifetime
{
// CosmosDB Linux emulator defaults
public const string ConnectionString =
"AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";

public const string DatabaseName = "wolverine_tests";

public CosmosClient Client { get; private set; } = null!;
public Container Container { get; private set; } = null!;

public async Task InitializeAsync()
{
var clientOptions = new CosmosClientOptions
{
HttpClientFactory = () =>
{
HttpMessageHandler httpMessageHandler = new HttpClientHandler
{
ServerCertificateCustomValidationCallback =
HttpClientHandler.DangerousAcceptAnyServerCertificateValidator
};
return new HttpClient(httpMessageHandler);
},
ConnectionMode = ConnectionMode.Gateway,
SerializerOptions = new CosmosSerializationOptions
{
PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase
}
};

Client = new CosmosClient(ConnectionString, clientOptions);

// Retry database/container creation since the vnext emulator can be slow to initialize
for (var attempt = 1; attempt <= 10; attempt++)
{
try
{
var databaseResponse = await Client.CreateDatabaseIfNotExistsAsync(DatabaseName);
var containerProperties =
new ContainerProperties(DocumentTypes.ContainerName, DocumentTypes.PartitionKeyPath);
var containerResponse =
await databaseResponse.Database.CreateContainerIfNotExistsAsync(containerProperties);
Container = containerResponse.Container;
return;
}
catch (CosmosException e) when (e.StatusCode == HttpStatusCode.ServiceUnavailable ||
e.StatusCode == HttpStatusCode.InternalServerError)
{
if (attempt == 10) throw;
await Task.Delay(TimeSpan.FromSeconds(3));
}
}
}

public async Task DisposeAsync()
{
Client?.Dispose();
}

public CosmosDbMessageStore BuildMessageStore()
{
return new CosmosDbMessageStore(Client, DatabaseName, Container, new WolverineOptions());
}

public async Task ClearAll()
{
var store = BuildMessageStore();
await store.Admin.ClearAllAsync();
}
}

[CollectionDefinition("cosmosdb")]
public class CosmosDbCollection : ICollectionFixture<AppFixture>
{
}
34 changes: 34 additions & 0 deletions src/Persistence/CosmosDbTests/CosmosDbTests.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net8.0;net9.0</TargetFrameworks>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>

<IsPackable>false</IsPackable>
<IsTestProject>true</IsTestProject>
</PropertyGroup>

<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App"/>
<PackageReference Include="Microsoft.NET.Test.Sdk" />
<PackageReference Include="GitHubActionsTestLogger" PrivateAssets="All" />
<PackageReference Include="Shouldly" />
<PackageReference Include="xunit"/>
<PackageReference Include="xunit.assemblyfixture" />
<PackageReference Include="xunit.runner.visualstudio">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<Using Include="Xunit"/>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Testing\Wolverine.ComplianceTests\Wolverine.ComplianceTests.csproj" />
<ProjectReference Include="..\Wolverine.CosmosDb\Wolverine.CosmosDb.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// <auto-generated/>
#pragma warning disable
using Microsoft.Azure.Cosmos;

namespace Internal.Generated.WolverineHandlers
{
// START: CompleteFourHandler1230864511
[global::System.CodeDom.Compiler.GeneratedCode("JasperFx", "1.0.0")]
public sealed class CompleteFourHandler1230864511 : Wolverine.Runtime.Handlers.MessageHandler
{
private readonly Microsoft.Azure.Cosmos.Container _container;

public CompleteFourHandler1230864511(Microsoft.Azure.Cosmos.Container container)
{
_container = container;
}



public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime.MessageContext context, System.Threading.CancellationToken cancellation)
{
// The actual message body
var completeFour = (Wolverine.ComplianceTests.Sagas.CompleteFour)context.Envelope.Message;


// Enlist in CosmosDB outbox transaction
context.EnlistInOutbox(new Wolverine.CosmosDb.Internals.CosmosDbEnvelopeTransaction(_container, context));
var sagaId = context.Envelope.SagaId;
if (string.IsNullOrEmpty(sagaId)) throw new Wolverine.Persistence.Sagas.IndeterminateSagaStateIdException(context.Envelope);

// Try to load the existing saga document from CosmosDB
Wolverine.ComplianceTests.Sagas.StringBasicWorkflow stringBasicWorkflow = default;
try
{
var _cosmosResponse = await _container.ReadItemAsync<Wolverine.ComplianceTests.Sagas.StringBasicWorkflow>(sagaId, Microsoft.Azure.Cosmos.PartitionKey.None, cancellationToken: cancellation).ConfigureAwait(false);
stringBasicWorkflow = _cosmosResponse.Resource;
}
catch (Microsoft.Azure.Cosmos.CosmosException e) when (e.StatusCode == System.Net.HttpStatusCode.NotFound)
{
stringBasicWorkflow = default;
}
if (stringBasicWorkflow == null)
{
throw new Wolverine.Persistence.Sagas.UnknownSagaException(typeof(Wolverine.ComplianceTests.Sagas.StringBasicWorkflow), sagaId);
}

else
{
context.SetSagaId(sagaId);

// The actual message execution
stringBasicWorkflow.Handle(completeFour);

// Delete the saga if completed, otherwise update it
if (stringBasicWorkflow.IsCompleted())
{
await _container.DeleteItemAsync<Wolverine.ComplianceTests.Sagas.StringBasicWorkflow>(sagaId, Microsoft.Azure.Cosmos.PartitionKey.None).ConfigureAwait(false);
}

else
{
await _container.UpsertItemAsync(stringBasicWorkflow).ConfigureAwait(false);
}


// Have to flush outgoing messages just in case Marten did nothing because of https://github.com/JasperFx/wolverine/issues/536
await context.FlushOutgoingMessagesAsync().ConfigureAwait(false);

}

}

}

// END: CompleteFourHandler1230864511


}

Loading
Loading