Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ internal sealed class LockstepRunEventStream : IRunEventStream
private int _isDisposed;

private readonly ISuperStepRunner _stepRunner;
private Activity? _sessionActivity;

public ValueTask<RunStatus> GetStatusAsync(CancellationToken cancellationToken = default) => new(this.RunStatus);

Expand All @@ -30,7 +31,12 @@ public LockstepRunEventStream(ISuperStepRunner stepRunner)

public void Start()
{
// No-op for lockstep execution
// Start the session-level activity that spans the entire lockstep execution lifetime.
// Individual run-stage activities are nested within this session activity.
this._sessionActivity = this._stepRunner.TelemetryContext.StartWorkflowSessionActivity();
this._sessionActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId)
.SetTag(Tags.SessionId, this._stepRunner.SessionId);
this._sessionActivity?.AddEvent(new ActivityEvent(EventNames.SessionStarted));
Comment thread
alliscode marked this conversation as resolved.
Outdated
}

public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPendingRequest, [EnumeratorCancellation] CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -172,6 +178,14 @@ public ValueTask DisposeAsync()
{
this._stopCancellation.Cancel();

// Stop the session activity — the session ends when the stream is disposed
if (this._sessionActivity is not null)
{
this._sessionActivity.AddEvent(new ActivityEvent(EventNames.SessionCompleted));
this._sessionActivity.Dispose();
this._sessionActivity = null;
}

this._stopCancellation.Dispose();
this._inputWaiter.Dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,18 @@ public void Start()
private async Task RunLoopAsync(CancellationToken cancellationToken)
{
using CancellationTokenSource errorSource = new();
CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(errorSource.Token, cancellationToken);
using CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(errorSource.Token, cancellationToken);

Comment thread
alliscode marked this conversation as resolved.
// Subscribe to events - they will flow directly to the channel as they're raised
this._stepRunner.OutgoingEvents.EventRaised += OnEventRaisedAsync;

using Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity();
activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.SessionId, this._stepRunner.SessionId);
// Start the session-level activity that spans the entire run loop lifetime.
// Individual run-stage activities are nested within this session activity.
Activity? sessionActivity = this._stepRunner.TelemetryContext.StartWorkflowSessionActivity();
sessionActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId)
.SetTag(Tags.SessionId, this._stepRunner.SessionId);

Activity? runActivity = null;

try
{
Expand All @@ -70,10 +75,16 @@ private async Task RunLoopAsync(CancellationToken cancellationToken)
await this._inputWaiter.WaitForInputAsync(cancellationToken: linkedSource.Token).ConfigureAwait(false);

this._runStatus = RunStatus.Running;
activity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted));
sessionActivity?.AddEvent(new ActivityEvent(EventNames.SessionStarted));

while (!linkedSource.Token.IsCancellationRequested)
{
// Start a new run-stage activity for this input→processing→halt cycle
runActivity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity();
runActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId)
.SetTag(Tags.SessionId, this._stepRunner.SessionId);
runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted));

// Run all available supersteps continuously
// Events are streamed out in real-time as they happen via the event handler
while (this._stepRunner.HasUnprocessedMessages && !linkedSource.Token.IsCancellationRequested)
Expand All @@ -93,6 +104,15 @@ private async Task RunLoopAsync(CancellationToken cancellationToken)
RunStatus capturedStatus = this._runStatus;
await this._eventChannel.Writer.WriteAsync(new InternalHaltSignal(currentEpoch, capturedStatus), linkedSource.Token).ConfigureAwait(false);

// Close the run-stage activity when processing halts.
// A new run activity will be created when the next input arrives.
if (runActivity is not null)
{
runActivity.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted));
runActivity.Dispose();
runActivity = null;
}

// Wait for next input from the consumer
// Works for both Idle (no work) and PendingRequests (waiting for responses)
await this._inputWaiter.WaitForInputAsync(TimeSpan.FromSeconds(1), linkedSource.Token).ConfigureAwait(false);
Expand All @@ -107,14 +127,26 @@ private async Task RunLoopAsync(CancellationToken cancellationToken)
}
catch (Exception ex)
{
if (activity != null)
// Record error on the run-stage activity if one is active
if (runActivity is not null)
{
runActivity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() {
{ Tags.ErrorType, ex.GetType().FullName },
{ Tags.BuildErrorMessage, ex.Message },
}));
runActivity.CaptureException(ex);
}

// Record error on the session activity
if (sessionActivity is not null)
{
activity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() {
sessionActivity.AddEvent(new ActivityEvent(EventNames.SessionError, tags: new() {
{ Tags.ErrorType, ex.GetType().FullName },
{ Tags.BuildErrorMessage, ex.Message },
}));
Comment thread
alliscode marked this conversation as resolved.
activity.CaptureException(ex);
sessionActivity.CaptureException(ex);
}

await this._eventChannel.Writer.WriteAsync(new WorkflowErrorEvent(ex), linkedSource.Token).ConfigureAwait(false);
}
finally
Expand All @@ -124,7 +156,20 @@ private async Task RunLoopAsync(CancellationToken cancellationToken)

