Skip to content
Draft
4 changes: 4 additions & 0 deletions src/DurableTask.Core/Command/OrchestratorAction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,9 @@ internal abstract class OrchestratorAction
public int Id { get; set; }

public abstract OrchestratorActionType OrchestratorActionType { get; }

public string APIName { get; set; }

public int ActionId { get; set; }
}
}
21 changes: 21 additions & 0 deletions src/DurableTask.Core/History/HistoryEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,27 @@ protected HistoryEvent(int eventId)
[DataMember]
public virtual EventType EventType { get; private set; }

/// <summary>
/// The user-facing API name that this event is associated with.
/// For example: CallActivityWithRetry
/// </summary>
[DataMember]
public string APIName { get; set; } = "no_api_provided";

/// <summary>
/// The Action/Task ID that this event is associated with.
/// Starts at 0 and increases by 1.
/// </summary>
[DataMember]
public int ActionId { get; set; } = -1;

/// <summary>
/// Only application to OrchestrationCompleted events.
/// Stores the OOProc actions payload, to be retrieved later.
/// </summary>
[DataMember]
public string ActionString { get; set; } = "no_actions_provided";

/// <summary>
/// Implementation for <see cref="IExtensibleDataObject.ExtensionData"/>.
/// </summary>
Expand Down
47 changes: 42 additions & 5 deletions src/DurableTask.Core/OrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ namespace DurableTask.Core
/// </summary>
public abstract class OrchestrationContext
{

/// <summary>
/// TBD.
/// </summary>
public string Actions = "";

/// <summary>
/// Thread-static variable used to signal whether the calling thread is the orchestrator thread.
/// The primary use case is for detecting illegal async usage in orchestration code.
Expand Down Expand Up @@ -155,14 +161,24 @@ public virtual Task<T> ScheduleWithRetry<T>(Type taskActivityType, RetryOptions
/// <param name="name">Name of the orchestration as specified by the ObjectCreator</param>
/// <param name="version">Name of the orchestration as specified by the ObjectCreator</param>
/// <param name="retryOptions">Retry policy</param>
/// <param name="apiName"></param>
/// <param name="actionId"></param>
/// <param name="parameters">Parameters for the TaskActivity.Execute method</param>
/// <returns>Task that represents the execution of the specified TaskActivity</returns>
public virtual Task<T> ScheduleWithRetry<T>(string name, string version, RetryOptions retryOptions,
params object[] parameters)
public virtual Task<T> ScheduleWithRetry<T>(string name, string version,
string apiName, int actionId, RetryOptions retryOptions, params object[] parameters)
{
Task<T> RetryCall() => ScheduleTask<T>(name, version, parameters);
Task<T> RetryCall() => ScheduleTask<T>(name, version, apiName, actionId, parameters);
var retryInterceptor = new RetryInterceptor<T>(this, retryOptions, RetryCall);
return retryInterceptor.Invoke();
return retryInterceptor.Invoke(apiName, actionId);
}

#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
public virtual Task<T> ScheduleWithRetry<T>(string name, string version, RetryOptions retryOptions,
#pragma warning restore CS1591 // Missing XML comment for publicly visible type or member
params object[] parameters)
{
return ScheduleWithRetry<T>(name, version, "", -1, retryOptions, parameters);
}

/// <summary>
Expand Down Expand Up @@ -252,7 +268,23 @@ public virtual Task<TResult> ScheduleTask<TResult>(Type activityType, params obj
/// <param name="version">Name of the orchestration as specified by the ObjectCreator</param>
/// <param name="parameters">Parameters for the TaskActivity.Execute method</param>
/// <returns>Task that represents the execution of the specified TaskActivity</returns>
public abstract Task<TResult> ScheduleTask<TResult>(string name, string version, params object[] parameters);
public virtual Task<TResult> ScheduleTask<TResult>(string name, string version, params object[] parameters)
{
return ScheduleTask<TResult>(name, version, "", -1, parameters);
}


/// <summary>
/// Schedule a TaskActivity by name and version.
/// </summary>
/// <typeparam name="TResult">Return Type of the TaskActivity.Execute method</typeparam>
/// <param name="name">Name of the orchestration as specified by the ObjectCreator</param>
/// <param name="version">Name of the orchestration as specified by the ObjectCreator</param>
/// <param name="apiName"></param>
/// <param name="actionId"></param>
/// <param name="parameters">Parameters for the TaskActivity.Execute method</param>
/// <returns>Task that represents the execution of the specified TaskActivity</returns>
public abstract Task<TResult> ScheduleTask<TResult>(string name, string version, string apiName, int actionId, params object[] parameters);

/// <summary>
/// Create a timer that will fire at the specified time and hand back the specified state.
Expand All @@ -273,6 +305,10 @@ public virtual Task<TResult> ScheduleTask<TResult>(Type activityType, params obj
/// <returns>Task that represents the async wait on the timer</returns>
public abstract Task<T> CreateTimer<T>(DateTime fireAt, T state, CancellationToken cancelToken);

#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
public abstract Task<T> CreateTimer<T>(DateTime fireAt, T state, CancellationToken cancelToken, string apiName, int apiID);
#pragma warning restore CS1591 // Missing XML comment for publicly visible type or member

/// <summary>
/// Create a sub-orchestration of the specified type.
/// </summary>
Expand Down Expand Up @@ -371,5 +407,6 @@ public abstract Task<T> CreateSubOrchestrationInstance<T>(string name, string ve
/// the first execution of this orchestration instance.
/// </param>
public abstract void ContinueAsNew(string newVersion, object input);

}
}
9 changes: 8 additions & 1 deletion src/DurableTask.Core/OrchestrationInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,19 @@ public class OrchestrationInstance : IExtensibleDataObject
[DataMember]
public string ExecutionId { get; set; }

/// <summary>
/// TBD.
/// </summary>
[DataMember]
public string Actions { get; set; }

internal OrchestrationInstance Clone()
{
return new OrchestrationInstance
{
ExecutionId = ExecutionId,
InstanceId = InstanceId
InstanceId = InstanceId,
Actions = Actions
};
}

Expand Down
5 changes: 3 additions & 2 deletions src/DurableTask.Core/RetryInterceptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace DurableTask.Core
using System;
using System.Diagnostics;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core.Common;
using DurableTask.Core.Tracing;
Expand Down Expand Up @@ -48,7 +49,7 @@ public RetryInterceptor(OrchestrationContext context, RetryOptions retryOptions,
/// </summary>
/// <returns>The return value of the supplied retry call</returns>
/// <exception cref="Exception">The final exception encountered if the call did not succeed</exception>
public async Task<T> Invoke()
public async Task<T> Invoke(string apiName = "", int actID = -1)
{
Exception lastException = null;
DateTime firstAttempt = this.context.CurrentUtcDateTime;
Expand All @@ -71,7 +72,7 @@ public async Task<T> Invoke()
}

DateTime retryAt = this.context.CurrentUtcDateTime.Add(nextDelay);
await this.context.CreateTimer(retryAt, "Retry Attempt " + retryCount + 1);
await this.context.CreateTimer(retryAt, "Retry Attempt " + retryCount + 1, CancellationToken.None, apiName, actID);
}

ExceptionDispatchInfo.Capture(lastException).Throw();
Expand Down
6 changes: 6 additions & 0 deletions src/DurableTask.Core/TaskActivityDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,16 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>
{
string output = await taskActivity.RunAsync(context, scheduledEvent.Input);
eventToRespond = new TaskCompletedEvent(-1, scheduledEvent.EventId, output);
eventToRespond.ActionId = scheduledEvent.ActionId;
eventToRespond.APIName = scheduledEvent.APIName;
}
catch (TaskFailureException e)
{
TraceHelper.TraceExceptionInstance(TraceEventType.Error, "TaskActivityDispatcher-ProcessTaskFailure", taskMessage.OrchestrationInstance, e);
string details = this.IncludeDetails ? e.Details : null;
eventToRespond = new TaskFailedEvent(-1, scheduledEvent.EventId, e.Message, details);
eventToRespond.ActionId = scheduledEvent.ActionId;
eventToRespond.APIName = scheduledEvent.APIName;
this.logHelper.TaskActivityFailure(orchestrationInstance, scheduledEvent.Name, (TaskFailedEvent)eventToRespond, e);
CorrelationTraceClient.Propagate(() => CorrelationTraceClient.TrackException(e));
}
Expand All @@ -178,6 +182,8 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>
? $"Unhandled exception while executing task: {e}\n\t{e.StackTrace}"
: null;
eventToRespond = new TaskFailedEvent(-1, scheduledEvent.EventId, e.Message, details);
eventToRespond.ActionId = scheduledEvent.ActionId;
eventToRespond.APIName = scheduledEvent.APIName;
this.logHelper.TaskActivityFailure(orchestrationInstance, scheduledEvent.Name, (TaskFailedEvent)eventToRespond, e);
}

Expand Down
26 changes: 18 additions & 8 deletions src/DurableTask.Core/TaskOrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ internal class TaskOrchestrationContext : OrchestrationContext
{
private readonly IDictionary<int, OpenTaskInfo> openTasks;
private readonly IDictionary<int, OrchestratorAction> orchestratorActionsMap;

private OrchestrationCompleteOrchestratorAction continueAsNew;
private bool executionCompletedOrTerminated;
private int idCounter;
Expand Down Expand Up @@ -64,24 +65,22 @@ internal void ClearPendingActions()
continueAsNew = null;
}

public override async Task<TResult> ScheduleTask<TResult>(string name, string version,
params object[] parameters)
public override async Task<TResult> ScheduleTask<TResult>(string name, string version, string apiName, int actionId = 0, params object[] parameters)
{
TResult result = await ScheduleTaskToWorker<TResult>(name, version, null, parameters);

TResult result = await ScheduleTaskToWorker<TResult>(name, version, null, apiName, actionId, parameters);
return result;
}

public async Task<TResult> ScheduleTaskToWorker<TResult>(string name, string version, string taskList,
params object[] parameters)
string apiName, int actionId, params object[] parameters)
{
object result = await ScheduleTaskInternal(name, version, taskList, typeof(TResult), parameters);
object result = await ScheduleTaskInternal(name, version, taskList, typeof(TResult), apiName, actionId, parameters);

return (TResult)result;
}

public async Task<object> ScheduleTaskInternal(string name, string version, string taskList, Type resultType,
params object[] parameters)
string apiName, int actionId, params object[] parameters)
{
int id = this.idCounter++;
string serializedInput = this.MessageDataConverter.Serialize(parameters);
Expand All @@ -92,6 +91,8 @@ public async Task<object> ScheduleTaskInternal(string name, string version, stri
Version = version,
Tasklist = taskList,
Input = serializedInput,
APIName = apiName,
ActionId = actionId,
};

this.orchestratorActionsMap.Add(id, scheduleTaskTaskAction);
Expand Down Expand Up @@ -223,13 +224,20 @@ public override Task<T> CreateTimer<T>(DateTime fireAt, T state)
return CreateTimer(fireAt, state, CancellationToken.None);
}

