Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 56 additions & 18 deletions src/Dapr.Workflow/Client/WorkflowGrpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,22 @@ public override async Task<WorkflowInstancePage> ListInstanceIdsAsync(
request.PageSize = (uint)pageSize.Value;
}

var grpcCallOptions = CreateCallOptions(cancellationToken);
var response = await grpcClient.ListInstanceIDsAsync(request, grpcCallOptions);
try
{
var grpcCallOptions = CreateCallOptions(cancellationToken);
var response = await grpcClient.ListInstanceIDsAsync(request, grpcCallOptions);

logger.LogListInstanceIds(response.InstanceIds.Count);
logger.LogListInstanceIds(response.InstanceIds.Count);

return new WorkflowInstancePage(
response.InstanceIds.ToList().AsReadOnly(),
response.HasContinuationToken ? response.ContinuationToken : null);
return new WorkflowInstancePage(
response.InstanceIds.ToList().AsReadOnly(),
response.HasContinuationToken ? response.ContinuationToken : null);
}
catch (RpcException ex) when (IsRpcMethodNotSupportedByRuntime(ex))
{
throw new NotSupportedException(
"ListInstanceIDs is not supported by the current Dapr runtime version. Please upgrade to a newer Dapr release.", ex);
}
}

/// <inheritdoc />
Expand All @@ -267,17 +275,25 @@ public override async Task<IReadOnlyList<WorkflowHistoryEvent>> GetInstanceHisto
InstanceId = instanceId
};

var grpcCallOptions = CreateCallOptions(cancellationToken);
var response = await grpcClient.GetInstanceHistoryAsync(request, grpcCallOptions);
try
{
var grpcCallOptions = CreateCallOptions(cancellationToken);
var response = await grpcClient.GetInstanceHistoryAsync(request, grpcCallOptions);

var events = response.Events
.Select(ProtoConverters.ToWorkflowHistoryEvent)
.ToList()
.AsReadOnly();
var events = response.Events
.Select(ProtoConverters.ToWorkflowHistoryEvent)
.ToList()
.AsReadOnly();

logger.LogGetInstanceHistory(instanceId, events.Count);
logger.LogGetInstanceHistory(instanceId, events.Count);

return events;
return events;
}
catch (RpcException ex) when (IsRpcMethodNotSupportedByRuntime(ex))
{
throw new NotSupportedException(
"GetInstanceHistory is not supported by the current Dapr runtime version. Please upgrade to a newer Dapr release.", ex);
}
}

/// <inheritdoc />
Expand Down Expand Up @@ -313,12 +329,20 @@ public override async Task<string> RerunWorkflowFromEventAsync(
request.Input = SerializeToJson(options.Input);
}

var grpcCallOptions = CreateCallOptions(cancellationToken);
var response = await grpcClient.RerunWorkflowFromEventAsync(request, grpcCallOptions);
try
{
var grpcCallOptions = CreateCallOptions(cancellationToken);
var response = await grpcClient.RerunWorkflowFromEventAsync(request, grpcCallOptions);

logger.LogRerunWorkflowFromEvent(sourceInstanceId, eventId, response.NewInstanceID);
logger.LogRerunWorkflowFromEvent(sourceInstanceId, eventId, response.NewInstanceID);

return response.NewInstanceID;
return response.NewInstanceID;
}
catch (RpcException ex) when (IsRpcMethodNotSupportedByRuntime(ex))
{
throw new NotSupportedException(
"RerunWorkflowFromEvent is not supported by the current Dapr runtime version. Please upgrade to a newer Dapr release.", ex);
}
}

/// <inheritdoc />
Expand All @@ -337,4 +361,18 @@ private static bool IsTerminalStatus(WorkflowRuntimeStatus status) =>
status is WorkflowRuntimeStatus.Completed
or WorkflowRuntimeStatus.Failed
or WorkflowRuntimeStatus.Terminated;

