Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ internal static ValueTask<Workflow<List<ChatMessage>>> GetWorkflowAsync(IChatCli

// Build the workflow by adding executors and connecting them
return new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, targets: [frenchAgent, englishAgent])
.AddFanInEdge(aggregationExecutor, sources: [frenchAgent, englishAgent])
.AddFanOutEdge(startExecutor, null, targets: [frenchAgent, englishAgent])
.AddFanInEdge(aggregationExecutor, null, sources: [frenchAgent, englishAgent])
.WithOutputFrom(aggregationExecutor)
.BuildAsync<List<ChatMessage>>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ private static async Task Main()

// Build the workflow by adding executors and connecting them
var workflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, targets: [physicist, chemist])
.AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
.AddFanOutEdge(startExecutor, "fan-out", targets: [physicist, chemist])
.AddFanInEdge(aggregationExecutor, "fan-in", sources: [physicist, chemist])
.WithOutputFrom(aggregationExecutor)
.Build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ public static Workflow BuildWorkflow()

// Step 4: Build the concurrent workflow with fan-out/fan-in pattern
return new WorkflowBuilder(splitter)
.AddFanOutEdge(splitter, targets: [.. mappers]) // Split -> many mappers
.AddFanInEdge(shuffler, sources: [.. mappers]) // All mappers -> shuffle
.AddFanOutEdge(shuffler, targets: [.. reducers]) // Shuffle -> many reducers
.AddFanInEdge(completion, sources: [.. reducers]) // All reducers -> completion
.AddFanOutEdge(splitter, "Splitting", targets: [.. mappers]) // Split -> many mappers
.AddFanInEdge(shuffler, "fan-in", sources: [.. mappers]) // All mappers -> shuffle
.AddFanOutEdge(shuffler, "Shuffling", targets: [.. reducers]) // Shuffle -> many reducers
.AddFanInEdge(completion, "fan-in", sources: [.. reducers]) // All reducers -> completion
.WithOutputFrom(completion)
.Build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ private static async Task Main()

// Build the workflow by connecting executors sequentially
var workflow = new WorkflowBuilder(fileRead)
.AddFanOutEdge(fileRead, targets: [wordCount, paragraphCount])
.AddFanInEdge(aggregate, sources: [wordCount, paragraphCount])
.AddFanOutEdge(fileRead, null, targets: [wordCount, paragraphCount])
.AddFanInEdge(aggregate, null, sources: [wordCount, paragraphCount])
.WithOutputFrom(aggregate)
.Build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ private static async Task Main()

// Build the workflow by connecting executors sequentially
WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);
builder.AddEdge(uppercase, reverse, false, "custom label").WithOutputFrom(reverse);
var workflow = builder.Build();

