Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 43 additions & 4 deletions python/packages/core/agent_framework/_workflows/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,19 @@ async def run(
checkpoint_storage: Runtime checkpoint storage. When provided with checkpoint_id,
used to load and restore the checkpoint. When provided without checkpoint_id,
enables checkpointing for this run.
**kwargs: Additional keyword arguments.
**kwargs: Additional keyword arguments to pass through to agent invocations.
These are stored in SharedState and accessible in @ai_function tools
via the **kwargs parameter.

With custom context for ai_functions:

.. code-block:: python

result = await workflow_agent.run(
"analyze data",
custom_data={"endpoint": "https://api.example.com"},
user_token={"user": "alice"},
)
Comment on lines +148 to +156
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code example is incorrectly placed within the "Keyword Args:" section. Following the pattern established in Workflow.run_stream() (lines 480-520 in _workflow.py), examples should be in a separate "Examples:" section after the "Returns:" section. The example text "With custom context for ai_functions:" should be a subsection heading within an "Examples:" section, not nested under "Keyword Args:". Please restructure the docstring to match the Workflow.run_stream() documentation style.

Copilot uses AI. Check for mistakes.

Returns:
The final workflow response as an AgentRunResponse.
Expand All @@ -153,7 +165,12 @@ async def run(
response_id = str(uuid.uuid4())

async for update in self._run_stream_impl(
input_messages, response_id, thread, checkpoint_id, checkpoint_storage
input_messages,
response_id,
thread,
checkpoint_id,
checkpoint_storage,
run_kwargs=kwargs if kwargs else None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason we couldn't use idiomatic Python **kwargs forwarding?

Comment on lines 165 to +173
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new functionality of passing kwargs through WorkflowAgent lacks test coverage. While the test file test_workflow_kwargs.py has comprehensive tests for kwargs flowing through Workflow and SequentialBuilder to agents, there are no tests verifying that kwargs passed to WorkflowAgent.run() and WorkflowAgent.run_stream() properly flow through to the underlying workflow and become accessible to @ai_function tools. Consider adding tests similar to test_sequential_kwargs_flow_to_agent but for WorkflowAgent specifically.

Copilot uses AI. Check for mistakes.
):
response_updates.append(update)

Expand Down Expand Up @@ -187,7 +204,20 @@ async def run_stream(
checkpoint_storage: Runtime checkpoint storage. When provided with checkpoint_id,
used to load and restore the checkpoint. When provided without checkpoint_id,
enables checkpointing for this run.
**kwargs: Additional keyword arguments.
**kwargs: Additional keyword arguments to pass through to agent invocations.
These are stored in SharedState and accessible in @ai_function tools
via the **kwargs parameter.

With custom context for ai_functions:

.. code-block:: python

async for event in workflow_agent.run_stream(
"analyze data",
custom_data={"endpoint": "https://api.example.com"},
user_token={"user": "alice"},
):
process(event)

Yields:
AgentRunResponseUpdate objects representing the workflow execution progress.
Comment on lines +211 to 223
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code example is incorrectly placed within the "Keyword Args:" section. Following the pattern established in Workflow.run_stream() (lines 480-520 in _workflow.py), examples should be in a separate "Examples:" section after the "Yields:" section. The example text "With custom context for ai_functions:" should be a subsection heading within an "Examples:" section, not nested under "Keyword Args:". Please restructure the docstring to match the Workflow.run_stream() documentation style.

Suggested change
With custom context for ai_functions:
.. code-block:: python
async for event in workflow_agent.run_stream(
"analyze data",
custom_data={"endpoint": "https://api.example.com"},
user_token={"user": "alice"},
):
process(event)
Yields:
AgentRunResponseUpdate objects representing the workflow execution progress.
Yields:
AgentRunResponseUpdate objects representing the workflow execution progress.
Examples:
With custom context for ai_functions:
.. code-block:: python
async for event in workflow_agent.run_stream(
"analyze data",
custom_data={"endpoint": "https://api.example.com"},
user_token={"user": "alice"},
):
process(event)

Copilot uses AI. Check for mistakes.
Expand All @@ -198,7 +228,12 @@ async def run_stream(
response_id = str(uuid.uuid4())

async for update in self._run_stream_impl(
input_messages, response_id, thread, checkpoint_id, checkpoint_storage
input_messages,
response_id,
thread,
checkpoint_id,
checkpoint_storage,
run_kwargs=kwargs if kwargs else None,
):
response_updates.append(update)
yield update
Expand All @@ -216,6 +251,7 @@ async def _run_stream_impl(
thread: AgentThread,
checkpoint_id: str | None = None,
checkpoint_storage: CheckpointStorage | None = None,
run_kwargs: dict[str, Any] | None = None,
) -> AsyncIterable[AgentRunResponseUpdate]:
"""Internal implementation of streaming execution.

Expand All @@ -225,6 +261,7 @@ async def _run_stream_impl(
thread: The conversation thread containing message history.
checkpoint_id: ID of checkpoint to restore from.
checkpoint_storage: Runtime checkpoint storage.
run_kwargs: Optional kwargs to store in SharedState for agent invocations

Yields:
AgentRunResponseUpdate objects representing the workflow execution progress.
Expand Down Expand Up @@ -255,6 +292,7 @@ async def _run_stream_impl(
message=None,
checkpoint_id=checkpoint_id,
checkpoint_storage=checkpoint_storage,
**(run_kwargs if run_kwargs else {}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like this adds some unnecessary complexity - could we simply propagate **kwargs as-is?

)
else:
# Execute workflow with streaming (initial run or no function responses)
Expand All @@ -268,6 +306,7 @@ async def _run_stream_impl(
event_stream = self.workflow.run_stream(
message=conversation_messages,
checkpoint_storage=checkpoint_storage,
**(run_kwargs if run_kwargs else {}),
)

# Process events from the stream
Expand Down