// Mark as ended when run loop exits
this._runStatus = RunStatus.Ended;
activity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted));

// Safety net: stop the run-stage activity if not already stopped (e.g. on cancellation or error)
if (runActivity is not null)
{
runActivity.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted));
runActivity.Dispose();
}

// Stop the session activity — the session always ends when the run loop exits
if (sessionActivity is not null)
{
sessionActivity.AddEvent(new ActivityEvent(EventNames.SessionCompleted));
sessionActivity.Dispose();
}
}

async ValueTask OnEventRaisedAsync(object? sender, WorkflowEvent e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace Microsoft.Agents.AI.Workflows.Observability;
internal static class ActivityNames
{
public const string WorkflowBuild = "workflow.build";
public const string WorkflowSession = "workflow.session";
public const string WorkflowRun = "workflow_invoke";
public const string MessageSend = "message.send";
Comment thread
alliscode marked this conversation as resolved.
public const string ExecutorProcess = "executor.process";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ internal static class EventNames
public const string BuildValidationCompleted = "build.validation_completed";
public const string BuildCompleted = "build.completed";
public const string BuildError = "build.error";
public const string SessionStarted = "session.started";
public const string SessionCompleted = "session.completed";
public const string SessionError = "session.error";
public const string WorkflowStarted = "workflow.started";
public const string WorkflowCompleted = "workflow.completed";
public const string WorkflowError = "workflow.error";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,24 @@ public WorkflowTelemetryContext(WorkflowTelemetryOptions options, ActivitySource
}

/// <summary>
/// Starts a workflow run activity if enabled.
/// Starts a workflow session activity if enabled. This is a root-level span
/// that represents the entire lifetime of a workflow execution (from start
/// until stop, cancellation, or error). Individual run stages are nested within it.
Comment thread
alliscode marked this conversation as resolved.
Outdated
/// </summary>
/// <returns>An activity if workflow run telemetry is enabled, otherwise null.</returns>
public Activity? StartWorkflowSessionActivity()
{
if (!this.IsEnabled || this.Options.DisableWorkflowRun)
{
return null;
}

return this.ActivitySource.StartActivity(ActivityNames.WorkflowSession);
}

/// <summary>
/// Starts a workflow run activity if enabled. This represents a single
/// input-to-halt cycle within a workflow session.
/// </summary>
/// <returns>An activity if workflow run telemetry is enabled, otherwise null.</returns>
public Activity? StartWorkflowRunActivity()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,16 @@ public ObservabilityTests()
/// Create a sample workflow for testing.
/// </summary>
/// <remarks>
/// This workflow is expected to create 8 activities that will be captured by the tests
/// This workflow is expected to create 9 activities that will be captured by the tests
/// - ActivityNames.WorkflowBuild
/// - ActivityNames.WorkflowRun
/// -- ActivityNames.EdgeGroupProcess
/// -- ActivityNames.ExecutorProcess (UppercaseExecutor)
/// --- ActivityNames.MessageSend
/// ---- ActivityNames.EdgeGroupProcess
/// -- ActivityNames.ExecutorProcess (ReverseTextExecutor)
/// --- ActivityNames.MessageSend
/// - ActivityNames.WorkflowSession
/// -- ActivityNames.WorkflowRun
/// --- ActivityNames.EdgeGroupProcess
/// --- ActivityNames.ExecutorProcess (UppercaseExecutor)
/// ---- ActivityNames.MessageSend
/// ----- ActivityNames.EdgeGroupProcess
/// --- ActivityNames.ExecutorProcess (ReverseTextExecutor)
/// ---- ActivityNames.MessageSend
Comment thread
alliscode marked this conversation as resolved.
/// </remarks>
/// <returns>The created workflow.</returns>
private static Workflow CreateWorkflow()
Expand All @@ -74,6 +75,7 @@ private static Dictionary<string, int> GetExpectedActivityNameCounts() =>
new()
{
{ ActivityNames.WorkflowBuild, 1 },
{ ActivityNames.WorkflowSession, 1 },
{ ActivityNames.WorkflowRun, 1 },
{ ActivityNames.EdgeGroupProcess, 2 },
{ ActivityNames.ExecutorProcess, 2 },
Expand Down Expand Up @@ -113,7 +115,7 @@ private async Task TestWorkflowEndToEndActivitiesAsync(string executionEnvironme

// Assert
var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList();
capturedActivities.Should().HaveCount(8, "Exactly 8 activities should be created.");
capturedActivities.Should().HaveCount(9, "Exactly 9 activities should be created.");

// Make sure all expected activities exist and have the correct count
foreach (var kvp in GetExpectedActivityNameCounts())
Expand Down
Loading