// Execute the workflow with input data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private static Workflow BuildConcurrentCore(
// provenance tracking exposed in the workflow context passed to a handler.
ExecutorIsh[] agentExecutors = (from agent in agents select (ExecutorIsh)new AgentRunStreamingExecutor(agent, includeInputInOutput: false)).ToArray();
ExecutorIsh[] accumulators = [.. from agent in agentExecutors select (ExecutorIsh)new CollectChatMessagesExecutor($"Batcher/{agent.Id}")];
builder.AddFanOutEdge(start, targets: agentExecutors);
builder.AddFanOutEdge(start, null, targets: agentExecutors);
for (int i = 0; i < agentExecutors.Length; i++)
{
builder.AddEdge(agentExecutors[i], accumulators[i]);
Expand Down
8 changes: 7 additions & 1 deletion dotnet/src/Microsoft.Agents.AI.Workflows/DirectEdgeData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ namespace Microsoft.Agents.AI.Workflows;
/// </summary>
public sealed class DirectEdgeData : EdgeData
{
internal DirectEdgeData(string sourceId, string sinkId, EdgeId id, PredicateT? condition = null) : base(id)
internal DirectEdgeData(string sourceId, string sinkId, EdgeId id, PredicateT? condition = null, string? label = null) : base(id)
{
this.SourceId = sourceId;
this.SinkId = sinkId;
this.Condition = condition;
this.Label = label;
this.Connection = new([sourceId], [sinkId]);
}

Expand All @@ -35,6 +36,11 @@ internal DirectEdgeData(string sourceId, string sinkId, EdgeId id, PredicateT? c
/// </summary>
public PredicateT? Condition { get; }

/// <summary>
/// An optional label for the edge, allowing for arbitrary metadata to be associated with it.
/// </summary>
public string? Label { get; }

/// <inheritdoc />
internal override EdgeConnection Connection { get; }
}
8 changes: 7 additions & 1 deletion dotnet/src/Microsoft.Agents.AI.Workflows/FanInEdgeData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ namespace Microsoft.Agents.AI.Workflows;
/// </summary>
internal sealed class FanInEdgeData : EdgeData
{
internal FanInEdgeData(List<string> sourceIds, string sinkId, EdgeId id) : base(id)
internal FanInEdgeData(List<string> sourceIds, string sinkId, EdgeId id, string? label) : base(id)
{
this.SourceIds = sourceIds;
this.SinkId = sinkId;
this.Connection = new(sourceIds, [sinkId]);
this.Label = label;
}

/// <summary>
Expand All @@ -27,6 +28,11 @@ internal FanInEdgeData(List<string> sourceIds, string sinkId, EdgeId id) : base(
/// </summary>
public string SinkId { get; }

/// <summary>
/// Optional label for the edge.
/// </summary>
public string? Label { get; init; }

/// <inheritdoc />
internal override EdgeConnection Connection { get; }
}
8 changes: 7 additions & 1 deletion dotnet/src/Microsoft.Agents.AI.Workflows/FanOutEdgeData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ namespace Microsoft.Agents.AI.Workflows;
/// </summary>
internal sealed class FanOutEdgeData : EdgeData
{
internal FanOutEdgeData(string sourceId, List<string> sinkIds, EdgeId edgeId, AssignerF? assigner = null) : base(edgeId)
internal FanOutEdgeData(string sourceId, List<string> sinkIds, EdgeId edgeId, AssignerF? assigner = null, string? label = null) : base(edgeId)
{
this.SourceId = sourceId;
this.SinkIds = sinkIds;
this.EdgeAssigner = assigner;
this.Connection = new([sourceId], sinkIds);
this.Label = label;
}

/// <summary>
Expand All @@ -37,6 +38,11 @@ internal FanOutEdgeData(string sourceId, List<string> sinkIds, EdgeId edgeId, As
/// </summary>
public AssignerF? EdgeAssigner { get; }

/// <summary>
/// An optional label for the edge.
/// </summary>
public string? Label { get; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Minor] This should probably be on EdgeData, rather than the specific EdgeDatas.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I move it? I was unsure if you wanted to keep EdgeData pure, but I agree to the suggestion. happy to move this to EdgeData...
Do I proceed? :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's unify them


/// <inheritdoc />
internal override EdgeConnection Connection { get; }
}
2 changes: 1 addition & 1 deletion dotnet/src/Microsoft.Agents.AI.Workflows/SwitchBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ internal WorkflowBuilder ReduceToFanOut(WorkflowBuilder builder, ExecutorIsh sou
List<(Func<object?, bool> Predicate, HashSet<int> OutgoingIndicies)> caseMap = this._caseMap;
HashSet<int> defaultIndicies = this._defaultIndicies;

return builder.AddFanOutEdge<object>(source, CasePartitioner, [.. this._executors]);
return builder.AddFanOutEdge<object>(source, CasePartitioner, null, [.. this._executors]);

IEnumerable<int> CasePartitioner(object? input, int targetCount)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,26 @@ private static void EmitWorkflowDigraph(Workflow workflow, List<string> lines, s
}

// Emit normal edges
foreach (var (src, target, isConditional) in ComputeNormalEdges(workflow))
foreach (var (src, target, isConditional, label) in ComputeNormalEdges(workflow))
{
var edgeAttr = isConditional ? " [style=dashed, label=\"conditional\"]" : "";
lines.Add($"{indent}\"{MapId(src)}\" -> \"{MapId(target)}\"{edgeAttr};");
// Build edge attributes
var attributes = new List<string>();

// Add style for conditional edges
if (isConditional)
{
attributes.Add("style=dashed");
}

// Add label (custom label or default "conditional" for conditional edges)
if (label != null)
{
attributes.Add($"label=\"{EscapeDotLabel(label)}\"");
}

// Combine attributes
var attrString = attributes.Count > 0 ? $" [{string.Join(", ", attributes)}]" : "";
lines.Add($"{indent}\"{MapId(src)}\" -> \"{MapId(target)}\"{attrString};");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the ids need to be put through EscapeDotXYZ()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, but I thought it would be nice in case we find text in chinese, korean or using other special characters, to scape them for compatibility.

Can remove those escape functions though in aras to simplicity, becoming responsibility of the user then.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it consistent, but I do not have a strong opinion as to which of the two.

}
}

Expand Down Expand Up @@ -175,14 +191,21 @@ string sanitize(string input)
}

// Emit normal edges
foreach (var (src, target, isConditional) in ComputeNormalEdges(workflow))
foreach (var (src, target, isConditional, label) in ComputeNormalEdges(workflow))
{
if (isConditional)
if (label != null)
{
lines.Add($"{indent}{MapId(src)} -. conditional .--> {MapId(target)};");
// Regular edge with label
lines.Add($"{indent}{MapId(src)} -->|{EscapeMermaidLabel(label)}| {MapId(target)};");
}
else if (isConditional)
{
// Conditional edge with default label (no escaping needed for literal)
lines.Add($"{indent}{MapId(src)} -->|conditional| {MapId(target)};");
}
else
{
// Regular edge without label
lines.Add($"{indent}{MapId(src)} --> {MapId(target)};");
}
}
Expand Down Expand Up @@ -214,9 +237,9 @@ string sanitize(string input)
return result;
}

private static List<(string Source, string Target, bool IsConditional)> ComputeNormalEdges(Workflow workflow)
private static List<(string Source, string Target, bool IsConditional, string? Label)> ComputeNormalEdges(Workflow workflow)
{
var edges = new List<(string, string, bool)>();
var edges = new List<(string, string, bool, string?)>();
foreach (var edgeGroup in workflow.Edges.Values.SelectMany(x => x))
{
if (edgeGroup.Kind == EdgeKind.FanIn)
Expand All @@ -229,14 +252,15 @@ string sanitize(string input)
case EdgeKind.Direct when edgeGroup.DirectEdgeData != null:
var directData = edgeGroup.DirectEdgeData;
var isConditional = directData.Condition != null;
edges.Add((directData.SourceId, directData.SinkId, isConditional));
var label = directData.Label;
edges.Add((directData.SourceId, directData.SinkId, isConditional, label));
break;

case EdgeKind.FanOut when edgeGroup.FanOutEdgeData != null:
var fanOutData = edgeGroup.FanOutEdgeData;
foreach (var sinkId in fanOutData.SinkIds)
{
edges.Add((fanOutData.SourceId, sinkId, false));
edges.Add((fanOutData.SourceId, sinkId, false, fanOutData.Label));
}
break;
}
Expand Down Expand Up @@ -276,5 +300,24 @@ private static bool TryGetNestedWorkflow(ExecutorRegistration registration, [Not
return false;
}

// Helper method to escape special characters in DOT labels
private static string EscapeDotLabel(string label)
{
return label.Replace("\"", "\\\"").Replace("\n", "\\n");
}

// Helper method to escape special characters in Mermaid labels
private static string EscapeMermaidLabel(string label)
{
return label
.Replace("&", "&amp;") // Must be first to avoid double-escaping
.Replace("|", "&#124;") // Pipe breaks Mermaid delimiter syntax
.Replace("\"", "&quot;") // Quote character
.Replace("<", "&lt;") // Less than
.Replace(">", "&gt;") // Greater than
.Replace("\n", "<br/>") // Newline to HTML break
.Replace("\r", ""); // Remove carriage return
}

#endregion
}
Loading