Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ interface IEventSource
/// </summary>
Type EventType { get; }

/// <summary>
/// Gets or sets the next event source in the stack (for LIFO ordering).
/// </summary>
IEventSource? Next { get; set; }

/// <summary>
/// Tries to set the result on tcs.
/// </summary>
Expand All @@ -33,6 +38,9 @@ class EventTaskCompletionSource<T> : TaskCompletionSource<T>, IEventSource
/// <inheritdoc/>
public Type EventType => typeof(T);

/// <inheritdoc/>
public IEventSource? Next { get; set; }

/// <inheritdoc/>
void IEventSource.TrySetResult(object result) => this.TrySetResult((T)result);
}
Expand Down
61 changes: 37 additions & 24 deletions src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ namespace Microsoft.DurableTask.Worker.Shims;
/// </summary>
sealed partial class TaskOrchestrationContextWrapper : TaskOrchestrationContext
{
readonly Dictionary<string, Queue<IEventSource>> externalEventSources = new(StringComparer.OrdinalIgnoreCase);
// We use a stack (a custom implementation using a single-linked list) to make it easier for users
// to abandon external events that they no longer care about. The common case is a Task.WhenAny in a loop.
// Events are assigned to the most recent (top of stack) waiter, which naturally avoids issues with cancelled waiters.
readonly Dictionary<string, IEventSource> externalEventSources = new(StringComparer.OrdinalIgnoreCase);
readonly NamedQueue<string> externalEventBuffer = new();
readonly OrchestrationContext innerContext;
readonly OrchestrationInvocationContext invocationContext;
Expand Down Expand Up @@ -280,29 +283,33 @@ public override async Task CreateTimer(DateTime fireAt, CancellationToken cancel
/// <inheritdoc/>
public override Task<T> WaitForExternalEvent<T>(string eventName, CancellationToken cancellationToken = default)
{
// Return immediately if this external event has already arrived.
if (this.externalEventBuffer.TryTake(eventName, out string? bufferedEventPayload))
{
return Task.FromResult(this.DataConverter.Deserialize<T>(bufferedEventPayload));
}

// Create a task completion source that will be set when the external event arrives.
EventTaskCompletionSource<T> eventSource = new();
if (this.externalEventSources.TryGetValue(eventName, out Queue<IEventSource>? existing))

// Set up the stack for listening to external events (LIFO - Last In First Out)
// New waiters are added to the top of the stack, so they get events first.
// This makes it easier for users to abandon external events they no longer care about.
// The common case is a Task.WhenAny in a loop.
if (this.externalEventSources.TryGetValue(eventName, out IEventSource? existing))
{
if (existing.Count > 0 && existing.Peek().EventType != typeof(T))
if (existing.EventType != typeof(T))
{
throw new ArgumentException("Events with the same name must have the same type argument. Expected"
+ $" {existing.Peek().GetType().FullName} but was requested {typeof(T).FullName}.");
+ $" {existing.EventType.FullName} but was requested {typeof(T).FullName}.");
}

existing.Enqueue(eventSource);
// Add new waiter to the top of the stack
eventSource.Next = existing;
}
else

// New waiter becomes the top of the stack
this.externalEventSources[eventName] = eventSource;

// Check the buffer to see if any events came in before the orchestrator was listening
if (this.externalEventBuffer.TryTake(eventName, out string? bufferedEvent))
{
Queue<IEventSource> eventSourceQueue = new();
eventSourceQueue.Enqueue(eventSource);
this.externalEventSources.Add(eventName, eventSourceQueue);
// We can complete the event right away, since we already have an event's input
this.CompleteExternalEvent(eventName, bufferedEvent);
}

// TODO: this needs to be tracked and disposed appropriately.
Expand Down Expand Up @@ -416,11 +423,23 @@ internal void ExitCriticalSectionIfNeeded()
/// <param name="rawEventPayload">The serialized event payload.</param>
internal void CompleteExternalEvent(string eventName, string rawEventPayload)
{
if (this.externalEventSources.TryGetValue(eventName, out Queue<IEventSource>? waiters))
if (this.externalEventSources.TryGetValue(eventName, out IEventSource? waiter))
{
object? value;
// Get the waiter at the top of the stack (most recent waiter)
// If we're going to raise an event we should remove it from the pending collection
// because otherwise WaitForExternalEvent() will always find one with this key and run infinitely.
IEventSource? next = waiter.Next;
if (next == null)
{
this.externalEventSources.Remove(eventName);
}
else
{
// Next waiter becomes the new top of the stack
this.externalEventSources[eventName] = next;
}

IEventSource waiter = waiters.Dequeue();
object? value;
if (waiter.EventType == typeof(OperationResult))
{
// use the framework-defined deserialization for entity responses, not the application-defined data converter,
Expand All @@ -432,12 +451,6 @@ internal void CompleteExternalEvent(string eventName, string rawEventPayload)
value = this.DataConverter.Deserialize(rawEventPayload, waiter.EventType);
}

// Events are completed in FIFO order. Remove the key if the last event was delivered.
if (waiters.Count == 0)
{
this.externalEventSources.Remove(eventName);
}

waiter.TrySetResult(value);
}
else
Expand Down
244 changes: 244 additions & 0 deletions test/Grpc.IntegrationTests/ExternalEventStackTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Tests.Logging;
using Microsoft.DurableTask.Worker;
using Xunit.Abstractions;

namespace Microsoft.DurableTask.Grpc.Tests;

/// <summary>
/// Integration tests for external event handling with Stack (LIFO) behavior.
/// These tests validate that the isolated worker correctly implements Stack-based
/// event handling consistent with the in-process model.
/// </summary>
public class ExternalEventStackTests : IntegrationTestBase
{
public ExternalEventStackTests(ITestOutputHelper output, GrpcSidecarFixture sidecarFixture)
: base(output, sidecarFixture)
{ }

/// <summary>
/// Test that validates Stack (LIFO) behavior: the most recent waiter receives events first.
/// </summary>
[Fact]
public async Task StackBehavior_LIFO_NewestWaiterReceivesEventFirst()
{
const string EventName = "TestEvent";
const string FirstEventPayload = "first-event";
const string SecondEventPayload = "second-event";
TaskName orchestratorName = nameof(StackBehavior_LIFO_NewestWaiterReceivesEventFirst);

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
{
// First waiter
Task<string> firstWaiter = ctx.WaitForExternalEvent<string>(EventName);

// Second waiter (newer, should receive event first)
Task<string> secondWaiter = ctx.WaitForExternalEvent<string>(EventName);

// Wait for both events to arrive from external client
string secondResult = await secondWaiter; // Should receive first event (stack top)
string firstResult = await firstWaiter; // Should receive second event

// Return which waiter received which event
return $"{firstResult}:{secondResult}";
}));
});

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);