public override async Task<T> CreateTimer<T>(DateTime fireAt, T state, CancellationToken cancelToken)
public override Task<T> CreateTimer<T>(DateTime fireAt, T state, CancellationToken cancelToken)
{
return CreateTimer(fireAt, state, CancellationToken.None, "TBD", -1);
}

public override async Task<T> CreateTimer<T>(DateTime fireAt, T state, CancellationToken cancelToken, string apiName, int actionId)
{
int id = this.idCounter++;
var createTimerOrchestratorAction = new CreateTimerOrchestratorAction
{
Id = id,
FireAt = fireAt,
APIName = apiName,
ActionId = actionId,
};

this.orchestratorActionsMap.Add(id, createTimerOrchestratorAction);
Expand Down Expand Up @@ -285,6 +293,7 @@ public void HandleTaskScheduledEvent(TaskScheduledEvent scheduledEvent)
}

this.orchestratorActionsMap.Remove(taskId);

}

public void HandleTimerCreatedEvent(TimerCreatedEvent timerCreatedEvent)
Expand Down Expand Up @@ -405,6 +414,7 @@ public void HandleTaskCompletedEvent(TaskCompletedEvent completedEvent)
{
LogDuplicateEvent("TaskCompleted", completedEvent, taskId);
}

}

public void HandleTaskFailedEvent(TaskFailedEvent failedEvent)
Expand Down
24 changes: 18 additions & 6 deletions src/DurableTask.Core/TaskOrchestrationDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,9 @@ protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
// finish up processing of the work item
if (!continuedAsNew && runtimeState.Events.Last().EventType != EventType.OrchestratorCompleted)
{
runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
var orchCompletedEvent = new OrchestratorCompletedEvent(-1);
orchCompletedEvent.ActionString = runtimeState.OrchestrationInstance.Actions;
runtimeState.AddEvent(orchCompletedEvent);
}

