diff --git a/src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs b/src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs index 71beff3a0..f4d47f01e 100644 --- a/src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs +++ b/src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs @@ -61,8 +61,9 @@ internal sealed class WorkflowOrchestrationContext : WorkflowContext private readonly Dictionary _unmatchedCompletionsByExecutionId = new(StringComparer.Ordinal); - // Parse instance ID as GUID or generate one + // Parse execution/instance ID as GUID or derive a deterministic namespace from the ID private readonly Guid _instanceGuid; + private static readonly Guid InstanceIdNamespace = new("6f927a2e-9c7e-4a1d-9b8d-7a86f2e7f62f"); private readonly string? _appId; private int _sequenceNumber; @@ -73,13 +74,17 @@ internal sealed class WorkflowOrchestrationContext : WorkflowContext private bool _turnInitialized; public WorkflowOrchestrationContext(string name, string instanceId, DateTime currentUtcDateTime, - IWorkflowSerializer workflowSerializer, ILoggerFactory loggerFactory, WorkflowVersionTracker versionTracker, string? appId = null) + IWorkflowSerializer workflowSerializer, ILoggerFactory loggerFactory, WorkflowVersionTracker versionTracker, + string? appId = null, string? executionId = null) { _workflowSerializer = workflowSerializer; _loggerFactory = loggerFactory; _logger = loggerFactory.CreateLogger() ?? throw new ArgumentNullException(nameof(loggerFactory)); - _instanceGuid = Guid.TryParse(instanceId, out var guid) ? guid : Guid.NewGuid(); + var guidSeed = !string.IsNullOrWhiteSpace(executionId) ? executionId : instanceId; + _instanceGuid = Guid.TryParse(guidSeed, out var guid) + ? guid + : CreateGuidFromName(InstanceIdNamespace, Encoding.UTF8.GetBytes(guidSeed)); Name = name; InstanceId = instanceId; _currentUtcDateTime = currentUtcDateTime; @@ -334,9 +339,8 @@ public override void ContinueAsNew(object? newInput = null, bool preserveUnproce /// public override Guid NewGuid() { - // Create deterministic Guid based on instance ID and counter var guidCounter = _guidCounter++; - var name = $"{InstanceId}_{guidCounter}"; // Stable name + var name = $"{InstanceId}_{guidCounter}"; // Stable per execution and replay return CreateGuidFromName(_instanceGuid, Encoding.UTF8.GetBytes(name)); } diff --git a/src/Dapr.Workflow/Worker/WorkflowWorker.cs b/src/Dapr.Workflow/Worker/WorkflowWorker.cs index 9b12b2371..5069ee1c5 100644 --- a/src/Dapr.Workflow/Worker/WorkflowWorker.cs +++ b/src/Dapr.Workflow/Worker/WorkflowWorker.cs @@ -310,7 +310,7 @@ private async Task HandleOrchestratorResponseAsync(Orchest // Initialize the context with the FULL history var context = new WorkflowOrchestrationContext(workflowName, request.InstanceId, currentUtcDateTime, - _serializer, loggerFactory, versionTracker, appId); + _serializer, loggerFactory, versionTracker, appId, request.ExecutionId); // Deserialize the input object? input = string.IsNullOrEmpty(serializedInput) diff --git a/test/Dapr.IntegrationTest.Workflow/ReplaySafetyTests.cs b/test/Dapr.IntegrationTest.Workflow/ReplaySafetyTests.cs index 6a95af012..52cd7ee93 100644 --- a/test/Dapr.IntegrationTest.Workflow/ReplaySafetyTests.cs +++ b/test/Dapr.IntegrationTest.Workflow/ReplaySafetyTests.cs @@ -110,6 +110,74 @@ public async Task Workflow_ShouldUseDeterministicGuidGeneration() Assert.All(guids, g => Assert.NotEqual(Guid.Empty, g)); } + [Fact] + public async Task NewGuid_ShouldRemainStableAcrossReplays() + { + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + + await using var environment = await DaprTestEnvironment.CreateWithPooledNetworkAsync(needsActorState: true); + await environment.StartAsync(); + + var harness = new DaprHarnessBuilder(componentsDir) + .WithEnvironment(environment) + .BuildWorkflow(); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + configureRuntime: opt => + { + opt.RegisterWorkflow(); + }, + configureClient: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(ReplayGuidWorkflow), workflowInstanceId); + + using var statusCts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); + var initialStatus = await WaitForGuidStatusAsync(daprWorkflowClient, workflowInstanceId, phase: 0, statusCts.Token); + var expectedGuid = initialStatus.Guid; + + await daprWorkflowClient.RaiseEventAsync(workflowInstanceId, ReplayGuidWorkflow.Event1Name, "go", statusCts.Token); + var replayStatus = await WaitForGuidStatusAsync(daprWorkflowClient, workflowInstanceId, phase: 1, statusCts.Token); + Assert.Equal(expectedGuid, replayStatus.Guid); + + await daprWorkflowClient.RaiseEventAsync(workflowInstanceId, ReplayGuidWorkflow.Event2Name, "go", statusCts.Token); + + var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId, cancellation: statusCts.Token); + Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); + Assert.Equal(expectedGuid, result.ReadOutputAs()); + } + + private static async Task WaitForGuidStatusAsync( + DaprWorkflowClient client, + string instanceId, + int phase, + CancellationToken cancellationToken) + { + while (true) + { + cancellationToken.ThrowIfCancellationRequested(); + var state = await client.GetWorkflowStateAsync(instanceId, getInputsAndOutputs: true, cancellation: cancellationToken); + var status = state?.ReadCustomStatusAs(); + if (status is not null && status.Phase == phase && !string.IsNullOrWhiteSpace(status.Guid)) + return status; + + await Task.Delay(TimeSpan.FromMilliseconds(200), cancellationToken); + } + } + private sealed partial class LoggingWorkflow : Workflow { public override async Task RunAsync(WorkflowContext context, string input) @@ -147,6 +215,28 @@ public override Task RunAsync(WorkflowContext context, object? input) } } + private sealed class ReplayGuidWorkflow : Workflow + { + public const string Event1Name = "ReplayGuidStep1"; + public const string Event2Name = "ReplayGuidStep2"; + + public override async Task RunAsync(WorkflowContext context, object? input) + { + var guid = context.NewGuid().ToString(); + context.SetCustomStatus(new ReplayGuidStatus(0, guid)); + + await context.WaitForExternalEventAsync(Event1Name); + context.SetCustomStatus(new ReplayGuidStatus(1, guid)); + + await context.WaitForExternalEventAsync(Event2Name); + context.SetCustomStatus(new ReplayGuidStatus(2, guid)); + + return guid; + } + } + + private sealed record ReplayGuidStatus(int Phase, string Guid); + private sealed class SimpleActivity : WorkflowActivity { public override Task RunAsync(WorkflowActivityContext context, string input) diff --git a/test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs b/test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs index 3764a2440..66ad2ba32 100644 --- a/test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs +++ b/test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2025 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -957,6 +957,45 @@ public void NewGuid_ShouldBeDeterministic_ForSameInstanceIdAndTimestamp() Assert.Equal(g1, g2); } + [Fact] + public void NewGuid_ShouldVary_ForDifferentExecutionIds() + { + var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); + var tracker = new WorkflowVersionTracker([]); + + var c1 = new WorkflowOrchestrationContext("wf", "same-instance", + new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc), + serializer, NullLoggerFactory.Instance, tracker, executionId: "exec-1"); + + var c2 = new WorkflowOrchestrationContext("wf", "same-instance", + new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc), + serializer, NullLoggerFactory.Instance, tracker, executionId: "exec-2"); + + var g1 = c1.NewGuid(); + var g2 = c2.NewGuid(); + + Assert.NotEqual(g1, g2); + } + + [Fact] + public void NewGuid_ShouldBeDeterministic_ForNonGuidInstanceId() + { + var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); + var now = new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc); + var tracker = new WorkflowVersionTracker([]); + + var c1 = new WorkflowOrchestrationContext("wf", "order-123", + now, serializer, NullLoggerFactory.Instance, tracker); + + var c2 = new WorkflowOrchestrationContext("wf", "order-123", + now, serializer, NullLoggerFactory.Instance, tracker); + + var g1 = c1.NewGuid(); + var g2 = c2.NewGuid(); + + Assert.Equal(g1, g2); + } + [Fact] public async Task CallActivityAsync_ShouldThrowArgumentException_WhenNameIsNullOrWhitespace() {