// Wait for orchestration to start and set up waiters
OrchestrationMetadata metadata = await server.Client.WaitForInstanceStartAsync(
instanceId, this.TimeoutToken);

// Send first event - should be received by second waiter (stack top)
await server.Client.RaiseEventAsync(instanceId, EventName, SecondEventPayload);

// Send second event - should be received by first waiter
await server.Client.RaiseEventAsync(instanceId, EventName, FirstEventPayload);

metadata = await server.Client.WaitForInstanceCompletionAsync(
instanceId, getInputsAndOutputs: true, this.TimeoutToken);

Assert.NotNull(metadata);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);

// Verify the output: second waiter should have received "second-event", first waiter "first-event"
string result = metadata.ReadOutputAs<string>();
Assert.Equal($"{FirstEventPayload}:{SecondEventPayload}", result);
}

/// <summary>
/// Test for issue #508: When the first waiter is cancelled, the second (active) waiter
/// should receive the event, not the cancelled one. With Stack behavior, the newest waiter
/// is at the top, so it will receive the event first.
/// </summary>
[Fact]
public async Task Issue508_FirstWaiterCancelled_SecondWaiterReceivesEvent()
{
const string EventName = "Event";
const string EventPayload = "test-payload";
TaskName orchestratorName = nameof(Issue508_FirstWaiterCancelled_SecondWaiterReceivesEvent);

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
{
// STEP 1: Create and immediately cancel the first waiter
using (CancellationTokenSource cts = new CancellationTokenSource())
{
cts.Cancel();
try
{
Task<string> cancelledWait = ctx.WaitForExternalEvent<string>(EventName, cts.Token);
await cancelledWait;
throw new InvalidOperationException("WaitForExternalEvent should have thrown cancellation exception");
}
catch (OperationCanceledException)
{
// Expected path
}
}

// STEP 2: Create second waiter (active, should receive the event)
// With Stack (LIFO), this new waiter is at the top, so it will receive the event
using (CancellationTokenSource cts = new CancellationTokenSource())
{
Task<string> waitForEvent = ctx.WaitForExternalEvent<string>(EventName, cts.Token);
Task timeout = ctx.CreateTimer(ctx.CurrentUtcDateTime.AddSeconds(2), CancellationToken.None);

Task winner = await Task.WhenAny(waitForEvent, timeout);
if (winner == timeout)
{
cts.Cancel();
throw new TimeoutException("Event lost: WaitForExternalEvent timed out");
}

string result = await waitForEvent;
return result;
}
}));
});

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);

