diff --git a/Directory.Packages.props b/Directory.Packages.props index d8a949727..37c9c1add 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -22,11 +22,12 @@ - + - + + @@ -79,13 +80,13 @@ - - - - - - - + + + + + + + diff --git a/docker-compose.yml b/docker-compose.yml index ed664d9a9..cd7363684 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -147,6 +147,14 @@ services: networks: sb-emulator: + cosmosdb: + image: "mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator:vnext-preview" + ports: + - "8081:8081" + - "1234:1234" + environment: + - "PROTOCOL=https" + asb-sql-2: image: "mcr.microsoft.com/azure-sql-edge" environment: diff --git a/docs/guide/durability/cosmosdb.md b/docs/guide/durability/cosmosdb.md new file mode 100644 index 000000000..597fe509b --- /dev/null +++ b/docs/guide/durability/cosmosdb.md @@ -0,0 +1,119 @@ +# CosmosDb Integration + +Wolverine supports an [Azure CosmosDB](https://learn.microsoft.com/en-us/azure/cosmos-db/) backed message persistence strategy +option as well as CosmosDB-backed transactional middleware and saga persistence. To get started, add the `WolverineFx.CosmosDb` dependency to your application: + +```bash +dotnet add package WolverineFx.CosmosDb +``` + +and in your application, tell Wolverine to use CosmosDB for message persistence: + +```cs +var builder = Host.CreateApplicationBuilder(); + +// Register CosmosClient with DI +builder.Services.AddSingleton(new CosmosClient( + "your-connection-string", + new CosmosClientOptions { /* options */ } +)); + +builder.UseWolverine(opts => +{ + // Tell Wolverine to use CosmosDB, specifying the database name + opts.UseCosmosDbPersistence("your-database-name"); + + // The CosmosDB integration supports basic transactional + // middleware just fine + opts.Policies.AutoApplyTransactions(); +}); +``` + +## Container Setup + +Wolverine uses a single CosmosDB container named `wolverine` with a partition key path of `/partitionKey`. +The container is automatically created during database migration if it does not exist. + +All Wolverine document types are stored in the same container, differentiated by a `docType` field: +- `incoming` - Incoming message envelopes +- `outgoing` - Outgoing message envelopes +- `deadletter` - Dead letter queue messages +- `node` - Node registration documents +- `agent-assignment` - Agent assignment documents +- `lock` - Distributed lock documents + +## Message Persistence + +The [durable inbox and outbox](/guide/durability/) options in Wolverine are completely supported with +CosmosDB as the persistence mechanism. This includes scheduled execution (and retries), dead letter queue storage, +and the ability to replay designated messages in the dead letter queue storage. + +## Saga Persistence + +The CosmosDB integration can serve as a [Wolverine Saga persistence mechanism](/guide/durability/sagas). The only limitation is that +all saga identity values must be `string` types. The saga id is used as both the CosmosDB document id and partition key. + +## Transactional Middleware + +Wolverine's CosmosDB integration supports [transactional middleware](/guide/durability/marten/transactional-middleware) +using the CosmosDB `Container` type. When using `AutoApplyTransactions()`, Wolverine will automatically detect +handlers that use `Container` and apply the transactional middleware. + +## Storage Side Effects (ICosmosDbOp) + +Use `ICosmosDbOp` as return values from handlers for a cleaner approach to CosmosDB operations: + +```cs +public static class MyHandler +{ + public static ICosmosDbOp Handle(CreateOrder command) + { + var order = new Order { id = command.Id, Name = command.Name }; + return CosmosDbOps.Store(order); + } +} +``` + +Available side effect operations: +- `CosmosDbOps.Store(document)` - Upsert a document +- `CosmosDbOps.Delete(id, partitionKey)` - Delete a document by id and partition key + +## Outbox Pattern + +You can use the `ICosmosDbOutbox` interface to combine CosmosDB operations with outgoing messages +in a single logical transaction: + +```cs +public class MyService +{ + private readonly ICosmosDbOutbox _outbox; + + public MyService(ICosmosDbOutbox outbox) + { + _outbox = outbox; + } + + public async Task DoWorkAsync(Container container) + { + _outbox.Enroll(container); + + // Send messages through the outbox + await _outbox.SendAsync(new MyMessage()); + + // Flush outgoing messages + await _outbox.SaveChangesAsync(); + } +} +``` + +## Dead Letter Queue Management + +Dead letter messages are stored in the same CosmosDB container with `docType = "deadletter"` and can be +managed through the standard Wolverine dead letter queue APIs. Messages can be marked as replayable and +will be moved back to the incoming queue. + +## Distributed Locking + +The CosmosDB integration implements distributed locking using document-based locks with ETag-based +optimistic concurrency. Lock documents have a 5-minute expiration time and are automatically +reclaimed if a node fails to renew them. diff --git a/src/Persistence/CosmosDbTests/AppFixture.cs b/src/Persistence/CosmosDbTests/AppFixture.cs new file mode 100644 index 000000000..3df787909 --- /dev/null +++ b/src/Persistence/CosmosDbTests/AppFixture.cs @@ -0,0 +1,85 @@ +using System.Net; +using Microsoft.Azure.Cosmos; +using Wolverine; +using Wolverine.CosmosDb; +using Wolverine.CosmosDb.Internals; +using Wolverine.Persistence.Durability; + +namespace CosmosDbTests; + +public class AppFixture : IAsyncLifetime +{ + // CosmosDB Linux emulator defaults + public const string ConnectionString = + "AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw=="; + + public const string DatabaseName = "wolverine_tests"; + + public CosmosClient Client { get; private set; } = null!; + public Container Container { get; private set; } = null!; + + public async Task InitializeAsync() + { + var clientOptions = new CosmosClientOptions + { + HttpClientFactory = () => + { + HttpMessageHandler httpMessageHandler = new HttpClientHandler + { + ServerCertificateCustomValidationCallback = + HttpClientHandler.DangerousAcceptAnyServerCertificateValidator + }; + return new HttpClient(httpMessageHandler); + }, + ConnectionMode = ConnectionMode.Gateway, + SerializerOptions = new CosmosSerializationOptions + { + PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase + } + }; + + Client = new CosmosClient(ConnectionString, clientOptions); + + // Retry database/container creation since the vnext emulator can be slow to initialize + for (var attempt = 1; attempt <= 10; attempt++) + { + try + { + var databaseResponse = await Client.CreateDatabaseIfNotExistsAsync(DatabaseName); + var containerProperties = + new ContainerProperties(DocumentTypes.ContainerName, DocumentTypes.PartitionKeyPath); + var containerResponse = + await databaseResponse.Database.CreateContainerIfNotExistsAsync(containerProperties); + Container = containerResponse.Container; + return; + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.ServiceUnavailable || + e.StatusCode == HttpStatusCode.InternalServerError) + { + if (attempt == 10) throw; + await Task.Delay(TimeSpan.FromSeconds(3)); + } + } + } + + public async Task DisposeAsync() + { + Client?.Dispose(); + } + + public CosmosDbMessageStore BuildMessageStore() + { + return new CosmosDbMessageStore(Client, DatabaseName, Container, new WolverineOptions()); + } + + public async Task ClearAll() + { + var store = BuildMessageStore(); + await store.Admin.ClearAllAsync(); + } +} + +[CollectionDefinition("cosmosdb")] +public class CosmosDbCollection : ICollectionFixture +{ +} diff --git a/src/Persistence/CosmosDbTests/CosmosDbTests.csproj b/src/Persistence/CosmosDbTests/CosmosDbTests.csproj new file mode 100644 index 000000000..f7a5b580d --- /dev/null +++ b/src/Persistence/CosmosDbTests/CosmosDbTests.csproj @@ -0,0 +1,34 @@ + + + + net8.0;net9.0 + enable + enable + + false + true + + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + + + diff --git a/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/CompleteFourHandler1230864511.cs b/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/CompleteFourHandler1230864511.cs new file mode 100644 index 000000000..74d304438 --- /dev/null +++ b/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/CompleteFourHandler1230864511.cs @@ -0,0 +1,79 @@ +// +#pragma warning disable +using Microsoft.Azure.Cosmos; + +namespace Internal.Generated.WolverineHandlers +{ + // START: CompleteFourHandler1230864511 + [global::System.CodeDom.Compiler.GeneratedCode("JasperFx", "1.0.0")] + public sealed class CompleteFourHandler1230864511 : Wolverine.Runtime.Handlers.MessageHandler + { + private readonly Microsoft.Azure.Cosmos.Container _container; + + public CompleteFourHandler1230864511(Microsoft.Azure.Cosmos.Container container) + { + _container = container; + } + + + + public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime.MessageContext context, System.Threading.CancellationToken cancellation) + { + // The actual message body + var completeFour = (Wolverine.ComplianceTests.Sagas.CompleteFour)context.Envelope.Message; + + + // Enlist in CosmosDB outbox transaction + context.EnlistInOutbox(new Wolverine.CosmosDb.Internals.CosmosDbEnvelopeTransaction(_container, context)); + var sagaId = context.Envelope.SagaId; + if (string.IsNullOrEmpty(sagaId)) throw new Wolverine.Persistence.Sagas.IndeterminateSagaStateIdException(context.Envelope); + + // Try to load the existing saga document from CosmosDB + Wolverine.ComplianceTests.Sagas.StringBasicWorkflow stringBasicWorkflow = default; + try + { + var _cosmosResponse = await _container.ReadItemAsync(sagaId, Microsoft.Azure.Cosmos.PartitionKey.None, cancellationToken: cancellation).ConfigureAwait(false); + stringBasicWorkflow = _cosmosResponse.Resource; + } + catch (Microsoft.Azure.Cosmos.CosmosException e) when (e.StatusCode == System.Net.HttpStatusCode.NotFound) + { + stringBasicWorkflow = default; + } + if (stringBasicWorkflow == null) + { + throw new Wolverine.Persistence.Sagas.UnknownSagaException(typeof(Wolverine.ComplianceTests.Sagas.StringBasicWorkflow), sagaId); + } + + else + { + context.SetSagaId(sagaId); + + // The actual message execution + stringBasicWorkflow.Handle(completeFour); + + // Delete the saga if completed, otherwise update it + if (stringBasicWorkflow.IsCompleted()) + { + await _container.DeleteItemAsync(sagaId, Microsoft.Azure.Cosmos.PartitionKey.None).ConfigureAwait(false); + } + + else + { + await _container.UpsertItemAsync(stringBasicWorkflow).ConfigureAwait(false); + } + + + // Have to flush outgoing messages just in case Marten did nothing because of https://github.com/JasperFx/wolverine/issues/536 + await context.FlushOutgoingMessagesAsync().ConfigureAwait(false); + + } + + } + + } + + // END: CompleteFourHandler1230864511 + + +} + diff --git a/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/CompleteOneHandler1612253335.cs b/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/CompleteOneHandler1612253335.cs new file mode 100644 index 000000000..f69731f20 --- /dev/null +++ b/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/CompleteOneHandler1612253335.cs @@ -0,0 +1,83 @@ +// +#pragma warning disable +using Microsoft.Azure.Cosmos; + +namespace Internal.Generated.WolverineHandlers +{ + // START: CompleteOneHandler1612253335 + [global::System.CodeDom.Compiler.GeneratedCode("JasperFx", "1.0.0")] + public sealed class CompleteOneHandler1612253335 : Wolverine.Runtime.Handlers.MessageHandler + { + private readonly Microsoft.Azure.Cosmos.Container _container; + + public CompleteOneHandler1612253335(Microsoft.Azure.Cosmos.Container container) + { + _container = container; + } + + + + public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime.MessageContext context, System.Threading.CancellationToken cancellation) + { + // The actual message body + var completeOne = (Wolverine.ComplianceTests.Sagas.CompleteOne)context.Envelope.Message; + + + // Enlist in CosmosDB outbox transaction + context.EnlistInOutbox(new Wolverine.CosmosDb.Internals.CosmosDbEnvelopeTransaction(_container, context)); + var sagaId = context.Envelope.SagaId; + if (string.IsNullOrEmpty(sagaId)) throw new Wolverine.Persistence.Sagas.IndeterminateSagaStateIdException(context.Envelope); + + // Try to load the existing saga document from CosmosDB + Wolverine.ComplianceTests.Sagas.StringBasicWorkflow stringBasicWorkflow = default; + try + { + var _cosmosResponse = await _container.ReadItemAsync(sagaId, Microsoft.Azure.Cosmos.PartitionKey.None, cancellationToken: cancellation).ConfigureAwait(false); + stringBasicWorkflow = _cosmosResponse.Resource; + } + catch (Microsoft.Azure.Cosmos.CosmosException e) when (e.StatusCode == System.Net.HttpStatusCode.NotFound) + { + stringBasicWorkflow = default; + } + if (stringBasicWorkflow == null) + { + throw new Wolverine.Persistence.Sagas.UnknownSagaException(typeof(Wolverine.ComplianceTests.Sagas.StringBasicWorkflow), sagaId); + } + + else + { + context.SetSagaId(sagaId); + + // The actual message execution + var outgoing1 = stringBasicWorkflow.Handle(completeOne); + + + // Outgoing, cascaded message + await context.EnqueueCascadingAsync(outgoing1).ConfigureAwait(false); + + // Delete the saga if completed, otherwise update it + if (stringBasicWorkflow.IsCompleted()) + { + await _container.DeleteItemAsync(sagaId, Microsoft.Azure.Cosmos.PartitionKey.None).ConfigureAwait(false); + } + + else + { + await _container.UpsertItemAsync(stringBasicWorkflow).ConfigureAwait(false); + } + + + // Have to flush outgoing messages just in case Marten did nothing because of https://github.com/JasperFx/wolverine/issues/536 + await context.FlushOutgoingMessagesAsync().ConfigureAwait(false); + + } + + } + + } + + // END: CompleteOneHandler1612253335 + + +} + diff --git a/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/CompleteTwoHandler402398939.cs b/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/CompleteTwoHandler402398939.cs new file mode 100644 index 000000000..c15488299 --- /dev/null +++ b/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/CompleteTwoHandler402398939.cs @@ -0,0 +1,79 @@ +// +#pragma warning disable +using Microsoft.Azure.Cosmos; + +namespace Internal.Generated.WolverineHandlers +{ + // START: CompleteTwoHandler402398939 + [global::System.CodeDom.Compiler.GeneratedCode("JasperFx", "1.0.0")] + public sealed class CompleteTwoHandler402398939 : Wolverine.Runtime.Handlers.MessageHandler + { + private readonly Microsoft.Azure.Cosmos.Container _container; + + public CompleteTwoHandler402398939(Microsoft.Azure.Cosmos.Container container) + { + _container = container; + } + + + + public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime.MessageContext context, System.Threading.CancellationToken cancellation) + { + // The actual message body + var completeTwo = (Wolverine.ComplianceTests.Sagas.CompleteTwo)context.Envelope.Message; + + + // Enlist in CosmosDB outbox transaction + context.EnlistInOutbox(new Wolverine.CosmosDb.Internals.CosmosDbEnvelopeTransaction(_container, context)); + var sagaId = context.Envelope.SagaId; + if (string.IsNullOrEmpty(sagaId)) throw new Wolverine.Persistence.Sagas.IndeterminateSagaStateIdException(context.Envelope); + + // Try to load the existing saga document from CosmosDB + Wolverine.ComplianceTests.Sagas.StringBasicWorkflow stringBasicWorkflow = default; + try + { + var _cosmosResponse = await _container.ReadItemAsync(sagaId, Microsoft.Azure.Cosmos.PartitionKey.None, cancellationToken: cancellation).ConfigureAwait(false); + stringBasicWorkflow = _cosmosResponse.Resource; + } + catch (Microsoft.Azure.Cosmos.CosmosException e) when (e.StatusCode == System.Net.HttpStatusCode.NotFound) + { + stringBasicWorkflow = default; + } + if (stringBasicWorkflow == null) + { + throw new Wolverine.Persistence.Sagas.UnknownSagaException(typeof(Wolverine.ComplianceTests.Sagas.StringBasicWorkflow), sagaId); + } + + else + { + context.SetSagaId(sagaId); + + // The actual message execution + stringBasicWorkflow.Handle(completeTwo); + + // Delete the saga if completed, otherwise update it + if (stringBasicWorkflow.IsCompleted()) + { + await _container.DeleteItemAsync(sagaId, Microsoft.Azure.Cosmos.PartitionKey.None).ConfigureAwait(false); + } + + else + { + await _container.UpsertItemAsync(stringBasicWorkflow).ConfigureAwait(false); + } + + + // Have to flush outgoing messages just in case Marten did nothing because of https://github.com/JasperFx/wolverine/issues/536 + await context.FlushOutgoingMessagesAsync().ConfigureAwait(false); + + } + + } + + } + + // END: CompleteTwoHandler402398939 + + +} + diff --git a/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/FinishItAllHandler1534262635.cs b/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/FinishItAllHandler1534262635.cs new file mode 100644 index 000000000..7b985c499 --- /dev/null +++ b/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/FinishItAllHandler1534262635.cs @@ -0,0 +1,79 @@ +// +#pragma warning disable +using Microsoft.Azure.Cosmos; + +namespace Internal.Generated.WolverineHandlers +{ + // START: FinishItAllHandler1534262635 + [global::System.CodeDom.Compiler.GeneratedCode("JasperFx", "1.0.0")] + public sealed class FinishItAllHandler1534262635 : Wolverine.Runtime.Handlers.MessageHandler + { + private readonly Microsoft.Azure.Cosmos.Container _container; + + public FinishItAllHandler1534262635(Microsoft.Azure.Cosmos.Container container) + { + _container = container; + } + + + + public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime.MessageContext context, System.Threading.CancellationToken cancellation) + { + // The actual message body + var finishItAll = (Wolverine.ComplianceTests.Sagas.FinishItAll)context.Envelope.Message; + + + // Enlist in CosmosDB outbox transaction + context.EnlistInOutbox(new Wolverine.CosmosDb.Internals.CosmosDbEnvelopeTransaction(_container, context)); + var sagaId = context.Envelope.SagaId; + if (string.IsNullOrEmpty(sagaId)) throw new Wolverine.Persistence.Sagas.IndeterminateSagaStateIdException(context.Envelope); + + // Try to load the existing saga document from CosmosDB + Wolverine.ComplianceTests.Sagas.StringBasicWorkflow stringBasicWorkflow = default; + try + { + var _cosmosResponse = await _container.ReadItemAsync(sagaId, Microsoft.Azure.Cosmos.PartitionKey.None, cancellationToken: cancellation).ConfigureAwait(false); + stringBasicWorkflow = _cosmosResponse.Resource; + } + catch (Microsoft.Azure.Cosmos.CosmosException e) when (e.StatusCode == System.Net.HttpStatusCode.NotFound) + { + stringBasicWorkflow = default; + } + if (stringBasicWorkflow == null) + { + throw new Wolverine.Persistence.Sagas.UnknownSagaException(typeof(Wolverine.ComplianceTests.Sagas.StringBasicWorkflow), sagaId); + } + + else + { + context.SetSagaId(sagaId); + + // The actual message execution + stringBasicWorkflow.Handle(finishItAll); + + // Delete the saga if completed, otherwise update it + if (stringBasicWorkflow.IsCompleted()) + { + await _container.DeleteItemAsync(sagaId, Microsoft.Azure.Cosmos.PartitionKey.None).ConfigureAwait(false); + } + + else + { + await _container.UpsertItemAsync(stringBasicWorkflow).ConfigureAwait(false); + } + + + // Have to flush outgoing messages just in case Marten did nothing because of https://github.com/JasperFx/wolverine/issues/536 + await context.FlushOutgoingMessagesAsync().ConfigureAwait(false); + + } + + } + + } + + // END: FinishItAllHandler1534262635 + + +} + diff --git a/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/StringCompleteThreeHandler606415888.cs b/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/StringCompleteThreeHandler606415888.cs new file mode 100644 index 000000000..d348578ca --- /dev/null +++ b/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/StringCompleteThreeHandler606415888.cs @@ -0,0 +1,81 @@ +// +#pragma warning disable +using Microsoft.Azure.Cosmos; + +namespace Internal.Generated.WolverineHandlers +{ + // START: StringCompleteThreeHandler606415888 + [global::System.CodeDom.Compiler.GeneratedCode("JasperFx", "1.0.0")] + public sealed class StringCompleteThreeHandler606415888 : Wolverine.Runtime.Handlers.MessageHandler + { + private readonly Microsoft.Azure.Cosmos.Container _container; + + public StringCompleteThreeHandler606415888(Microsoft.Azure.Cosmos.Container container) + { + _container = container; + } + + + + public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime.MessageContext context, System.Threading.CancellationToken cancellation) + { + // The actual message body + var stringCompleteThree = (Wolverine.ComplianceTests.Sagas.StringCompleteThree)context.Envelope.Message; + + // Application-specific Open Telemetry auditing + System.Diagnostics.Activity.Current?.SetTag("SagaId", stringCompleteThree.SagaId); + + // Enlist in CosmosDB outbox transaction + context.EnlistInOutbox(new Wolverine.CosmosDb.Internals.CosmosDbEnvelopeTransaction(_container, context)); + string sagaId = stringCompleteThree.SagaId ?? context.Envelope.SagaId; + if (string.IsNullOrEmpty(sagaId)) throw new Wolverine.Persistence.Sagas.IndeterminateSagaStateIdException(context.Envelope); + + // Try to load the existing saga document from CosmosDB + Wolverine.ComplianceTests.Sagas.StringBasicWorkflow stringBasicWorkflow = default; + try + { + var _cosmosResponse = await _container.ReadItemAsync(sagaId, Microsoft.Azure.Cosmos.PartitionKey.None, cancellationToken: cancellation).ConfigureAwait(false); + stringBasicWorkflow = _cosmosResponse.Resource; + } + catch (Microsoft.Azure.Cosmos.CosmosException e) when (e.StatusCode == System.Net.HttpStatusCode.NotFound) + { + stringBasicWorkflow = default; + } + if (stringBasicWorkflow == null) + { + throw new Wolverine.Persistence.Sagas.UnknownSagaException(typeof(Wolverine.ComplianceTests.Sagas.StringBasicWorkflow), sagaId); + } + + else + { + context.SetSagaId(sagaId); + + // The actual message execution + stringBasicWorkflow.Handle(stringCompleteThree); + + // Delete the saga if completed, otherwise update it + if (stringBasicWorkflow.IsCompleted()) + { + await _container.DeleteItemAsync(sagaId, Microsoft.Azure.Cosmos.PartitionKey.None).ConfigureAwait(false); + } + + else + { + await _container.UpsertItemAsync(stringBasicWorkflow).ConfigureAwait(false); + } + + + // Have to flush outgoing messages just in case Marten did nothing because of https://github.com/JasperFx/wolverine/issues/536 + await context.FlushOutgoingMessagesAsync().ConfigureAwait(false); + + } + + } + + } + + // END: StringCompleteThreeHandler606415888 + + +} + diff --git a/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/StringDoThreeHandler1820069266.cs b/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/StringDoThreeHandler1820069266.cs new file mode 100644 index 000000000..3cfe0cf27 --- /dev/null +++ b/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/StringDoThreeHandler1820069266.cs @@ -0,0 +1,81 @@ +// +#pragma warning disable +using Microsoft.Azure.Cosmos; + +namespace Internal.Generated.WolverineHandlers +{ + // START: StringDoThreeHandler1820069266 + [global::System.CodeDom.Compiler.GeneratedCode("JasperFx", "1.0.0")] + public sealed class StringDoThreeHandler1820069266 : Wolverine.Runtime.Handlers.MessageHandler + { + private readonly Microsoft.Azure.Cosmos.Container _container; + + public StringDoThreeHandler1820069266(Microsoft.Azure.Cosmos.Container container) + { + _container = container; + } + + + + public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime.MessageContext context, System.Threading.CancellationToken cancellation) + { + // The actual message body + var stringDoThree = (Wolverine.ComplianceTests.Sagas.StringDoThree)context.Envelope.Message; + + // Application-specific Open Telemetry auditing + System.Diagnostics.Activity.Current?.SetTag("TheSagaId", stringDoThree.TheSagaId); + + // Enlist in CosmosDB outbox transaction + context.EnlistInOutbox(new Wolverine.CosmosDb.Internals.CosmosDbEnvelopeTransaction(_container, context)); + string sagaId = stringDoThree.TheSagaId ?? context.Envelope.SagaId; + if (string.IsNullOrEmpty(sagaId)) throw new Wolverine.Persistence.Sagas.IndeterminateSagaStateIdException(context.Envelope); + + // Try to load the existing saga document from CosmosDB + Wolverine.ComplianceTests.Sagas.StringBasicWorkflow stringBasicWorkflow = default; + try + { + var _cosmosResponse = await _container.ReadItemAsync(sagaId, Microsoft.Azure.Cosmos.PartitionKey.None, cancellationToken: cancellation).ConfigureAwait(false); + stringBasicWorkflow = _cosmosResponse.Resource; + } + catch (Microsoft.Azure.Cosmos.CosmosException e) when (e.StatusCode == System.Net.HttpStatusCode.NotFound) + { + stringBasicWorkflow = default; + } + if (stringBasicWorkflow == null) + { + throw new Wolverine.Persistence.Sagas.UnknownSagaException(typeof(Wolverine.ComplianceTests.Sagas.StringBasicWorkflow), sagaId); + } + + else + { + context.SetSagaId(sagaId); + + // The actual message execution + stringBasicWorkflow.Handles(stringDoThree); + + // Delete the saga if completed, otherwise update it + if (stringBasicWorkflow.IsCompleted()) + { + await _container.DeleteItemAsync(sagaId, Microsoft.Azure.Cosmos.PartitionKey.None).ConfigureAwait(false); + } + + else + { + await _container.UpsertItemAsync(stringBasicWorkflow).ConfigureAwait(false); + } + + + // Have to flush outgoing messages just in case Marten did nothing because of https://github.com/JasperFx/wolverine/issues/536 + await context.FlushOutgoingMessagesAsync().ConfigureAwait(false); + + } + + } + + } + + // END: StringDoThreeHandler1820069266 + + +} + diff --git a/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/StringStartHandler2085759971.cs b/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/StringStartHandler2085759971.cs new file mode 100644 index 000000000..6fc90922c --- /dev/null +++ b/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/StringStartHandler2085759971.cs @@ -0,0 +1,53 @@ +// +#pragma warning disable +using Microsoft.Azure.Cosmos; + +namespace Internal.Generated.WolverineHandlers +{ + // START: StringStartHandler2085759971 + [global::System.CodeDom.Compiler.GeneratedCode("JasperFx", "1.0.0")] + public sealed class StringStartHandler2085759971 : Wolverine.Runtime.Handlers.MessageHandler + { + private readonly Microsoft.Azure.Cosmos.Container _container; + + public StringStartHandler2085759971(Microsoft.Azure.Cosmos.Container container) + { + _container = container; + } + + + + public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime.MessageContext context, System.Threading.CancellationToken cancellation) + { + // The actual message body + var stringStart = (Wolverine.ComplianceTests.Sagas.StringStart)context.Envelope.Message; + + // Application-specific Open Telemetry auditing + System.Diagnostics.Activity.Current?.SetTag("Id", stringStart.Id); + + // Enlist in CosmosDB outbox transaction + context.EnlistInOutbox(new Wolverine.CosmosDb.Internals.CosmosDbEnvelopeTransaction(_container, context)); + var stringBasicWorkflow = new Wolverine.ComplianceTests.Sagas.StringBasicWorkflow(); + + // The actual message execution + stringBasicWorkflow.Start(stringStart); + + context.SetSagaId(stringStart.Id); + if (!stringBasicWorkflow.IsCompleted()) + { + await _container.UpsertItemAsync(stringBasicWorkflow).ConfigureAwait(false); + } + + + // Have to flush outgoing messages just in case Marten did nothing because of https://github.com/JasperFx/wolverine/issues/536 + await context.FlushOutgoingMessagesAsync().ConfigureAwait(false); + + } + + } + + // END: StringStartHandler2085759971 + + +} + diff --git a/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/WildcardStartHandler784149372.cs b/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/WildcardStartHandler784149372.cs new file mode 100644 index 000000000..afc377028 --- /dev/null +++ b/src/Persistence/CosmosDbTests/Internal/Generated/WolverineHandlers/WildcardStartHandler784149372.cs @@ -0,0 +1,53 @@ +// +#pragma warning disable +using Microsoft.Azure.Cosmos; + +namespace Internal.Generated.WolverineHandlers +{ + // START: WildcardStartHandler784149372 + [global::System.CodeDom.Compiler.GeneratedCode("JasperFx", "1.0.0")] + public sealed class WildcardStartHandler784149372 : Wolverine.Runtime.Handlers.MessageHandler + { + private readonly Microsoft.Azure.Cosmos.Container _container; + + public WildcardStartHandler784149372(Microsoft.Azure.Cosmos.Container container) + { + _container = container; + } + + + + public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime.MessageContext context, System.Threading.CancellationToken cancellation) + { + // The actual message body + var wildcardStart = (Wolverine.ComplianceTests.Sagas.WildcardStart)context.Envelope.Message; + + // Application-specific Open Telemetry auditing + System.Diagnostics.Activity.Current?.SetTag("Id", wildcardStart.Id); + + // Enlist in CosmosDB outbox transaction + context.EnlistInOutbox(new Wolverine.CosmosDb.Internals.CosmosDbEnvelopeTransaction(_container, context)); + var stringBasicWorkflow = new Wolverine.ComplianceTests.Sagas.StringBasicWorkflow(); + + // The actual message execution + stringBasicWorkflow.Starts(wildcardStart); + + context.SetSagaId(wildcardStart.Id); + if (!stringBasicWorkflow.IsCompleted()) + { + await _container.UpsertItemAsync(stringBasicWorkflow).ConfigureAwait(false); + } + + + // Have to flush outgoing messages just in case Marten did nothing because of https://github.com/JasperFx/wolverine/issues/536 + await context.FlushOutgoingMessagesAsync().ConfigureAwait(false); + + } + + } + + // END: WildcardStartHandler784149372 + + +} + diff --git a/src/Persistence/CosmosDbTests/end_to_end.cs b/src/Persistence/CosmosDbTests/end_to_end.cs new file mode 100644 index 000000000..c4d4dc114 --- /dev/null +++ b/src/Persistence/CosmosDbTests/end_to_end.cs @@ -0,0 +1,50 @@ +using Microsoft.Azure.Cosmos; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.CosmosDb; +using Wolverine.CosmosDb.Internals; +using Wolverine.Tracking; + +namespace CosmosDbTests; + +[Collection("cosmosdb")] +public class end_to_end +{ + private readonly AppFixture _fixture; + + public end_to_end(AppFixture fixture) + { + _fixture = fixture; + } + + [Fact] + public async Task can_send_and_receive_messages() + { + await _fixture.ClearAll(); + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + opts.UseCosmosDbPersistence(AppFixture.DatabaseName); + opts.Services.AddSingleton(_fixture.Client); + opts.Discovery.IncludeAssembly(GetType().Assembly); + }).StartAsync(); + + var tracked = await host.InvokeMessageAndWaitAsync(new SmokeTestMessage("Hello, CosmosDb!")); + + tracked.Executed.MessagesOf().Any().ShouldBeTrue(); + } +} + +public record SmokeTestMessage(string Text); + +public static class SmokeTestMessageHandler +{ + public static void Handle(SmokeTestMessage message) + { + // no-op + } +} diff --git a/src/Persistence/CosmosDbTests/message_store_compliance.cs b/src/Persistence/CosmosDbTests/message_store_compliance.cs new file mode 100644 index 000000000..71856a584 --- /dev/null +++ b/src/Persistence/CosmosDbTests/message_store_compliance.cs @@ -0,0 +1,39 @@ +using Microsoft.Azure.Cosmos; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Wolverine; +using Wolverine.ComplianceTests; +using Wolverine.CosmosDb; +using Wolverine.CosmosDb.Internals; +using Wolverine.Transports.Tcp; + +namespace CosmosDbTests; + +[Collection("cosmosdb")] +public class message_store_compliance : MessageStoreCompliance +{ + private readonly AppFixture _fixture; + + public message_store_compliance(AppFixture fixture) + { + _fixture = fixture; + } + + public override async Task BuildCleanHost() + { + await _fixture.ClearAll(); + + var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + + opts.UseCosmosDbPersistence(AppFixture.DatabaseName); + opts.Services.AddSingleton(_fixture.Client); + + opts.ListenAtPort(2345).UseDurableInbox(); + }).StartAsync(); + + return host; + } +} diff --git a/src/Persistence/CosmosDbTests/node_persistence_compliance.cs b/src/Persistence/CosmosDbTests/node_persistence_compliance.cs new file mode 100644 index 000000000..99933d9ec --- /dev/null +++ b/src/Persistence/CosmosDbTests/node_persistence_compliance.cs @@ -0,0 +1,23 @@ +using Wolverine; +using Wolverine.ComplianceTests; +using Wolverine.CosmosDb.Internals; +using Wolverine.Persistence.Durability; + +namespace CosmosDbTests; + +[Collection("cosmosdb")] +public class node_persistence_compliance : NodePersistenceCompliance +{ + private readonly AppFixture _fixture; + + public node_persistence_compliance(AppFixture fixture) + { + _fixture = fixture; + } + + protected override async Task buildCleanMessageStore() + { + await _fixture.ClearAll(); + return _fixture.BuildMessageStore(); + } +} diff --git a/src/Persistence/CosmosDbTests/saga_storage_compliance.cs b/src/Persistence/CosmosDbTests/saga_storage_compliance.cs new file mode 100644 index 000000000..6f0b235b5 --- /dev/null +++ b/src/Persistence/CosmosDbTests/saga_storage_compliance.cs @@ -0,0 +1,75 @@ +using JasperFx.CodeGeneration; +using JasperFx.Core; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Azure.Cosmos; +using Wolverine; +using Wolverine.ComplianceTests.Sagas; +using Wolverine.CosmosDb; + +namespace CosmosDbTests; + +public class CosmosDbSagaHost : ISagaHost +{ + private readonly AppFixture _fixture; + + public CosmosDbSagaHost() + { + _fixture = new AppFixture(); + _fixture.InitializeAsync().GetAwaiter().GetResult(); + } + + public IHost BuildHost() + { + return Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + + opts.CodeGeneration.GeneratedCodeOutputPath = AppContext.BaseDirectory.ParentDirectory()! + .ParentDirectory()!.ParentDirectory()!.AppendPath("Internal", "Generated"); + opts.CodeGeneration.TypeLoadMode = TypeLoadMode.Auto; + + opts.Discovery.IncludeType(); + opts.Discovery.IncludeAssembly(typeof(StringBasicWorkflow).Assembly); + + opts.Services.AddSingleton(_fixture.Client); + opts.UseCosmosDbPersistence(AppFixture.DatabaseName); + }).Start(); + } + + public Task LoadState(Guid id) where T : Saga + { + throw new NotSupportedException(); + } + + public Task LoadState(int id) where T : Saga + { + throw new NotSupportedException(); + } + + public Task LoadState(long id) where T : Saga + { + throw new NotSupportedException(); + } + + public async Task LoadState(string id) where T : Saga + { + try + { + var response = await _fixture.Container.ReadItemAsync(id, PartitionKey.None); + return response.Resource; + } + catch (CosmosException e) when (e.StatusCode == System.Net.HttpStatusCode.NotFound) + { + return default!; + } + } +} + +public class saga_storage_compliance : StringIdentifiedSagaComplianceSpecs +{ + public saga_storage_compliance() + { + } +} diff --git a/src/Persistence/CosmosDbTests/scheduled_job_compliance.cs b/src/Persistence/CosmosDbTests/scheduled_job_compliance.cs new file mode 100644 index 000000000..0c98ca797 --- /dev/null +++ b/src/Persistence/CosmosDbTests/scheduled_job_compliance.cs @@ -0,0 +1,29 @@ +using Microsoft.Extensions.DependencyInjection; +using Wolverine; +using Wolverine.ComplianceTests; +using Wolverine.ComplianceTests.Scheduling; +using Wolverine.CosmosDb; +using Wolverine.Util; + +namespace CosmosDbTests; + +[Collection("cosmosdb")] +public class scheduled_job_compliance : ScheduledJobCompliance +{ + private readonly AppFixture _fixture; + + public scheduled_job_compliance(AppFixture fixture) + { + _fixture = fixture; + } + + public override void ConfigurePersistence(WolverineOptions opts) + { + opts.Services.AddSingleton(_fixture.Client); + opts.UseCosmosDbPersistence(AppFixture.DatabaseName); + + opts.Transports.NodeControlEndpoint = + opts.Transports.GetOrCreateEndpoint( + new Uri($"tcp://localhost:{PortFinder.GetAvailablePort()}")); + } +} diff --git a/src/Persistence/CosmosDbTests/using_storage_return_types_and_entity_attributes.cs b/src/Persistence/CosmosDbTests/using_storage_return_types_and_entity_attributes.cs new file mode 100644 index 000000000..4671e8c00 --- /dev/null +++ b/src/Persistence/CosmosDbTests/using_storage_return_types_and_entity_attributes.cs @@ -0,0 +1,57 @@ +using Microsoft.Azure.Cosmos; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.CosmosDb; +using Wolverine.CosmosDb.Internals; +using Wolverine.Tracking; + +namespace CosmosDbTests; + +[Collection("cosmosdb")] +public class using_storage_return_types_and_entity_attributes +{ + private readonly AppFixture _fixture; + + public using_storage_return_types_and_entity_attributes(AppFixture fixture) + { + _fixture = fixture; + } + + [Fact] + public async Task can_use_cosmosdb_ops_as_side_effects() + { + await _fixture.ClearAll(); + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + opts.UseCosmosDbPersistence(AppFixture.DatabaseName); + opts.Services.AddSingleton(_fixture.Client); + opts.Discovery.IncludeAssembly(GetType().Assembly); + }).StartAsync(); + + var tracked = await host.InvokeMessageAndWaitAsync(new CreateDocument("doc1", "Test Document")); + tracked.Executed.MessagesOf().Any().ShouldBeTrue(); + } +} + +public record CreateDocument(string Id, string Name); + +public static class CreateDocumentHandler +{ + public static ICosmosDbOp Handle(CreateDocument command) + { + var doc = new TestDocument { id = command.Id, name = command.Name, partitionKey = "test" }; + return CosmosDbOps.Store(doc); + } +} + +public class TestDocument +{ + public string id { get; set; } = string.Empty; + public string name { get; set; } = string.Empty; + public string partitionKey { get; set; } = string.Empty; +} diff --git a/src/Persistence/Wolverine.CosmosDb/CosmosDbOutbox.cs b/src/Persistence/Wolverine.CosmosDb/CosmosDbOutbox.cs new file mode 100644 index 000000000..1dc241139 --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/CosmosDbOutbox.cs @@ -0,0 +1,30 @@ +using Microsoft.Azure.Cosmos; +using Wolverine.CosmosDb.Internals; +using Wolverine.Runtime; + +namespace Wolverine.CosmosDb; + +public class CosmosDbOutbox : MessageContext, ICosmosDbOutbox +{ + public CosmosDbOutbox(IWolverineRuntime runtime, Container container) : base(runtime) + { + Enroll(container); + } + + public void Enroll(Container container) + { + Container = container; + Transaction = new CosmosDbEnvelopeTransaction(container, this); + } + + /// + /// Flush out any outgoing outbox'd messages + /// + /// + public async Task SaveChangesAsync(CancellationToken cancellation = default) + { + await FlushOutgoingMessagesAsync(); + } + + public Container Container { get; private set; } = null!; +} diff --git a/src/Persistence/Wolverine.CosmosDb/ICosmosDbOp.cs b/src/Persistence/Wolverine.CosmosDb/ICosmosDbOp.cs new file mode 100644 index 000000000..3d1a296be --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/ICosmosDbOp.cs @@ -0,0 +1,53 @@ +using Microsoft.Azure.Cosmos; + +namespace Wolverine.CosmosDb; + +public interface ICosmosDbOp : ISideEffect +{ + Task Execute(Container container); +} + +public class StoreDoc(T Document) : ICosmosDbOp +{ + public async Task Execute(Container container) + { + await container.UpsertItemAsync(Document); + } +} + +public class DeleteById(string Id, string PartitionKeyValue) : ICosmosDbOp +{ + public async Task Execute(Container container) + { + try + { + await container.DeleteItemAsync(Id, new PartitionKey(PartitionKeyValue)); + } + catch (CosmosException e) when (e.StatusCode == System.Net.HttpStatusCode.NotFound) + { + // Already gone + } + } +} + +/// +/// Side effect helper class for Wolverine's integration with CosmosDb +/// +public static class CosmosDbOps +{ + /// + /// Store (upsert) a CosmosDb document + /// + /// + /// + /// + public static ICosmosDbOp Store(T document) => new StoreDoc(document); + + /// + /// Delete a CosmosDb document by its id and partition key + /// + /// + /// + /// + public static ICosmosDbOp Delete(string id, string partitionKey) => new DeleteById(id, partitionKey); +} diff --git a/src/Persistence/Wolverine.CosmosDb/ICosmosDbOutbox.cs b/src/Persistence/Wolverine.CosmosDb/ICosmosDbOutbox.cs new file mode 100644 index 000000000..42d99c987 --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/ICosmosDbOutbox.cs @@ -0,0 +1,20 @@ +using Microsoft.Azure.Cosmos; + +namespace Wolverine.CosmosDb; + +/// +/// Outbox-ed messaging sending with CosmosDb +/// +public interface ICosmosDbOutbox : IMessageBus +{ + /// + /// Current CosmosDB container + /// + Container Container { get; } + + /// + /// Enroll a CosmosDb container into the outbox'd sender + /// + /// + void Enroll(Container container); +} diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbEnvelopeTransaction.cs b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbEnvelopeTransaction.cs new file mode 100644 index 000000000..f834c2c0d --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbEnvelopeTransaction.cs @@ -0,0 +1,71 @@ +using Microsoft.Azure.Cosmos; +using Wolverine.Persistence.Durability; +using Wolverine.Runtime; + +namespace Wolverine.CosmosDb.Internals; + +public class CosmosDbEnvelopeTransaction : IEnvelopeTransaction +{ + private readonly int _nodeId; + private readonly CosmosDbMessageStore _store; + + public CosmosDbEnvelopeTransaction(Container container, MessageContext context) + { + if (context.Storage is CosmosDbMessageStore store) + { + _store = store; + _nodeId = context.Runtime.Options.Durability.AssignedNodeNumber; + } + else + { + throw new InvalidOperationException( + "This Wolverine application is not using CosmosDb as the backing message persistence"); + } + + Container = container; + } + + public Container Container { get; } + + public async Task TryMakeEagerIdempotencyCheckAsync(Envelope envelope, DurabilitySettings settings, + CancellationToken cancellation) + { + var copy = Envelope.ForPersistedHandled(envelope, DateTimeOffset.UtcNow, settings); + try + { + await PersistIncomingAsync(copy); + envelope.WasPersistedInInbox = true; + envelope.Status = EnvelopeStatus.Handled; + return true; + } + catch (Exception) + { + return false; + } + } + + public async Task PersistOutgoingAsync(Envelope envelope) + { + var outgoing = new OutgoingMessage(envelope); + await Container.UpsertItemAsync(outgoing, new PartitionKey(outgoing.PartitionKey)); + } + + public async Task PersistOutgoingAsync(Envelope[] envelopes) + { + foreach (var envelope in envelopes) + { + await PersistOutgoingAsync(envelope); + } + } + + public async Task PersistIncomingAsync(Envelope envelope) + { + var incoming = new IncomingMessage(envelope, _store); + await Container.UpsertItemAsync(incoming, new PartitionKey(incoming.PartitionKey)); + } + + public ValueTask RollbackAsync() + { + return ValueTask.CompletedTask; + } +} diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.Admin.cs b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.Admin.cs new file mode 100644 index 000000000..0e8d9c8c7 --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.Admin.cs @@ -0,0 +1,204 @@ +using Microsoft.Azure.Cosmos; +using Wolverine.Logging; +using Wolverine.Persistence.Durability; + +namespace Wolverine.CosmosDb.Internals; + +public partial class CosmosDbMessageStore : IMessageStoreAdmin +{ + public Task DeleteAllHandledAsync() + { + throw new NotSupportedException("This function is not yet supported by CosmosDb"); + } + + public async Task ClearAllAsync() + { + await DeleteByDocTypeAsync(DocumentTypes.Incoming); + await DeleteByDocTypeAsync(DocumentTypes.Outgoing); + await DeleteByDocTypeAsync(DocumentTypes.DeadLetter); + await DeleteByDocTypeAsync(DocumentTypes.Node); + await DeleteByDocTypeAsync(DocumentTypes.AgentAssignment); + await DeleteByDocTypeAsync(DocumentTypes.Lock); + await DeleteByDocTypeAsync(DocumentTypes.NodeRecord); + await DeleteByDocTypeAsync(DocumentTypes.AgentRestriction); + await DeleteByDocTypeAsync(DocumentTypes.NodeSequence); + } + + public Task RebuildAsync() + { + return ClearAllAsync(); + } + + public async Task FetchCountsAsync() + { + var counts = new PersistedCounts(); + + counts.DeadLetter = await CountByQueryAsync( + "SELECT VALUE COUNT(1) FROM c WHERE c.docType = @docType", + DocumentTypes.DeadLetter); + + counts.Handled = await CountByQueryAsync( + "SELECT VALUE COUNT(1) FROM c WHERE c.docType = @docType AND c.status = @status", + DocumentTypes.Incoming, ("@status", EnvelopeStatus.Handled)); + + counts.Incoming = await CountByQueryAsync( + "SELECT VALUE COUNT(1) FROM c WHERE c.docType = @docType AND c.status = @status", + DocumentTypes.Incoming, ("@status", EnvelopeStatus.Incoming)); + + counts.Outgoing = await CountByQueryAsync( + "SELECT VALUE COUNT(1) FROM c WHERE c.docType = @docType", + DocumentTypes.Outgoing); + + counts.Scheduled = await CountByQueryAsync( + "SELECT VALUE COUNT(1) FROM c WHERE c.docType = @docType AND c.status = @status", + DocumentTypes.Incoming, ("@status", EnvelopeStatus.Scheduled)); + + return counts; + } + + public async Task> AllIncomingAsync() + { + var queryText = "SELECT * FROM c WHERE c.docType = @docType"; + var query = new QueryDefinition(queryText) + .WithParameter("@docType", DocumentTypes.Incoming); + + var results = new List(); + using var iterator = _container.GetItemQueryIterator(query); + + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(); + results.AddRange(response.Select(m => m.Read())); + } + + return results; + } + + public async Task> AllOutgoingAsync() + { + var queryText = "SELECT * FROM c WHERE c.docType = @docType"; + var query = new QueryDefinition(queryText) + .WithParameter("@docType", DocumentTypes.Outgoing); + + var results = new List(); + using var iterator = _container.GetItemQueryIterator(query); + + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(); + results.AddRange(response.Select(m => m.Read())); + } + + return results; + } + + public async Task ReleaseAllOwnershipAsync() + { + await ReleaseOwnershipByDocTypeAsync(DocumentTypes.Incoming); + await ReleaseOwnershipByDocTypeAsync(DocumentTypes.Outgoing); + } + + public async Task ReleaseAllOwnershipAsync(int ownerId) + { + await ReleaseOwnershipByDocTypeAsync(DocumentTypes.Incoming, ownerId); + await ReleaseOwnershipByDocTypeAsync(DocumentTypes.Outgoing, ownerId); + } + + public async Task CheckConnectivityAsync(CancellationToken token) + { + var query = new QueryDefinition("SELECT VALUE COUNT(1) FROM c WHERE c.docType = @docType") + .WithParameter("@docType", DocumentTypes.Incoming); + using var iterator = _container.GetItemQueryIterator(query); + await iterator.ReadNextAsync(token); + } + + public async Task MigrateAsync() + { + var database = _client.GetDatabase(_databaseName); + var containerProperties = new ContainerProperties(DocumentTypes.ContainerName, DocumentTypes.PartitionKeyPath); + await database.CreateContainerIfNotExistsAsync(containerProperties); + } + + private async Task CountByQueryAsync(string queryText, string docType, + params (string name, object value)[] extraParams) + { + var query = new QueryDefinition(queryText) + .WithParameter("@docType", docType); + foreach (var (name, value) in extraParams) + { + query = query.WithParameter(name, value); + } + + using var iterator = _container.GetItemQueryIterator(query); + if (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(); + return response.FirstOrDefault(); + } + + return 0; + } + + private async Task DeleteByDocTypeAsync(string docType) + { + var queryText = "SELECT c.id, c.partitionKey FROM c WHERE c.docType = @docType"; + var query = new QueryDefinition(queryText) + .WithParameter("@docType", docType); + + using var iterator = _container.GetItemQueryIterator(query); + + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(); + foreach (var item in response) + { + string id = item.id; + string pk = item.partitionKey; + try + { + await _container.DeleteItemAsync(id, new PartitionKey(pk)); + } + catch (CosmosException) + { + // Best effort + } + } + } + } + + private async Task ReleaseOwnershipByDocTypeAsync(string docType, int? ownerId = null) + { + var queryText = ownerId.HasValue + ? "SELECT * FROM c WHERE c.docType = @docType AND c.ownerId = @ownerId" + : "SELECT * FROM c WHERE c.docType = @docType AND c.ownerId != 0"; + + var query = new QueryDefinition(queryText) + .WithParameter("@docType", docType); + + if (ownerId.HasValue) + { + query = query.WithParameter("@ownerId", ownerId.Value); + } + + using var iterator = _container.GetItemQueryIterator(query); + + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(); + foreach (var item in response) + { + string id = item.id; + string pk = item.partitionKey; + item.ownerId = 0; + try + { + await _container.ReplaceItemAsync(item, id, new PartitionKey(pk)); + } + catch (CosmosException) + { + // Best effort + } + } + } + } +} diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.DeadLetters.cs b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.DeadLetters.cs new file mode 100644 index 000000000..4d0823a50 --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.DeadLetters.cs @@ -0,0 +1,320 @@ +using System.Net; +using JasperFx.Core; +using Microsoft.Azure.Cosmos; +using Wolverine.Persistence.Durability; +using Wolverine.Persistence.Durability.DeadLetterManagement; + +namespace Wolverine.CosmosDb.Internals; + +public partial class CosmosDbMessageStore : IDeadLetters +{ + private static string DlqId(Guid id) + { + return $"deadletter|{id}"; + } + + public async Task DeadLetterEnvelopeByIdAsync(Guid id, string? tenantId = null) + { + try + { + var response = await _container.ReadItemAsync(DlqId(id), + new PartitionKey(DocumentTypes.DeadLetterPartition)); + return response.Resource.ToEnvelope(); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + return null; + } + } + + public async Task> SummarizeAllAsync(string serviceName, TimeRange range, + CancellationToken token) + { + var conditions = new List { "c.docType = @docType" }; + + if (range.From.HasValue) + { + conditions.Add("c.sentAt >= @from"); + } + + if (range.To.HasValue) + { + conditions.Add("c.sentAt <= @to"); + } + + var queryText = $"SELECT * FROM c WHERE {string.Join(" AND ", conditions)}"; + var query = new QueryDefinition(queryText) + .WithParameter("@docType", DocumentTypes.DeadLetter); + if (range.From.HasValue) query = query.WithParameter("@from", range.From.Value); + if (range.To.HasValue) query = query.WithParameter("@to", range.To.Value); + + var messages = new List(); + using var iterator = _container.GetItemQueryIterator(query, + requestOptions: new QueryRequestOptions + { + PartitionKey = new PartitionKey(DocumentTypes.DeadLetterPartition) + }); + + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(token); + messages.AddRange(response); + } + + var grouped = messages + .GroupBy(x => new { ReceivedAt = x.ReceivedAt ?? "", x.MessageType, x.ExceptionType }) + .Select(g => new DeadLetterQueueCount( + serviceName, + g.Key.ReceivedAt.IsNotEmpty() ? new Uri(g.Key.ReceivedAt) : Uri, + g.Key.MessageType ?? "", + g.Key.ExceptionType ?? "", + Uri, + g.Count())) + .ToList(); + + return grouped; + } + + public async Task QueryAsync(DeadLetterEnvelopeQuery query, CancellationToken token) + { + if (query.MessageIds != null && query.MessageIds.Any()) + { + var envelopes = new List(); + foreach (var id in query.MessageIds) + { + try + { + var response = await _container.ReadItemAsync(DlqId(id), + new PartitionKey(DocumentTypes.DeadLetterPartition), cancellationToken: token); + envelopes.Add(response.Resource.ToEnvelope()); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + // Skip missing + } + } + + return new DeadLetterEnvelopeResults + { + PageNumber = 1, + TotalCount = envelopes.Count, + Envelopes = envelopes, + DatabaseUri = Uri + }; + } + + var conditions = new List { "c.docType = @docType" }; + + if (query.Range?.From.HasValue == true) + { + conditions.Add("c.sentAt >= @from"); + } + + if (query.Range?.To.HasValue == true) + { + conditions.Add("c.sentAt <= @to"); + } + + if (query.ExceptionType.IsNotEmpty()) + { + conditions.Add("c.exceptionType = @exceptionType"); + } + + if (query.ExceptionMessage.IsNotEmpty()) + { + conditions.Add("STARTSWITH(c.exceptionMessage, @exceptionMessage)"); + } + + if (query.MessageType.IsNotEmpty()) + { + conditions.Add("c.messageType = @messageType"); + } + + if (query.ReceivedAt.IsNotEmpty()) + { + conditions.Add("c.receivedAt = @receivedAt"); + } + + var whereClause = string.Join(" AND ", conditions); + + // Count query + var countQueryText = $"SELECT VALUE COUNT(1) FROM c WHERE {whereClause}"; + var countQuery = RebuildQueryDef(countQueryText, query); + int totalCount; + using (var countIterator = _container.GetItemQueryIterator(countQuery, + requestOptions: new QueryRequestOptions + { + PartitionKey = new PartitionKey(DocumentTypes.DeadLetterPartition) + })) + { + var countResponse = await countIterator.ReadNextAsync(token); + totalCount = countResponse.FirstOrDefault(); + } + + // Paged query + if (query.PageNumber <= 0) query.PageNumber = 1; + var skip = (query.PageNumber - 1) * query.PageSize; + + var pagedQueryText = + $"SELECT * FROM c WHERE {whereClause} ORDER BY c.sentAt OFFSET @skip LIMIT @take"; + var pagedQuery = RebuildQueryDef(pagedQueryText, query) + .WithParameter("@skip", skip) + .WithParameter("@take", query.PageSize); + + var messages = new List(); + using var pagedIterator = _container.GetItemQueryIterator(pagedQuery, + requestOptions: new QueryRequestOptions + { + PartitionKey = new PartitionKey(DocumentTypes.DeadLetterPartition) + }); + + while (pagedIterator.HasMoreResults) + { + var response = await pagedIterator.ReadNextAsync(token); + messages.AddRange(response); + } + + return new DeadLetterEnvelopeResults + { + PageNumber = query.PageNumber, + TotalCount = totalCount, + Envelopes = messages.Select(m => m.ToEnvelope()).ToList(), + DatabaseUri = Uri + }; + } + + public async Task DiscardAsync(DeadLetterEnvelopeQuery query, CancellationToken token) + { + if (query.MessageIds != null && query.MessageIds.Any()) + { + foreach (var id in query.MessageIds) + { + try + { + await _container.DeleteItemAsync(DlqId(id), + new PartitionKey(DocumentTypes.DeadLetterPartition), cancellationToken: token); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + // Already gone + } + } + + return; + } + + var messages = await LoadDeadLettersByQuery(query, token); + foreach (var message in messages) + { + try + { + await _container.DeleteItemAsync(message.Id, + new PartitionKey(DocumentTypes.DeadLetterPartition), cancellationToken: token); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + // Already gone + } + } + } + + public async Task ReplayAsync(DeadLetterEnvelopeQuery query, CancellationToken token) + { + if (query.MessageIds != null && query.MessageIds.Any()) + { + foreach (var id in query.MessageIds) + { + await MarkDeadLetterAsReplayableAsync(DlqId(id), token); + } + + return; + } + + var messages = await LoadDeadLettersByQuery(query, token); + foreach (var message in messages) + { + message.Replayable = true; + await _container.ReplaceItemAsync(message, message.Id, + new PartitionKey(DocumentTypes.DeadLetterPartition), cancellationToken: token); + } + } + + public async Task MarkDeadLetterEnvelopesAsReplayableAsync(string exceptionType = "") + { + var query = new DeadLetterEnvelopeQuery + { + Range = TimeRange.AllTime(), + ExceptionType = exceptionType.IsEmpty() ? null : exceptionType + }; + await ReplayAsync(query, CancellationToken.None); + } + + public async Task MarkDeadLetterEnvelopesAsReplayableAsync(Guid[] ids, string? tenantId = null) + { + await ReplayAsync(new DeadLetterEnvelopeQuery { MessageIds = ids }, CancellationToken.None); + } + + private async Task MarkDeadLetterAsReplayableAsync(string id, CancellationToken token) + { + try + { + var response = await _container.ReadItemAsync(id, + new PartitionKey(DocumentTypes.DeadLetterPartition), cancellationToken: token); + var message = response.Resource; + message.Replayable = true; + await _container.ReplaceItemAsync(message, id, + new PartitionKey(DocumentTypes.DeadLetterPartition), cancellationToken: token); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + // Already gone + } + } + + private async Task> LoadDeadLettersByQuery(DeadLetterEnvelopeQuery query, + CancellationToken token) + { + var conditions = new List { "c.docType = @docType" }; + + if (query.Range.From.HasValue) conditions.Add("c.sentAt >= @from"); + if (query.Range.To.HasValue) conditions.Add("c.sentAt <= @to"); + if (query.ExceptionType.IsNotEmpty()) conditions.Add("c.exceptionType = @exceptionType"); + if (query.MessageType.IsNotEmpty()) conditions.Add("c.messageType = @messageType"); + if (query.ReceivedAt.IsNotEmpty()) conditions.Add("c.receivedAt = @receivedAt"); + + var queryText = $"SELECT * FROM c WHERE {string.Join(" AND ", conditions)}"; + var queryDef = RebuildQueryDef(queryText, query); + + var messages = new List(); + using var iterator = _container.GetItemQueryIterator(queryDef, + requestOptions: new QueryRequestOptions + { + PartitionKey = new PartitionKey(DocumentTypes.DeadLetterPartition) + }); + + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(token); + messages.AddRange(response); + } + + return messages; + } + + private QueryDefinition RebuildQueryDef(string queryText, DeadLetterEnvelopeQuery query) + { + var queryDef = new QueryDefinition(queryText) + .WithParameter("@docType", DocumentTypes.DeadLetter); + + if (query.Range?.From.HasValue == true) queryDef = queryDef.WithParameter("@from", query.Range.From.Value); + if (query.Range?.To.HasValue == true) queryDef = queryDef.WithParameter("@to", query.Range.To.Value); + if (query.ExceptionType.IsNotEmpty()) + queryDef = queryDef.WithParameter("@exceptionType", query.ExceptionType); + if (query.ExceptionMessage.IsNotEmpty()) + queryDef = queryDef.WithParameter("@exceptionMessage", query.ExceptionMessage); + if (query.MessageType.IsNotEmpty()) queryDef = queryDef.WithParameter("@messageType", query.MessageType); + if (query.ReceivedAt.IsNotEmpty()) queryDef = queryDef.WithParameter("@receivedAt", query.ReceivedAt); + + return queryDef; + } +} diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.Inbox.cs b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.Inbox.cs new file mode 100644 index 000000000..a0d35c990 --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.Inbox.cs @@ -0,0 +1,183 @@ +using System.Net; +using Microsoft.Azure.Cosmos; +using Wolverine.Persistence.Durability; +using Wolverine.Transports; + +namespace Wolverine.CosmosDb.Internals; + +public partial class CosmosDbMessageStore : IMessageInbox +{ + public async Task ScheduleExecutionAsync(Envelope envelope) + { + var id = _identity(envelope); + var partitionKey = envelope.Destination?.ToString() ?? DocumentTypes.SystemPartition; + try + { + var response = + await _container.ReadItemAsync(id, new PartitionKey(partitionKey)); + var message = response.Resource; + message.ExecutionTime = envelope.ScheduledTime; + message.Status = EnvelopeStatus.Scheduled; + message.Attempts = envelope.Attempts; + message.OwnerId = 0; + await _container.ReplaceItemAsync(message, id, new PartitionKey(partitionKey)); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + // Already gone + } + } + + public async Task MoveToDeadLetterStorageAsync(Envelope envelope, Exception? exception) + { + var id = _identity(envelope); + var partitionKey = envelope.Destination?.ToString() ?? DocumentTypes.SystemPartition; + + var dlq = new DeadLetterMessage(envelope, exception); + + if (envelope.DeliverBy.HasValue) + { + dlq.ExpirationTime = envelope.DeliverBy.Value; + } + else + { + dlq.ExpirationTime = DateTimeOffset.UtcNow.Add(_options.Durability.DeadLetterQueueExpiration); + } + + await _container.UpsertItemAsync(dlq, new PartitionKey(DocumentTypes.DeadLetterPartition)); + + try + { + await _container.DeleteItemAsync(id, new PartitionKey(partitionKey)); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + // Already gone + } + } + + public async Task IncrementIncomingEnvelopeAttemptsAsync(Envelope envelope) + { + var id = _identity(envelope); + var partitionKey = envelope.Destination?.ToString() ?? DocumentTypes.SystemPartition; + try + { + var response = + await _container.ReadItemAsync(id, new PartitionKey(partitionKey)); + var message = response.Resource; + message.Attempts = envelope.Attempts; + await _container.ReplaceItemAsync(message, id, new PartitionKey(partitionKey)); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + // Already gone + } + } + + public async Task StoreIncomingAsync(Envelope envelope) + { + var incoming = new IncomingMessage(envelope, this); + try + { + await _container.CreateItemAsync(incoming, new PartitionKey(incoming.PartitionKey)); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.Conflict) + { + throw new DuplicateIncomingEnvelopeException(envelope); + } + } + + public async Task StoreIncomingAsync(IReadOnlyList envelopes) + { + foreach (var envelope in envelopes) + { + var incoming = new IncomingMessage(envelope, this); + try + { + await _container.CreateItemAsync(incoming, new PartitionKey(incoming.PartitionKey)); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.Conflict) + { + // Skip duplicates in batch mode; DurableReceiver will retry one at a time + } + } + } + + public async Task ExistsAsync(Envelope envelope, CancellationToken cancellation) + { + var id = IdentityFor(envelope); + var partitionKey = envelope.Destination?.ToString() ?? DocumentTypes.SystemPartition; + try + { + await _container.ReadItemAsync(id, new PartitionKey(partitionKey), + cancellationToken: cancellation); + return true; + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + return false; + } + } + + public Task RescheduleExistingEnvelopeForRetryAsync(Envelope envelope) + { + envelope.Status = EnvelopeStatus.Scheduled; + envelope.OwnerId = TransportConstants.AnyNode; + + return StoreIncomingAsync(envelope); + } + + public async Task MarkIncomingEnvelopeAsHandledAsync(Envelope envelope) + { + var id = _identity(envelope); + var partitionKey = envelope.Destination?.ToString() ?? DocumentTypes.SystemPartition; + try + { + var response = + await _container.ReadItemAsync(id, new PartitionKey(partitionKey)); + var message = response.Resource; + message.Status = EnvelopeStatus.Handled; + message.KeepUntil = DateTimeOffset.UtcNow.Add(_options.Durability.KeepAfterMessageHandling); + await _container.ReplaceItemAsync(message, id, new PartitionKey(partitionKey)); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + // Already gone + } + } + + public async Task MarkIncomingEnvelopeAsHandledAsync(IReadOnlyList envelopes) + { + foreach (var envelope in envelopes) + { + await MarkIncomingEnvelopeAsHandledAsync(envelope); + } + } + + public async Task ReleaseIncomingAsync(int ownerId, Uri receivedAt) + { + var partitionKey = receivedAt.ToString(); + var queryText = + "SELECT * FROM c WHERE c.docType = @docType AND c.ownerId = @ownerId AND c.receivedAt = @receivedAt"; + var query = new QueryDefinition(queryText) + .WithParameter("@docType", DocumentTypes.Incoming) + .WithParameter("@ownerId", ownerId) + .WithParameter("@receivedAt", receivedAt.ToString()); + + using var iterator = _container.GetItemQueryIterator(query, + requestOptions: new QueryRequestOptions + { + PartitionKey = new PartitionKey(partitionKey) + }); + + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(); + foreach (var message in response) + { + message.OwnerId = 0; + await _container.ReplaceItemAsync(message, message.Id, new PartitionKey(partitionKey)); + } + } + } +} diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.Locking.cs b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.Locking.cs new file mode 100644 index 000000000..59e085d48 --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.Locking.cs @@ -0,0 +1,207 @@ +using System.Net; +using Newtonsoft.Json; +using Microsoft.Azure.Cosmos; +using Wolverine.Runtime; + +namespace Wolverine.CosmosDb.Internals; + +public partial class CosmosDbMessageStore +{ + private string _leaderLockId; + private string _scheduledLockId; + private CosmosDistributedLock? _scheduledLock; + private string? _scheduledLockETag; + private IWolverineRuntime? _runtime; + + private CosmosDistributedLock? _leaderLock; + private string? _leaderLockETag; + + public bool HasLeadershipLock() + { + if (_leaderLock == null) return false; + if (_leaderLock.ExpirationTime < DateTimeOffset.UtcNow) return false; + return true; + } + + public async Task TryAttainLeadershipLockAsync(CancellationToken token) + { + var newLock = new CosmosDistributedLock + { + Id = _leaderLockId, + NodeId = _options.UniqueNodeId.ToString(), + ExpirationTime = DateTimeOffset.UtcNow.AddMinutes(5) + }; + + if (_leaderLock == null) + { + try + { + var response = await _container.CreateItemAsync(newLock, + new PartitionKey(DocumentTypes.SystemPartition), cancellationToken: token); + _leaderLock = newLock; + _leaderLockETag = response.ETag; + return true; + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.Conflict) + { + // Lock already exists, try to take it if expired + try + { + var existing = await _container.ReadItemAsync( + _leaderLockId, new PartitionKey(DocumentTypes.SystemPartition), + cancellationToken: token); + + if (existing.Resource.ExpirationTime < DateTimeOffset.UtcNow) + { + var replaceResponse = await _container.ReplaceItemAsync(newLock, + _leaderLockId, new PartitionKey(DocumentTypes.SystemPartition), + new ItemRequestOptions { IfMatchEtag = existing.ETag }, + cancellationToken: token); + _leaderLock = newLock; + _leaderLockETag = replaceResponse.ETag; + return true; + } + } + catch (CosmosException) + { + // Another node took it + } + + return false; + } + } + + try + { + var response = await _container.ReplaceItemAsync(newLock, _leaderLockId, + new PartitionKey(DocumentTypes.SystemPartition), + new ItemRequestOptions { IfMatchEtag = _leaderLockETag }, + cancellationToken: token); + _leaderLock = newLock; + _leaderLockETag = response.ETag; + return true; + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.PreconditionFailed || + e.StatusCode == HttpStatusCode.NotFound) + { + _leaderLock = null; + _leaderLockETag = null; + return false; + } + } + + public async Task ReleaseLeadershipLockAsync() + { + if (_leaderLock == null) return; + try + { + await _container.DeleteItemAsync( + _leaderLockId, new PartitionKey(DocumentTypes.SystemPartition), + new ItemRequestOptions { IfMatchEtag = _leaderLockETag }); + } + catch (CosmosException) + { + // Best effort + } + + _leaderLock = null; + _leaderLockETag = null; + } + + public async Task TryAttainScheduledJobLockAsync(CancellationToken token) + { + var newLock = new CosmosDistributedLock + { + Id = _scheduledLockId, + NodeId = _options.UniqueNodeId.ToString(), + ExpirationTime = DateTimeOffset.UtcNow.AddMinutes(5) + }; + + if (_scheduledLock == null) + { + try + { + var response = await _container.CreateItemAsync(newLock, + new PartitionKey(DocumentTypes.SystemPartition), cancellationToken: token); + _scheduledLock = newLock; + _scheduledLockETag = response.ETag; + return true; + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.Conflict) + { + try + { + var existing = await _container.ReadItemAsync( + _scheduledLockId, new PartitionKey(DocumentTypes.SystemPartition), + cancellationToken: token); + + if (existing.Resource.ExpirationTime < DateTimeOffset.UtcNow) + { + var replaceResponse = await _container.ReplaceItemAsync(newLock, + _scheduledLockId, new PartitionKey(DocumentTypes.SystemPartition), + new ItemRequestOptions { IfMatchEtag = existing.ETag }, + cancellationToken: token); + _scheduledLock = newLock; + _scheduledLockETag = replaceResponse.ETag; + return true; + } + } + catch (CosmosException) + { + // Another node took it + } + + return false; + } + } + + try + { + var response = await _container.ReplaceItemAsync(newLock, _scheduledLockId, + new PartitionKey(DocumentTypes.SystemPartition), + new ItemRequestOptions { IfMatchEtag = _scheduledLockETag }, + cancellationToken: token); + _scheduledLock = newLock; + _scheduledLockETag = response.ETag; + return true; + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.PreconditionFailed || + e.StatusCode == HttpStatusCode.NotFound) + { + _scheduledLock = null; + _scheduledLockETag = null; + return false; + } + } + + public async Task ReleaseScheduledJobLockAsync() + { + if (_scheduledLock == null) return; + try + { + await _container.DeleteItemAsync( + _scheduledLockId, new PartitionKey(DocumentTypes.SystemPartition), + new ItemRequestOptions { IfMatchEtag = _scheduledLockETag }); + } + catch (CosmosException) + { + // Best effort + } + + _scheduledLock = null; + _scheduledLockETag = null; + } +} + +public class CosmosDistributedLock +{ + [JsonProperty("id")] public string Id { get; set; } = string.Empty; + + [JsonProperty("docType")] public string DocType { get; set; } = DocumentTypes.Lock; + + [JsonProperty("partitionKey")] public string PartitionKey { get; set; } = DocumentTypes.SystemPartition; + + [JsonProperty("nodeId")] public string NodeId { get; set; } = string.Empty; + + [JsonProperty("expirationTime")] public DateTimeOffset ExpirationTime { get; set; } +} diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.NodeAgents.cs b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.NodeAgents.cs new file mode 100644 index 000000000..3d09bac88 --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.NodeAgents.cs @@ -0,0 +1,522 @@ +using System.Net; +using Newtonsoft.Json; +using Microsoft.Azure.Cosmos; +using Wolverine.Runtime.Agents; + +namespace Wolverine.CosmosDb.Internals; + +public partial class CosmosDbMessageStore : INodeAgentPersistence +{ + async Task INodeAgentPersistence.ClearAllAsync(CancellationToken cancellationToken) + { + var nodes = await Nodes.LoadAllNodesAsync(cancellationToken); + foreach (var node in nodes) + { + try + { + await _container.DeleteItemAsync( + $"node|{node.NodeId}", new PartitionKey(DocumentTypes.SystemPartition), + cancellationToken: cancellationToken); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + } + + foreach (var agent in node.ActiveAgents) + { + try + { + await _container.DeleteItemAsync( + CosmosAgentAssignment.ToId(agent), new PartitionKey(DocumentTypes.SystemPartition), + cancellationToken: cancellationToken); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + } + } + } + } + + public async Task PersistAsync(WolverineNode node, CancellationToken cancellationToken) + { + var sequenceId = $"{DocumentTypes.NodeSequence}|sequence"; + CosmosNodeSequence? sequence; + try + { + var response = await _container.ReadItemAsync( + sequenceId, new PartitionKey(DocumentTypes.SystemPartition), + cancellationToken: cancellationToken); + sequence = response.Resource; + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + sequence = new CosmosNodeSequence { Id = sequenceId }; + } + + node.AssignedNodeNumber = ++sequence.Count; + + await _container.UpsertItemAsync(sequence, new PartitionKey(DocumentTypes.SystemPartition), + cancellationToken: cancellationToken); + + var nodeDoc = new CosmosWolverineNode(node); + await _container.UpsertItemAsync(nodeDoc, new PartitionKey(DocumentTypes.SystemPartition), + cancellationToken: cancellationToken); + + return node.AssignedNodeNumber; + } + + public async Task DeleteAsync(Guid nodeId, int assignedNodeNumber) + { + try + { + await _container.DeleteItemAsync( + $"node|{nodeId}", new PartitionKey(DocumentTypes.SystemPartition)); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + } + + // Delete agent assignments for this node + var queryText = + "SELECT * FROM c WHERE c.docType = @docType AND c.nodeId = @nodeId"; + var query = new QueryDefinition(queryText) + .WithParameter("@docType", DocumentTypes.AgentAssignment) + .WithParameter("@nodeId", nodeId.ToString()); + + using var iterator = _container.GetItemQueryIterator(query, + requestOptions: new QueryRequestOptions + { + PartitionKey = new PartitionKey(DocumentTypes.SystemPartition) + }); + + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(); + foreach (var assignment in response) + { + try + { + await _container.DeleteItemAsync( + assignment.Id, new PartitionKey(DocumentTypes.SystemPartition)); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + } + } + } + + await ReleaseAllOwnershipAsync(assignedNodeNumber); + } + + public async Task> LoadAllNodesAsync(CancellationToken cancellationToken) + { + var nodeQuery = new QueryDefinition("SELECT * FROM c WHERE c.docType = @docType") + .WithParameter("@docType", DocumentTypes.Node); + + var nodes = new List(); + using (var iterator = _container.GetItemQueryIterator(nodeQuery, + requestOptions: new QueryRequestOptions + { + PartitionKey = new PartitionKey(DocumentTypes.SystemPartition) + })) + { + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(cancellationToken); + nodes.AddRange(response); + } + } + + var assignmentQuery = new QueryDefinition("SELECT * FROM c WHERE c.docType = @docType") + .WithParameter("@docType", DocumentTypes.AgentAssignment); + + var assignments = new List(); + using (var iterator = _container.GetItemQueryIterator(assignmentQuery, + requestOptions: new QueryRequestOptions + { + PartitionKey = new PartitionKey(DocumentTypes.SystemPartition) + })) + { + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(cancellationToken); + assignments.AddRange(response); + } + } + + var result = new List(); + foreach (var nodeDoc in nodes) + { + var node = nodeDoc.ToWolverineNode(); + node.ActiveAgents.Clear(); + node.ActiveAgents.AddRange(assignments.Where(x => x.NodeId == node.NodeId.ToString()) + .Select(x => new Uri(x.AgentUri))); + result.Add(node); + } + + return result; + } + + public async Task PersistAgentRestrictionsAsync(IReadOnlyList restrictions, + CancellationToken cancellationToken) + { + foreach (var restriction in restrictions) + { + if (restriction.Type == AgentRestrictionType.None) + { + try + { + await _container.DeleteItemAsync( + $"restriction|{restriction.Id}", new PartitionKey(DocumentTypes.SystemPartition), + cancellationToken: cancellationToken); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + } + } + else + { + var doc = new CosmosAgentRestriction + { + Id = $"restriction|{restriction.Id}", + AgentUri = restriction.AgentUri.ToString(), + Type = restriction.Type, + NodeNumber = restriction.NodeNumber + }; + await _container.UpsertItemAsync(doc, new PartitionKey(DocumentTypes.SystemPartition), + cancellationToken: cancellationToken); + } + } + } + + public async Task LoadNodeAgentStateAsync(CancellationToken cancellationToken) + { + var nodes = await LoadAllNodesAsync(cancellationToken); + + var restrictionQuery = new QueryDefinition("SELECT * FROM c WHERE c.docType = @docType") + .WithParameter("@docType", DocumentTypes.AgentRestriction); + + var restrictions = new List(); + using var iterator = _container.GetItemQueryIterator(restrictionQuery, + requestOptions: new QueryRequestOptions + { + PartitionKey = new PartitionKey(DocumentTypes.SystemPartition) + }); + + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(cancellationToken); + restrictions.AddRange(response); + } + + var agentRestrictions = restrictions.Select( + x => new AgentRestriction(Guid.Parse(x.Id.Replace("restriction|", "")), + new Uri(x.AgentUri), x.Type, x.NodeNumber) + ); + + return new NodeAgentState(nodes, new AgentRestrictions([.. agentRestrictions])); + } + + public async Task AssignAgentsAsync(Guid nodeId, IReadOnlyList agents, + CancellationToken cancellationToken) + { + foreach (var agent in agents) + { + var assignment = new CosmosAgentAssignment(agent, nodeId); + await _container.UpsertItemAsync(assignment, new PartitionKey(DocumentTypes.SystemPartition), + cancellationToken: cancellationToken); + } + } + + public async Task RemoveAssignmentAsync(Guid nodeId, Uri agentUri, CancellationToken cancellationToken) + { + try + { + await _container.DeleteItemAsync( + CosmosAgentAssignment.ToId(agentUri), new PartitionKey(DocumentTypes.SystemPartition), + cancellationToken: cancellationToken); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + } + } + + public async Task AddAssignmentAsync(Guid nodeId, Uri agentUri, CancellationToken cancellationToken) + { + var assignment = new CosmosAgentAssignment(agentUri, nodeId); + await _container.UpsertItemAsync(assignment, new PartitionKey(DocumentTypes.SystemPartition), + cancellationToken: cancellationToken); + } + + public async Task LoadNodeAsync(Guid nodeId, CancellationToken cancellationToken) + { + try + { + var response = await _container.ReadItemAsync( + $"node|{nodeId}", new PartitionKey(DocumentTypes.SystemPartition), + cancellationToken: cancellationToken); + + var node = response.Resource.ToWolverineNode(); + + var assignmentQuery = + new QueryDefinition( + "SELECT * FROM c WHERE c.docType = @docType AND c.nodeId = @nodeId") + .WithParameter("@docType", DocumentTypes.AgentAssignment) + .WithParameter("@nodeId", nodeId.ToString()); + + var assignments = new List(); + using var iterator = _container.GetItemQueryIterator(assignmentQuery, + requestOptions: new QueryRequestOptions + { + PartitionKey = new PartitionKey(DocumentTypes.SystemPartition) + }); + + while (iterator.HasMoreResults) + { + var resp = await iterator.ReadNextAsync(cancellationToken); + assignments.AddRange(resp); + } + + node.ActiveAgents.Clear(); + node.ActiveAgents.AddRange(assignments.OrderBy(x => x.Id).Select(x => new Uri(x.AgentUri))); + + return node; + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + return null; + } + } + + public async Task MarkHealthCheckAsync(WolverineNode node, CancellationToken cancellationToken) + { + try + { + var response = await _container.ReadItemAsync( + $"node|{node.NodeId}", new PartitionKey(DocumentTypes.SystemPartition), + cancellationToken: cancellationToken); + var doc = response.Resource; + doc.LastHealthCheck = DateTimeOffset.UtcNow; + await _container.ReplaceItemAsync(doc, doc.Id, new PartitionKey(DocumentTypes.SystemPartition), + cancellationToken: cancellationToken); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + // Node doesn't exist, create it + var doc = new CosmosWolverineNode(node) { LastHealthCheck = DateTimeOffset.UtcNow }; + await _container.UpsertItemAsync(doc, new PartitionKey(DocumentTypes.SystemPartition), + cancellationToken: cancellationToken); + } + } + + public async Task OverwriteHealthCheckTimeAsync(Guid nodeId, DateTimeOffset lastHeartbeatTime) + { + try + { + var response = await _container.ReadItemAsync( + $"node|{nodeId}", new PartitionKey(DocumentTypes.SystemPartition)); + var doc = response.Resource; + doc.LastHealthCheck = lastHeartbeatTime; + await _container.ReplaceItemAsync(doc, doc.Id, new PartitionKey(DocumentTypes.SystemPartition)); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + // Node doesn't exist + } + } + + public async Task LogRecordsAsync(params NodeRecord[] records) + { + foreach (var record in records) + { + var doc = new CosmosNodeRecord(record); + await _container.UpsertItemAsync(doc, new PartitionKey(DocumentTypes.SystemPartition)); + } + } + + public async Task> FetchRecentRecordsAsync(int count) + { + var queryText = + "SELECT * FROM c WHERE c.docType = @docType ORDER BY c.timestamp DESC OFFSET 0 LIMIT @limit"; + var query = new QueryDefinition(queryText) + .WithParameter("@docType", DocumentTypes.NodeRecord) + .WithParameter("@limit", count); + + var results = new List(); + using var iterator = _container.GetItemQueryIterator(query, + requestOptions: new QueryRequestOptions + { + PartitionKey = new PartitionKey(DocumentTypes.SystemPartition) + }); + + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(); + results.AddRange(response); + } + + results.Reverse(); + return results.Select(x => x.ToNodeRecord()).ToList(); + } +} + +// Helper document types for CosmosDB + +public class CosmosWolverineNode +{ + public CosmosWolverineNode() + { + } + + public CosmosWolverineNode(WolverineNode node) + { + Id = $"node|{node.NodeId}"; + NodeId = node.NodeId.ToString(); + Description = node.Description; + AssignedNodeNumber = node.AssignedNodeNumber; + ControlUri = node.ControlUri?.ToString(); + LastHealthCheck = node.LastHealthCheck; + Started = node.Started; + Version = node.Version?.ToString(); + Capabilities = node.Capabilities.Select(x => x.ToString()).ToList(); + } + + [JsonProperty("id")] public string Id { get; set; } = string.Empty; + + [JsonProperty("docType")] public string DocType { get; set; } = DocumentTypes.Node; + + [JsonProperty("partitionKey")] public string PartitionKey { get; set; } = DocumentTypes.SystemPartition; + + [JsonProperty("nodeId")] public string NodeId { get; set; } = string.Empty; + + [JsonProperty("description")] public string? Description { get; set; } + + [JsonProperty("assignedNodeNumber")] + public int AssignedNodeNumber { get; set; } + + [JsonProperty("controlUri")] public string? ControlUri { get; set; } + + [JsonProperty("lastHealthCheck")] public DateTimeOffset LastHealthCheck { get; set; } + + [JsonProperty("started")] public DateTimeOffset Started { get; set; } + + [JsonProperty("version")] public string? Version { get; set; } + + [JsonProperty("capabilities")] public List Capabilities { get; set; } = new(); + + public WolverineNode ToWolverineNode() + { + var node = new WolverineNode + { + NodeId = Guid.Parse(NodeId), + Description = Description ?? Environment.MachineName, + AssignedNodeNumber = AssignedNodeNumber, + ControlUri = ControlUri != null ? new Uri(ControlUri) : null, + LastHealthCheck = LastHealthCheck, + Started = Started, + Version = Version != null ? new Version(Version) : new Version(0, 0, 0, 0) + }; + node.Capabilities.AddRange(Capabilities.Select(x => new Uri(x))); + return node; + } +} + +public class CosmosAgentAssignment +{ + public static string ToId(Uri uri) + { + return "agent|" + uri.ToString().TrimEnd('/').Replace("//", "/").Replace(':', '_').Replace('/', '_'); + } + + public CosmosAgentAssignment() + { + } + + public CosmosAgentAssignment(Uri agentUri, Guid nodeId) + { + Id = ToId(agentUri); + NodeId = nodeId.ToString(); + AgentUri = agentUri.ToString(); + } + + [JsonProperty("id")] public string Id { get; set; } = string.Empty; + + [JsonProperty("docType")] public string DocType { get; set; } = DocumentTypes.AgentAssignment; + + [JsonProperty("partitionKey")] public string PartitionKey { get; set; } = DocumentTypes.SystemPartition; + + [JsonProperty("nodeId")] public string NodeId { get; set; } = string.Empty; + + [JsonProperty("agentUri")] public string AgentUri { get; set; } = string.Empty; +} + +public class CosmosNodeSequence +{ + [JsonProperty("id")] public string Id { get; set; } = $"{DocumentTypes.NodeSequence}|sequence"; + + [JsonProperty("docType")] public string DocType { get; set; } = DocumentTypes.NodeSequence; + + [JsonProperty("partitionKey")] public string PartitionKey { get; set; } = DocumentTypes.SystemPartition; + + [JsonProperty("count")] public int Count { get; set; } +} + +public class CosmosAgentRestriction +{ + [JsonProperty("id")] public string Id { get; set; } = string.Empty; + + [JsonProperty("docType")] public string DocType { get; set; } = DocumentTypes.AgentRestriction; + + [JsonProperty("partitionKey")] public string PartitionKey { get; set; } = DocumentTypes.SystemPartition; + + [JsonProperty("agentUri")] public string AgentUri { get; set; } = string.Empty; + + [JsonProperty("type")] public AgentRestrictionType Type { get; set; } + + [JsonProperty("nodeNumber")] public int NodeNumber { get; set; } +} + +public class CosmosNodeRecord +{ + public CosmosNodeRecord() + { + } + + public CosmosNodeRecord(NodeRecord record) + { + Id = $"record|{Guid.NewGuid()}"; + NodeNumber = record.NodeNumber; + RecordType = record.RecordType; + Timestamp = record.Timestamp; + Description = record.Description; + ServiceName = record.ServiceName; + } + + [JsonProperty("id")] public string Id { get; set; } = string.Empty; + + [JsonProperty("docType")] public string DocType { get; set; } = DocumentTypes.NodeRecord; + + [JsonProperty("partitionKey")] public string PartitionKey { get; set; } = DocumentTypes.SystemPartition; + + [JsonProperty("nodeNumber")] public int NodeNumber { get; set; } + + [JsonProperty("recordType")] public NodeRecordType RecordType { get; set; } + + [JsonProperty("timestamp")] public DateTimeOffset Timestamp { get; set; } + + [JsonProperty("description")] public string? Description { get; set; } + + [JsonProperty("serviceName")] public string? ServiceName { get; set; } + + public NodeRecord ToNodeRecord() + { + return new NodeRecord + { + NodeNumber = NodeNumber, + RecordType = RecordType, + Timestamp = Timestamp, + Description = Description ?? string.Empty, + ServiceName = ServiceName ?? string.Empty + }; + } +} diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.Outbox.cs b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.Outbox.cs new file mode 100644 index 000000000..bd894e4f2 --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.Outbox.cs @@ -0,0 +1,91 @@ +using System.Net; +using Microsoft.Azure.Cosmos; +using Wolverine.Persistence.Durability; + +namespace Wolverine.CosmosDb.Internals; + +public partial class CosmosDbMessageStore : IMessageOutbox +{ + public async Task> LoadOutgoingAsync(Uri destination) + { + var partitionKey = destination.ToString(); + var queryText = + "SELECT * FROM c WHERE c.docType = @docType AND c.destination = @destination"; + var query = new QueryDefinition(queryText) + .WithParameter("@docType", DocumentTypes.Outgoing) + .WithParameter("@destination", destination.ToString()); + + var results = new List(); + using var iterator = _container.GetItemQueryIterator(query, + requestOptions: new QueryRequestOptions + { + PartitionKey = new PartitionKey(partitionKey) + }); + + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(); + results.AddRange(response.Select(x => x.Read())); + } + + return results; + } + + public async Task StoreOutgoingAsync(Envelope envelope, int ownerId) + { + var outgoing = new OutgoingMessage(envelope) + { + OwnerId = ownerId + }; + + await _container.UpsertItemAsync(outgoing, new PartitionKey(outgoing.PartitionKey)); + } + + public async Task DeleteOutgoingAsync(Envelope[] envelopes) + { + foreach (var envelope in envelopes) + { + await DeleteOutgoingAsync(envelope); + } + } + + public async Task DeleteOutgoingAsync(Envelope envelope) + { + var id = $"outgoing|{envelope.Id}"; + var partitionKey = envelope.Destination?.ToString() ?? DocumentTypes.SystemPartition; + try + { + await _container.DeleteItemAsync(id, new PartitionKey(partitionKey)); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + // Already gone + } + } + + public async Task DiscardAndReassignOutgoingAsync(Envelope[] discards, Envelope[] reassigned, int nodeId) + { + foreach (var discard in discards) + { + await DeleteOutgoingAsync(discard); + } + + foreach (var envelope in reassigned) + { + var id = $"outgoing|{envelope.Id}"; + var partitionKey = envelope.Destination?.ToString() ?? DocumentTypes.SystemPartition; + try + { + var response = + await _container.ReadItemAsync(id, new PartitionKey(partitionKey)); + var message = response.Resource; + message.OwnerId = nodeId; + await _container.ReplaceItemAsync(message, id, new PartitionKey(partitionKey)); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + // Already gone + } + } + } +} diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.cs b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.cs new file mode 100644 index 000000000..29931047a --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.cs @@ -0,0 +1,158 @@ +using System.Net; +using JasperFx.Core.Reflection; +using JasperFx.Descriptors; +using Microsoft.Azure.Cosmos; +using Wolverine.CosmosDb.Internals.Durability; +using Wolverine.Persistence.Durability; +using Wolverine.Runtime; +using Wolverine.Runtime.Agents; +using Wolverine.Transports; + +namespace Wolverine.CosmosDb.Internals; + +public partial class CosmosDbMessageStore : IMessageStoreWithAgentSupport +{ + private readonly Container _container; + private readonly CosmosClient _client; + private readonly WolverineOptions _options; + private readonly string _databaseName; + private readonly Func _identity; + + public CosmosDbMessageStore(CosmosClient client, string databaseName, Container container, + WolverineOptions options) + { + _client = client; + _databaseName = databaseName; + _container = container; + _options = options; + + _identity = options.Durability.MessageIdentity == MessageIdentity.IdOnly + ? e => $"incoming|{e.Id}" + : e => + $"incoming|{e.Id}|{e.Destination?.ToString().Replace(":/", "").TrimEnd('/')}"; + + _leaderLockId = $"lock|leader"; + _scheduledLockId = $"lock|scheduled"; + } + + public MessageStoreRole Role { get; set; } = MessageStoreRole.Main; + + public List TenantIds { get; } = new(); + + public void PromoteToMain(IWolverineRuntime runtime) + { + Role = MessageStoreRole.Main; + } + + public void DemoteToAncillary() + { + Role = MessageStoreRole.Ancillary; + } + + public string Name => _databaseName; + + public Uri Uri => new("cosmosdb://durability"); + + public string IdentityFor(Envelope envelope) => _identity(envelope); + + public ValueTask DisposeAsync() + { + HasDisposed = true; + return new ValueTask(); + } + + public bool HasDisposed { get; set; } + public IMessageInbox Inbox => this; + public IMessageOutbox Outbox => this; + public INodeAgentPersistence Nodes => this; + public IMessageStoreAdmin Admin => this; + public IDeadLetters DeadLetters => this; + + public void Initialize(IWolverineRuntime runtime) + { + // Container creation is handled during setup/migration + } + + public DatabaseDescriptor Describe() + { + return new DatabaseDescriptor(this) + { + Engine = "cosmosdb", + DatabaseName = _databaseName + }; + } + + public Task DrainAsync() + { + return Task.CompletedTask; + } + + public IAgent StartScheduledJobs(IWolverineRuntime runtime) + { + _leaderLockId = $"lock|leader|{runtime.Options.ServiceName.ToLowerInvariant()}"; + _scheduledLockId = $"lock|scheduled|{runtime.Options.ServiceName.ToLowerInvariant()}"; + _runtime = runtime; + var agent = BuildAgent(runtime); + agent.As().StartTimers(); + return agent; + } + + public IAgent BuildAgent(IWolverineRuntime runtime) + { + return new CosmosDbDurabilityAgent(_container, runtime, this); + } + + public IAgentFamily? BuildAgentFamily(IWolverineRuntime runtime) + { + return null; + } + + public async Task> LoadPageOfGloballyOwnedIncomingAsync(Uri listenerAddress, int limit) + { + var partitionKey = listenerAddress.ToString(); + var queryText = + "SELECT * FROM c WHERE c.docType = @docType AND c.ownerId = @ownerId AND c.receivedAt = @receivedAt AND c.status = @status ORDER BY c.envelopeId OFFSET 0 LIMIT @limit"; + var query = new QueryDefinition(queryText) + .WithParameter("@docType", DocumentTypes.Incoming) + .WithParameter("@ownerId", TransportConstants.AnyNode) + .WithParameter("@receivedAt", listenerAddress.ToString()) + .WithParameter("@status", EnvelopeStatus.Incoming) + .WithParameter("@limit", limit); + + var results = new List(); + using var iterator = _container.GetItemQueryIterator(query, + requestOptions: new QueryRequestOptions + { + PartitionKey = new PartitionKey(partitionKey) + }); + + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(); + results.AddRange(response.Select(x => x.Read())); + } + + return results; + } + + public async Task ReassignIncomingAsync(int ownerId, IReadOnlyList incoming) + { + foreach (var envelope in incoming) + { + var id = _identity(envelope); + var partitionKey = envelope.Destination?.ToString() ?? DocumentTypes.SystemPartition; + try + { + var response = + await _container.ReadItemAsync(id, new PartitionKey(partitionKey)); + var message = response.Resource; + message.OwnerId = ownerId; + await _container.ReplaceItemAsync(message, id, new PartitionKey(partitionKey)); + } + catch (CosmosException e) when (e.StatusCode == HttpStatusCode.NotFound) + { + // Envelope was already handled/deleted, skip it + } + } + } +} diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbPersistenceFrameProvider.cs b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbPersistenceFrameProvider.cs new file mode 100644 index 000000000..63246a585 --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbPersistenceFrameProvider.cs @@ -0,0 +1,216 @@ +using JasperFx; +using JasperFx.CodeGeneration; +using JasperFx.CodeGeneration.Frames; +using JasperFx.CodeGeneration.Model; +using JasperFx.Core.Reflection; +using Microsoft.Azure.Cosmos; +using Wolverine.Configuration; +using Wolverine.Persistence; +using Wolverine.Persistence.Sagas; +using Wolverine.Runtime; +using MethodCall = JasperFx.CodeGeneration.Frames.MethodCall; + +namespace Wolverine.CosmosDb.Internals; + +public class CosmosDbPersistenceFrameProvider : IPersistenceFrameProvider +{ + public Frame[] DetermineFrameToNullOutMaybeSoftDeleted(Variable entity) => []; + + public void ApplyTransactionSupport(IChain chain, IServiceContainer container) + { + if (!chain.Middleware.OfType().Any()) + { + chain.Middleware.Add(new TransactionalFrame(chain)); + + if (chain is not SagaChain) + { + chain.Postprocessors.Add(new FlushOutgoingMessages()); + } + } + } + + public void ApplyTransactionSupport(IChain chain, IServiceContainer container, Type entityType) + { + ApplyTransactionSupport(chain, container); + } + + public bool CanApply(IChain chain, IServiceContainer container) + { + if (chain is SagaChain) + { + return true; + } + + if (chain.ReturnVariablesOfType().Any()) return true; + + var serviceDependencies = chain + .ServiceDependencies(container, new[] { typeof(Container) }.ToArray()); + return serviceDependencies.Any(x => x == typeof(Container)); + } + + public bool CanPersist(Type entityType, IServiceContainer container, out Type persistenceService) + { + persistenceService = typeof(Container); + return true; + } + + public Type DetermineSagaIdType(Type sagaType, IServiceContainer container) + { + return typeof(string); + } + + public Frame DetermineLoadFrame(IServiceContainer container, Type sagaType, Variable sagaId) + { + return new LoadDocumentFrame(sagaType, sagaId); + } + + public Frame DetermineInsertFrame(Variable saga, IServiceContainer container) + { + return new CosmosDbUpsertFrame(saga); + } + + public Frame CommitUnitOfWorkFrame(Variable saga, IServiceContainer container) + { + // CosmosDB operations are immediate, but we still flush outgoing messages + return new FlushOutgoingMessages(); + } + + public Frame DetermineUpdateFrame(Variable saga, IServiceContainer container) + { + return new CosmosDbUpsertFrame(saga); + } + + public Frame DetermineDeleteFrame(Variable sagaId, Variable saga, IServiceContainer container) + { + return new CosmosDbDeleteDocumentFrame(sagaId, saga); + } + + public Frame DetermineStoreFrame(Variable saga, IServiceContainer container) + { + return DetermineUpdateFrame(saga, container); + } + + public Frame DetermineDeleteFrame(Variable variable, IServiceContainer container) + { + return new CosmosDbDeleteByVariableFrame(variable); + } + + public Frame DetermineStorageActionFrame(Type entityType, Variable action, IServiceContainer container) + { + var method = typeof(CosmosDbStorageActionApplier).GetMethod("ApplyAction")! + .MakeGenericMethod(entityType); + + var call = new MethodCall(typeof(CosmosDbStorageActionApplier), method); + call.Arguments[1] = action; + + return call; + } +} + +public static class CosmosDbStorageActionApplier +{ + public static async Task ApplyAction(Container container, IStorageAction action) + { + if (action.Entity == null) return; + + switch (action.Action) + { + case StorageAction.Delete: + // For delete, we need the id. Use ToString() as a convention + var deleteId = action.Entity!.ToString()!; + try + { + await container.DeleteItemAsync(deleteId, PartitionKey.None); + } + catch (CosmosException) + { + // Best effort + } + + break; + case StorageAction.Insert: + case StorageAction.Store: + case StorageAction.Update: + await container.UpsertItemAsync(action.Entity); + break; + } + } +} + +internal class CosmosDbUpsertFrame : AsyncFrame +{ + private readonly Variable _document; + private Variable? _container; + + public CosmosDbUpsertFrame(Variable document) + { + _document = document; + uses.Add(_document); + } + + public override IEnumerable FindVariables(IMethodVariables chain) + { + _container = chain.FindVariable(typeof(Container)); + yield return _container; + } + + public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) + { + writer.Write( + $"await {_container!.Usage}.UpsertItemAsync({_document.Usage}).ConfigureAwait(false);"); + Next?.GenerateCode(method, writer); + } +} + +internal class CosmosDbDeleteDocumentFrame : AsyncFrame +{ + private readonly Variable _sagaId; + private readonly Variable _saga; + private Variable? _container; + + public CosmosDbDeleteDocumentFrame(Variable sagaId, Variable saga) + { + _sagaId = sagaId; + _saga = saga; + uses.Add(_sagaId); + uses.Add(_saga); + } + + public override IEnumerable FindVariables(IMethodVariables chain) + { + _container = chain.FindVariable(typeof(Container)); + yield return _container; + } + + public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) + { + writer.Write( + $"await {_container!.Usage}.DeleteItemAsync<{_saga.VariableType.FullNameInCode()}>({_sagaId.Usage}, {typeof(PartitionKey).FullNameInCode()}.None).ConfigureAwait(false);"); + Next?.GenerateCode(method, writer); + } +} + +internal class CosmosDbDeleteByVariableFrame : AsyncFrame +{ + private readonly Variable _variable; + private Variable? _container; + + public CosmosDbDeleteByVariableFrame(Variable variable) + { + _variable = variable; + uses.Add(_variable); + } + + public override IEnumerable FindVariables(IMethodVariables chain) + { + _container = chain.FindVariable(typeof(Container)); + yield return _container; + } + + public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) + { + writer.Write( + $"await {_container!.Usage}.DeleteItemAsync<{_variable.VariableType.FullNameInCode()}>({_variable.Usage}.ToString(), {typeof(PartitionKey).FullNameInCode()}.None).ConfigureAwait(false);"); + Next?.GenerateCode(method, writer); + } +} diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/DeadLetterMessage.cs b/src/Persistence/Wolverine.CosmosDb/Internals/DeadLetterMessage.cs new file mode 100644 index 000000000..006643248 --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/Internals/DeadLetterMessage.cs @@ -0,0 +1,81 @@ +using JasperFx.Core.Reflection; +using Newtonsoft.Json; +using Wolverine.Persistence.Durability; +using Wolverine.Runtime.Serialization; + +namespace Wolverine.CosmosDb.Internals; + +public class DeadLetterMessage +{ + public DeadLetterMessage() + { + } + + public DeadLetterMessage(Envelope envelope, Exception? exception) + { + Id = $"deadletter|{envelope.Id}"; + EnvelopeId = envelope.Id; + MessageType = envelope.MessageType; + ReceivedAt = envelope.Destination?.ToString(); + SentAt = envelope.SentAt; + ScheduledTime = envelope.ScheduledTime; + Source = envelope.Source; + ExceptionType = exception?.GetType().FullNameInCode(); + ExceptionMessage = exception?.Message; + Body = EnvelopeSerializer.Serialize(envelope); + PartitionKey = DocumentTypes.DeadLetterPartition; + } + + [JsonProperty("id")] + public string Id { get; set; } = string.Empty; + + [JsonProperty("docType")] + public string DocType { get; set; } = DocumentTypes.DeadLetter; + + [JsonProperty("partitionKey")] + public string PartitionKey { get; set; } = DocumentTypes.DeadLetterPartition; + + [JsonProperty("envelopeId")] + public Guid EnvelopeId { get; set; } + + [JsonProperty("messageType")] + public string? MessageType { get; set; } + + [JsonProperty("receivedAt")] + public string? ReceivedAt { get; set; } + + [JsonProperty("sentAt")] + public DateTimeOffset? SentAt { get; set; } + + [JsonProperty("scheduledTime")] + public DateTimeOffset? ScheduledTime { get; set; } + + [JsonProperty("source")] + public string? Source { get; set; } + + [JsonProperty("exceptionType")] + public string? ExceptionType { get; set; } + + [JsonProperty("exceptionMessage")] + public string? ExceptionMessage { get; set; } + + [JsonProperty("replayable")] + public bool Replayable { get; set; } + + [JsonProperty("body")] + public byte[] Body { get; set; } = []; + + [JsonProperty("expirationTime")] + public DateTimeOffset ExpirationTime { get; set; } + + [JsonProperty("_etag")] + public string? ETag { get; set; } + + public DeadLetterEnvelope ToEnvelope() + { + var envelope = EnvelopeSerializer.Deserialize(Body); + return new DeadLetterEnvelope(EnvelopeId, ScheduledTime, envelope, MessageType ?? "", + ReceivedAt ?? "", Source ?? "", ExceptionType ?? "", ExceptionMessage ?? "", + SentAt ?? DateTimeOffset.MinValue, Replayable); + } +} diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/DocumentTypes.cs b/src/Persistence/Wolverine.CosmosDb/Internals/DocumentTypes.cs new file mode 100644 index 000000000..3f6676387 --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/Internals/DocumentTypes.cs @@ -0,0 +1,20 @@ +namespace Wolverine.CosmosDb.Internals; + +public static class DocumentTypes +{ + public const string Incoming = "incoming"; + public const string Outgoing = "outgoing"; + public const string DeadLetter = "deadletter"; + public const string Node = "node"; + public const string AgentAssignment = "agent-assignment"; + public const string Lock = "lock"; + public const string NodeRecord = "node-record"; + public const string AgentRestriction = "agent-restriction"; + public const string NodeSequence = "node-sequence"; + + public const string ContainerName = "wolverine"; + public const string PartitionKeyPath = "/partitionKey"; + + public const string SystemPartition = "system"; + public const string DeadLetterPartition = "deadletter"; +} diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/Durability/CosmosDbDurabilityAgent.Incoming.cs b/src/Persistence/Wolverine.CosmosDb/Internals/Durability/CosmosDbDurabilityAgent.Incoming.cs new file mode 100644 index 000000000..5bda8499e --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/Internals/Durability/CosmosDbDurabilityAgent.Incoming.cs @@ -0,0 +1,73 @@ +using Microsoft.Azure.Cosmos; +using Microsoft.Extensions.Logging; +using Wolverine.Logging; +using Wolverine.Transports; + +namespace Wolverine.CosmosDb.Internals.Durability; + +public partial class CosmosDbDurabilityAgent +{ + private async Task tryRecoverIncomingMessages() + { + try + { + var queryText = + "SELECT DISTINCT c.receivedAt FROM c WHERE c.docType = @docType AND c.ownerId = 0"; + var query = new QueryDefinition(queryText) + .WithParameter("@docType", DocumentTypes.Incoming); + + using var iterator = _container.GetItemQueryIterator(query); + + var listeners = new List(); + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(); + foreach (var item in response) + { + string? receivedAt = item.receivedAt; + if (receivedAt != null) + { + listeners.Add(receivedAt); + } + } + } + + foreach (var listenerStr in listeners) + { + var receivedAt = new Uri(listenerStr); + var circuit = _runtime.Endpoints.FindListenerCircuit(receivedAt); + if (circuit.Status != ListeningStatus.Accepting) + { + continue; + } + + await recoverMessagesForListener(receivedAt, circuit); + } + } + catch (Exception e) + { + _logger.LogError(e, "Error trying to recover messages from the durable inbox"); + } + } + + private async Task recoverMessagesForListener(Uri listener, IListenerCircuit circuit) + { + try + { + var envelopes = await _parent.LoadPageOfGloballyOwnedIncomingAsync(listener, + _settings.RecoveryBatchSize); + await _parent.ReassignIncomingAsync(_settings.AssignedNodeNumber, envelopes); + + await circuit.EnqueueDirectlyAsync(envelopes); + _logger.RecoveredIncoming(envelopes); + + _logger.LogInformation( + "Successfully recovered {Count} messages from the inbox for listener {Listener}", + envelopes.Count, listener); + } + catch (Exception e) + { + _logger.LogError(e, "Error trying to recover messages from the inbox for listener {Uri}", listener); + } + } +} diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/Durability/CosmosDbDurabilityAgent.Outgoing.cs b/src/Persistence/Wolverine.CosmosDb/Internals/Durability/CosmosDbDurabilityAgent.Outgoing.cs new file mode 100644 index 000000000..0ab398f71 --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/Internals/Durability/CosmosDbDurabilityAgent.Outgoing.cs @@ -0,0 +1,69 @@ +using Microsoft.Azure.Cosmos; +using Microsoft.Extensions.Logging; + +namespace Wolverine.CosmosDb.Internals.Durability; + +public partial class CosmosDbDurabilityAgent +{ + private async Task tryRecoverOutgoingMessagesAsync() + { + try + { + var queryText = + "SELECT DISTINCT c.destination FROM c WHERE c.docType = @docType AND c.ownerId = 0"; + var query = new QueryDefinition(queryText) + .WithParameter("@docType", DocumentTypes.Outgoing); + + using var iterator = _container.GetItemQueryIterator(query); + + var destinations = new List(); + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(); + foreach (var item in response) + { + string? destination = item.destination; + if (destination != null) + { + destinations.Add(destination); + } + } + } + + foreach (var destinationStr in destinations) + { + await tryRecoverOutgoingMessagesToSenderAsync(new Uri(destinationStr)); + } + } + catch (Exception e) + { + _logger.LogError(e, "Error trying to recover orphaned outgoing messages"); + } + } + + private async Task tryRecoverOutgoingMessagesToSenderAsync(Uri sender) + { + try + { + var sendingAgent = _runtime.Endpoints.GetOrBuildSendingAgent(sender); + if (sendingAgent.Latched) return; + + var outgoing = await _parent.Outbox.LoadOutgoingAsync(sendingAgent.Destination); + var expiredMessages = outgoing.Where(x => x.IsExpired()).ToArray(); + var good = outgoing.Where(x => !x.IsExpired()).ToArray(); + + await _parent.Outbox.DiscardAndReassignOutgoingAsync(expiredMessages, good, + _runtime.Options.Durability.AssignedNodeNumber); + + foreach (var envelope in good) await sendingAgent.EnqueueOutgoingAsync(envelope); + + _logger.LogInformation( + "Recovered {Count} messages from outbox for destination {Destination} while discarding {ExpiredCount} expired messages", + good.Length, sendingAgent.Destination, expiredMessages.Length); + } + catch (Exception e) + { + _logger.LogError(e, "Error trying to find a sending agent for {Destination}", sender); + } + } +} diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/Durability/CosmosDbDurabilityAgent.Scheduled.cs b/src/Persistence/Wolverine.CosmosDb/Internals/Durability/CosmosDbDurabilityAgent.Scheduled.cs new file mode 100644 index 000000000..e909400b5 --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/Internals/Durability/CosmosDbDurabilityAgent.Scheduled.cs @@ -0,0 +1,85 @@ +using Microsoft.Azure.Cosmos; +using Microsoft.Extensions.Logging; + +namespace Wolverine.CosmosDb.Internals.Durability; + +public partial class CosmosDbDurabilityAgent +{ + private async Task runScheduledJobs() + { + try + { + if (!(await _parent.TryAttainScheduledJobLockAsync(_combined.Token))) + { + return; + } + } + catch (Exception e) + { + _logger.LogError(e, "Error trying to attain the scheduled job lock"); + return; + } + + try + { + var queryText = + "SELECT * FROM c WHERE c.docType = @docType AND c.status = @status AND c.executionTime <= @now ORDER BY c.executionTime OFFSET 0 LIMIT @limit"; + var query = new QueryDefinition(queryText) + .WithParameter("@docType", DocumentTypes.Incoming) + .WithParameter("@status", EnvelopeStatus.Scheduled) + .WithParameter("@now", DateTimeOffset.UtcNow) + .WithParameter("@limit", _settings.RecoveryBatchSize); + + var incoming = new List(); + using var iterator = _container.GetItemQueryIterator(query); + + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(_combined.Token); + incoming.AddRange(response); + } + + if (!incoming.Any()) + { + return; + } + + await locallyPublishScheduledMessages(incoming); + } + catch (Exception e) + { + _logger.LogError(e, "Error while trying to process scheduled messages"); + } + finally + { + try + { + await _parent.ReleaseScheduledJobLockAsync(); + } + catch (Exception e) + { + _logger.LogError(e, "Error trying to release the scheduled job lock"); + } + } + } + + private async Task locallyPublishScheduledMessages(List incoming) + { + var envelopes = incoming.Select(x => x.Read()).ToList(); + + foreach (var message in incoming) + { + message.Status = EnvelopeStatus.Incoming; + message.OwnerId = _settings.AssignedNodeNumber; + await _container.ReplaceItemAsync(message, message.Id, + new PartitionKey(message.PartitionKey)); + } + + foreach (var envelope in envelopes) + { + _logger.LogInformation("Locally enqueuing scheduled message {Id} of type {MessageType}", envelope.Id, + envelope.MessageType); + await _localQueue.EnqueueAsync(envelope); + } + } +} diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/Durability/CosmosDbDurabilityAgent.cs b/src/Persistence/Wolverine.CosmosDb/Internals/Durability/CosmosDbDurabilityAgent.cs new file mode 100644 index 000000000..f42dd7adb --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/Internals/Durability/CosmosDbDurabilityAgent.cs @@ -0,0 +1,159 @@ +using JasperFx; +using JasperFx.Core; +using JasperFx.Core.Reflection; +using Microsoft.Azure.Cosmos; +using Microsoft.Extensions.Logging; +using Wolverine.Persistence; +using Wolverine.Runtime; +using Wolverine.Runtime.Agents; +using Wolverine.Runtime.WorkerQueues; +using Wolverine.Transports; + +namespace Wolverine.CosmosDb.Internals.Durability; + +public partial class CosmosDbDurabilityAgent : IAgent +{ + private readonly Container _container; + private readonly IWolverineRuntime _runtime; + private readonly CosmosDbMessageStore _parent; + private readonly ILocalQueue _localQueue; + private readonly DurabilitySettings _settings; + private readonly ILogger _logger; + + private Task? _recoveryTask; + private Task? _scheduledJob; + + private readonly CancellationTokenSource _cancellation = new(); + private readonly CancellationTokenSource _combined; + private PersistenceMetrics? _metrics; + + public CosmosDbDurabilityAgent(Container container, IWolverineRuntime runtime, + CosmosDbMessageStore parent) + { + _container = container; + _runtime = runtime; + _parent = parent; + _localQueue = (ILocalQueue)runtime.Endpoints.AgentForLocalQueue(TransportConstants.Scheduled); + _settings = runtime.DurabilitySettings; + + Uri = new Uri("cosmosdb://durability"); + + _logger = runtime.LoggerFactory.CreateLogger(); + + _combined = CancellationTokenSource.CreateLinkedTokenSource(runtime.Cancellation, _cancellation.Token); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + StartTimers(); + return Task.CompletedTask; + } + + internal void StartTimers() + { + _metrics = new PersistenceMetrics(_runtime.Meter, _settings, null); + + if (_settings.DurabilityMetricsEnabled) + { + _metrics.StartPolling(_runtime.LoggerFactory.CreateLogger(), _parent); + } + + var recoveryStart = _settings.ScheduledJobFirstExecution.Add(new Random().Next(0, 1000).Milliseconds()); + + _recoveryTask = Task.Run(async () => + { + await Task.Delay(recoveryStart, _combined.Token); + using var timer = new PeriodicTimer(_settings.ScheduledJobPollingTime); + + while (!_combined.IsCancellationRequested) + { + var lastExpiredTime = DateTimeOffset.UtcNow; + + await tryRecoverIncomingMessages(); + await tryRecoverOutgoingMessagesAsync(); + + if (_settings.DeadLetterQueueExpirationEnabled) + { + var now = DateTimeOffset.UtcNow; + if (now > lastExpiredTime.AddHours(1)) + { + await tryDeleteExpiredDeadLetters(); + } + } + + await timer.WaitForNextTickAsync(_combined.Token); + } + }, _combined.Token); + + _scheduledJob = Task.Run(async () => + { + await Task.Delay(recoveryStart, _combined.Token); + using var timer = new PeriodicTimer(_settings.ScheduledJobPollingTime); + + while (!_combined.IsCancellationRequested) + { + await runScheduledJobs(); + await timer.WaitForNextTickAsync(_combined.Token); + } + }, _combined.Token); + } + + private async Task tryDeleteExpiredDeadLetters() + { + var now = DateTimeOffset.UtcNow; + var queryText = + "SELECT c.id, c.partitionKey FROM c WHERE c.docType = @docType AND c.expirationTime < @now"; + var query = new QueryDefinition(queryText) + .WithParameter("@docType", DocumentTypes.DeadLetter) + .WithParameter("@now", now); + + using var iterator = _container.GetItemQueryIterator(query, + requestOptions: new QueryRequestOptions + { + PartitionKey = new PartitionKey(DocumentTypes.DeadLetterPartition) + }); + + while (iterator.HasMoreResults) + { + var response = await iterator.ReadNextAsync(); + foreach (var item in response) + { + string id = item.id; + try + { + await _container.DeleteItemAsync(id, + new PartitionKey(DocumentTypes.DeadLetterPartition)); + } + catch (CosmosException) + { + // Best effort + } + } + } + } + + public Task StopAsync(CancellationToken cancellationToken) + { + _cancellation.Cancel(); + + if (_metrics != null) + { + _metrics.SafeDispose(); + } + + if (_recoveryTask != null) + { + _recoveryTask.SafeDispose(); + } + + if (_scheduledJob != null) + { + _scheduledJob.SafeDispose(); + } + + return Task.CompletedTask; + } + + public Uri Uri { get; set; } + public AgentStatus Status { get; set; } +} diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/IncomingMessage.cs b/src/Persistence/Wolverine.CosmosDb/Internals/IncomingMessage.cs new file mode 100644 index 000000000..e433a79bf --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/Internals/IncomingMessage.cs @@ -0,0 +1,90 @@ +using Newtonsoft.Json; +using Wolverine.Runtime.Serialization; + +namespace Wolverine.CosmosDb.Internals; + +public class IncomingMessage +{ + public IncomingMessage() + { + } + + public IncomingMessage(Envelope envelope, CosmosDbMessageStore store) + { + Id = store.IdentityFor(envelope); + EnvelopeId = envelope.Id; + Status = envelope.Status; + OwnerId = envelope.OwnerId; + ExecutionTime = envelope.ScheduledTime?.ToUniversalTime(); + Attempts = envelope.Attempts; + Body = envelope.Status == EnvelopeStatus.Handled ? [] : EnvelopeSerializer.Serialize(envelope); + MessageType = envelope.MessageType!; + ReceivedAt = envelope.Destination?.ToString(); + PartitionKey = envelope.Destination?.ToString() ?? DocumentTypes.SystemPartition; + } + + [JsonProperty("id")] + public string Id { get; set; } = string.Empty; + + [JsonProperty("docType")] + public string DocType { get; set; } = DocumentTypes.Incoming; + + [JsonProperty("partitionKey")] + public string PartitionKey { get; set; } = string.Empty; + + [JsonProperty("envelopeId")] + public Guid EnvelopeId { get; set; } + + [JsonProperty("status")] + public EnvelopeStatus Status { get; set; } = EnvelopeStatus.Incoming; + + [JsonProperty("ownerId")] + public int OwnerId { get; set; } + + [JsonProperty("executionTime")] + public DateTimeOffset? ExecutionTime { get; set; } + + [JsonProperty("attempts")] + public int Attempts { get; set; } + + [JsonProperty("body")] + public byte[] Body { get; set; } = []; + + [JsonProperty("messageType")] + public string MessageType { get; set; } = string.Empty; + + [JsonProperty("receivedAt")] + public string? ReceivedAt { get; set; } + + [JsonProperty("keepUntil")] + public DateTimeOffset? KeepUntil { get; set; } + + [JsonProperty("_etag")] + public string? ETag { get; set; } + + public Envelope Read() + { + Envelope envelope; + if (Body == null || Body.Length == 0) + { + envelope = new Envelope + { + Id = EnvelopeId, + MessageType = MessageType, + Destination = ReceivedAt != null ? new Uri(ReceivedAt) : null, + Data = [] + }; + } + else + { + envelope = EnvelopeSerializer.Deserialize(Body); + } + + envelope.Id = EnvelopeId; + envelope.OwnerId = OwnerId; + envelope.Status = Status; + envelope.Attempts = Attempts; + envelope.ScheduledTime = ExecutionTime; + return envelope; + } +} diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/LoadDocumentFrame.cs b/src/Persistence/Wolverine.CosmosDb/Internals/LoadDocumentFrame.cs new file mode 100644 index 000000000..ec24b08c9 --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/Internals/LoadDocumentFrame.cs @@ -0,0 +1,52 @@ +using JasperFx.CodeGeneration; +using JasperFx.CodeGeneration.Frames; +using JasperFx.CodeGeneration.Model; +using JasperFx.Core.Reflection; +using Microsoft.Azure.Cosmos; + +namespace Wolverine.CosmosDb.Internals; + +internal class LoadDocumentFrame : AsyncFrame +{ + private readonly Variable _sagaId; + private Variable? _cancellation; + private Variable? _container; + + public LoadDocumentFrame(Type sagaType, Variable sagaId) + { + _sagaId = sagaId; + Saga = new Variable(sagaType, this); + } + + public Variable Saga { get; } + + public override IEnumerable FindVariables(IMethodVariables chain) + { + _container = chain.FindVariable(typeof(Container)); + yield return _container; + + _cancellation = chain.FindVariable(typeof(CancellationToken)); + yield return _cancellation; + } + + public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) + { + writer.WriteLine(""); + writer.WriteComment("Try to load the existing saga document from CosmosDB"); + writer.Write( + $"{Saga.VariableType.FullNameInCode()} {Saga.Usage} = default;"); + writer.Write($"try"); + writer.Write($"{{"); + writer.Write( + $" var _cosmosResponse = await {_container!.Usage}.ReadItemAsync<{Saga.VariableType.FullNameInCode()}>({_sagaId.Usage}, {typeof(PartitionKey).FullNameInCode()}.None, cancellationToken: {_cancellation!.Usage}).ConfigureAwait(false);"); + writer.Write($" {Saga.Usage} = _cosmosResponse.Resource;"); + writer.Write($"}}"); + writer.Write( + $"catch ({typeof(CosmosException).FullNameInCode()} e) when (e.StatusCode == System.Net.HttpStatusCode.NotFound)"); + writer.Write($"{{"); + writer.Write($" {Saga.Usage} = default;"); + writer.Write($"}}"); + + Next?.GenerateCode(method, writer); + } +} diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/OutgoingMessage.cs b/src/Persistence/Wolverine.CosmosDb/Internals/OutgoingMessage.cs new file mode 100644 index 000000000..233d20bd0 --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/Internals/OutgoingMessage.cs @@ -0,0 +1,65 @@ +using Newtonsoft.Json; +using Wolverine.Runtime.Serialization; + +namespace Wolverine.CosmosDb.Internals; + +public class OutgoingMessage +{ + public OutgoingMessage() + { + } + + public OutgoingMessage(Envelope envelope) + { + Id = $"outgoing|{envelope.Id}"; + EnvelopeId = envelope.Id; + OwnerId = envelope.OwnerId; + Attempts = envelope.Attempts; + Body = EnvelopeSerializer.Serialize(envelope); + MessageType = envelope.MessageType!; + Destination = envelope.Destination?.ToString(); + DeliverBy = envelope.DeliverBy?.ToUniversalTime(); + PartitionKey = envelope.Destination?.ToString() ?? DocumentTypes.SystemPartition; + } + + [JsonProperty("id")] + public string Id { get; set; } = string.Empty; + + [JsonProperty("docType")] + public string DocType { get; set; } = DocumentTypes.Outgoing; + + [JsonProperty("partitionKey")] + public string PartitionKey { get; set; } = string.Empty; + + [JsonProperty("envelopeId")] + public Guid EnvelopeId { get; set; } + + [JsonProperty("ownerId")] + public int OwnerId { get; set; } + + [JsonProperty("destination")] + public string? Destination { get; set; } + + [JsonProperty("deliverBy")] + public DateTimeOffset? DeliverBy { get; set; } + + [JsonProperty("body")] + public byte[] Body { get; set; } = []; + + [JsonProperty("attempts")] + public int Attempts { get; set; } + + [JsonProperty("messageType")] + public string MessageType { get; set; } = string.Empty; + + [JsonProperty("_etag")] + public string? ETag { get; set; } + + public Envelope Read() + { + var envelope = EnvelopeSerializer.Deserialize(Body); + envelope.OwnerId = OwnerId; + envelope.Attempts = Attempts; + return envelope; + } +} diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/TransactionalFrame.cs b/src/Persistence/Wolverine.CosmosDb/Internals/TransactionalFrame.cs new file mode 100644 index 000000000..b456d99de --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/Internals/TransactionalFrame.cs @@ -0,0 +1,53 @@ +using JasperFx.CodeGeneration; +using JasperFx.CodeGeneration.Frames; +using JasperFx.CodeGeneration.Model; +using Microsoft.Azure.Cosmos; +using Wolverine.Configuration; +using Wolverine.Persistence.Sagas; +using Wolverine.Runtime; + +namespace Wolverine.CosmosDb.Internals; + +internal class TransactionalFrame : Frame +{ + private readonly IChain _chain; + private Variable? _cancellation; + private Variable? _context; + + public TransactionalFrame(IChain chain) : base(true) + { + _chain = chain; + } + + public Variable? Container { get; private set; } + + public override IEnumerable FindVariables(IMethodVariables chain) + { + _cancellation = chain.FindVariable(typeof(CancellationToken)); + yield return _cancellation; + + // Container is resolved from DI (registered by UseCosmosDbPersistence) + Container = chain.FindVariable(typeof(Container)); + yield return Container; + + _context = chain.TryFindVariable(typeof(IMessageContext), VariableSource.NotServices); + + if (_context != null) + { + yield return _context; + } + } + + public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) + { + if (_context != null) + { + writer.BlankLine(); + writer.WriteComment("Enlist in CosmosDB outbox transaction"); + writer.Write( + $"{_context.Usage}.{nameof(MessageContext.EnlistInOutbox)}(new {typeof(CosmosDbEnvelopeTransaction).FullName}({Container!.Usage}, {_context.Usage}));"); + } + + Next?.GenerateCode(method, writer); + } +} diff --git a/src/Persistence/Wolverine.CosmosDb/Wolverine.CosmosDb.csproj b/src/Persistence/Wolverine.CosmosDb/Wolverine.CosmosDb.csproj new file mode 100644 index 000000000..b80ff8e65 --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/Wolverine.CosmosDb.csproj @@ -0,0 +1,22 @@ + + + + CosmosDb Saga, Message, and Outbox storage for Wolverine applications + WolverineFx.CosmosDb + false + false + false + false + false + + + + + + + + + + + + diff --git a/src/Persistence/Wolverine.CosmosDb/WolverineCosmosDbExtensions.cs b/src/Persistence/Wolverine.CosmosDb/WolverineCosmosDbExtensions.cs new file mode 100644 index 000000000..687a99f38 --- /dev/null +++ b/src/Persistence/Wolverine.CosmosDb/WolverineCosmosDbExtensions.cs @@ -0,0 +1,39 @@ +using Microsoft.Azure.Cosmos; +using Microsoft.Extensions.DependencyInjection; +using Wolverine.CosmosDb.Internals; +using Wolverine.Persistence.Durability; +using Wolverine.Persistence.Sagas; + +namespace Wolverine.CosmosDb; + +public static class WolverineCosmosDbExtensions +{ + /// + /// Utilize CosmosDb for envelope and saga storage with this system. + /// Requires a CosmosClient and database name to be configured. + /// + /// + /// The CosmosDB database name to use + /// + public static WolverineOptions UseCosmosDbPersistence(this WolverineOptions options, string databaseName) + { + options.Services.AddSingleton(sp => + { + var client = sp.GetRequiredService(); + var container = client.GetDatabase(databaseName).GetContainer(DocumentTypes.ContainerName); + var wolverineOptions = sp.GetRequiredService(); + return new CosmosDbMessageStore(client, databaseName, container, wolverineOptions); + }); + + // Register the CosmosDB Container for use by code-generated handlers + options.Services.AddSingleton(sp => + { + var client = sp.GetRequiredService(); + return client.GetDatabase(databaseName).GetContainer(DocumentTypes.ContainerName); + }); + + options.CodeGeneration.InsertFirstPersistenceStrategy(); + options.CodeGeneration.ReferenceAssembly(typeof(WolverineCosmosDbExtensions).Assembly); + return options; + } +} diff --git a/wolverine.sln b/wolverine.sln index c9a462b9f..c001e37a4 100644 --- a/wolverine.sln +++ b/wolverine.sln @@ -108,6 +108,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".SolutionItems", ".Solution .github\workflows\http.yml = .github\workflows\http.yml .github\workflows\persistence.yml = .github\workflows\persistence.yml .github\workflows\pulsar.yml = .github\workflows\pulsar.yml + Directory.Packages.props = Directory.Packages.props EndProjectSection EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.MemoryPack", "src\Extensions\Wolverine.MemoryPack\Wolverine.MemoryPack.csproj", "{469A2C91-64B4-439B-9097-05499770FFB3}" @@ -350,6 +351,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.Nats", "src\Trans EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.Nats.Tests", "src\Transports\NATS\Wolverine.Nats.Tests\Wolverine.Nats.Tests.csproj", "{77B49C73-29B7-47A5-9475-AC290F53D76D}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "CosmosDb", "CosmosDb", "{68B94BE1-185D-D133-8A8C-EFE0C95F2BC7}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.CosmosDb", "src\Persistence\Wolverine.CosmosDb\Wolverine.CosmosDb.csproj", "{9DBA7EBE-C6E6-4F26-87E8-D87A6CDDE737}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CosmosDbTests", "src\Persistence\CosmosDbTests\CosmosDbTests.csproj", "{E0D51CAE-97CF-48A8-879E-149A4E69BEE2}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -1956,6 +1963,30 @@ Global {77B49C73-29B7-47A5-9475-AC290F53D76D}.Release|x64.Build.0 = Release|Any CPU {77B49C73-29B7-47A5-9475-AC290F53D76D}.Release|x86.ActiveCfg = Release|Any CPU {77B49C73-29B7-47A5-9475-AC290F53D76D}.Release|x86.Build.0 = Release|Any CPU + {9DBA7EBE-C6E6-4F26-87E8-D87A6CDDE737}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9DBA7EBE-C6E6-4F26-87E8-D87A6CDDE737}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9DBA7EBE-C6E6-4F26-87E8-D87A6CDDE737}.Debug|x64.ActiveCfg = Debug|Any CPU + {9DBA7EBE-C6E6-4F26-87E8-D87A6CDDE737}.Debug|x64.Build.0 = Debug|Any CPU + {9DBA7EBE-C6E6-4F26-87E8-D87A6CDDE737}.Debug|x86.ActiveCfg = Debug|Any CPU + {9DBA7EBE-C6E6-4F26-87E8-D87A6CDDE737}.Debug|x86.Build.0 = Debug|Any CPU + {9DBA7EBE-C6E6-4F26-87E8-D87A6CDDE737}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9DBA7EBE-C6E6-4F26-87E8-D87A6CDDE737}.Release|Any CPU.Build.0 = Release|Any CPU + {9DBA7EBE-C6E6-4F26-87E8-D87A6CDDE737}.Release|x64.ActiveCfg = Release|Any CPU + {9DBA7EBE-C6E6-4F26-87E8-D87A6CDDE737}.Release|x64.Build.0 = Release|Any CPU + {9DBA7EBE-C6E6-4F26-87E8-D87A6CDDE737}.Release|x86.ActiveCfg = Release|Any CPU + {9DBA7EBE-C6E6-4F26-87E8-D87A6CDDE737}.Release|x86.Build.0 = Release|Any CPU + {E0D51CAE-97CF-48A8-879E-149A4E69BEE2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E0D51CAE-97CF-48A8-879E-149A4E69BEE2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E0D51CAE-97CF-48A8-879E-149A4E69BEE2}.Debug|x64.ActiveCfg = Debug|Any CPU + {E0D51CAE-97CF-48A8-879E-149A4E69BEE2}.Debug|x64.Build.0 = Debug|Any CPU + {E0D51CAE-97CF-48A8-879E-149A4E69BEE2}.Debug|x86.ActiveCfg = Debug|Any CPU + {E0D51CAE-97CF-48A8-879E-149A4E69BEE2}.Debug|x86.Build.0 = Debug|Any CPU + {E0D51CAE-97CF-48A8-879E-149A4E69BEE2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E0D51CAE-97CF-48A8-879E-149A4E69BEE2}.Release|Any CPU.Build.0 = Release|Any CPU + {E0D51CAE-97CF-48A8-879E-149A4E69BEE2}.Release|x64.ActiveCfg = Release|Any CPU + {E0D51CAE-97CF-48A8-879E-149A4E69BEE2}.Release|x64.Build.0 = Release|Any CPU + {E0D51CAE-97CF-48A8-879E-149A4E69BEE2}.Release|x86.ActiveCfg = Release|Any CPU + {E0D51CAE-97CF-48A8-879E-149A4E69BEE2}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -2117,6 +2148,9 @@ Global {3FE8E499-BE44-4E27-815C-39A4DB2C4EA1} = {84D32C8B-9CCE-4925-9AEC-8F445C7A2E3D} {1F04214E-E901-436A-A05F-6BB9ED375019} = {3FE8E499-BE44-4E27-815C-39A4DB2C4EA1} {77B49C73-29B7-47A5-9475-AC290F53D76D} = {3FE8E499-BE44-4E27-815C-39A4DB2C4EA1} + {68B94BE1-185D-D133-8A8C-EFE0C95F2BC7} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210} + {9DBA7EBE-C6E6-4F26-87E8-D87A6CDDE737} = {68B94BE1-185D-D133-8A8C-EFE0C95F2BC7} + {E0D51CAE-97CF-48A8-879E-149A4E69BEE2} = {68B94BE1-185D-D133-8A8C-EFE0C95F2BC7} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {30422362-0D90-4DBE-8C97-DD2B5B962768}