Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3234c6e
Prototype: Add request_info API and @response_handler
TaoChenOSU Oct 14, 2025
309ceb4
Add original_request as a parameter to the response handler
TaoChenOSU Oct 15, 2025
334016f
Prototype: request interception in sub workflows
TaoChenOSU Oct 15, 2025
a4b928d
Prototype: request interception in sub workflows 2
TaoChenOSU Oct 16, 2025
598830c
WIP: Make checkpointing work
TaoChenOSU Oct 17, 2025
62e07d7
Merge branch 'main' into local-branch-python-request-response-redesign
TaoChenOSU Oct 20, 2025
2c5f595
checkpointing with sub workflow
TaoChenOSU Oct 21, 2025
4939e46
Fix function executor
TaoChenOSU Oct 21, 2025
2767ee5
Allow sub-workflow to output directly
TaoChenOSU Oct 22, 2025
bd82c02
Remove ReqeustInfoExecutor and related classes; Debugging checkpoint_…
TaoChenOSU Oct 22, 2025
ae5463a
Merge branch 'main' into local-branch-python-request-response-redesign
TaoChenOSU Oct 23, 2025
e5361b4
Merge branch 'main' into local-branch-python-request-response-redesign
TaoChenOSU Oct 24, 2025
d3e8343
Fix Handoff and sample
TaoChenOSU Oct 24, 2025
dc24afa
fix pending requests in checkpoint
TaoChenOSU Oct 24, 2025
042f610
Fix unit tests
TaoChenOSU Oct 24, 2025
fdfe83d
Fix formatting
TaoChenOSU Oct 25, 2025
cf8866d
Merge branch 'main' into local-branch-python-request-response-redesign
TaoChenOSU Oct 27, 2025
ad19795
Resolve comments
TaoChenOSU Oct 27, 2025
26c2acc
Address comment
TaoChenOSU Oct 27, 2025
6dde167
Add checkpoint tests
TaoChenOSU Oct 27, 2025
82ec03b
Add tests
TaoChenOSU Oct 27, 2025
968577f
misc
TaoChenOSU Oct 27, 2025
28d2811
fix mypy
TaoChenOSU Oct 27, 2025
ccb18c6
fix mypy
TaoChenOSU Oct 27, 2025
9830724
Use request type as part of the key
TaoChenOSU Oct 28, 2025
9caa7aa
Log warning if there is not response handler for a request
TaoChenOSU Oct 28, 2025
f6a1ccb
Update Internal edge group comments
TaoChenOSU Oct 28, 2025
4635c4d
REcord message type in executor processing span
TaoChenOSU Oct 28, 2025
b1d4b1a
Merge branch 'main' into local-branch-python-request-response-redesign
TaoChenOSU Oct 28, 2025
476b0c7
Update sample
TaoChenOSU Oct 28, 2025
3d99a86
Merge branch 'main' into local-branch-python-request-response-redesign
TaoChenOSU Oct 29, 2025
8cd0770
Improve tests
TaoChenOSU Oct 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/packages/core/agent_framework/_workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
RequestInfoMessage,
RequestResponse,
)
from ._request_info_mixin import response_handler
from ._runner import Runner
from ._runner_context import (
InProcRunnerContext,
Expand All @@ -90,7 +91,6 @@
from ._shared_state import SharedState
from ._validation import (
EdgeDuplicationError,
ExecutorDuplicationError,
GraphConnectivityError,
TypeCompatibilityError,
ValidationTypeEnum,
Expand All @@ -117,7 +117,6 @@
"EdgeDuplicationError",
"Executor",
"ExecutorCompletedEvent",
"ExecutorDuplicationError",
"ExecutorEvent",
"ExecutorFailedEvent",
"ExecutorInvokedEvent",
Expand Down Expand Up @@ -187,5 +186,6 @@
"executor",
"get_checkpoint_summary",
"handler",
"response_handler",
"validate_workflow_graph",
]
4 changes: 2 additions & 2 deletions python/packages/core/agent_framework/_workflows/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ from ._request_info_executor import (
RequestInfoMessage,
RequestResponse,
)
from ._request_info_mixin import response_handler
from ._runner import Runner
from ._runner_context import (
InProcRunnerContext,
Expand All @@ -88,7 +89,6 @@ from ._sequential import SequentialBuilder
from ._shared_state import SharedState
from ._validation import (
EdgeDuplicationError,
ExecutorDuplicationError,
GraphConnectivityError,
TypeCompatibilityError,
ValidationTypeEnum,
Expand All @@ -115,7 +115,6 @@ __all__ = [
"EdgeDuplicationError",
"Executor",
"ExecutorCompletedEvent",
"ExecutorDuplicationError",
"ExecutorEvent",
"ExecutorFailedEvent",
"ExecutorInvokedEvent",
Expand Down Expand Up @@ -185,5 +184,6 @@ __all__ = [
"executor",
"get_checkpoint_summary",
"handler",
"response_handler",
"validate_workflow_graph",
]
7 changes: 7 additions & 0 deletions python/packages/core/agent_framework/_workflows/_const.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# Copyright (c) Microsoft. All rights reserved.

DEFAULT_MAX_ITERATIONS = 100 # Default maximum iterations for workflow execution.

INTERNAL_SOURCE_PREFIX = "internal" # Source identifier for internal workflow messages.


def INTERNAL_SOURCE_ID(executor_id: str) -> str:
"""Generate an internal source ID for a given executor."""
return f"{INTERNAL_SOURCE_PREFIX}:{executor_id}"
25 changes: 25 additions & 0 deletions python/packages/core/agent_framework/_workflows/_edge.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dataclasses import dataclass, field
from typing import Any, ClassVar

from ._const import INTERNAL_SOURCE_ID
from ._executor import Executor
from ._model_utils import DictConvertible, encode_value

Expand Down Expand Up @@ -865,3 +866,27 @@ def to_dict(self) -> dict[str, Any]:
payload = super().to_dict()
payload["cases"] = [encode_value(case) for case in self.cases]
return payload


class InternalEdgeGroup(EdgeGroup):
"""Special edge group used to route internal messages to executors.

This group is not serialized and is only used at runtime to link internal
executors that should not be exposed as part of the public workflow graph.
"""

def __init__(self, executor_id: str) -> None:
"""Create an internal edge group from the given edges.

Parameters
----------
executor_id:
Identifier of the internal executor that should receive messages.

Examples:
.. code-block:: python

edge_group = InternalEdgeGroup("executor_a")
"""
edge = Edge(source_id=INTERNAL_SOURCE_ID(executor_id), target_id=executor_id)
super().__init__([edge])
28 changes: 19 additions & 9 deletions python/packages/core/agent_framework/_workflows/_edge_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,15 @@
from typing import Any, cast

from ..observability import EdgeGroupDeliveryStatus, OtelAttr, create_edge_group_processing_span
from ._edge import Edge, EdgeGroup, FanInEdgeGroup, FanOutEdgeGroup, SingleEdgeGroup, SwitchCaseEdgeGroup
from ._edge import (
Edge,
EdgeGroup,
FanInEdgeGroup,
FanOutEdgeGroup,
InternalEdgeGroup,
SingleEdgeGroup,
SwitchCaseEdgeGroup,
)
from ._executor import Executor
from ._runner_context import Message, RunnerContext
from ._shared_state import SharedState
Expand Down Expand Up @@ -44,11 +52,11 @@ async def send_message(self, message: Message, shared_state: SharedState, ctx: R
"""
raise NotImplementedError

def _can_handle(self, executor_id: str, message_data: Any) -> bool:
def _can_handle(self, executor_id: str, message: Message) -> bool:
"""Check if an executor can handle the given message data."""
if executor_id not in self._executors:
return False
return self._executors[executor_id].can_handle(message_data)
return self._executors[executor_id].can_handle(message)

async def _execute_on_target(
self,
Expand All @@ -66,7 +74,7 @@ async def _execute_on_target(

# Execute with trace context parameters
await target_executor.execute(
message.data,
message,
source_ids, # source_executor_ids
shared_state, # shared_state
ctx, # runner_context
Expand Down Expand Up @@ -103,7 +111,7 @@ async def send_message(self, message: Message, shared_state: SharedState, ctx: R
})
return False

if self._can_handle(self._edge.target_id, message.data):
if self._can_handle(self._edge.target_id, message):
if self._edge.should_route(message.data):
span.set_attributes({
OtelAttr.EDGE_GROUP_DELIVERED: True,
Expand Down Expand Up @@ -183,7 +191,7 @@ async def send_message(self, message: Message, shared_state: SharedState, ctx: R
# If the target ID is specified and the selection result contains it, send the message to that edge
if message.target_id in selection_results:
edge = self._target_map.get(message.target_id)
if edge and self._can_handle(edge.target_id, message.data):
if edge and self._can_handle(edge.target_id, message):
if edge.should_route(message.data):
span.set_attributes({
OtelAttr.EDGE_GROUP_DELIVERED: True,
Expand Down Expand Up @@ -215,7 +223,7 @@ async def send_message(self, message: Message, shared_state: SharedState, ctx: R
# If no target ID, send the message to the selected targets
for target_id in selection_results:
edge = self._target_map[target_id]
if self._can_handle(edge.target_id, message.data) and edge.should_route(message.data):
if self._can_handle(edge.target_id, message) and edge.should_route(message.data):
deliverable_edges.append(edge)

if len(deliverable_edges) > 0:
Expand Down Expand Up @@ -291,7 +299,9 @@ async def send_message(self, message: Message, shared_state: SharedState, ctx: R
return False

# Check if target can handle list of message data (fan-in aggregates multiple messages)
if self._can_handle(self._edges[0].target_id, [message.data]):
if self._can_handle(
self._edges[0].target_id, Message(data=[message.data], source_id=message.source_id)
):
# If the edge can handle the data, buffer the message
self._buffer[message.source_id].append(message)
span.set_attributes({
Expand Down Expand Up @@ -374,7 +384,7 @@ def create_edge_runner(edge_group: EdgeGroup, executors: dict[str, Executor]) ->
Returns:
The appropriate EdgeRunner instance.
"""
if isinstance(edge_group, SingleEdgeGroup):
if isinstance(edge_group, (SingleEdgeGroup, InternalEdgeGroup)):
return SingleEdgeRunner(edge_group, executors)
if isinstance(edge_group, SwitchCaseEdgeGroup):
return SwitchCaseEdgeRunner(edge_group, executors)
Expand Down
6 changes: 5 additions & 1 deletion python/packages/core/agent_framework/_workflows/_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ def __init__(
source_executor_id: str,
request_type: type,
request_data: "RequestInfoMessage",
response_type: type,
):
"""Initialize the request info event.

Expand All @@ -220,11 +221,13 @@ def __init__(
source_executor_id: ID of the executor that made the request.
request_type: Type of the request (e.g., a specific data type).
request_data: The data associated with the request.
response_type: Expected type of the response.
"""
super().__init__(request_data)
self.request_id = request_id
self.source_executor_id = source_executor_id
self.request_type = request_type
self.response_type = response_type

def __repr__(self) -> str:
"""Return a string representation of the request info event."""
Expand All @@ -233,7 +236,8 @@ def __repr__(self) -> str:
f"request_id={self.request_id}, "
f"source_executor_id={self.source_executor_id}, "
f"request_type={self.request_type.__name__}, "
f"data={self.data})"
f"data={self.data}, "
f"response_type={self.response_type.__name__})"
)


Expand Down
36 changes: 20 additions & 16 deletions python/packages/core/agent_framework/_workflows/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
_framework_event_origin, # type: ignore[reportPrivateUsage]
)
from ._model_utils import DictConvertible
from ._runner_context import Message, RunnerContext # type: ignore
from ._request_info_mixin import RequestInfoMixin
from ._runner_context import Message, MessageType, RunnerContext
from ._shared_state import SharedState
from ._typing_utils import is_instance_of
from ._workflow_context import WorkflowContext, validate_function_signature
Expand All @@ -25,7 +26,7 @@


# region Executor
class Executor(DictConvertible):
class Executor(RequestInfoMixin, DictConvertible):
"""Base class for all workflow executors that process messages and perform computations.

## Overview
Expand Down Expand Up @@ -204,6 +205,9 @@ def __init__(
"Please define at least one handler using the @handler decorator."
)

# Initialize RequestInfoMixin to discover response handlers
self._discover_response_handlers()

async def execute(
self,
message: Any,
Expand All @@ -229,12 +233,17 @@ async def execute(
Returns:
An awaitable that resolves to the result of the execution.
"""
# Create processing span for tracing (gracefully handles disabled tracing)
# Default to find handler in regular handlers
target_handlers = self._handlers

# Handle case where Message wrapper is passed instead of raw data
if isinstance(message, Message):
if message.message_type == MessageType.RESPONSE:
# Switch to response handlers if message is a response
target_handlers = self._response_handlers
message = message.data

# Create processing span for tracing (gracefully handles disabled tracing)
with create_processing_span(
self.id,
self.__class__.__name__,
Expand All @@ -244,15 +253,9 @@ async def execute(
):
# Find the handler and handler spec that matches the message type.
handler: Callable[[Any, WorkflowContext[Any, Any]], Awaitable[None]] | None = None
ctx_annotation = None
for message_type in self._handlers:
for message_type in target_handlers:
if is_instance_of(message, message_type):
handler = self._handlers[message_type]
# Find the corresponding handler spec for context annotation
for spec in self._handler_specs:
if spec.get("message_type") == message_type:
ctx_annotation = spec.get("ctx_annotation")
break
handler = target_handlers[message_type]
break

if handler is None:
Expand All @@ -263,7 +266,6 @@ async def execute(
source_executor_ids=source_executor_ids,
shared_state=shared_state,
runner_context=runner_context,
ctx_annotation=ctx_annotation,
trace_contexts=trace_contexts,
source_span_ids=source_span_ids,
)
Expand All @@ -289,7 +291,6 @@ def _create_context_for_handler(
source_executor_ids: list[str],
shared_state: SharedState,
runner_context: RunnerContext,
ctx_annotation: Any,
trace_contexts: list[dict[str, str]] | None = None,
source_span_ids: list[str] | None = None,
) -> WorkflowContext[Any]:
Expand All @@ -299,7 +300,6 @@ def _create_context_for_handler(
source_executor_ids: The IDs of the source executors that sent messages to this executor.
shared_state: The shared state for the workflow.
runner_context: The runner context that provides methods to send messages and events.
ctx_annotation: The context annotation from the handler spec to determine which context type to create.
trace_contexts: Optional trace contexts from multiple sources for OpenTelemetry propagation.
source_span_ids: Optional source span IDs from multiple sources for linking.

Expand Down Expand Up @@ -350,7 +350,7 @@ def _discover_handlers(self) -> None:
# Skip attributes that may not be accessible
continue

def can_handle(self, message: Any) -> bool:
def can_handle(self, message: Message) -> bool:
"""Check if the executor can handle a given message type.

Args:
Expand All @@ -359,7 +359,11 @@ def can_handle(self, message: Any) -> bool:
Returns:
True if the executor can handle the message type, False otherwise.
"""
return any(is_instance_of(message, message_type) for message_type in self._handlers)
if message.message_type == MessageType.REGULAR:
return any(is_instance_of(message.data, message_type) for message_type in self._handlers)
if message.message_type == MessageType.RESPONSE:
return any(is_instance_of(message.data, message_type) for message_type in self._response_handlers)
return False

def _register_instance_handler(
self,
Expand Down
Loading
Loading