Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/Dapr.Workflow/Logging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ internal static partial class Logging
[LoggerMessage(LogLevel.Error, "Workflow '{WorkflowName}' not found in registry")]
public static partial void LogWorkerWorkflowHandleOrchestratorRequestNotInRegistry(this ILogger logger, string workflowName);

[LoggerMessage(LogLevel.Error, "Workflow '{WorkflowName}' failed to activate")]
public static partial void LogWorkerWorkflowHandleOrchestratorRequestActivationFailed(this ILogger logger, Exception ex, string workflowName);

[LoggerMessage(LogLevel.Information, "Workflow execution completed: Name='{WorkflowName}', InstanceId='{InstanceId}'")]
public static partial void LogWorkerWorkflowHandleOrchestratorRequestCompleted(this ILogger logger, string workflowName, string instanceId);

Expand All @@ -37,6 +40,9 @@ internal static partial class Logging
[LoggerMessage(LogLevel.Error, "Activity '{ActivityName}' not found in registry")]
public static partial void LogWorkerWorkflowHandleActivityRequestNotInRegistry(this ILogger logger, string activityName);

[LoggerMessage(LogLevel.Error, "Activity '{ActivityName}' failed to activate")]
public static partial void LogWorkerWorkflowHandleActivityRequestActivationFailed(this ILogger logger, Exception ex, string activityName);

[LoggerMessage(LogLevel.Debug, "Activity execution completed: Name='{ActivityName}', TaskId='{TasKId}'")]
public static partial void LogWorkerWorkflowHandleActivityRequestCompleted(this ILogger logger, string activityName, int taskId);

