Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ internal sealed class WorkflowOrchestrationContext : WorkflowContext
private readonly Dictionary<string, HistoryEvent> _unmatchedCompletionsByExecutionId = new(StringComparer.Ordinal);


// Parse instance ID as GUID or generate one
// Parse execution/instance ID as GUID or derive a deterministic namespace from the ID
private readonly Guid _instanceGuid;
private static readonly Guid InstanceIdNamespace = new("6f927a2e-9c7e-4a1d-9b8d-7a86f2e7f62f");

private readonly string? _appId;
private int _sequenceNumber;
Expand All @@ -73,13 +74,17 @@ internal sealed class WorkflowOrchestrationContext : WorkflowContext
private bool _turnInitialized;

public WorkflowOrchestrationContext(string name, string instanceId, DateTime currentUtcDateTime,
IWorkflowSerializer workflowSerializer, ILoggerFactory loggerFactory, WorkflowVersionTracker versionTracker, string? appId = null)
IWorkflowSerializer workflowSerializer, ILoggerFactory loggerFactory, WorkflowVersionTracker versionTracker,
string? appId = null, string? executionId = null)
{
_workflowSerializer = workflowSerializer;
_loggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<WorkflowOrchestrationContext>() ??
throw new ArgumentNullException(nameof(loggerFactory));
_instanceGuid = Guid.TryParse(instanceId, out var guid) ? guid : Guid.NewGuid();
var guidSeed = !string.IsNullOrWhiteSpace(executionId) ? executionId : instanceId;
_instanceGuid = Guid.TryParse(guidSeed, out var guid)
? guid
: CreateGuidFromName(InstanceIdNamespace, Encoding.UTF8.GetBytes(guidSeed));
Name = name;
InstanceId = instanceId;
_currentUtcDateTime = currentUtcDateTime;
Expand Down Expand Up @@ -334,9 +339,8 @@ public override void ContinueAsNew(object? newInput = null, bool preserveUnproce
/// <inheritdoc />
public override Guid NewGuid()
{
// Create deterministic Guid based on instance ID and counter
var guidCounter = _guidCounter++;
var name = $"{InstanceId}_{guidCounter}"; // Stable name
var name = $"{InstanceId}_{guidCounter}"; // Stable per execution and replay
return CreateGuidFromName(_instanceGuid, Encoding.UTF8.GetBytes(name));
}

Expand Down
2 changes: 1 addition & 1 deletion src/Dapr.Workflow/Worker/WorkflowWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ private async Task<OrchestratorResponse> HandleOrchestratorResponseAsync(Orchest

// Initialize the context with the FULL history
var context = new WorkflowOrchestrationContext(workflowName, request.InstanceId, currentUtcDateTime,
_serializer, loggerFactory, versionTracker, appId);
_serializer, loggerFactory, versionTracker, appId, request.ExecutionId);

// Deserialize the input
object? input = string.IsNullOrEmpty(serializedInput)
Expand Down
90 changes: 90 additions & 0 deletions test/Dapr.IntegrationTest.Workflow/ReplaySafetyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,74 @@ public async Task Workflow_ShouldUseDeterministicGuidGeneration()
Assert.All(guids, g => Assert.NotEqual(Guid.Empty, g));
}

[Fact]
public async Task NewGuid_ShouldRemainStableAcrossReplays()
{
var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components");
var workflowInstanceId = Guid.NewGuid().ToString();

await using var environment = await DaprTestEnvironment.CreateWithPooledNetworkAsync(needsActorState: true);
await environment.StartAsync();

var harness = new DaprHarnessBuilder(componentsDir)
.WithEnvironment(environment)
.BuildWorkflow();
await using var testApp = await DaprHarnessBuilder.ForHarness(harness)
.ConfigureServices(builder =>
{
builder.Services.AddDaprWorkflowBuilder(
configureRuntime: opt =>
{
opt.RegisterWorkflow<ReplayGuidWorkflow>();
},
configureClient: (sp, clientBuilder) =>
{
var config = sp.GetRequiredService<IConfiguration>();
var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"];
if (!string.IsNullOrEmpty(grpcEndpoint))
clientBuilder.UseGrpcEndpoint(grpcEndpoint);
});
})
.BuildAndStartAsync();

using var scope = testApp.CreateScope();
var daprWorkflowClient = scope.ServiceProvider.GetRequiredService<DaprWorkflowClient>();

await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(ReplayGuidWorkflow), workflowInstanceId);

using var statusCts = new CancellationTokenSource(TimeSpan.FromMinutes(1));
var initialStatus = await WaitForGuidStatusAsync(daprWorkflowClient, workflowInstanceId, phase: 0, statusCts.Token);
var expectedGuid = initialStatus.Guid;

await daprWorkflowClient.RaiseEventAsync(workflowInstanceId, ReplayGuidWorkflow.Event1Name, "go", statusCts.Token);
var replayStatus = await WaitForGuidStatusAsync(daprWorkflowClient, workflowInstanceId, phase: 1, statusCts.Token);
Assert.Equal(expectedGuid, replayStatus.Guid);

await daprWorkflowClient.RaiseEventAsync(workflowInstanceId, ReplayGuidWorkflow.Event2Name, "go", statusCts.Token);

var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId, cancellation: statusCts.Token);
Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus);
Assert.Equal(expectedGuid, result.ReadOutputAs<string>());
}

