Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ internal static ValueTask<Workflow<List<ChatMessage>>> GetWorkflowAsync(IChatCli

// Build the workflow by adding executors and connecting them
return new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, targets: [frenchAgent, englishAgent])
.AddFanInEdge(aggregationExecutor, sources: [frenchAgent, englishAgent])
.AddFanOutEdge(startExecutor, null, targets: [frenchAgent, englishAgent])
.AddFanInEdge(aggregationExecutor, null, sources: [frenchAgent, englishAgent])
.WithOutputFrom(aggregationExecutor)
.BuildAsync<List<ChatMessage>>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ private static async Task Main()

// Build the workflow by adding executors and connecting them
var workflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, targets: [physicist, chemist])
.AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
.AddFanOutEdge(startExecutor, "fan-out", targets: [physicist, chemist])
.AddFanInEdge(aggregationExecutor, "fan-in", sources: [physicist, chemist])
.WithOutputFrom(aggregationExecutor)
.Build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ public static Workflow BuildWorkflow()

// Step 4: Build the concurrent workflow with fan-out/fan-in pattern
return new WorkflowBuilder(splitter)
.AddFanOutEdge(splitter, targets: [.. mappers]) // Split -> many mappers
.AddFanInEdge(shuffler, sources: [.. mappers]) // All mappers -> shuffle
.AddFanOutEdge(shuffler, targets: [.. reducers]) // Shuffle -> many reducers
.AddFanInEdge(completion, sources: [.. reducers]) // All reducers -> completion
.AddFanOutEdge(splitter, "Splitting", targets: [.. mappers]) // Split -> many mappers
.AddFanInEdge(shuffler, "fan-in", sources: [.. mappers]) // All mappers -> shuffle
.AddFanOutEdge(shuffler, "Shuffling", targets: [.. reducers]) // Shuffle -> many reducers
.AddFanInEdge(completion, "fan-in", sources: [.. reducers]) // All reducers -> completion
.WithOutputFrom(completion)
.Build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ private static async Task Main()

// Build the workflow by connecting executors sequentially
var workflow = new WorkflowBuilder(fileRead)
.AddFanOutEdge(fileRead, targets: [wordCount, paragraphCount])
.AddFanInEdge(aggregate, sources: [wordCount, paragraphCount])
.AddFanOutEdge(fileRead, null, targets: [wordCount, paragraphCount])
.AddFanInEdge(aggregate, null, sources: [wordCount, paragraphCount])
.WithOutputFrom(aggregate)
.Build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ private static async Task Main()

// Build the workflow by connecting executors sequentially
WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);
builder.AddEdge(uppercase, reverse, false, "custom label").WithOutputFrom(reverse);
var workflow = builder.Build();

