Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions python/packages/core/agent_framework/_workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,26 @@
handler,
)
from ._function_executor import FunctionExecutor, executor
from ._group_chat import (
DEFAULT_MANAGER_INSTRUCTIONS,
GroupChatBuilder,
GroupChatDirective,
GroupChatManagerFn,
GroupChatOrchestratorExecutor,
GroupChatParticipantPipeline,
GroupChatParticipantSpec,
GroupChatRequestMessage,
GroupChatResponseMessage,
GroupChatStateSnapshot,
GroupChatTurn,
GroupChatWiring,
StandardGroupChatManager,
)
from ._magentic import (
MagenticAgentDeltaEvent,
MagenticAgentExecutor,
MagenticAgentMessageEvent,
MagenticBuilder,
MagenticCallbackEvent,
MagenticCallbackMode,
MagenticContext,
MagenticFinalResultEvent,
MagenticManagerBase,
Expand Down Expand Up @@ -104,6 +117,7 @@
from ._workflow_executor import WorkflowExecutor

__all__ = [
"DEFAULT_MANAGER_INSTRUCTIONS",
"DEFAULT_MAX_ITERATIONS",
"AgentExecutor",
"AgentExecutorRequest",
Expand All @@ -127,14 +141,23 @@
"FileCheckpointStorage",
"FunctionExecutor",
"GraphConnectivityError",
"GroupChatBuilder",
"GroupChatDirective",
"GroupChatManagerFn",
"GroupChatOrchestratorExecutor",
"GroupChatParticipantPipeline",
"GroupChatParticipantSpec",
"GroupChatRequestMessage",
"GroupChatResponseMessage",
"GroupChatStateSnapshot",
"GroupChatTurn",
"GroupChatWiring",
"InMemoryCheckpointStorage",
"InProcRunnerContext",
"MagenticAgentDeltaEvent",
"MagenticAgentExecutor",
"MagenticAgentMessageEvent",
"MagenticBuilder",
"MagenticCallbackEvent",
"MagenticCallbackMode",
"MagenticContext",
"MagenticFinalResultEvent",
"MagenticManagerBase",
Expand All @@ -159,6 +182,7 @@
"SequentialBuilder",
"SharedState",
"SingleEdgeGroup",
"StandardGroupChatManager",
"StandardMagenticManager",
"SwitchCaseEdgeGroup",
"SwitchCaseEdgeGroupCase",
Expand Down
32 changes: 28 additions & 4 deletions python/packages/core/agent_framework/_workflows/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,26 @@ from ._executor import (
handler,
)
from ._function_executor import FunctionExecutor, executor
from ._group_chat import (
DEFAULT_MANAGER_INSTRUCTIONS,
GroupChatBuilder,
GroupChatDirective,
GroupChatManagerFn,
GroupChatOrchestratorExecutor,
GroupChatParticipantPipeline,
GroupChatParticipantSpec,
GroupChatRequestMessage,
GroupChatResponseMessage,
GroupChatStateSnapshot,
GroupChatTurn,
GroupChatWiring,
StandardGroupChatManager,
)
from ._magentic import (
MagenticAgentDeltaEvent,
MagenticAgentExecutor,
MagenticAgentMessageEvent,
MagenticBuilder,
MagenticCallbackEvent,
MagenticCallbackMode,
MagenticContext,
MagenticFinalResultEvent,
MagenticManagerBase,
Expand Down Expand Up @@ -102,6 +115,7 @@ from ._workflow_context import WorkflowContext
from ._workflow_executor import WorkflowExecutor

__all__ = [
"DEFAULT_MANAGER_INSTRUCTIONS",
"DEFAULT_MAX_ITERATIONS",
"AgentExecutor",
"AgentExecutorRequest",
Expand All @@ -125,14 +139,23 @@ __all__ = [
"FileCheckpointStorage",
"FunctionExecutor",
"GraphConnectivityError",
"GroupChatBuilder",
"GroupChatDirective",
"GroupChatManagerFn",
"GroupChatOrchestratorExecutor",
"GroupChatParticipantPipeline",
"GroupChatParticipantSpec",
"GroupChatRequestMessage",
"GroupChatResponseMessage",
"GroupChatStateSnapshot",
"GroupChatTurn",
"GroupChatWiring",
"InMemoryCheckpointStorage",
"InProcRunnerContext",
"MagenticAgentDeltaEvent",
"MagenticAgentExecutor",
"MagenticAgentMessageEvent",
"MagenticBuilder",
"MagenticCallbackEvent",
"MagenticCallbackMode",
"MagenticContext",
"MagenticFinalResultEvent",
"MagenticManagerBase",
Expand All @@ -157,6 +180,7 @@ __all__ = [
"SequentialBuilder",
"SharedState",
"SingleEdgeGroup",
"StandardGroupChatManager",
"StandardMagenticManager",
"SwitchCaseEdgeGroup",
"SwitchCaseEdgeGroupCase",
Expand Down
30 changes: 4 additions & 26 deletions python/packages/core/agent_framework/_workflows/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import logging
import uuid
from collections.abc import AsyncIterable, Sequence
from collections.abc import AsyncIterable
from dataclasses import dataclass
from datetime import datetime
from typing import TYPE_CHECKING, Any, ClassVar, TypedDict, cast
Expand All @@ -17,7 +17,6 @@
FunctionCallContent,
FunctionResultContent,
Role,
TextContent,
UsageDetails,
)

Expand All @@ -27,6 +26,7 @@
RequestInfoEvent,
WorkflowEvent,
)
from ._message_utils import normalize_messages_input

if TYPE_CHECKING:
from ._workflow import Workflow
Expand Down Expand Up @@ -129,7 +129,7 @@ async def run(
"""
# Collect all streaming updates
response_updates: list[AgentRunResponseUpdate] = []
input_messages = self._normalize_messages(messages)
input_messages = normalize_messages_input(messages)
thread = thread or self.get_new_thread()
response_id = str(uuid.uuid4())

Expand Down Expand Up @@ -163,7 +163,7 @@ async def run_stream(
Yields:
AgentRunResponseUpdate objects representing the workflow execution progress.
"""
input_messages = self._normalize_messages(messages)
input_messages = normalize_messages_input(messages)
thread = thread or self.get_new_thread()
response_updates: list[AgentRunResponseUpdate] = []
response_id = str(uuid.uuid4())
Expand Down Expand Up @@ -223,28 +223,6 @@ async def _run_stream_impl(
if update:
yield update

def _normalize_messages(
self,
messages: str | ChatMessage | Sequence[str] | Sequence[ChatMessage] | None = None,
) -> list[ChatMessage]:
"""Normalize input messages to a list of ChatMessage objects."""
if messages is None:
return []

if isinstance(messages, str):
return [ChatMessage(role=Role.USER, contents=[TextContent(text=messages)])]

if isinstance(messages, ChatMessage):
return [messages]

normalized: list[ChatMessage] = []
for msg in messages:
if isinstance(msg, str):
normalized.append(ChatMessage(role=Role.USER, contents=[TextContent(text=msg)]))
elif isinstance(msg, ChatMessage):
normalized.append(msg)
return normalized

def _convert_workflow_event_to_agent_update(
self,
response_id: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
AgentRunUpdateEvent, # type: ignore[reportPrivateUsage]
)
from ._executor import Executor, handler
from ._message_utils import normalize_messages_input
from ._workflow_context import WorkflowContext

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -167,7 +168,7 @@ async def from_response(
@handler
async def from_str(self, text: str, ctx: WorkflowContext[AgentExecutorResponse, AgentRunResponse]) -> None:
"""Accept a raw user prompt string and run the agent (one-shot)."""
self._cache = [ChatMessage(role="user", text=text)] # type: ignore[arg-type]
self._cache = normalize_messages_input(text)
await self._run_agent_and_emit(ctx)

@handler
Expand All @@ -177,15 +178,15 @@ async def from_message(
ctx: WorkflowContext[AgentExecutorResponse, AgentRunResponse],
) -> None:
"""Accept a single ChatMessage as input."""
self._cache = [message]
self._cache = normalize_messages_input(message)
await self._run_agent_and_emit(ctx)

@handler
async def from_messages(
self,
messages: list[ChatMessage],
messages: list[str | ChatMessage],
ctx: WorkflowContext[AgentExecutorResponse, AgentRunResponse],
) -> None:
"""Accept a list of ChatMessage objects as conversation context."""
self._cache = list(messages)
"""Accept a list of chat inputs (strings or ChatMessage) as conversation context."""
self._cache = normalize_messages_input(messages)
await self._run_agent_and_emit(ctx)
15 changes: 10 additions & 5 deletions python/packages/core/agent_framework/_workflows/_concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ._agent_executor import AgentExecutorRequest, AgentExecutorResponse
from ._checkpoint import CheckpointStorage
from ._executor import Executor, handler
from ._message_utils import normalize_messages_input
from ._workflow import Workflow
from ._workflow_builder import WorkflowBuilder
from ._workflow_context import WorkflowContext
Expand Down Expand Up @@ -50,17 +51,21 @@ async def from_request(self, request: AgentExecutorRequest, ctx: WorkflowContext

@handler
async def from_str(self, prompt: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
request = AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=prompt)], should_respond=True)
request = AgentExecutorRequest(messages=normalize_messages_input(prompt), should_respond=True)
await ctx.send_message(request)

@handler
async def from_message(self, message: ChatMessage, ctx: WorkflowContext[AgentExecutorRequest]) -> None: # type: ignore[name-defined]
request = AgentExecutorRequest(messages=[message], should_respond=True)
async def from_message(self, message: ChatMessage, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
request = AgentExecutorRequest(messages=normalize_messages_input(message), should_respond=True)
await ctx.send_message(request)

@handler
async def from_messages(self, messages: list[ChatMessage], ctx: WorkflowContext[AgentExecutorRequest]) -> None: # type: ignore[name-defined]
request = AgentExecutorRequest(messages=list(messages), should_respond=True)
async def from_messages(
self,
messages: list[str | ChatMessage],
ctx: WorkflowContext[AgentExecutorRequest],
) -> None:
request = AgentExecutorRequest(messages=normalize_messages_input(messages), should_respond=True)
await ctx.send_message(request)


Expand Down
8 changes: 1 addition & 7 deletions python/packages/core/agent_framework/_workflows/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,13 +450,7 @@ def to_dict(self) -> dict[str, Any]:

def handler(
func: Callable[[ExecutorT, Any, ContextT], Awaitable[Any]],
) -> (
Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]
| Callable[
[Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]],
Callable[[ExecutorT, Any, ContextT], Awaitable[Any]],
]
):
) -> Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]:
"""Decorator to register a handler for an executor.

Args:
Expand Down
Loading