Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,66 @@ public async Task with_inline_endpoint()
await host.StopAsync();
}

[Fact]
public async Task with_inline_endpoint_cascaded_timeout()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseAzureServiceBusTesting()
.AutoProvision().AutoPurgeOnStartup();

opts.ListenToAzureServiceBusQueue("inline1").ProcessInline();
opts.PublishAllMessages().ToAzureServiceBusQueue("inline1");
}).StartAsync();

var referenceTime = DateTimeOffset.UtcNow;
var delay = TimeSpan.FromSeconds(1);
var margin = TimeSpan.FromSeconds(2);

var session = await host.TrackActivity()
.IncludeExternalTransports()
.Timeout(30.Seconds())
.WaitForMessageToBeReceivedAt<AsbCascadedTimeout>(host)
.ExecuteAndWaitAsync(c => c.SendAsync(new AsbTriggerCascadedTimeout(delay)));

var envelope = session.Scheduled.Envelopes().Single(e => e.Message is AsbCascadedTimeout);
envelope.ShouldNotBeNull();
envelope.ScheduledTime!.Value.ShouldBeInRange(referenceTime.Add(delay - margin), referenceTime.Add(delay + margin));

await host.StopAsync();
}

[Fact]
public async Task with_inline_endpoint_explicit_scheduling()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseAzureServiceBusTesting()
.AutoProvision().AutoPurgeOnStartup();

opts.ListenToAzureServiceBusQueue("inline1").ProcessInline();
opts.PublishAllMessages().ToAzureServiceBusQueue("inline1");
}).StartAsync();

var referenceTime = DateTimeOffset.UtcNow;
var delay = TimeSpan.FromSeconds(1);
var margin = TimeSpan.FromSeconds(2);

var session = await host.TrackActivity()
.IncludeExternalTransports()
.Timeout(30.Seconds())
.WaitForMessageToBeReceivedAt<AsbExplicitScheduled>(host)
.ExecuteAndWaitAsync(c => c.SendAsync(new AsbTriggerExplicitScheduled(delay)));

var envelope = session.Scheduled.Envelopes().Single(e => e.Message is AsbExplicitScheduled);
envelope.ShouldNotBeNull();
envelope.ScheduledTime!.Value.ShouldBeInRange(referenceTime.Add(delay - margin), referenceTime.Add(delay + margin));

await host.StopAsync();
}

[Fact]
public async Task with_buffered_endpoint() // durable would have similar mechanics
{
Expand All @@ -55,4 +115,31 @@ public async Task with_buffered_endpoint() // durable would have similar mechani

await host.StopAsync();
}

public record AsbTriggerCascadedTimeout(TimeSpan Delay);
public record AsbCascadedTimeout(string Id, TimeSpan delay) : TimeoutMessage(delay);

public record AsbTriggerExplicitScheduled(TimeSpan Delay);
public record AsbExplicitScheduled(string Id);


public class ScheduledMessageHandler
{
public AsbCascadedTimeout Handle(AsbTriggerCascadedTimeout trigger)
{
return new AsbCascadedTimeout("test-timeout", trigger.Delay);
}
public static void Handle(AsbCascadedTimeout timeout)
{
}

public async Task Handle(AsbTriggerExplicitScheduled trigger, IMessageContext context)
{
await context.ScheduleAsync(new AsbExplicitScheduled("test"), trigger.Delay);
}

public static void Handle(AsbExplicitScheduled scheduled)
{
}
}
}
41 changes: 28 additions & 13 deletions src/Wolverine/Runtime/MessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,14 @@ public async Task FlushOutgoingMessagesAsync()
{
if (!envelope.Sender!.IsDurable)
{
Runtime.ScheduleLocalExecutionInMemory(envelope.ScheduledTime!.Value, envelope);
if (envelope.Sender!.SupportsNativeScheduledSend)
{
await sendEnvelopeAsync(envelope);
}
else
{
Runtime.ScheduleLocalExecutionInMemory(envelope.ScheduledTime!.Value, envelope);
}
}

// If NullMessageStore, then we're calling a different Send method that is marking the message
Expand All @@ -73,13 +80,9 @@ public async Task FlushOutgoingMessagesAsync()
Runtime.MessageTracking.Sent(envelope);
}
}
else if (ReferenceEquals(this, Transaction))
{
await envelope.StoreAndForwardAsync();
}
else
{
await envelope.QuickSendAsync();
await sendEnvelopeAsync(envelope);
}
}
catch (Exception e)
Expand All @@ -100,6 +103,18 @@ public async Task FlushOutgoingMessagesAsync()
_outstanding.Clear();

_hasFlushed = true;

async Task sendEnvelopeAsync(Envelope envelope)
{
if (ReferenceEquals(this, Transaction))
{
await envelope.StoreAndForwardAsync();
}
else
{
await envelope.QuickSendAsync();
}
}
}

private List<Envelope>? _sent;
Expand All @@ -120,8 +135,8 @@ public async Task AssertAnyRequiredResponseWasGenerated()
{
failureDescription += $"No cascading messages were created by this handler for the expected response type {Envelope.ReplyRequested}";
}
await SendFailureAcknowledgementAsync( failureDescription);

await SendFailureAcknowledgementAsync(failureDescription);
}
else
{
Expand Down Expand Up @@ -174,7 +189,7 @@ public async Task MoveToDeadLetterQueueAsync(Exception exception)
{
// Don't bother with agent commands
if (Envelope?.Message is IAgentCommand) return;

if (_channel == null || Envelope == null)
{
throw new InvalidOperationException("No Envelope is active for this context");
Expand All @@ -196,7 +211,7 @@ public async Task MoveToDeadLetterQueueAsync(Exception exception)

return;
}

if (Envelope.Batch != null)
{
foreach (var envelope in Envelope.Batch)
Expand Down Expand Up @@ -408,9 +423,9 @@ public async Task EnqueueCascadingAsync(object? message)
{
throw new InvalidOperationException(
$"Message of type {message.GetType().FullNameInCode()} implements {typeof(ISideEffect).FullNameInCode()}, and cannot be used as a cascading message. Side effects cannot be mixed in with outgoing cascaded messages.");

}

if (Envelope?.ResponseType != null && (message?.GetType() == Envelope.ResponseType ||
Envelope.ResponseType.IsInstanceOfType(message)))
{
Expand Down Expand Up @@ -485,7 +500,7 @@ internal void ReadEnvelope(Envelope? originalEnvelope, IChannelCallback channel)
Envelope = originalEnvelope ?? throw new ArgumentNullException(nameof(originalEnvelope));

originalEnvelope.MaybeCorrectReplyUri();

CorrelationId = originalEnvelope.CorrelationId;
ConversationId = originalEnvelope.Id;
_channel = channel;
Expand Down