Expand Down
17 changes: 13 additions & 4 deletions src/Dapr.Workflow/Worker/IWorkflowsFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,26 @@ public void RegisterActivity<TInput, TOutput>(string name,
/// </summary>
/// <param name="identifier">The identifier of the workflow.</param>
/// <param name="serviceProvider">The service provider for dependency injection.</param>
/// <param name="workflow">The created workflow, or null if not found.</param>
/// <param name="workflow">The created workflow, or null if not found or activation failed.</param>
/// <param name="activationException">
/// The exception thrown during activation, or null if the workflow was not found in the registry.
/// This allows callers to distinguish between "not registered" and "registered but failed to activate".
/// </param>
/// <returns>True if the workflow was created; otherwise false.</returns>
bool TryCreateWorkflow(TaskIdentifier identifier, IServiceProvider serviceProvider, out IWorkflow? workflow);
bool TryCreateWorkflow(TaskIdentifier identifier, IServiceProvider serviceProvider, out IWorkflow? workflow,
out Exception? activationException);

/// <summary>
/// Tries to create an activity instance.
/// </summary>
/// <param name="identifier">The identifier of the activity.</param>
/// <param name="serviceProvider">The service provider for dependency injection.</param>
/// <param name="activity">The created activity, or null if not found.</param>
/// <param name="activity">The created activity, or null if not found or activation failed.</param>
/// <param name="activationException">
/// The exception thrown during activation, or null if the activity was not found in the registry.
/// This allows callers to distinguish between "not registered" and "registered but failed to activate".
/// </param>
/// <returns>True if the activity was created; otherwise false.</returns>
bool TryCreateActivity(TaskIdentifier identifier, IServiceProvider serviceProvider,
out IWorkflowActivity? activity);
out IWorkflowActivity? activity, out Exception? activationException);
}
50 changes: 48 additions & 2 deletions src/Dapr.Workflow/Worker/WorkflowWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,36 @@ private async Task<OrchestratorResponse> HandleOrchestratorResponseAsync(Orchest

// Try to get the workflow from the factory
var workflowIdentifier = new TaskIdentifier(workflowName);
if (!_workflowsFactory.TryCreateWorkflow(workflowIdentifier, scope.ServiceProvider, out var workflow))
if (!_workflowsFactory.TryCreateWorkflow(workflowIdentifier, scope.ServiceProvider, out var workflow, out var workflowActivationException))
{
if (workflowActivationException != null)
{
_logger.LogWorkerWorkflowHandleOrchestratorRequestActivationFailed(workflowActivationException, workflowName);

return new OrchestratorResponse
{
InstanceId = request.InstanceId,
CompletionToken = completionToken,
Actions =
{
new OrchestratorAction
{
CompleteOrchestration = new CompleteOrchestrationAction
{
OrchestrationStatus = OrchestrationStatus.Failed,
FailureDetails = new()
{
IsNonRetriable = true,
ErrorType = workflowActivationException.GetType().FullName ?? "WorkflowActivationFailed",
ErrorMessage = $"Workflow '{workflowName}' failed to activate: {workflowActivationException.Message}",
StackTrace = workflowActivationException.StackTrace ?? string.Empty
}
}
}
}
};
}

_logger.LogWorkerWorkflowHandleOrchestratorRequestNotInRegistry(workflowName);

return new OrchestratorResponse
Expand Down Expand Up @@ -489,8 +517,26 @@ private async Task<ActivityResponse> HandleActivityResponseAsync(ActivityRequest

// Try to get the activity from the factory
var activityIdentifier = new TaskIdentifier(request.Name);
if (!_workflowsFactory.TryCreateActivity(activityIdentifier, scope.ServiceProvider, out var activity))
if (!_workflowsFactory.TryCreateActivity(activityIdentifier, scope.ServiceProvider, out var activity, out var activityActivationException))
{
if (activityActivationException != null)
{
_logger.LogWorkerWorkflowHandleActivityRequestActivationFailed(activityActivationException, request.Name);

return new ActivityResponse
{
InstanceId = request.OrchestrationInstance?.InstanceId ?? string.Empty,
TaskId = request.TaskId,
CompletionToken = completionToken,
FailureDetails = new()
{
ErrorType = activityActivationException.GetType().FullName ?? "ActivityActivationFailed",
ErrorMessage = $"Activity '{request.Name}' failed to activate: {activityActivationException.Message}",
StackTrace = activityActivationException.StackTrace ?? string.Empty
}
};
}

_logger.LogWorkerWorkflowHandleActivityRequestNotInRegistry(request.Name);

return new ActivityResponse
Expand Down
12 changes: 10 additions & 2 deletions src/Dapr.Workflow/Worker/WorkflowsFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,49 +96,57 @@ public void RegisterActivity<TInput, TOutput>(string name,
}

/// <inheritdoc />
public bool TryCreateWorkflow(TaskIdentifier identifier, IServiceProvider serviceProvider, out IWorkflow? workflow)
public bool TryCreateWorkflow(TaskIdentifier identifier, IServiceProvider serviceProvider, out IWorkflow? workflow,
out Exception? activationException)
{
if (_workflowFactories.TryGetValue(identifier.Name, out var factory))
{
try
{
workflow = factory(serviceProvider);
logger.LogCreateWorkflowInstanceSuccess(identifier.Name);
activationException = null;
return true;
}
catch (Exception ex)
{
logger.LogCreateWorkflowFailure(ex, identifier.Name);
activationException = ex;
workflow = null;
return false;
}
}

logger.LogCreateWorkflowNotFoundInRegistry(identifier.Name);
activationException = null;
workflow = null;
return false;
}

/// <inheritdoc />
public bool TryCreateActivity(TaskIdentifier identifier, IServiceProvider serviceProvider, out IWorkflowActivity? activity)
public bool TryCreateActivity(TaskIdentifier identifier, IServiceProvider serviceProvider, out IWorkflowActivity? activity,
out Exception? activationException)
{
if (_activityFactories.TryGetValue(identifier.Name, out var factory))
{
try
{
activity = factory(serviceProvider);
logger.LogCreateActivityInstanceSuccess(identifier.Name);
activationException = null;
return true;
}
catch (Exception ex)
{
logger.LogCreateActivityFailure(ex, identifier.Name);
activationException = ex;
activity = null;
return false;
}
}

logger.LogCreateActivityNotFoundInRegistry(identifier.Name);
activationException = null;
activity = null;
return false;
}
Expand Down
110 changes: 106 additions & 4 deletions test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,48 @@ public async Task HandleOrchestratorResponseAsync_ShouldReturnEmptyActions_WhenW
var action = Assert.Single(response.Actions);
Assert.NotNull(action.CompleteOrchestration);
Assert.Equal(OrchestrationStatus.Failed, action.CompleteOrchestration.OrchestrationStatus);
Assert.Equal("WorkflowNotFound", action.CompleteOrchestration.FailureDetails.ErrorType);
}

[Fact]
public async Task HandleOrchestratorResponseAsync_ShouldReturnActivationFailure_WhenWorkflowActivationFails()
{
var sp = new ServiceCollection().BuildServiceProvider();
var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web));
var options = new WorkflowRuntimeOptions();

var factory = new StubWorkflowsFactory();
factory.AddWorkflowActivationError("wf", new InvalidOperationException("No service for type 'IMyService' has been registered."));

var worker = new WorkflowWorker(
CreateGrpcClientMock().Object,
factory,
NullLoggerFactory.Instance,
serializer,
sp,
options);

var request = new OrchestratorRequest
{
InstanceId = "i",
PastEvents =
{
new HistoryEvent
{
ExecutionStarted = new ExecutionStartedEvent { Name = "wf", Input = "123" }
}
}
};

var response = await InvokeHandleOrchestratorResponseAsync(worker, request);

Assert.Equal("i", response.InstanceId);
var activationAction = Assert.Single(response.Actions);
Assert.NotNull(activationAction.CompleteOrchestration);
Assert.Equal(OrchestrationStatus.Failed, activationAction.CompleteOrchestration.OrchestrationStatus);
Assert.NotEqual("WorkflowNotFound", activationAction.CompleteOrchestration.FailureDetails.ErrorType);
Assert.Contains("failed to activate", activationAction.CompleteOrchestration.FailureDetails.ErrorMessage);
Assert.Contains("IMyService", activationAction.CompleteOrchestration.FailureDetails.ErrorMessage);
}

[Fact]
Expand Down Expand Up @@ -1002,6 +1044,42 @@ public async Task HandleActivityResponseAsync_ShouldReturnNotFoundFailure_WhenAc
Assert.Contains("Activity 'act' not found", response.FailureDetails.ErrorMessage);
}

