Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/InProcessTestHost/InProcessTestHost.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<RootNamespace>Microsoft.DurableTask.Testing</RootNamespace>
<AssemblyName>Microsoft.DurableTask.InProcessTestHost</AssemblyName>
<PackageId>Microsoft.DurableTask.InProcessTestHost</PackageId>
<Version>0.2.2-preview.1</Version>
<Version>0.2.3-preview.1</Version>

<!-- Suppress CA1848: Use LoggerMessage delegates for high-performance logging scenarios -->
<NoWarn>$(NoWarn);CA1848</NoWarn>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ protected override async Task ExecuteWorkItemAsync(TaskOrchestrationWorkItem wor
{
throw new ArgumentException($"Could not find an orchestration instance ID in the work item's runtime state.", nameof(workItem));
}
var activityMessages = new List<TaskMessage>();
var timerMessages = new List<TaskMessage>();
var orchestratorMessages = new List<TaskMessage>();
// We loop for as long as the orchestrator does a ContinueAsNew

var activityMessages = new List<TaskMessage>();
var timerMessages = new List<TaskMessage>();
var orchestratorMessages = new List<TaskMessage>();

// We loop for as long as the orchestrator does a ContinueAsNew
while (true)
{
if (this.log.IsEnabled(LogLevel.Debug))
Expand Down Expand Up @@ -149,6 +149,16 @@ protected override async Task ExecuteWorkItemAsync(TaskOrchestrationWorkItem wor
out bool continueAsNew);
if (continueAsNew)
{
// The previous execution is being replaced by a new one. Clear any
// accumulated messages from the old execution so they are not
// re-enqueued when the work item completes. Without this, stale
// activity/timer/orchestrator messages from prior iterations would
// be committed alongside the new execution, causing duplicate
// activities, stale timer fires, and ultimately stuck instances.
activityMessages.Clear();
timerMessages.Clear();
orchestratorMessages.Clear();

// Continue running the orchestration with a new history.
// Renew the lock if we're getting close to its expiration.
if (workItem.LockedUntilUtc != default && DateTime.UtcNow.AddMinutes(1) > workItem.LockedUntilUtc)
Expand Down Expand Up @@ -365,11 +375,11 @@ void ApplyOrchestratorActions(
Input = sendEventAction.EventData,
};

runtimeState.AddEvent(sendEvent);
EventRaisedEvent eventRaisedEvent = new(-1, sendEventAction.EventData)
{
Name = sendEventAction.EventName
runtimeState.AddEvent(sendEvent);

EventRaisedEvent eventRaisedEvent = new(-1, sendEventAction.EventData)
{
Name = sendEventAction.EventName
};

orchestratorMessages.Add(new TaskMessage
Expand Down
16 changes: 8 additions & 8 deletions src/InProcessTestHost/Sidecar/Dispatcher/WorkItemDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,10 @@ async Task WaitForAllClear(CancellationToken cancellationToken)
{
TimeSpan logInterval = TimeSpan.FromMinutes(1);

// IMPORTANT: This logic assumes only a single logical "thread" is executing the receive loop,
// and that there's no possible race condition when comparing work-item counts.
// IMPORTANT: The receive loop is expected to have a single logical execution path, but the
// work-item count can still change concurrently and must be read using thread-safe access.
DateTime nextLogTime = DateTime.MinValue;
while (this.currentWorkItems >= this.MaxWorkItems)
while (Volatile.Read(ref this.currentWorkItems) >= this.MaxWorkItems)
{
// Periodically log that we're waiting for available concurrency.
// No need to use UTC for this. Local time is a bit easier to debug.
Expand All @@ -169,7 +169,7 @@ async Task WaitForAllClear(CancellationToken cancellationToken)
this.log.FetchingThrottled(
dispatcher: this.name,
details: "The current active work-item count has reached the allowed maximum.",
this.currentWorkItems,
Volatile.Read(ref this.currentWorkItems),
this.MaxWorkItems);
nextLogTime = now.Add(logInterval);
}
Expand All @@ -192,14 +192,14 @@ async Task WaitForAllClear(CancellationToken cancellationToken)
async Task WaitForOutstandingWorkItems(CancellationToken cancellationToken)
{
DateTime nextLogTime = DateTime.MinValue;
while (this.currentWorkItems > 0)
while (Volatile.Read(ref this.currentWorkItems) > 0)
{
// Periodically log that we're waiting for outstanding work items to complete.
// No need to use UTC for this. Local time is a bit easier to debug.
DateTime now = DateTime.Now;
if (now >= nextLogTime)
{
this.log.DispatcherStopping(this.name, this.currentWorkItems);
this.log.DispatcherStopping(this.name, Volatile.Read(ref this.currentWorkItems));
nextLogTime = now.AddMinutes(1);
}

Expand Down Expand Up @@ -265,7 +265,7 @@ async Task FetchAndExecuteLoop(CancellationToken cancellationToken)

if (workItem != null)
{
this.currentWorkItems++;
Interlocked.Increment(ref this.currentWorkItems);
this.log.FetchWorkItemCompleted(
this.name,
this.GetWorkItemId(workItem),
Expand Down Expand Up @@ -334,7 +334,7 @@ async Task ExecuteWorkItem(T workItem)
details: ex.ToString());
}

this.currentWorkItems--;
Interlocked.Decrement(ref this.currentWorkItems);
}
}
}
Expand Down
15 changes: 9 additions & 6 deletions src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ public void AddMessage(TaskMessage message)
SerializedInstanceState state = this.store.GetOrAdd(instanceId, id => new SerializedInstanceState(id, executionId));
lock (state)
{
bool isRestart = state.ExecutionId != null && state.ExecutionId != executionId;
bool isRestart = executionId != null && state.ExecutionId != null && state.ExecutionId != executionId;

if (message.Event is ExecutionStartedEvent startEvent)
{
Expand Down Expand Up @@ -684,11 +684,14 @@ public void AddMessage(TaskMessage message)
else if (state.IsCompleted)
{
// Drop the message since we're completed
// GOOD: The user-provided the instanceId
// logger.LogWarning(
// "Dropped {eventType} message for instance '{instanceId}' because the orchestration has already completed.",
// message.Event.EventType,
// instanceId);
return;
}
else if (isRestart)
{
// Drop messages that belong to a previous execution (stale activity
// completions, timer fires, etc.). These can arrive when a
// ContinueAsNew created a new execution but work items from the
// old execution are still in flight.
return;
}

Expand Down
Loading
Loading