// Execute the workflow with input data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private static Workflow BuildConcurrentCore(
// provenance tracking exposed in the workflow context passed to a handler.
ExecutorIsh[] agentExecutors = (from agent in agents select (ExecutorIsh)new AgentRunStreamingExecutor(agent, includeInputInOutput: false)).ToArray();
ExecutorIsh[] accumulators = [.. from agent in agentExecutors select (ExecutorIsh)new CollectChatMessagesExecutor($"Batcher/{agent.Id}")];
builder.AddFanOutEdge(start, targets: agentExecutors);
builder.AddFanOutEdge(start, null, targets: agentExecutors);
for (int i = 0; i < agentExecutors.Length; i++)
{
builder.AddEdge(agentExecutors[i], accumulators[i]);
Expand Down
8 changes: 7 additions & 1 deletion dotnet/src/Microsoft.Agents.AI.Workflows/DirectEdgeData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ namespace Microsoft.Agents.AI.Workflows;
/// </summary>
public sealed class DirectEdgeData : EdgeData
{
internal DirectEdgeData(string sourceId, string sinkId, EdgeId id, PredicateT? condition = null) : base(id)
internal DirectEdgeData(string sourceId, string sinkId, EdgeId id, PredicateT? condition = null, string? label = null) : base(id)
{
this.SourceId = sourceId;
this.SinkId = sinkId;
this.Condition = condition;
this.Label = label;
this.Connection = new([sourceId], [sinkId]);
}

Expand All @@ -35,6 +36,11 @@ internal DirectEdgeData(string sourceId, string sinkId, EdgeId id, PredicateT? c
/// </summary>
public PredicateT? Condition { get; }

/// <summary>
/// An optional label for the edge, allowing for arbitrary metadata to be associated with it.
/// </summary>
public string? Label { get; }

/// <inheritdoc />
internal override EdgeConnection Connection { get; }
}
8 changes: 7 additions & 1 deletion dotnet/src/Microsoft.Agents.AI.Workflows/FanInEdgeData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ namespace Microsoft.Agents.AI.Workflows;
/// </summary>
internal sealed class FanInEdgeData : EdgeData
{
internal FanInEdgeData(List<string> sourceIds, string sinkId, EdgeId id) : base(id)
internal FanInEdgeData(List<string> sourceIds, string sinkId, EdgeId id, string? label) : base(id)
{
this.SourceIds = sourceIds;
this.SinkId = sinkId;
this.Connection = new(sourceIds, [sinkId]);
this.Label = label;
}

/// <summary>
Expand All @@ -27,6 +28,11 @@ internal FanInEdgeData(List<string> sourceIds, string sinkId, EdgeId id) : base(
/// </summary>
public string SinkId { get; }

/// <summary>
/// Optional label for the edge.
/// </summary>
public string? Label { get; init; }

/// <inheritdoc />
internal override EdgeConnection Connection { get; }
}
8 changes: 7 additions & 1 deletion dotnet/src/Microsoft.Agents.AI.Workflows/FanOutEdgeData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ namespace Microsoft.Agents.AI.Workflows;
/// </summary>
internal sealed class FanOutEdgeData : EdgeData
{
internal FanOutEdgeData(string sourceId, List<string> sinkIds, EdgeId edgeId, AssignerF? assigner = null) : base(edgeId)
internal FanOutEdgeData(string sourceId, List<string> sinkIds, EdgeId edgeId, AssignerF? assigner = null, string? label = null) : base(edgeId)
{
this.SourceId = sourceId;
this.SinkIds = sinkIds;
this.EdgeAssigner = assigner;
this.Connection = new([sourceId], sinkIds);
this.Label = label;
}

/// <summary>
Expand All @@ -37,6 +38,11 @@ internal FanOutEdgeData(string sourceId, List<string> sinkIds, EdgeId edgeId, As
/// </summary>
public AssignerF? EdgeAssigner { get; }

/// <summary>
/// An optional label for the edge.
/// </summary>
public string? Label { get; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Minor] This should probably be on EdgeData, rather than the specific EdgeDatas.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I move it? I was unsure if you wanted to keep EdgeData pure, but I agree to the suggestion. happy to move this to EdgeData...
Do I proceed? :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's unify them


/// <inheritdoc />
internal override EdgeConnection Connection { get; }
}
2 changes: 1 addition & 1 deletion dotnet/src/Microsoft.Agents.AI.Workflows/SwitchBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ internal WorkflowBuilder ReduceToFanOut(WorkflowBuilder builder, ExecutorIsh sou
List<(Func<object?, bool> Predicate, HashSet<int> OutgoingIndicies)> caseMap = this._caseMap;
HashSet<int> defaultIndicies = this._defaultIndicies;

return builder.AddFanOutEdge<object>(source, CasePartitioner, [.. this._executors]);
return builder.AddFanOutEdge<object>(source, CasePartitioner, null, [.. this._executors]);

IEnumerable<int> CasePartitioner(object? input, int targetCount)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,26 @@ private static void EmitWorkflowDigraph(Workflow workflow, List<string> lines, s
}

// Emit normal edges
foreach (var (src, target, isConditional) in ComputeNormalEdges(workflow))
foreach (var (src, target, isConditional, label) in ComputeNormalEdges(workflow))
{
var edgeAttr = isConditional ? " [style=dashed, label=\"conditional\"]" : "";
lines.Add($"{indent}\"{MapId(src)}\" -> \"{MapId(target)}\"{edgeAttr};");
// Build edge attributes
var attributes = new List<string>();

// Add style for conditional edges
if (isConditional)
{
attributes.Add("style=dashed");
}

// Add label (custom label or default "conditional" for conditional edges)
if (label != null)
{
attributes.Add($"label=\"{EscapeDotLabel(label)}\"");
}

// Combine attributes
var attrString = attributes.Count > 0 ? $" [{string.Join(", ", attributes)}]" : "";
lines.Add($"{indent}\"{MapId(src)}\" -> \"{MapId(target)}\"{attrString};");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the ids need to be put through EscapeDotXYZ()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, but I thought it would be nice in case we find text in chinese, korean or using other special characters, to scape them for compatibility.

Can remove those escape functions though in aras to simplicity, becoming responsibility of the user then.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it consistent, but I do not have a strong opinion as to which of the two.

}
}

Expand Down Expand Up @@ -175,14 +191,21 @@ string sanitize(string input)
}

// Emit normal edges
foreach (var (src, target, isConditional) in ComputeNormalEdges(workflow))
foreach (var (src, target, isConditional, label) in ComputeNormalEdges(workflow))
{
if (isConditional)
if (label != null)
{
// Regular edge with label
lines.Add($"{indent}{MapId(src)} -->|{label}| {MapId(target)};");
}
else if (isConditional)
{
lines.Add($"{indent}{MapId(src)} -. conditional .--> {MapId(target)};");
// Conditional edge with default label
lines.Add($"{indent}{MapId(src)} -->|conditional| {MapId(target)};");
}
else
{
// Regular edge without label
lines.Add($"{indent}{MapId(src)} --> {MapId(target)};");
}
}
Expand Down Expand Up @@ -214,9 +237,9 @@ string sanitize(string input)
return result;
}

private static List<(string Source, string Target, bool IsConditional)> ComputeNormalEdges(Workflow workflow)
private static List<(string Source, string Target, bool IsConditional, string? Label)> ComputeNormalEdges(Workflow workflow)
{
var edges = new List<(string, string, bool)>();
var edges = new List<(string, string, bool, string?)>();
foreach (var edgeGroup in workflow.Edges.Values.SelectMany(x => x))
{
if (edgeGroup.Kind == EdgeKind.FanIn)
Expand All @@ -229,14 +252,15 @@ string sanitize(string input)
case EdgeKind.Direct when edgeGroup.DirectEdgeData != null:
var directData = edgeGroup.DirectEdgeData;
var isConditional = directData.Condition != null;
edges.Add((directData.SourceId, directData.SinkId, isConditional));
var label = directData.Label ?? (isConditional ? "conditional" : null);
edges.Add((directData.SourceId, directData.SinkId, isConditional, label));
break;

case EdgeKind.FanOut when edgeGroup.FanOutEdgeData != null:
var fanOutData = edgeGroup.FanOutEdgeData;
foreach (var sinkId in fanOutData.SinkIds)
{
edges.Add((fanOutData.SourceId, sinkId, false));
edges.Add((fanOutData.SourceId, sinkId, false, fanOutData.Label));
}
break;
}
Expand Down Expand Up @@ -276,5 +300,11 @@ private static bool TryGetNestedWorkflow(ExecutorRegistration registration, [Not
return false;
}

// Helper method to escape special characters in DOT labels
private static string EscapeDotLabel(string label)
{
return label.Replace("\"", "\\\"").Replace("\n", "\\n");
}

#endregion
}
27 changes: 17 additions & 10 deletions dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,12 @@ private HashSet<Edge> EnsureEdgesFor(string sourceId)
/// <param name="target">The executor that acts as the target node of the edge. Cannot be null.</param>
/// <param name="idempotent">If set to <see langword="true"/>, adding the same edge multiple times will be a NoOp,
/// rather than an error.</param>
/// <param name="label"></param>
/// <returns>The current instance of <see cref="WorkflowBuilder"/>.</returns>
/// <exception cref="InvalidOperationException">Thrown if an unconditional edge between the specified source and target
/// executors already exists.</exception>
public WorkflowBuilder AddEdge(ExecutorIsh source, ExecutorIsh target, bool idempotent = false)
=> this.AddEdge<object>(source, target, null, idempotent);
public WorkflowBuilder AddEdge(ExecutorIsh source, ExecutorIsh target, bool idempotent = false, string? label = null)
=> this.AddEdge<object>(source, target, null, idempotent, label);

internal static Func<object?, bool>? CreateConditionFunc<T>(Func<T?, bool>? condition)
{
Expand Down Expand Up @@ -234,10 +235,11 @@ public WorkflowBuilder AddEdge(ExecutorIsh source, ExecutorIsh target, bool idem
/// <param name="idempotent">If set to <see langword="true"/>, adding the same edge multiple times will be a NoOp,
/// rather than an error.</param>
/// If null, the edge is always activated when the source sends a message.</param>
/// <param name="label"></param>
/// <returns>The current instance of <see cref="WorkflowBuilder"/>.</returns>
/// <exception cref="InvalidOperationException">Thrown if an unconditional edge between the specified source and target
/// executors already exists.</exception>
public WorkflowBuilder AddEdge<T>(ExecutorIsh source, ExecutorIsh target, Func<T?, bool>? condition = null, bool idempotent = false)
public WorkflowBuilder AddEdge<T>(ExecutorIsh source, ExecutorIsh target, Func<T?, bool>? condition = null, bool idempotent = false, string? label = null)
{
// Add an edge from source to target with an optional condition.
// This is a low-level builder method that does not enforce any specific executor type.
Expand All @@ -258,7 +260,7 @@ public WorkflowBuilder AddEdge<T>(ExecutorIsh source, ExecutorIsh target, Func<T
"You cannot add another edge without a condition for the same source and target.");
}

DirectEdgeData directEdge = new(this.Track(source).Id, this.Track(target).Id, this.TakeEdgeId(), CreateConditionFunc(condition));
DirectEdgeData directEdge = new(this.Track(source).Id, this.Track(target).Id, this.TakeEdgeId(), CreateConditionFunc(condition), label);

this.EnsureEdgesFor(source.Id).Add(new(directEdge));

Expand All @@ -272,10 +274,11 @@ public WorkflowBuilder AddEdge<T>(ExecutorIsh source, ExecutorIsh target, Func<T
/// <remarks>If a partitioner function is provided, it will be used to distribute input across the target
/// executors. The order of targets determines their mapping in the partitioning process.</remarks>
/// <param name="source">The source executor from which the fan-out edge originates. Cannot be null.</param>
/// <param name="label"></param>
/// <param name="targets">One or more target executors that will receive the fan-out edge. Cannot be null or empty.</param>
/// <returns>The current instance of <see cref="WorkflowBuilder"/>.</returns>
public WorkflowBuilder AddFanOutEdge(ExecutorIsh source, params IEnumerable<ExecutorIsh> targets)
=> this.AddFanOutEdge<object>(source, null, targets);
public WorkflowBuilder AddFanOutEdge(ExecutorIsh source, string? label, params IEnumerable<ExecutorIsh> targets)
=> this.AddFanOutEdge<object>(source, null, label, targets);

internal static Func<object?, int, IEnumerable<int>>? CreateEdgeAssignerFunc<T>(Func<T?, int, IEnumerable<int>>? partitioner)
{
Expand Down Expand Up @@ -304,9 +307,10 @@ public WorkflowBuilder AddFanOutEdge(ExecutorIsh source, params IEnumerable<Exec
/// <param name="source">The source executor from which the fan-out edge originates. Cannot be null.</param>
/// <param name="partitioner">An optional function that determines how input is partitioned among the target executors.
/// If null, messages will route to all targets.</param>
/// <param name="label"></param>
/// <param name="targets">One or more target executors that will receive the fan-out edge. Cannot be null or empty.</param>
/// <returns>The current instance of <see cref="WorkflowBuilder"/>.</returns>
public WorkflowBuilder AddFanOutEdge<T>(ExecutorIsh source, Func<T?, int, IEnumerable<int>>? partitioner = null, params IEnumerable<ExecutorIsh> targets)
public WorkflowBuilder AddFanOutEdge<T>(ExecutorIsh source, Func<T?, int, IEnumerable<int>>? partitioner = null, string? label = null, params IEnumerable<ExecutorIsh> targets)
{
Throw.IfNull(source);
Throw.IfNull(targets);
Expand All @@ -323,7 +327,8 @@ public WorkflowBuilder AddFanOutEdge<T>(ExecutorIsh source, Func<T?, int, IEnume
this.Track(source).Id,
sinkIds,
this.TakeEdgeId(),
CreateEdgeAssignerFunc(partitioner));
CreateEdgeAssignerFunc(partitioner),
label);

this.EnsureEdgesFor(source.Id).Add(new(fanOutEdge));

Expand All @@ -338,9 +343,10 @@ public WorkflowBuilder AddFanOutEdge<T>(ExecutorIsh source, Func<T?, int, IEnume
/// based on the completion or state of multiple sources. The trigger parameter can be used to customize activation
/// behavior.</remarks>
/// <param name="target">The target executor that receives input from the specified source executors. Cannot be null.</param>
/// <param name="label"></param>
/// <param name="sources">One or more source executors that provide input to the target. Cannot be null or empty.</param>
/// <returns>The current instance of <see cref="WorkflowBuilder"/>.</returns>
public WorkflowBuilder AddFanInEdge(ExecutorIsh target, params IEnumerable<ExecutorIsh> sources)
public WorkflowBuilder AddFanInEdge(ExecutorIsh target, string? label = null, params IEnumerable<ExecutorIsh> sources)
{
Throw.IfNull(target);
Throw.IfNull(sources);
Expand All @@ -356,7 +362,8 @@ public WorkflowBuilder AddFanInEdge(ExecutorIsh target, params IEnumerable<Execu
FanInEdgeData edgeData = new(
sourceIds,
this.Track(target).Id,
this.TakeEdgeId());
this.TakeEdgeId(),
label);

foreach (string sourceId in edgeData.SourceIds)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public async Task Test_EdgeMap_MaintainsFanInEdgeStateAsync()

Dictionary<string, HashSet<Edge>> workflowEdges = [];

FanInEdgeData edgeData = new(["executor1", "executor2"], "executor3", new EdgeId(0));
FanInEdgeData edgeData = new(["executor1", "executor2"], "executor3", new EdgeId(0), null);
Edge fanInEdge = new(edgeData);

workflowEdges["executor1"] = [fanInEdge];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public async Task Test_FanInEdgeRunnerAsync()
runContext.Executors["executor2"] = new ForwardMessageExecutor<string>("executor2");
runContext.Executors["executor3"] = new ForwardMessageExecutor<string>("executor3");

FanInEdgeData edgeData = new(["executor1", "executor2"], "executor3", new EdgeId(0));
FanInEdgeData edgeData = new(["executor1", "executor2"], "executor3", new EdgeId(0), "custom label");
FanInEdgeRunner runner = new(runContext, edgeData);

// Step 1: Send message from executor1, should not forward yet.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public async Task InProcessRun_StateShouldError_TwoExecutorsAsync()

Workflow workflow =
new WorkflowBuilder(forward)
.AddFanOutEdge(forward, targets: [testExecutor, testExecutor2])
.AddFanOutEdge(forward, null, targets: [testExecutor, testExecutor2])
.Build();

Run runWithFailure = await InProcessExecution.RunAsync(workflow, new TurnToken());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void Test_FanOutEdgeInfo_JsonRoundtrip()
RunJsonRoundtrip(TestFanOutEdgeInfo_Assigner, predicate: TestFanOutEdgeInfo_Assigner.CreateValidator());
}

private static FanInEdgeData TestFanInEdgeData => new(["SourceExecutor1", "SourceExecutor2"], "TargetExecutor", TakeEdgeId());
private static FanInEdgeData TestFanInEdgeData => new(["SourceExecutor1", "SourceExecutor2"], "TargetExecutor", TakeEdgeId(), null);
private static FanInEdgeInfo TestFanInEdgeInfo => new(TestFanInEdgeData);

[Fact]
Expand Down
Loading