diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/using_native_scheduling.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/using_native_scheduling.cs index 4fc0c63d8..7be5bfd2b 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/using_native_scheduling.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/using_native_scheduling.cs @@ -32,6 +32,66 @@ public async Task with_inline_endpoint() await host.StopAsync(); } + [Fact] + public async Task with_inline_endpoint_cascaded_timeout() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseAzureServiceBusTesting() + .AutoProvision().AutoPurgeOnStartup(); + + opts.ListenToAzureServiceBusQueue("inline1").ProcessInline(); + opts.PublishAllMessages().ToAzureServiceBusQueue("inline1"); + }).StartAsync(); + + var referenceTime = DateTimeOffset.UtcNow; + var delay = TimeSpan.FromSeconds(1); + var margin = TimeSpan.FromSeconds(2); + + var session = await host.TrackActivity() + .IncludeExternalTransports() + .Timeout(30.Seconds()) + .WaitForMessageToBeReceivedAt(host) + .ExecuteAndWaitAsync(c => c.SendAsync(new AsbTriggerCascadedTimeout(delay))); + + var envelope = session.Scheduled.Envelopes().Single(e => e.Message is AsbCascadedTimeout); + envelope.ShouldNotBeNull(); + envelope.ScheduledTime!.Value.ShouldBeInRange(referenceTime.Add(delay - margin), referenceTime.Add(delay + margin)); + + await host.StopAsync(); + } + + [Fact] + public async Task with_inline_endpoint_explicit_scheduling() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseAzureServiceBusTesting() + .AutoProvision().AutoPurgeOnStartup(); + + opts.ListenToAzureServiceBusQueue("inline1").ProcessInline(); + opts.PublishAllMessages().ToAzureServiceBusQueue("inline1"); + }).StartAsync(); + + var referenceTime = DateTimeOffset.UtcNow; + var delay = TimeSpan.FromSeconds(1); + var margin = TimeSpan.FromSeconds(2); + + var session = await host.TrackActivity() + .IncludeExternalTransports() + .Timeout(30.Seconds()) + .WaitForMessageToBeReceivedAt(host) + .ExecuteAndWaitAsync(c => c.SendAsync(new AsbTriggerExplicitScheduled(delay))); + + var envelope = session.Scheduled.Envelopes().Single(e => e.Message is AsbExplicitScheduled); + envelope.ShouldNotBeNull(); + envelope.ScheduledTime!.Value.ShouldBeInRange(referenceTime.Add(delay - margin), referenceTime.Add(delay + margin)); + + await host.StopAsync(); + } + [Fact] public async Task with_buffered_endpoint() // durable would have similar mechanics { @@ -55,4 +115,31 @@ public async Task with_buffered_endpoint() // durable would have similar mechani await host.StopAsync(); } + + public record AsbTriggerCascadedTimeout(TimeSpan Delay); + public record AsbCascadedTimeout(string Id, TimeSpan delay) : TimeoutMessage(delay); + + public record AsbTriggerExplicitScheduled(TimeSpan Delay); + public record AsbExplicitScheduled(string Id); + + + public class ScheduledMessageHandler + { + public AsbCascadedTimeout Handle(AsbTriggerCascadedTimeout trigger) + { + return new AsbCascadedTimeout("test-timeout", trigger.Delay); + } + public static void Handle(AsbCascadedTimeout timeout) + { + } + + public async Task Handle(AsbTriggerExplicitScheduled trigger, IMessageContext context) + { + await context.ScheduleAsync(new AsbExplicitScheduled("test"), trigger.Delay); + } + + public static void Handle(AsbExplicitScheduled scheduled) + { + } + } } \ No newline at end of file diff --git a/src/Wolverine/Runtime/MessageContext.cs b/src/Wolverine/Runtime/MessageContext.cs index f06017cc2..9533210bc 100644 --- a/src/Wolverine/Runtime/MessageContext.cs +++ b/src/Wolverine/Runtime/MessageContext.cs @@ -63,7 +63,14 @@ public async Task FlushOutgoingMessagesAsync() { if (!envelope.Sender!.IsDurable) { - Runtime.ScheduleLocalExecutionInMemory(envelope.ScheduledTime!.Value, envelope); + if (envelope.Sender!.SupportsNativeScheduledSend) + { + await sendEnvelopeAsync(envelope); + } + else + { + Runtime.ScheduleLocalExecutionInMemory(envelope.ScheduledTime!.Value, envelope); + } } // If NullMessageStore, then we're calling a different Send method that is marking the message @@ -73,13 +80,9 @@ public async Task FlushOutgoingMessagesAsync() Runtime.MessageTracking.Sent(envelope); } } - else if (ReferenceEquals(this, Transaction)) - { - await envelope.StoreAndForwardAsync(); - } else { - await envelope.QuickSendAsync(); + await sendEnvelopeAsync(envelope); } } catch (Exception e) @@ -100,6 +103,18 @@ public async Task FlushOutgoingMessagesAsync() _outstanding.Clear(); _hasFlushed = true; + + async Task sendEnvelopeAsync(Envelope envelope) + { + if (ReferenceEquals(this, Transaction)) + { + await envelope.StoreAndForwardAsync(); + } + else + { + await envelope.QuickSendAsync(); + } + } } private List? _sent; @@ -120,8 +135,8 @@ public async Task AssertAnyRequiredResponseWasGenerated() { failureDescription += $"No cascading messages were created by this handler for the expected response type {Envelope.ReplyRequested}"; } - - await SendFailureAcknowledgementAsync( failureDescription); + + await SendFailureAcknowledgementAsync(failureDescription); } else { @@ -174,7 +189,7 @@ public async Task MoveToDeadLetterQueueAsync(Exception exception) { // Don't bother with agent commands if (Envelope?.Message is IAgentCommand) return; - + if (_channel == null || Envelope == null) { throw new InvalidOperationException("No Envelope is active for this context"); @@ -196,7 +211,7 @@ public async Task MoveToDeadLetterQueueAsync(Exception exception) return; } - + if (Envelope.Batch != null) { foreach (var envelope in Envelope.Batch) @@ -408,9 +423,9 @@ public async Task EnqueueCascadingAsync(object? message) { throw new InvalidOperationException( $"Message of type {message.GetType().FullNameInCode()} implements {typeof(ISideEffect).FullNameInCode()}, and cannot be used as a cascading message. Side effects cannot be mixed in with outgoing cascaded messages."); - + } - + if (Envelope?.ResponseType != null && (message?.GetType() == Envelope.ResponseType || Envelope.ResponseType.IsInstanceOfType(message))) { @@ -485,7 +500,7 @@ internal void ReadEnvelope(Envelope? originalEnvelope, IChannelCallback channel) Envelope = originalEnvelope ?? throw new ArgumentNullException(nameof(originalEnvelope)); originalEnvelope.MaybeCorrectReplyUri(); - + CorrelationId = originalEnvelope.CorrelationId; ConversationId = originalEnvelope.Id; _channel = channel;