// Wait for orchestration to start and cancel first waiter
await server.Client.WaitForInstanceStartAsync(
instanceId, this.TimeoutToken);

// Send event - should be received by second waiter (stack top), not the cancelled first waiter
await server.Client.RaiseEventAsync(instanceId, EventName, EventPayload);

OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
instanceId, getInputsAndOutputs: true, this.TimeoutToken);

Assert.NotNull(metadata);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);

string result = metadata.ReadOutputAs<string>();
Assert.Equal(EventPayload, result);
}

/// <summary>
/// Test multiple events with multiple waiters: validates that events are delivered
/// in LIFO order to waiters.
/// </summary>
[Fact]
public async Task MultipleEvents_MultipleWaiters_LIFOOrder()
{
const string EventName = "Event";
TaskName orchestratorName = nameof(MultipleEvents_MultipleWaiters_LIFOOrder);

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
{
// Create three waiters
Task<string> waiter1 = ctx.WaitForExternalEvent<string>(EventName);
Task<string> waiter2 = ctx.WaitForExternalEvent<string>(EventName);
Task<string> waiter3 = ctx.WaitForExternalEvent<string>(EventName);

// Wait for all events
string result3 = await waiter3; // Should get first event (stack top)
string result2 = await waiter2; // Should get second event
string result1 = await waiter1; // Should get third event (oldest)

// Return as comma-separated string for easier assertion
return $"{result1},{result2},{result3}";
}));
});

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);

// Wait for orchestration to start and set up waiters
await server.Client.WaitForInstanceStartAsync(
instanceId, this.TimeoutToken);

// Send three events - should be received in LIFO order
await server.Client.RaiseEventAsync(instanceId, EventName, "event-1");
await server.Client.RaiseEventAsync(instanceId, EventName, "event-2");
await server.Client.RaiseEventAsync(instanceId, EventName, "event-3");

OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
instanceId, getInputsAndOutputs: true, this.TimeoutToken);

Assert.NotNull(metadata);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);

// Verify LIFO order: waiter3 (newest) gets event-1, waiter2 gets event-2, waiter1 gets event-3
string result = metadata.ReadOutputAs<string>();
Assert.Equal("event-3,event-2,event-1", result);
}

/// <summary>
/// Test event buffering: when an event arrives before any waiters, it should be buffered
/// and delivered to the first waiter that arrives.
/// </summary>
[Fact]
public async Task EventBuffering_EventArrivesBeforeWaiter_FirstWaiterReceivesBufferedEvent()
{
const string EventName = "Event";
const string EventPayload = "buffered-event";
TaskName orchestratorName = nameof(EventBuffering_EventArrivesBeforeWaiter_FirstWaiterReceivesBufferedEvent);

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
{
// Small delay to allow external event to arrive first
await ctx.CreateTimer(ctx.CurrentUtcDateTime.AddMilliseconds(100), CancellationToken.None);

// Now create waiter - should immediately receive the buffered event
Task<string> waiter = ctx.WaitForExternalEvent<string>(EventName);
Task timeout = ctx.CreateTimer(ctx.CurrentUtcDateTime.AddSeconds(1), CancellationToken.None);

Task winner = await Task.WhenAny(waiter, timeout);
if (winner == timeout)
{
throw new TimeoutException("Buffered event was not received");
}

string result = await waiter;
return result;
}));
});

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);

// Send event before waiter is created
await server.Client.RaiseEventAsync(instanceId, EventName, EventPayload);

OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
instanceId, getInputsAndOutputs: true, this.TimeoutToken);

Assert.NotNull(metadata);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);

string result = metadata.ReadOutputAs<string>();
Assert.Equal(EventPayload, result);
}
}
5 changes: 4 additions & 1 deletion test/Grpc.IntegrationTests/OrchestrationPatterns.cs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,10 @@ public async Task ExternalEventsInParallel(int eventCount)
Assert.NotNull(metadata);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);

int[] expected = Enumerable.Range(0, eventCount).ToArray();
// With Stack (LIFO) behavior, the most recent waiter receives events first.
// So if we create waiters in order [0, 1, 2, 3, 4] and send events [0, 1, 2, 3, 4],
// the waiters will receive them in reverse order: [4, 3, 2, 1, 0]
int[] expected = Enumerable.Range(0, eventCount).Reverse().ToArray();
Assert.Equal<int>(expected, metadata.ReadOutputAs<int[]>());
}

Expand Down
Loading