Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/InProcessTestHost/InProcessTestHost.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<RootNamespace>Microsoft.DurableTask.Testing</RootNamespace>
<AssemblyName>Microsoft.DurableTask.InProcessTestHost</AssemblyName>
<PackageId>Microsoft.DurableTask.InProcessTestHost</PackageId>
<Version>0.2.0-preview.1</Version>
<Version>0.2.1-preview.1</Version>

<!-- Suppress CA1848: Use LoggerMessage delegates for high-performance logging scenarios -->
<NoWarn>$(NoWarn);CA1848</NoWarn>
Expand Down
9 changes: 3 additions & 6 deletions src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ public PurgeResult PurgeInstanceState(PurgeInstanceFilter purgeInstanceFilter)
class ReadyToRunQueue
{
readonly Channel<SerializedInstanceState> readyToRunQueue = Channel.CreateUnbounded<SerializedInstanceState>();
readonly Dictionary<string, object> readyInstances = new(StringComparer.OrdinalIgnoreCase);
readonly ConcurrentDictionary<string, SerializedInstanceState> readyInstances = new(StringComparer.OrdinalIgnoreCase);

public void Reset()
{
Expand All @@ -893,7 +893,7 @@ public async ValueTask<SerializedInstanceState> TakeNextAsync(CancellationToken
SerializedInstanceState state = await this.readyToRunQueue.Reader.ReadAsync(ct);
lock (state)
{
if (this.readyInstances.Remove(state.InstanceId))
if (this.readyInstances.TryRemove(state.InstanceId, out _))
{
if (state.IsLoaded)
{
Expand All @@ -909,12 +909,9 @@ public async ValueTask<SerializedInstanceState> TakeNextAsync(CancellationToken

public void Schedule(SerializedInstanceState state)
{
// TODO: There is a race condition here. If another thread is calling TakeNextAsync
// and removed the queue item before updating the dictionary, then we'll fail
// to update the readyToRunQueue and the orchestration will get stuck.
if (this.readyInstances.TryAdd(state.InstanceId, state))
{
if (!this.readyToRunQueue.Writer.TryWrite(state))
if (!this.readyToRunQueue.Writer.TryWrite(state))
{
throw new InvalidOperationException($"unable to write to queue for {state.InstanceId}");
}
Expand Down
104 changes: 104 additions & 0 deletions test/InProcessTestHost.Tests/ConcurrentTimerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Testing;
using Microsoft.DurableTask.Worker;
using Xunit;

namespace InProcessTestHost.Tests;

/// <summary>
/// Tests to verify that multiple orchestrations with identical timer FireAt timestamps
/// all complete correctly without any being dropped.
/// </summary>
public class ConcurrentTimerTests
{
[Fact]
// Test that multi orchestrations with the same timer that fire at the same time
Comment thread
nytian marked this conversation as resolved.
Outdated
// can all complete correctly.
public async Task MultipleOrchestrations_WithSameTimerFireAt_AllComplete()
Comment thread
nytian marked this conversation as resolved.
{
const int orchestrationCount = 10;
const string orchestratorName = "TimerOrchestrator";

await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(tasks =>
{
tasks.AddOrchestratorFunc<DateTime, string>(orchestratorName, async (ctx, fireAt) =>
{
await ctx.CreateTimer(fireAt, CancellationToken.None);
return $"done:{ctx.InstanceId}";
});
});

DateTime sharedFireAt = DateTime.UtcNow.AddSeconds(10);
Comment thread
nytian marked this conversation as resolved.
Outdated

string[] instanceIds = new string[orchestrationCount];
for (int i = 0; i < orchestrationCount; i++)
{
instanceIds[i] = await host.Client.ScheduleNewOrchestrationInstanceAsync(
orchestratorName, sharedFireAt);
}

using CancellationTokenSource cts = new(TimeSpan.FromSeconds(30));

Task<OrchestrationMetadata>[] waitTasks = instanceIds
.Select(id => host.Client.WaitForInstanceCompletionAsync(
id, getInputsAndOutputs: true, cts.Token))
.ToArray();

OrchestrationMetadata[] results = await Task.WhenAll(waitTasks);

for (int i = 0; i < orchestrationCount; i++)
{
Assert.NotNull(results[i]);
Assert.Equal(OrchestrationRuntimeStatus.Completed, results[i].RuntimeStatus);
string output = results[i].ReadOutputAs<string>()!;
Assert.Equal($"done:{instanceIds[i]}", output);
}
}

[Fact]
// Test that fan-out sub-orchestrations with the same timer fire at time
Comment thread
nytian marked this conversation as resolved.
Outdated
// can all complete correctly.
public async Task SubOrchestrations_WithIdenticalTimers_AllComplete()
Comment thread
nytian marked this conversation as resolved.
{
const int subOrchestrationCount = 10;
const string parentName = "ParentOrchestrator";
const string childName = "ChildTimerOrchestrator";

await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(tasks =>
{
tasks.AddOrchestratorFunc<int>(parentName, async ctx =>
{
DateTime sharedFireAt = ctx.CurrentUtcDateTime.AddSeconds(2);

// A parent orchestration will schedule 10 sub-orchestrations which has a timer
// fires at the same time.
Task<string>[] childTasks = Enumerable.Range(0, subOrchestrationCount)
.Select(i => ctx.CallSubOrchestratorAsync<string>(childName, sharedFireAt))
.ToArray();

string[] results = await Task.WhenAll(childTasks);
return results.Length;
});

tasks.AddOrchestratorFunc<DateTime, string>(childName, async (ctx, fireAt) =>
{
await ctx.CreateTimer(fireAt, CancellationToken.None);
return $"child-done:{ctx.InstanceId}";
});
});

string instanceId = await host.Client.ScheduleNewOrchestrationInstanceAsync(parentName);

using CancellationTokenSource cts = new(TimeSpan.FromSeconds(60));
OrchestrationMetadata metadata = await host.Client.WaitForInstanceCompletionAsync(
instanceId, getInputsAndOutputs: true, cts.Token);

Assert.NotNull(metadata);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
Assert.Equal(subOrchestrationCount, metadata.ReadOutputAs<int>());
}
}
Loading