diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs index 51e59732c..9e2e9c6ab 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.EventSource.cs @@ -21,6 +21,11 @@ interface IEventSource /// Type EventType { get; } + /// + /// Gets or sets the next event source in the stack (for LIFO ordering). + /// + IEventSource? Next { get; set; } + /// /// Tries to set the result on tcs. /// @@ -33,6 +38,9 @@ class EventTaskCompletionSource : TaskCompletionSource, IEventSource /// public Type EventType => typeof(T); + /// + public IEventSource? Next { get; set; } + /// void IEventSource.TrySetResult(object result) => this.TrySetResult((T)result); } diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index c7090e0b8..44430610e 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -18,7 +18,10 @@ namespace Microsoft.DurableTask.Worker.Shims; /// sealed partial class TaskOrchestrationContextWrapper : TaskOrchestrationContext { - readonly Dictionary> externalEventSources = new(StringComparer.OrdinalIgnoreCase); + // We use a stack (a custom implementation using a single-linked list) to make it easier for users + // to abandon external events that they no longer care about. The common case is a Task.WhenAny in a loop. + // Events are assigned to the most recent (top of stack) waiter, which naturally avoids issues with cancelled waiters. + readonly Dictionary externalEventSources = new(StringComparer.OrdinalIgnoreCase); readonly NamedQueue externalEventBuffer = new(); readonly OrchestrationContext innerContext; readonly OrchestrationInvocationContext invocationContext; @@ -280,29 +283,33 @@ public override async Task CreateTimer(DateTime fireAt, CancellationToken cancel /// public override Task WaitForExternalEvent(string eventName, CancellationToken cancellationToken = default) { - // Return immediately if this external event has already arrived. - if (this.externalEventBuffer.TryTake(eventName, out string? bufferedEventPayload)) - { - return Task.FromResult(this.DataConverter.Deserialize(bufferedEventPayload)); - } - // Create a task completion source that will be set when the external event arrives. EventTaskCompletionSource eventSource = new(); - if (this.externalEventSources.TryGetValue(eventName, out Queue? existing)) + + // Set up the stack for listening to external events (LIFO - Last In First Out) + // New waiters are added to the top of the stack, so they get events first. + // This makes it easier for users to abandon external events they no longer care about. + // The common case is a Task.WhenAny in a loop. + if (this.externalEventSources.TryGetValue(eventName, out IEventSource? existing)) { - if (existing.Count > 0 && existing.Peek().EventType != typeof(T)) + if (existing.EventType != typeof(T)) { throw new ArgumentException("Events with the same name must have the same type argument. Expected" - + $" {existing.Peek().GetType().FullName} but was requested {typeof(T).FullName}."); + + $" {existing.EventType.FullName} but was requested {typeof(T).FullName}."); } - existing.Enqueue(eventSource); + // Add new waiter to the top of the stack + eventSource.Next = existing; } - else + + // New waiter becomes the top of the stack + this.externalEventSources[eventName] = eventSource; + + // Check the buffer to see if any events came in before the orchestrator was listening + if (this.externalEventBuffer.TryTake(eventName, out string? bufferedEvent)) { - Queue eventSourceQueue = new(); - eventSourceQueue.Enqueue(eventSource); - this.externalEventSources.Add(eventName, eventSourceQueue); + // We can complete the event right away, since we already have an event's input + this.CompleteExternalEvent(eventName, bufferedEvent); } // TODO: this needs to be tracked and disposed appropriately. @@ -416,11 +423,23 @@ internal void ExitCriticalSectionIfNeeded() /// The serialized event payload. internal void CompleteExternalEvent(string eventName, string rawEventPayload) { - if (this.externalEventSources.TryGetValue(eventName, out Queue? waiters)) + if (this.externalEventSources.TryGetValue(eventName, out IEventSource? waiter)) { - object? value; + // Get the waiter at the top of the stack (most recent waiter) + // If we're going to raise an event we should remove it from the pending collection + // because otherwise WaitForExternalEvent() will always find one with this key and run infinitely. + IEventSource? next = waiter.Next; + if (next == null) + { + this.externalEventSources.Remove(eventName); + } + else + { + // Next waiter becomes the new top of the stack + this.externalEventSources[eventName] = next; + } - IEventSource waiter = waiters.Dequeue(); + object? value; if (waiter.EventType == typeof(OperationResult)) { // use the framework-defined deserialization for entity responses, not the application-defined data converter, @@ -432,12 +451,6 @@ internal void CompleteExternalEvent(string eventName, string rawEventPayload) value = this.DataConverter.Deserialize(rawEventPayload, waiter.EventType); } - // Events are completed in FIFO order. Remove the key if the last event was delivered. - if (waiters.Count == 0) - { - this.externalEventSources.Remove(eventName); - } - waiter.TrySetResult(value); } else diff --git a/test/Grpc.IntegrationTests/ExternalEventStackTests.cs b/test/Grpc.IntegrationTests/ExternalEventStackTests.cs new file mode 100644 index 000000000..dcd79b65d --- /dev/null +++ b/test/Grpc.IntegrationTests/ExternalEventStackTests.cs @@ -0,0 +1,244 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Tests.Logging; +using Microsoft.DurableTask.Worker; +using Xunit.Abstractions; + +namespace Microsoft.DurableTask.Grpc.Tests; + +/// +/// Integration tests for external event handling with Stack (LIFO) behavior. +/// These tests validate that the isolated worker correctly implements Stack-based +/// event handling consistent with the in-process model. +/// +public class ExternalEventStackTests : IntegrationTestBase +{ + public ExternalEventStackTests(ITestOutputHelper output, GrpcSidecarFixture sidecarFixture) + : base(output, sidecarFixture) + { } + + /// + /// Test that validates Stack (LIFO) behavior: the most recent waiter receives events first. + /// + [Fact] + public async Task StackBehavior_LIFO_NewestWaiterReceivesEventFirst() + { + const string EventName = "TestEvent"; + const string FirstEventPayload = "first-event"; + const string SecondEventPayload = "second-event"; + TaskName orchestratorName = nameof(StackBehavior_LIFO_NewestWaiterReceivesEventFirst); + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx => + { + // First waiter + Task firstWaiter = ctx.WaitForExternalEvent(EventName); + + // Second waiter (newer, should receive event first) + Task secondWaiter = ctx.WaitForExternalEvent(EventName); + + // Wait for both events to arrive from external client + string secondResult = await secondWaiter; // Should receive first event (stack top) + string firstResult = await firstWaiter; // Should receive second event + + // Return which waiter received which event + return $"{firstResult}:{secondResult}"; + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + + // Wait for orchestration to start and set up waiters + OrchestrationMetadata metadata = await server.Client.WaitForInstanceStartAsync( + instanceId, this.TimeoutToken); + + // Send first event - should be received by second waiter (stack top) + await server.Client.RaiseEventAsync(instanceId, EventName, SecondEventPayload); + + // Send second event - should be received by first waiter + await server.Client.RaiseEventAsync(instanceId, EventName, FirstEventPayload); + + metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + + // Verify the output: second waiter should have received "second-event", first waiter "first-event" + string result = metadata.ReadOutputAs(); + Assert.Equal($"{FirstEventPayload}:{SecondEventPayload}", result); + } + + /// + /// Test for issue #508: When the first waiter is cancelled, the second (active) waiter + /// should receive the event, not the cancelled one. With Stack behavior, the newest waiter + /// is at the top, so it will receive the event first. + /// + [Fact] + public async Task Issue508_FirstWaiterCancelled_SecondWaiterReceivesEvent() + { + const string EventName = "Event"; + const string EventPayload = "test-payload"; + TaskName orchestratorName = nameof(Issue508_FirstWaiterCancelled_SecondWaiterReceivesEvent); + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx => + { + // STEP 1: Create and immediately cancel the first waiter + using (CancellationTokenSource cts = new CancellationTokenSource()) + { + cts.Cancel(); + try + { + Task cancelledWait = ctx.WaitForExternalEvent(EventName, cts.Token); + await cancelledWait; + throw new InvalidOperationException("WaitForExternalEvent should have thrown cancellation exception"); + } + catch (OperationCanceledException) + { + // Expected path + } + } + + // STEP 2: Create second waiter (active, should receive the event) + // With Stack (LIFO), this new waiter is at the top, so it will receive the event + using (CancellationTokenSource cts = new CancellationTokenSource()) + { + Task waitForEvent = ctx.WaitForExternalEvent(EventName, cts.Token); + Task timeout = ctx.CreateTimer(ctx.CurrentUtcDateTime.AddSeconds(2), CancellationToken.None); + + Task winner = await Task.WhenAny(waitForEvent, timeout); + if (winner == timeout) + { + cts.Cancel(); + throw new TimeoutException("Event lost: WaitForExternalEvent timed out"); + } + + string result = await waitForEvent; + return result; + } + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + + // Wait for orchestration to start and cancel first waiter + await server.Client.WaitForInstanceStartAsync( + instanceId, this.TimeoutToken); + + // Send event - should be received by second waiter (stack top), not the cancelled first waiter + await server.Client.RaiseEventAsync(instanceId, EventName, EventPayload); + + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + + string result = metadata.ReadOutputAs(); + Assert.Equal(EventPayload, result); + } + + /// + /// Test multiple events with multiple waiters: validates that events are delivered + /// in LIFO order to waiters. + /// + [Fact] + public async Task MultipleEvents_MultipleWaiters_LIFOOrder() + { + const string EventName = "Event"; + TaskName orchestratorName = nameof(MultipleEvents_MultipleWaiters_LIFOOrder); + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx => + { + // Create three waiters + Task waiter1 = ctx.WaitForExternalEvent(EventName); + Task waiter2 = ctx.WaitForExternalEvent(EventName); + Task waiter3 = ctx.WaitForExternalEvent(EventName); + + // Wait for all events + string result3 = await waiter3; // Should get first event (stack top) + string result2 = await waiter2; // Should get second event + string result1 = await waiter1; // Should get third event (oldest) + + // Return as comma-separated string for easier assertion + return $"{result1},{result2},{result3}"; + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + + // Wait for orchestration to start and set up waiters + await server.Client.WaitForInstanceStartAsync( + instanceId, this.TimeoutToken); + + // Send three events - should be received in LIFO order + await server.Client.RaiseEventAsync(instanceId, EventName, "event-1"); + await server.Client.RaiseEventAsync(instanceId, EventName, "event-2"); + await server.Client.RaiseEventAsync(instanceId, EventName, "event-3"); + + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + + // Verify LIFO order: waiter3 (newest) gets event-1, waiter2 gets event-2, waiter1 gets event-3 + string result = metadata.ReadOutputAs(); + Assert.Equal("event-3,event-2,event-1", result); + } + + /// + /// Test event buffering: when an event arrives before any waiters, it should be buffered + /// and delivered to the first waiter that arrives. + /// + [Fact] + public async Task EventBuffering_EventArrivesBeforeWaiter_FirstWaiterReceivesBufferedEvent() + { + const string EventName = "Event"; + const string EventPayload = "buffered-event"; + TaskName orchestratorName = nameof(EventBuffering_EventArrivesBeforeWaiter_FirstWaiterReceivesBufferedEvent); + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx => + { + // Small delay to allow external event to arrive first + await ctx.CreateTimer(ctx.CurrentUtcDateTime.AddMilliseconds(100), CancellationToken.None); + + // Now create waiter - should immediately receive the buffered event + Task waiter = ctx.WaitForExternalEvent(EventName); + Task timeout = ctx.CreateTimer(ctx.CurrentUtcDateTime.AddSeconds(1), CancellationToken.None); + + Task winner = await Task.WhenAny(waiter, timeout); + if (winner == timeout) + { + throw new TimeoutException("Buffered event was not received"); + } + + string result = await waiter; + return result; + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + + // Send event before waiter is created + await server.Client.RaiseEventAsync(instanceId, EventName, EventPayload); + + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + + string result = metadata.ReadOutputAs(); + Assert.Equal(EventPayload, result); + } +} diff --git a/test/Grpc.IntegrationTests/OrchestrationPatterns.cs b/test/Grpc.IntegrationTests/OrchestrationPatterns.cs index 45fbd4903..1315eeff9 100644 --- a/test/Grpc.IntegrationTests/OrchestrationPatterns.cs +++ b/test/Grpc.IntegrationTests/OrchestrationPatterns.cs @@ -414,7 +414,10 @@ public async Task ExternalEventsInParallel(int eventCount) Assert.NotNull(metadata); Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - int[] expected = Enumerable.Range(0, eventCount).ToArray(); + // With Stack (LIFO) behavior, the most recent waiter receives events first. + // So if we create waiters in order [0, 1, 2, 3, 4] and send events [0, 1, 2, 3, 4], + // the waiters will receive them in reverse order: [4, 3, 2, 1, 0] + int[] expected = Enumerable.Range(0, eventCount).Reverse().ToArray(); Assert.Equal(expected, metadata.ReadOutputAs()); }