diff --git a/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/CompleteFourHandler1230864511.cs b/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/CompleteFourHandler1230864511.cs index 168eb192f..0ad94a4f8 100644 --- a/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/CompleteFourHandler1230864511.cs +++ b/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/CompleteFourHandler1230864511.cs @@ -5,7 +5,8 @@ namespace Internal.Generated.WolverineHandlers { // START: CompleteFourHandler1230864511 - public class CompleteFourHandler1230864511 : Wolverine.Runtime.Handlers.MessageHandler + [global::System.CodeDom.Compiler.GeneratedCode("JasperFx", "1.0.0")] + public sealed class CompleteFourHandler1230864511 : Wolverine.Runtime.Handlers.MessageHandler { private readonly Raven.Client.Documents.IDocumentStore _documentStore; @@ -34,6 +35,7 @@ public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime. else { + context.SetSagaId(sagaId); // The actual message execution stringBasicWorkflow.Handle(completeFour); diff --git a/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/CompleteOneHandler1612253335.cs b/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/CompleteOneHandler1612253335.cs index 91c1c8854..de7b46725 100644 --- a/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/CompleteOneHandler1612253335.cs +++ b/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/CompleteOneHandler1612253335.cs @@ -5,7 +5,8 @@ namespace Internal.Generated.WolverineHandlers { // START: CompleteOneHandler1612253335 - public class CompleteOneHandler1612253335 : Wolverine.Runtime.Handlers.MessageHandler + [global::System.CodeDom.Compiler.GeneratedCode("JasperFx", "1.0.0")] + public sealed class CompleteOneHandler1612253335 : Wolverine.Runtime.Handlers.MessageHandler { private readonly Raven.Client.Documents.IDocumentStore _documentStore; @@ -34,6 +35,7 @@ public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime. else { + context.SetSagaId(sagaId); // The actual message execution var outgoing1 = stringBasicWorkflow.Handle(completeOne); diff --git a/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/CompleteTwoHandler402398939.cs b/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/CompleteTwoHandler402398939.cs index 40ef683cd..b2b9715c6 100644 --- a/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/CompleteTwoHandler402398939.cs +++ b/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/CompleteTwoHandler402398939.cs @@ -5,7 +5,8 @@ namespace Internal.Generated.WolverineHandlers { // START: CompleteTwoHandler402398939 - public class CompleteTwoHandler402398939 : Wolverine.Runtime.Handlers.MessageHandler + [global::System.CodeDom.Compiler.GeneratedCode("JasperFx", "1.0.0")] + public sealed class CompleteTwoHandler402398939 : Wolverine.Runtime.Handlers.MessageHandler { private readonly Raven.Client.Documents.IDocumentStore _documentStore; @@ -34,6 +35,7 @@ public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime. else { + context.SetSagaId(sagaId); // The actual message execution stringBasicWorkflow.Handle(completeTwo); diff --git a/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/FinishItAllHandler1534262635.cs b/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/FinishItAllHandler1534262635.cs index 9a7f31d26..8bdb4b3ae 100644 --- a/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/FinishItAllHandler1534262635.cs +++ b/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/FinishItAllHandler1534262635.cs @@ -5,7 +5,8 @@ namespace Internal.Generated.WolverineHandlers { // START: FinishItAllHandler1534262635 - public class FinishItAllHandler1534262635 : Wolverine.Runtime.Handlers.MessageHandler + [global::System.CodeDom.Compiler.GeneratedCode("JasperFx", "1.0.0")] + public sealed class FinishItAllHandler1534262635 : Wolverine.Runtime.Handlers.MessageHandler { private readonly Raven.Client.Documents.IDocumentStore _documentStore; @@ -34,6 +35,7 @@ public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime. else { + context.SetSagaId(sagaId); // The actual message execution stringBasicWorkflow.Handle(finishItAll); diff --git a/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/StringCompleteThreeHandler606415888.cs b/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/StringCompleteThreeHandler606415888.cs index 3c37d675f..d5b9138ca 100644 --- a/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/StringCompleteThreeHandler606415888.cs +++ b/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/StringCompleteThreeHandler606415888.cs @@ -5,7 +5,8 @@ namespace Internal.Generated.WolverineHandlers { // START: StringCompleteThreeHandler606415888 - public class StringCompleteThreeHandler606415888 : Wolverine.Runtime.Handlers.MessageHandler + [global::System.CodeDom.Compiler.GeneratedCode("JasperFx", "1.0.0")] + public sealed class StringCompleteThreeHandler606415888 : Wolverine.Runtime.Handlers.MessageHandler { private readonly Raven.Client.Documents.IDocumentStore _documentStore; @@ -22,6 +23,8 @@ public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime. // The actual message body var stringCompleteThree = (Wolverine.ComplianceTests.Sagas.StringCompleteThree)context.Envelope.Message; + // Application-specific Open Telemetry auditing + System.Diagnostics.Activity.Current?.SetTag("SagaId", stringCompleteThree.SagaId); string sagaId = context.Envelope.SagaId ?? stringCompleteThree.SagaId; if (string.IsNullOrEmpty(sagaId)) throw new Wolverine.Persistence.Sagas.IndeterminateSagaStateIdException(context.Envelope); @@ -34,6 +37,7 @@ public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime. else { + context.SetSagaId(sagaId); // The actual message execution stringBasicWorkflow.Handle(stringCompleteThree); diff --git a/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/StringDoThreeHandler1820069266.cs b/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/StringDoThreeHandler1820069266.cs index 7cae05ebb..0f1f1c81d 100644 --- a/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/StringDoThreeHandler1820069266.cs +++ b/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/StringDoThreeHandler1820069266.cs @@ -5,7 +5,8 @@ namespace Internal.Generated.WolverineHandlers { // START: StringDoThreeHandler1820069266 - public class StringDoThreeHandler1820069266 : Wolverine.Runtime.Handlers.MessageHandler + [global::System.CodeDom.Compiler.GeneratedCode("JasperFx", "1.0.0")] + public sealed class StringDoThreeHandler1820069266 : Wolverine.Runtime.Handlers.MessageHandler { private readonly Raven.Client.Documents.IDocumentStore _documentStore; @@ -22,6 +23,8 @@ public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime. // The actual message body var stringDoThree = (Wolverine.ComplianceTests.Sagas.StringDoThree)context.Envelope.Message; + // Application-specific Open Telemetry auditing + System.Diagnostics.Activity.Current?.SetTag("TheSagaId", stringDoThree.TheSagaId); string sagaId = context.Envelope.SagaId ?? stringDoThree.TheSagaId; if (string.IsNullOrEmpty(sagaId)) throw new Wolverine.Persistence.Sagas.IndeterminateSagaStateIdException(context.Envelope); @@ -34,6 +37,7 @@ public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime. else { + context.SetSagaId(sagaId); // The actual message execution stringBasicWorkflow.Handles(stringDoThree); diff --git a/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/StringStartHandler2085759971.cs b/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/StringStartHandler2085759971.cs index 081e5b1a8..01bc2f82b 100644 --- a/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/StringStartHandler2085759971.cs +++ b/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/StringStartHandler2085759971.cs @@ -5,7 +5,8 @@ namespace Internal.Generated.WolverineHandlers { // START: StringStartHandler2085759971 - public class StringStartHandler2085759971 : Wolverine.Runtime.Handlers.MessageHandler + [global::System.CodeDom.Compiler.GeneratedCode("JasperFx", "1.0.0")] + public sealed class StringStartHandler2085759971 : Wolverine.Runtime.Handlers.MessageHandler { private readonly Raven.Client.Documents.IDocumentStore _documentStore; @@ -22,11 +23,14 @@ public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime. // The actual message body var stringStart = (Wolverine.ComplianceTests.Sagas.StringStart)context.Envelope.Message; + // Application-specific Open Telemetry auditing + System.Diagnostics.Activity.Current?.SetTag("Id", stringStart.Id); var stringBasicWorkflow = new Wolverine.ComplianceTests.Sagas.StringBasicWorkflow(); // The actual message execution stringBasicWorkflow.Start(stringStart); + context.SetSagaId(stringStart.Id); if (!stringBasicWorkflow.IsCompleted()) { await asyncDocumentSession.StoreAsync(stringBasicWorkflow, cancellation).ConfigureAwait(false); diff --git a/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/WildcardStartHandler784149372.cs b/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/WildcardStartHandler784149372.cs index d5a563484..93a385f98 100644 --- a/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/WildcardStartHandler784149372.cs +++ b/src/Persistence/RavenDbTests/Internal/Generated/WolverineHandlers/WildcardStartHandler784149372.cs @@ -5,7 +5,8 @@ namespace Internal.Generated.WolverineHandlers { // START: WildcardStartHandler784149372 - public class WildcardStartHandler784149372 : Wolverine.Runtime.Handlers.MessageHandler + [global::System.CodeDom.Compiler.GeneratedCode("JasperFx", "1.0.0")] + public sealed class WildcardStartHandler784149372 : Wolverine.Runtime.Handlers.MessageHandler { private readonly Raven.Client.Documents.IDocumentStore _documentStore; @@ -22,11 +23,14 @@ public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime. // The actual message body var wildcardStart = (Wolverine.ComplianceTests.Sagas.WildcardStart)context.Envelope.Message; + // Application-specific Open Telemetry auditing + System.Diagnostics.Activity.Current?.SetTag("Id", wildcardStart.Id); var stringBasicWorkflow = new Wolverine.ComplianceTests.Sagas.StringBasicWorkflow(); // The actual message execution stringBasicWorkflow.Starts(wildcardStart); + context.SetSagaId(wildcardStart.Id); if (!stringBasicWorkflow.IsCompleted()) { await asyncDocumentSession.StoreAsync(stringBasicWorkflow, cancellation).ConfigureAwait(false); diff --git a/src/Persistence/RavenDbTests/Internals/IncomingMessage_mapping.cs b/src/Persistence/RavenDbTests/Internals/IncomingMessage_mapping.cs index eda671bf3..e0b4a25cd 100644 --- a/src/Persistence/RavenDbTests/Internals/IncomingMessage_mapping.cs +++ b/src/Persistence/RavenDbTests/Internals/IncomingMessage_mapping.cs @@ -20,7 +20,7 @@ public IncomingMessage_mapping() theEnvelope.Id = Guid.NewGuid(); theEnvelope.OwnerId = 3; theEnvelope.Attempts = 2; - theEnvelope.Status = EnvelopeStatus.Handled; + theEnvelope.Status = EnvelopeStatus.Incoming; var message = new IncomingMessage(theEnvelope, new RavenDbMessageStore(Substitute.For(), new WolverineOptions())); theMappedEnvelope = message.Read(); @@ -35,7 +35,7 @@ public void map_the_id() [Fact] public void map_the_status() { - theMappedEnvelope.Status.ShouldBe(EnvelopeStatus.Handled); + theMappedEnvelope.Status.ShouldBe(EnvelopeStatus.Incoming); } [Fact] diff --git a/src/Persistence/RavenDbTests/message_store_compliance.cs b/src/Persistence/RavenDbTests/message_store_compliance.cs index 4d33088fd..49b9908d7 100644 --- a/src/Persistence/RavenDbTests/message_store_compliance.cs +++ b/src/Persistence/RavenDbTests/message_store_compliance.cs @@ -17,10 +17,40 @@ namespace RavenDbTests; public class DatabaseFixture : RavenTestDriver { + private static bool _configured; + public IDocumentStore StartRavenStore() { + EnsureServerConfigured(); return GetDocumentStore(); } + + internal static void EnsureServerConfigured() + { + if (_configured) return; + _configured = true; + + // Configure the embedded RavenDB server. + // RavenDB.TestDriver 7.0.x requires .NET 8.0.15+ runtime. + // We try to use a brew-installed .NET 8 if available, otherwise fall back to system dotnet. + var options = new TestServerOptions + { + FrameworkVersion = null, // Use available runtime + Licensing = new ServerOptions.LicensingOptions + { + ThrowOnInvalidOrMissingLicense = false // Don't require license for tests + } + }; + + // Check for brew-installed .NET 8 which has newer runtime + var brewDotNetPath = "/opt/homebrew/opt/dotnet@8/bin/dotnet"; + if (File.Exists(brewDotNetPath)) + { + options.DotNetPath = brewDotNetPath; + } + + ConfigureServer(options); + } } [CollectionDefinition("raven")] @@ -52,13 +82,13 @@ public override async Task BuildCleanHost() { // TODO -- TEMP! opts.Durability.Mode = DurabilityMode.Solo; - + opts.UseRavenDbPersistence(); opts.Services.AddSingleton(store); opts.ListenAtPort(2345).UseDurableInbox(); }).StartAsync(); - + return host; } @@ -74,11 +104,11 @@ public async Task marks_envelope_as_having_an_expires_on_mark_handled() var incoming = await session.LoadAsync(envelope.Id.ToString()); var metadata = session.Advanced.GetMetadataFor(incoming); metadata.TryGetValue("@expires", out var raw).ShouldBeTrue(); - + var value = metadata["@expires"]; Debug.WriteLine(value); } -} \ No newline at end of file +} diff --git a/src/Persistence/RavenDbTests/ravendb_durability_end_to_end.cs b/src/Persistence/RavenDbTests/ravendb_durability_end_to_end.cs index 7febd01fb..599dc16c7 100644 --- a/src/Persistence/RavenDbTests/ravendb_durability_end_to_end.cs +++ b/src/Persistence/RavenDbTests/ravendb_durability_end_to_end.cs @@ -18,6 +18,7 @@ using Wolverine.RavenDb.Internals; using Wolverine.Transports.Tcp; using Wolverine.Util; +using RavenDbTests; public class ravendb_durability_end_to_end : RavenTestDriver, IAsyncLifetime { @@ -34,6 +35,7 @@ public async Task InitializeAsync() { _listener = new Uri($"tcp://localhost:{PortFinder.GetAvailablePort()}"); + DatabaseFixture.EnsureServerConfigured(); _receiverStore = GetDocumentStore(); _senderStore = GetDocumentStore(); @@ -53,11 +55,11 @@ public async Task InitializeAsync() // Leave it as a lambda so it doesn't get disposed opts.Services.AddSingleton(s => _receiverStore); - + opts.ListenForMessagesFrom(_listener).UseDurableInbox(); opts.Services.AddResourceSetupOnStartup(); - + opts.UseTcpForControlEndpoint(); }) .Start(); @@ -75,10 +77,10 @@ public async Task InitializeAsync() .UseDurableOutbox()); opts.UseTcpForControlEndpoint(); - + opts.CodeGeneration.InsertFirstPersistenceStrategy(); opts.Services.AddSingleton(s => new RavenDbMessageStore(_senderStore, s.GetRequiredService())); - + // Leave it as a lambda so it doesn't get disposed opts.Services.AddSingleton(s => _senderStore); @@ -108,7 +110,7 @@ public async Task DisposeAsync() } _senders.Clear(); - + _receiverStore.Dispose(); _senderStore.Dispose(); } @@ -245,4 +247,4 @@ public async Task Handle(TraceMessage message, IAsyncDocumentSession session) var traceDoc = new TraceDoc { Name = message.Name }; await session.StoreAsync(traceDoc); } -} \ No newline at end of file +} diff --git a/src/Persistence/RavenDbTests/saga_storage_compliance.cs b/src/Persistence/RavenDbTests/saga_storage_compliance.cs index 1e9af297d..8b699eb1a 100644 --- a/src/Persistence/RavenDbTests/saga_storage_compliance.cs +++ b/src/Persistence/RavenDbTests/saga_storage_compliance.cs @@ -14,24 +14,25 @@ namespace RavenDbTests; public class RavenDbSagaHost : RavenTestDriver, ISagaHost { private IDocumentStore _store; - + public IHost BuildHost() { + DatabaseFixture.EnsureServerConfigured(); _store = GetDocumentStore(); return Host.CreateDefaultBuilder() .UseWolverine(opts => { opts.Durability.Mode = DurabilityMode.Solo; - + opts.CodeGeneration.GeneratedCodeOutputPath = AppContext.BaseDirectory.ParentDirectory().ParentDirectory().ParentDirectory().AppendPath("Internal", "Generated"); opts.CodeGeneration.TypeLoadMode = TypeLoadMode.Auto; - + // Shouldn't be necessary, but apparently is. Type scanning is not working // for some reason across the compliance tests opts.Discovery.IncludeType(); opts.Discovery.IncludeAssembly(typeof(StringBasicWorkflow).Assembly); - + opts.Services.AddSingleton(_store); opts.UseRavenDbPersistence(); }).Start(); @@ -65,4 +66,4 @@ public class saga_storage_compliance : StringIdentifiedSagaComplianceSpecs { opts.Durability.Mode = DurabilityMode.Solo; - + opts.Services.AddSingleton(store); - + opts.ListenAtPort(2345).UseDurableInbox(); - + opts.UseRavenDbPersistence(); opts.Policies.AutoApplyTransactions(); + + // Include handlers from this test assembly + opts.Discovery.IncludeAssembly(typeof(transactional_middleware).Assembly); }).StartAsync(); await host.InvokeAsync(new RecordTeam("Chiefs", 1960)); diff --git a/src/Persistence/Wolverine.RavenDb/Internals/IncomingMessage.cs b/src/Persistence/Wolverine.RavenDb/Internals/IncomingMessage.cs index 8eecc2f21..dc87d7a64 100644 --- a/src/Persistence/Wolverine.RavenDb/Internals/IncomingMessage.cs +++ b/src/Persistence/Wolverine.RavenDb/Internals/IncomingMessage.cs @@ -16,7 +16,8 @@ public IncomingMessage(Envelope envelope, RavenDbMessageStore store) OwnerId = envelope.OwnerId; ExecutionTime = envelope.ScheduledTime?.ToUniversalTime(); Attempts = envelope.Attempts; - Body = EnvelopeSerializer.Serialize(envelope); + // When storing as Handled, don't persist the body - it's just for idempotency checks + Body = envelope.Status == EnvelopeStatus.Handled ? [] : EnvelopeSerializer.Serialize(envelope); MessageType = envelope.MessageType!; ReceivedAt = envelope.Destination; } @@ -35,14 +36,28 @@ public IncomingMessage(Envelope envelope, RavenDbMessageStore store) public Envelope Read() { - var envelope = EnvelopeSerializer.Deserialize(Body); + Envelope envelope; + if (Body == null || Body.Length == 0) + { + // For handled envelopes, body is not stored - create a minimal envelope + envelope = new Envelope + { + Id = EnvelopeId, + MessageType = MessageType, + Destination = ReceivedAt, + Data = [] + }; + } + else + { + envelope = EnvelopeSerializer.Deserialize(Body); + } + envelope.Id = EnvelopeId; envelope.OwnerId = OwnerId; envelope.Status = Status; envelope.Attempts = Attempts; envelope.ScheduledTime = ExecutionTime; return envelope; - - } } \ No newline at end of file diff --git a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.Admin.cs b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.Admin.cs index a0fdb2eac..2e3031973 100644 --- a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.Admin.cs +++ b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.Admin.cs @@ -32,13 +32,13 @@ public async Task FetchCountsAsync() using var session = _store.OpenAsyncSession(); return new PersistedCounts { - DeadLetter = await session.Query().CountAsync(), + DeadLetter = await session.Query().Customize(x => x.WaitForNonStaleResults()).CountAsync(), Handled = - await session.Query().Where(m => m.Status == EnvelopeStatus.Handled).CountAsync(), - Incoming = await session.Query().Where(m => m.Status == EnvelopeStatus.Incoming) + await session.Query().Customize(x => x.WaitForNonStaleResults()).Where(m => m.Status == EnvelopeStatus.Handled).CountAsync(), + Incoming = await session.Query().Customize(x => x.WaitForNonStaleResults()).Where(m => m.Status == EnvelopeStatus.Incoming) .CountAsync(), - Outgoing = await session.Query().CountAsync(), - Scheduled = await session.Query().Where(m => m.Status == EnvelopeStatus.Scheduled) + Outgoing = await session.Query().Customize(x => x.WaitForNonStaleResults()).CountAsync(), + Scheduled = await session.Query().Customize(x => x.WaitForNonStaleResults()).Where(m => m.Status == EnvelopeStatus.Scheduled) .CountAsync(), }; } diff --git a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.DeadLetters.cs b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.DeadLetters.cs index 31831b57e..ad599b58a 100644 --- a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.DeadLetters.cs +++ b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.DeadLetters.cs @@ -14,134 +14,239 @@ private static string dlqId(Guid id) { return $"dlq/{id}"; } - - // public async Task QueryDeadLetterEnvelopesAsync(DeadLetterEnvelopeQueryParameters queryParameters, string? tenantId = null) - // { - // using var session = _store.OpenAsyncSession(); - // var queryable = session.Query().Customize(x => x.WaitForNonStaleResults()); - // if (queryParameters.StartId.HasValue) - // { - // queryable = (IRavenQueryable)Queryable.Where(queryable, x => x.EnvelopeId >= queryParameters.StartId.Value); - // } - // - // if (queryParameters.MessageType.IsNotEmpty()) - // { - // queryable = (IRavenQueryable)Queryable.Where(queryable, x => x.MessageType == queryParameters.MessageType); - // } - // - // if (queryParameters.ExceptionType.IsNotEmpty()) - // { - // queryable = (IRavenQueryable)Queryable.Where(queryable, x => x.ExceptionType == queryParameters.ExceptionType); - // } - // - // if (queryParameters.ExceptionMessage.IsNotEmpty()) - // { - // queryable = (IRavenQueryable)Queryable.Where(queryable, x => x.ExceptionMessage == queryParameters.ExceptionMessage); - // } - // - // if (queryParameters.From.HasValue) - // { - // queryable = (IRavenQueryable)Queryable.Where(queryable, x => x.SentAt >= queryParameters.From.Value); - // } - // - // if (queryParameters.Until.HasValue) - // { - // queryable = (IRavenQueryable)Queryable.Where(queryable, x => x.SentAt <= queryParameters.Until.Value); - // } - // - // var messages = await queryable - // .OrderBy(x => x.SentAt) - // .Take((int)queryParameters.Limit + 1) - // .ToListAsync(); - // - // var envelopes = messages.Select(x => x.ToEnvelope()).ToList(); - // - // var next = Guid.Empty; - // if (envelopes.Count > queryParameters.Limit) - // { - // next = envelopes.Last().Envelope.Id; - // envelopes.RemoveAt(envelopes.Count - 1); - // } - // - // return new DeadLetterEnvelopesFound(envelopes, next, tenantId); - // } public async Task DeadLetterEnvelopeByIdAsync(Guid id, string? tenantId = null) { using var session = _store.OpenAsyncSession(); var message = await session.LoadAsync(dlqId(id)); if (message is null) return null; - + return message.ToEnvelope(); } - // TODO -- use this in the new admin - public async Task MarkDeadLetterEnvelopesAsReplayableAsync(string exceptionType = "") + public async Task> SummarizeAllAsync(string serviceName, TimeRange range, + CancellationToken token) { using var session = _store.OpenAsyncSession(); - var count = exceptionType.IsEmpty() - ? await session.Query().CountAsync() - : await session.Query().CountAsync(x => x.ExceptionType == exceptionType); + var queryable = session.Query().Customize(x => x.WaitForNonStaleResults()); - string command = null; - if (exceptionType.IsEmpty()) + if (range.From.HasValue) { - command = $@" -from DeadLetterMessages as m -update -{{ - m.Replayable = true -}}"; + queryable = (IRavenQueryable)queryable.Where(x => x.SentAt >= range.From.Value); + } + + if (range.To.HasValue) + { + queryable = (IRavenQueryable)queryable.Where(x => x.SentAt <= range.To.Value); + } + + var messages = await queryable.ToListAsync(token); + + // Group by ReceivedAt, MessageType, ExceptionType + var grouped = messages + .GroupBy(x => new { ReceivedAt = x.ReceivedAt?.ToString() ?? "", x.MessageType, x.ExceptionType }) + .Select(g => new DeadLetterQueueCount( + serviceName, + g.Key.ReceivedAt.IsNotEmpty() ? new Uri(g.Key.ReceivedAt) : Uri, + g.Key.MessageType ?? "", + g.Key.ExceptionType ?? "", + Uri, + g.Count())) + .ToList(); + + return grouped; + } + + public async Task QueryAsync(DeadLetterEnvelopeQuery query, CancellationToken token) + { + using var session = _store.OpenAsyncSession(); + var queryable = session.Query().Customize(x => x.WaitForNonStaleResults()); + + // If MessageIds are specified, they take precedence + if (query.MessageIds != null && query.MessageIds.Any()) + { + var ids = query.MessageIds.Select(dlqId).ToArray(); + var messages = await session.LoadAsync(ids, token); + var envelopes = messages.Values + .Where(m => m != null) + .Select(m => m!.ToEnvelope()) + .ToList(); + + return new DeadLetterEnvelopeResults + { + PageNumber = 1, + TotalCount = envelopes.Count, + Envelopes = envelopes, + DatabaseUri = Uri + }; + } + + // Apply filters + if (query.Range.From.HasValue) + { + queryable = (IRavenQueryable)queryable.Where(x => x.SentAt >= query.Range.From.Value); + } + + if (query.Range.To.HasValue) + { + queryable = (IRavenQueryable)queryable.Where(x => x.SentAt <= query.Range.To.Value); + } + + if (query.ExceptionType.IsNotEmpty()) + { + queryable = (IRavenQueryable)queryable.Where(x => x.ExceptionType == query.ExceptionType); + } + + if (query.ExceptionMessage.IsNotEmpty()) + { + queryable = (IRavenQueryable)queryable.Where(x => x.ExceptionMessage.StartsWith(query.ExceptionMessage)); + } + + if (query.MessageType.IsNotEmpty()) + { + queryable = (IRavenQueryable)queryable.Where(x => x.MessageType == query.MessageType); + } + + if (query.ReceivedAt.IsNotEmpty()) + { + var receivedAtUri = new Uri(query.ReceivedAt); + queryable = (IRavenQueryable)queryable.Where(x => x.ReceivedAt == receivedAtUri); + } + + // Get total count + var totalCount = await queryable.CountAsync(token); + + // Apply paging + if (query.PageNumber <= 0) query.PageNumber = 1; + var skip = (query.PageNumber - 1) * query.PageSize; + + var pagedMessages = await queryable + .OrderBy(x => x.SentAt) + .Skip(skip) + .Take(query.PageSize) + .ToListAsync(token); + + return new DeadLetterEnvelopeResults + { + PageNumber = query.PageNumber, + TotalCount = totalCount, + Envelopes = pagedMessages.Select(m => m.ToEnvelope()).ToList(), + DatabaseUri = Uri + }; + } + + public async Task DiscardAsync(DeadLetterEnvelopeQuery query, CancellationToken token) + { + // If MessageIds are specified, delete those specific messages + if (query.MessageIds != null && query.MessageIds.Any()) + { + using var session = _store.OpenAsyncSession(); + foreach (var id in query.MessageIds) + { + session.Delete(dlqId(id)); + } + await session.SaveChangesAsync(token); + return; } - else + + // Build delete query + var rql = BuildDeleteQuery(query); + + var op = await _store.Operations.SendAsync(new DeleteByQueryOperation(new IndexQuery + { + Query = rql, + WaitForNonStaleResults = true, + WaitForNonStaleResultsTimeout = 30.Seconds() + }), token: token); + + await op.WaitForCompletionAsync(); + } + + public async Task ReplayAsync(DeadLetterEnvelopeQuery query, CancellationToken token) + { + // If MessageIds are specified, mark those specific messages + if (query.MessageIds != null && query.MessageIds.Any()) { - command = $@" + using var session = _store.OpenAsyncSession(); + foreach (var id in query.MessageIds) + { + session.Advanced.Patch(dlqId(id), x => x.Replayable, true); + } + await session.SaveChangesAsync(token); + return; + } + + // Build update query + var whereClause = BuildWhereClause(query); + var rql = $@" from DeadLetterMessages as m -where m.ExceptionType = @exceptionType +{whereClause} update {{ m.Replayable = true }}"; - } var op = await _store.Operations.SendAsync(new PatchByQueryOperation(new IndexQuery { - Query = command, + Query = rql, WaitForNonStaleResults = true, - WaitForNonStaleResultsTimeout = 10.Seconds(), - QueryParameters = new(){{"exceptionType", exceptionType}} - })); + WaitForNonStaleResultsTimeout = 30.Seconds() + }), token: token); + await op.WaitForCompletionAsync(); } - public async Task MarkDeadLetterEnvelopesAsReplayableAsync(Guid[] ids, string? tenantId = null) + private string BuildDeleteQuery(DeadLetterEnvelopeQuery query) { - using var session = _store.OpenAsyncSession(); - foreach (var id in ids) - { - session.Advanced.Patch(dlqId(id), x => x.Replayable, true); - } - - await session.SaveChangesAsync(); + var whereClause = BuildWhereClause(query); + return $"from DeadLetterMessages as m {whereClause}"; } - public Task> SummarizeAllAsync(string serviceName, TimeRange range, CancellationToken token) + private string BuildWhereClause(DeadLetterEnvelopeQuery query) { - throw new NotImplementedException(); - } + var conditions = new List(); - public Task QueryAsync(DeadLetterEnvelopeQuery query, CancellationToken token) - { - throw new NotImplementedException(); + if (query.Range.From.HasValue) + { + conditions.Add($"m.SentAt >= '{query.Range.From.Value:o}'"); + } + + if (query.Range.To.HasValue) + { + conditions.Add($"m.SentAt <= '{query.Range.To.Value:o}'"); + } + + if (query.ExceptionType.IsNotEmpty()) + { + conditions.Add($"m.ExceptionType = '{query.ExceptionType}'"); + } + + if (query.MessageType.IsNotEmpty()) + { + conditions.Add($"m.MessageType = '{query.MessageType}'"); + } + + if (query.ReceivedAt.IsNotEmpty()) + { + conditions.Add($"m.ReceivedAt = '{query.ReceivedAt}'"); + } + + return conditions.Count > 0 ? "where " + string.Join(" and ", conditions) : ""; } - public Task DiscardAsync(DeadLetterEnvelopeQuery query, CancellationToken token) + // Legacy methods for backwards compatibility + public async Task MarkDeadLetterEnvelopesAsReplayableAsync(string exceptionType = "") { - throw new NotImplementedException(); + var query = new DeadLetterEnvelopeQuery + { + Range = TimeRange.AllTime(), + ExceptionType = exceptionType.IsEmpty() ? null : exceptionType + }; + await ReplayAsync(query, CancellationToken.None); } - public Task ReplayAsync(DeadLetterEnvelopeQuery query, CancellationToken token) + public async Task MarkDeadLetterEnvelopesAsReplayableAsync(Guid[] ids, string? tenantId = null) { - throw new NotImplementedException(); + await ReplayAsync(new DeadLetterEnvelopeQuery { MessageIds = ids }, CancellationToken.None); } -} \ No newline at end of file +} diff --git a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.Inbox.cs b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.Inbox.cs index 306590830..0f13c62ac 100644 --- a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.Inbox.cs +++ b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.Inbox.cs @@ -103,7 +103,7 @@ public async Task ExistsAsync(Envelope envelope, CancellationToken cancell { using var session = _store.OpenAsyncSession(); var identity = IdentityFor(envelope); - return (await session.LoadAsync(identity) == null); + return (await session.LoadAsync(identity, cancellation)) != null; } public Task RescheduleExistingEnvelopeForRetryAsync(Envelope envelope) diff --git a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.NodeAgents.cs b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.NodeAgents.cs index bb66f093c..9e5f86063 100644 --- a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.NodeAgents.cs +++ b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.NodeAgents.cs @@ -133,6 +133,17 @@ public async Task LoadNodeAgentStateAsync(CancellationToken canc .Customize(x => x.WaitForNonStaleResults()) .ToListAsync(token: cancellationToken); + var assignments = await session + .Query() + .Customize(x => x.WaitForNonStaleResults()) + .ToListAsync(token: cancellationToken); + + foreach (var node in nodes) + { + node.ActiveAgents.Clear(); + node.ActiveAgents.AddRange(assignments.Where(x => x.NodeId == node.NodeId).Select(x => x.AgentUri)); + } + var restrictions = await session .Query() .Customize(x => x.WaitForNonStaleResults()) @@ -214,7 +225,11 @@ public async Task LogRecordsAsync(params NodeRecord[] records) public async Task> FetchRecentRecordsAsync(int count) { using var session = _store.OpenAsyncSession(); - var list = await session.Query().OrderByDescending(x => x.Timestamp).Take(count).ToListAsync(); + var list = await session.Query() + .Customize(x => x.WaitForNonStaleResults()) + .OrderByDescending(x => x.Timestamp) + .Take(count) + .ToListAsync(); list.Reverse(); return list; }