[Fact]
public async Task HandleActivityResponseAsync_ShouldReturnActivationFailure_WhenActivityActivationFails()
{
var sp = new ServiceCollection().BuildServiceProvider();
var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web));
var options = new WorkflowRuntimeOptions();

var factory = new StubWorkflowsFactory();
factory.AddActivityActivationError("act", new InvalidOperationException("No service for type 'IEmailSender' has been registered."));

var worker = new WorkflowWorker(
CreateGrpcClientMock().Object,
factory,
NullLoggerFactory.Instance,
serializer,
sp,
options);

var request = new ActivityRequest
{
Name = "act",
TaskId = 7,
OrchestrationInstance = new OrchestrationInstance { InstanceId = "i" },
Input = "1"
};

var response = await InvokeHandleActivityResponseAsync(worker, request);

Assert.Equal("i", response.InstanceId);
Assert.Equal(7, response.TaskId);
Assert.NotNull(response.FailureDetails);
Assert.NotEqual("ActivityNotFoundException", response.FailureDetails.ErrorType);
Assert.Contains("failed to activate", response.FailureDetails.ErrorMessage);
Assert.Contains("IEmailSender", response.FailureDetails.ErrorMessage);
}

[Fact]
public async Task HandleActivityResponseAsync_ShouldExecuteActivity_AndSerializeResult()
{
Expand Down Expand Up @@ -1625,20 +1703,44 @@ private sealed class StubWorkflowsFactory : IWorkflowsFactory
{
private readonly Dictionary<string, IWorkflow> _workflows = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, IWorkflowActivity> _activities = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, Exception> _workflowActivationErrors = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, Exception> _activityActivationErrors = new(StringComparer.OrdinalIgnoreCase);

public void AddWorkflow(string name, IWorkflow wf) => _workflows[name] = wf;
public void AddActivity(string name, IWorkflowActivity act) => _activities[name] = act;
public void AddWorkflowActivationError(string name, Exception ex) => _workflowActivationErrors[name] = ex;
public void AddActivityActivationError(string name, Exception ex) => _activityActivationErrors[name] = ex;

public void RegisterWorkflow<TWorkflow>(string? name = null) where TWorkflow : class, IWorkflow => throw new NotSupportedException();
public void RegisterWorkflow<TInput, TOutput>(string name, Func<WorkflowContext, TInput, Task<TOutput>> implementation) => throw new NotSupportedException();
public void RegisterActivity<TActivity>(string? name = null) where TActivity : class, IWorkflowActivity => throw new NotSupportedException();
public void RegisterActivity<TInput, TOutput>(string name, Func<WorkflowActivityContext, TInput, Task<TOutput>> implementation) => throw new NotSupportedException();

public bool TryCreateWorkflow(TaskIdentifier identifier, IServiceProvider serviceProvider, out IWorkflow? workflow)
=> _workflows.TryGetValue(identifier.Name, out workflow);
public bool TryCreateWorkflow(TaskIdentifier identifier, IServiceProvider serviceProvider, out IWorkflow? workflow,
out Exception? activationException)
{
if (_workflowActivationErrors.TryGetValue(identifier.Name, out var ex))
{
activationException = ex;
workflow = null;
return false;
}
activationException = null;
return _workflows.TryGetValue(identifier.Name, out workflow);
}

public bool TryCreateActivity(TaskIdentifier identifier, IServiceProvider serviceProvider, out IWorkflowActivity? activity)
=> _activities.TryGetValue(identifier.Name, out activity);
public bool TryCreateActivity(TaskIdentifier identifier, IServiceProvider serviceProvider, out IWorkflowActivity? activity,
out Exception? activationException)
{
if (_activityActivationErrors.TryGetValue(identifier.Name, out var ex))
{
activationException = ex;
activity = null;
return false;
}
activationException = null;
return _activities.TryGetValue(identifier.Name, out activity);
}
}

private sealed class InlineWorkflow(Type inputType, Func<WorkflowContext, object?, Task<object?>> run) : IWorkflow
Expand Down
Loading
Loading