/// <summary>
/// Returns <c>true</c> when the <see cref="RpcException"/> indicates that the Dapr sidecar does not
/// implement the requested gRPC method and fell back to its service-invocation proxy, which then
/// failed because the workflow client never sends a <c>dapr-app-id</c> header. This pattern
/// occurs on older Dapr runtime versions that pre-date the <c>GetInstanceHistory</c>,
/// <c>ListInstanceIDs</c>, and <c>RerunWorkflowFromEvent</c> RPCs.
/// The sidecar emits "failed to proxy request: required metadata dapr-callee-app-id or dapr-app-id not found"
/// in this case.
/// </summary>
private static bool IsRpcMethodNotSupportedByRuntime(RpcException ex) =>
ex.StatusCode == StatusCode.Unknown &&
ex.Status.Detail.Contains("required metadata", StringComparison.OrdinalIgnoreCase) &&
ex.Status.Detail.Contains("dapr-app-id", StringComparison.OrdinalIgnoreCase);
}
37 changes: 27 additions & 10 deletions src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ internal sealed class WorkflowOrchestrationContext : WorkflowContext
private bool _isReplaying;
private bool _turnInitialized;
private bool _preserveUnprocessedEvents;

public WorkflowOrchestrationContext(string name, string instanceId, DateTime currentUtcDateTime,
IWorkflowSerializer workflowSerializer, ILoggerFactory loggerFactory, WorkflowVersionTracker versionTracker,
string? appId = null, string? executionId = null)
Expand Down Expand Up @@ -360,20 +359,25 @@ public override void ContinueAsNew(object? newInput = null, bool preserveUnproce
}
};

// Do NOT snapshot _externalEventBuffer here. ContinueAsNew is called from within
// workflow execution, which happens during ProcessEvents. Events arriving later in
// the same NewEvents batch will be buffered AFTER this point and would be missed.
// FinalizeCarryoverEvents() is called after all ProcessEvents calls are complete.
// The Dapr sidecar's CarryoverEvents field strips the Input when re-delivering events,
// causing all carryover payloads to deserialize as default(T). Instead, we re-queue
// unprocessed events as SendEvent actions to self in FinalizeCarryoverEvents(), which
// preserves the original Input values and avoids double-delivery.
_preserveUnprocessedEvents = preserveUnprocessedEvents;
_pendingActions.Add(action.Id, action);
}