if (isCompleted)
Expand Down Expand Up @@ -512,7 +514,9 @@ protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
}
}

runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
var orchCompletedEvent = new OrchestratorCompletedEvent(-1);
orchCompletedEvent.ActionString = runtimeState.OrchestrationInstance.Actions;
runtimeState.AddEvent(orchCompletedEvent);
workItem.OrchestrationRuntimeState = runtimeState;

TaskOrchestration newOrchestration = this.objectManager.GetObject(
Expand Down Expand Up @@ -796,7 +800,9 @@ TaskMessage ProcessScheduleTaskDecision(
{
Name = scheduleTaskOrchestratorAction.Name,
Version = scheduleTaskOrchestratorAction.Version,
Input = scheduleTaskOrchestratorAction.Input
Input = scheduleTaskOrchestratorAction.Input,
APIName = scheduleTaskOrchestratorAction.APIName,
ActionId = scheduleTaskOrchestratorAction.ActionId,
};

taskMessage.Event = scheduledEvent;
Expand All @@ -807,7 +813,9 @@ TaskMessage ProcessScheduleTaskDecision(
scheduledEvent = new TaskScheduledEvent(scheduleTaskOrchestratorAction.Id)
{
Name = scheduleTaskOrchestratorAction.Name,
Version = scheduleTaskOrchestratorAction.Version
Version = scheduleTaskOrchestratorAction.Version,
APIName = scheduleTaskOrchestratorAction.APIName,
ActionId = scheduleTaskOrchestratorAction.ActionId,
};
}

Expand All @@ -828,15 +836,19 @@ TaskMessage ProcessCreateTimerDecision(

var timerCreatedEvent = new TimerCreatedEvent(createTimerOrchestratorAction.Id)
{
FireAt = createTimerOrchestratorAction.FireAt
FireAt = createTimerOrchestratorAction.FireAt,
APIName = createTimerOrchestratorAction.APIName,
ActionId = createTimerOrchestratorAction.ActionId,
};

runtimeState.AddEvent(timerCreatedEvent);

taskMessage.Event = new TimerFiredEvent(-1)
{
TimerId = createTimerOrchestratorAction.Id,
FireAt = createTimerOrchestratorAction.FireAt
FireAt = createTimerOrchestratorAction.FireAt,
APIName = createTimerOrchestratorAction.APIName,
ActionId = createTimerOrchestratorAction.ActionId,
};

this.logHelper.CreatingTimer(
Expand Down