private static async Task<ReplayGuidStatus> WaitForGuidStatusAsync(
DaprWorkflowClient client,
string instanceId,
int phase,
CancellationToken cancellationToken)
{
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
var state = await client.GetWorkflowStateAsync(instanceId, getInputsAndOutputs: true, cancellation: cancellationToken);
var status = state?.ReadCustomStatusAs<ReplayGuidStatus>();
if (status is not null && status.Phase == phase && !string.IsNullOrWhiteSpace(status.Guid))
return status;

await Task.Delay(TimeSpan.FromMilliseconds(200), cancellationToken);
}
}

private sealed partial class LoggingWorkflow : Workflow<string, string>
{
public override async Task<string> RunAsync(WorkflowContext context, string input)
Expand Down Expand Up @@ -147,6 +215,28 @@ public override Task<Guid[]> RunAsync(WorkflowContext context, object? input)
}
}

private sealed class ReplayGuidWorkflow : Workflow<object?, string>
{
public const string Event1Name = "ReplayGuidStep1";
public const string Event2Name = "ReplayGuidStep2";

public override async Task<string> RunAsync(WorkflowContext context, object? input)
{
var guid = context.NewGuid().ToString();
context.SetCustomStatus(new ReplayGuidStatus(0, guid));

await context.WaitForExternalEventAsync<string>(Event1Name);
context.SetCustomStatus(new ReplayGuidStatus(1, guid));

await context.WaitForExternalEventAsync<string>(Event2Name);
context.SetCustomStatus(new ReplayGuidStatus(2, guid));

return guid;
}
}

private sealed record ReplayGuidStatus(int Phase, string Guid);

private sealed class SimpleActivity : WorkflowActivity<string, string>
{
public override Task<string> RunAsync(WorkflowActivityContext context, string input)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -957,6 +957,45 @@ public void NewGuid_ShouldBeDeterministic_ForSameInstanceIdAndTimestamp()
Assert.Equal(g1, g2);
}

[Fact]
public void NewGuid_ShouldVary_ForDifferentExecutionIds()
{
var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web));
var tracker = new WorkflowVersionTracker([]);

var c1 = new WorkflowOrchestrationContext("wf", "same-instance",
new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc),
serializer, NullLoggerFactory.Instance, tracker, executionId: "exec-1");

var c2 = new WorkflowOrchestrationContext("wf", "same-instance",
new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc),
serializer, NullLoggerFactory.Instance, tracker, executionId: "exec-2");

var g1 = c1.NewGuid();
var g2 = c2.NewGuid();

Assert.NotEqual(g1, g2);
}

[Fact]
public void NewGuid_ShouldBeDeterministic_ForNonGuidInstanceId()
{
var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web));
var now = new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc);
var tracker = new WorkflowVersionTracker([]);

var c1 = new WorkflowOrchestrationContext("wf", "order-123",
now, serializer, NullLoggerFactory.Instance, tracker);

var c2 = new WorkflowOrchestrationContext("wf", "order-123",
now, serializer, NullLoggerFactory.Instance, tracker);

var g1 = c1.NewGuid();
var g2 = c2.NewGuid();

Assert.Equal(g1, g2);
}

[Fact]
public async Task CallActivityAsync_ShouldThrowArgumentException_WhenNameIsNullOrWhitespace()
{
Expand Down
Loading