/// <summary>
/// Populates <c>CarryoverEvents</c> on any pending <c>ContinuedAsNew</c> action using the
/// final state of <c>_externalEventBuffer</c>. Must be called after all <c>ProcessEvents</c>
/// calls for the current turn are complete, so that events arriving later in the same
/// <c>NewEvents</c> batch are included.
/// Re-queues unprocessed external events as <c>SendEvent</c> actions to this instance
/// so that the new execution (after <c>ContinuedAsNew</c>) receives them with their
/// original input values intact.
/// </summary>
/// <remarks>
/// Using the gRPC <c>CarryoverEvents</c> field causes the Dapr sidecar to strip the
/// <c>Input</c> of each re-delivered event, so all payloads deserialize as
/// <c>default(T)</c>. Emitting <c>SendEvent</c> actions to self avoids that stripping
/// because the event data travels through the sidecar's normal event queue path.
/// </remarks>
internal void FinalizeCarryoverEvents()
{
if (!_preserveUnprocessedEvents || _externalEventBuffer.Count == 0)
Expand All @@ -383,7 +387,20 @@ internal void FinalizeCarryoverEvents()
{
if (action.CompleteOrchestration?.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew)
{
action.CompleteOrchestration.CarryoverEvents.AddRange(_externalEventBuffer);
foreach (var bufferedEvent in _externalEventBuffer)
{
var sendId = _sequenceNumber++;
_pendingActions.Add(sendId, new OrchestratorAction
{
Id = sendId,
SendEvent = new SendEventAction
{
Instance = new OrchestrationInstance { InstanceId = InstanceId },
Name = bufferedEvent.EventRaised.Name,
Data = bufferedEvent.EventRaised.Input
}
});
}
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Dapr.Testcontainers.Common.Options;
using Dapr.Testcontainers.Common.Testing;
using Dapr.Testcontainers.Harnesses;
using Dapr.Testcontainers.Xunit.Attributes;
using Dapr.Workflow;
using Dapr.Workflow.Versioning;
using Grpc.Core;
Expand All @@ -29,7 +30,7 @@ public sealed class CombinedVersioningIntegrationTests
private const string CanonicalWorkflowName = "CombinedVersionedWorkflow";
private const string ResumeEventName = "resume";

[Fact]
[MinimumDaprRuntimeFact("1.17")]
public async Task ShouldCombinePatchAndNameBasedVersioning()
{
var instanceIdV1 = Guid.NewGuid().ToString("N");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,17 @@
/// into a single NewEvents delivery, the pre-fix code lost every signal after the first.
/// After the fix the full buffer is captured once all events are processed, so every
/// signal survives as a carryover event and the workflow counts down to zero.
///
/// The signal count is intentionally small (15) so that all signals can be fired
/// simultaneously — maximising the chance the sidecar batches several of them into
/// a single NewEvents delivery — while keeping the total wall-clock time well under
/// 30 seconds. Each ContinueAsNew iteration requires one sidecar round-trip; a larger
/// count (e.g. 250) makes the test take 2+ minutes and risks CI timeouts.
/// </summary>
[Fact]

Check failure on line 44 in test/Dapr.IntegrationTest.Workflow/ContinueAsNewCarryoverEventsTests.cs

View workflow job for this annotation

GitHub Actions / net8-1.17.3-Workflow

Dapr.IntegrationTest.Workflow.ContinueAsNewCarryoverEventsTests.ContinueAsNew_ShouldCarryOverEvents_WhenMultipleSignalsArriveTogether

System.Threading.Tasks.TaskCanceledException : A task was canceled.

Check failure on line 44 in test/Dapr.IntegrationTest.Workflow/ContinueAsNewCarryoverEventsTests.cs

View workflow job for this annotation

GitHub Actions / net10-1.17.4-Workflow

Dapr.IntegrationTest.Workflow.ContinueAsNewCarryoverEventsTests.ContinueAsNew_ShouldCarryOverEvents_WhenMultipleSignalsArriveTogether

System.Threading.Tasks.TaskCanceledException : A task was canceled.

Check failure on line 44 in test/Dapr.IntegrationTest.Workflow/ContinueAsNewCarryoverEventsTests.cs

View workflow job for this annotation

GitHub Actions / net8-1.17.4-Workflow

Dapr.IntegrationTest.Workflow.ContinueAsNewCarryoverEventsTests.ContinueAsNew_ShouldCarryOverEvents_WhenMultipleSignalsArriveTogether

System.Threading.Tasks.TaskCanceledException : A task was canceled.

Check failure on line 44 in test/Dapr.IntegrationTest.Workflow/ContinueAsNewCarryoverEventsTests.cs

View workflow job for this annotation

GitHub Actions / net10-1.17.3-Workflow

Dapr.IntegrationTest.Workflow.ContinueAsNewCarryoverEventsTests.ContinueAsNew_ShouldCarryOverEvents_WhenMultipleSignalsArriveTogether

System.Threading.Tasks.TaskCanceledException : A task was canceled.

Check failure on line 44 in test/Dapr.IntegrationTest.Workflow/ContinueAsNewCarryoverEventsTests.cs

View workflow job for this annotation

GitHub Actions / net9-1.17.4-Workflow

Dapr.IntegrationTest.Workflow.ContinueAsNewCarryoverEventsTests.ContinueAsNew_ShouldCarryOverEvents_WhenMultipleSignalsArriveTogether

System.Threading.Tasks.TaskCanceledException : A task was canceled.

Check failure on line 44 in test/Dapr.IntegrationTest.Workflow/ContinueAsNewCarryoverEventsTests.cs

View workflow job for this annotation

GitHub Actions / net9-1.17.3-Workflow

Dapr.IntegrationTest.Workflow.ContinueAsNewCarryoverEventsTests.ContinueAsNew_ShouldCarryOverEvents_WhenMultipleSignalsArriveTogether

System.Threading.Tasks.TaskCanceledException : A task was canceled.

Check failure on line 44 in test/Dapr.IntegrationTest.Workflow/ContinueAsNewCarryoverEventsTests.cs

View workflow job for this annotation

GitHub Actions / net9-1.16.13-rc.1-Workflow

Dapr.IntegrationTest.Workflow.ContinueAsNewCarryoverEventsTests.ContinueAsNew_ShouldCarryOverEvents_WhenMultipleSignalsArriveTogether

System.Threading.Tasks.TaskCanceledException : A task was canceled.

Check failure on line 44 in test/Dapr.IntegrationTest.Workflow/ContinueAsNewCarryoverEventsTests.cs

View workflow job for this annotation

GitHub Actions / net10-1.16.13-rc.1-Workflow

Dapr.IntegrationTest.Workflow.ContinueAsNewCarryoverEventsTests.ContinueAsNew_ShouldCarryOverEvents_WhenMultipleSignalsArriveTogether

System.Threading.Tasks.TaskCanceledException : A task was canceled.

Check failure on line 44 in test/Dapr.IntegrationTest.Workflow/ContinueAsNewCarryoverEventsTests.cs

View workflow job for this annotation

GitHub Actions / net8-1.16.13-rc.1-Workflow

Dapr.IntegrationTest.Workflow.ContinueAsNewCarryoverEventsTests.ContinueAsNew_ShouldCarryOverEvents_WhenMultipleSignalsArriveTogether

System.Threading.Tasks.TaskCanceledException : A task was canceled.
public async Task ContinueAsNew_ShouldCarryOverEvents_WhenMultipleSignalsArriveTogether()
{
const int signalCount = 250;
const int signalCount = 15;
var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components");
var workflowInstanceId = Guid.NewGuid().ToString();

Expand Down Expand Up @@ -86,16 +92,19 @@
// All signals must be consumed via carryover before the workflow completes.
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(
TestContext.Current.CancellationToken);
timeoutCts.CancelAfter(TimeSpan.FromMinutes(2));
timeoutCts.CancelAfter(TimeSpan.FromSeconds(60));

var result = await client.WaitForWorkflowCompletionAsync(
workflowInstanceId, cancellation: timeoutCts.Token);

Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus);

// Every index in [0, signalCount) must appear exactly once in the output — no drops, no duplicates.
// Every index in [0, signalCount) must appear exactly once — no drops, no duplicates.
// Order() sorts the received values; comparing against Range ensures complete coverage
// with the correct payload for each signal (not default(int)=0 from Input stripping).
var receivedIndexes = result.ReadOutputAs<List<int>>();
Assert.NotNull(receivedIndexes);
Assert.Equal(signalCount, receivedIndexes.Count);
Assert.Equal(Enumerable.Range(0, signalCount), receivedIndexes.Order());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public sealed class ExternalEventCancellationSequentialTests
public async Task ExternalEvents_ShouldComplete_WhenRaisedSequentially_WithDelay()
{
await ExternalEventCancellationTestHarness.RunAsync(
workflowCount: 1000,
workflowCount: 50,
raiseEventsInParallel: false,
perEventDelay: TimeSpan.FromMilliseconds(75),
initialWaitTimeout: TimeSpan.FromMilliseconds(200));
Expand All @@ -38,7 +38,7 @@ public sealed class ExternalEventCancellationParallelTests
public async Task ExternalEvents_ShouldComplete_WhenRaisedInParallel_MinimalDelay()
{
await ExternalEventCancellationTestHarness.RunAsync(
workflowCount: 1000,
workflowCount: 50,
raiseEventsInParallel: true,
perEventDelay: TimeSpan.Zero,
initialWaitTimeout: TimeSpan.FromMilliseconds(200));
Expand Down Expand Up @@ -93,7 +93,7 @@ public static async Task RunAsync(
await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(CanceledWaitWorkflow), workflowId, initialWaitTimeout);
}

using var waitCts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
using var waitCts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
await Task.WhenAll(workflowIds.Select(id =>
WaitForCustomStatusAsync(daprWorkflowClient, id, WaitingAfterTimeoutStatus, waitCts.Token)));

Expand All @@ -113,7 +113,7 @@ await Task.WhenAll(workflowIds.Select(id =>
}
}

using var completionCts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
using var completionCts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
var results = await Task.WhenAll(workflowIds.Select(id =>
daprWorkflowClient.WaitForWorkflowCompletionAsync(id, cancellation: completionCts.Token)));

Expand Down
5 changes: 3 additions & 2 deletions test/Dapr.IntegrationTest.Workflow/WorkflowRpcTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

using Dapr.Testcontainers.Common;
using Dapr.Testcontainers.Harnesses;
using Dapr.Testcontainers.Xunit.Attributes;
using Dapr.Workflow;
using Dapr.Workflow.Client;
using Microsoft.Extensions.Configuration;
Expand All @@ -22,7 +23,7 @@ namespace Dapr.IntegrationTest.Workflow;

public sealed class WorkflowRpcTests
{
[Fact]
[MinimumDaprRuntimeFact("1.17.0")]
public async Task ListInstanceIds_ShouldReturnScheduledWorkflowInstances()
{
var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components");
Expand Down Expand Up @@ -63,7 +64,7 @@ public async Task ListInstanceIds_ShouldReturnScheduledWorkflowInstances()
Assert.Contains(instanceId, page.InstanceIds);
}

[Fact]
[MinimumDaprRuntimeFact("1.17.0")]
public async Task GetInstanceHistory_ShouldReturnHistoryForCompletedWorkflow()
{
var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components");
Expand Down
Loading
Loading