Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 16 additions & 18 deletions python/packages/core/agent_framework/_workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,29 @@
handler,
)
from ._function_executor import FunctionExecutor, executor
from ._group_chat import (
DEFAULT_MANAGER_INSTRUCTIONS,
DEFAULT_MANAGER_STRUCTURED_OUTPUT_PROMPT,
GroupChatBuilder,
GroupChatDirective,
GroupChatStateSnapshot,
ManagerDirectiveModel,
)
from ._handoff import HandoffBuilder, HandoffUserInputRequest
from ._magentic import (
MagenticAgentDeltaEvent,
MagenticAgentExecutor,
MagenticAgentMessageEvent,
MagenticBuilder,
MagenticCallbackEvent,
MagenticCallbackMode,
MagenticContext,
MagenticFinalResultEvent,
MagenticManagerBase,
MagenticOrchestratorExecutor,
MagenticOrchestratorMessageEvent,
MagenticPlanReviewDecision,
MagenticPlanReviewReply,
MagenticPlanReviewRequest,
MagenticProgressLedger,
MagenticProgressLedgerItem,
MagenticRequestMessage,
MagenticResponseMessage,
MagenticStartMessage,
StandardMagenticManager,
)
from ._orchestration_state import OrchestrationState
from ._request_info_executor import (
PendingRequestDetails,
RequestInfoExecutor,
Expand Down Expand Up @@ -105,6 +105,8 @@
from ._workflow_executor import WorkflowExecutor

__all__ = [
"DEFAULT_MANAGER_INSTRUCTIONS",
"DEFAULT_MANAGER_STRUCTURED_OUTPUT_PROMPT",
"DEFAULT_MAX_ITERATIONS",
"AgentExecutor",
"AgentExecutorRequest",
Expand All @@ -128,30 +130,26 @@
"FileCheckpointStorage",
"FunctionExecutor",
"GraphConnectivityError",
"GroupChatBuilder",
"GroupChatDirective",
"GroupChatStateSnapshot",
"HandoffBuilder",
"HandoffUserInputRequest",
"InMemoryCheckpointStorage",
"InProcRunnerContext",
"MagenticAgentDeltaEvent",
"MagenticAgentExecutor",
"MagenticAgentMessageEvent",
"MagenticBuilder",
"MagenticCallbackEvent",
"MagenticCallbackMode",
"MagenticContext",
"MagenticFinalResultEvent",
"MagenticManagerBase",
"MagenticOrchestratorExecutor",
"MagenticOrchestratorMessageEvent",
"MagenticPlanReviewDecision",
"MagenticPlanReviewReply",
"MagenticPlanReviewRequest",
"MagenticProgressLedger",
"MagenticProgressLedgerItem",
"MagenticRequestMessage",
"MagenticResponseMessage",
"MagenticStartMessage",
"ManagerDirectiveModel",
"Message",
"OrchestrationState",
"PendingRequestDetails",
"RequestInfoEvent",
"RequestInfoExecutor",
Expand Down
32 changes: 14 additions & 18 deletions python/packages/core/agent_framework/_workflows/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -50,29 +50,28 @@ from ._executor import (
handler,
)
from ._function_executor import FunctionExecutor, executor
from ._group_chat import (
DEFAULT_MANAGER_INSTRUCTIONS,
DEFAULT_MANAGER_STRUCTURED_OUTPUT_PROMPT,
GroupChatBuilder,
GroupChatDirective,
GroupChatStateSnapshot,
)
from ._handoff import HandoffBuilder, HandoffUserInputRequest
from ._magentic import (
MagenticAgentDeltaEvent,
MagenticAgentExecutor,
MagenticAgentMessageEvent,
MagenticBuilder,
MagenticCallbackEvent,
MagenticCallbackMode,
MagenticContext,
MagenticFinalResultEvent,
MagenticManagerBase,
MagenticOrchestratorExecutor,
MagenticOrchestratorMessageEvent,
MagenticPlanReviewDecision,
MagenticPlanReviewReply,
MagenticPlanReviewRequest,
MagenticProgressLedger,
MagenticProgressLedgerItem,
MagenticRequestMessage,
MagenticResponseMessage,
MagenticStartMessage,
StandardMagenticManager,
)
from ._orchestration_state import OrchestrationState
from ._request_info_executor import (
PendingRequestDetails,
RequestInfoExecutor,
Expand Down Expand Up @@ -103,6 +102,8 @@ from ._workflow_context import WorkflowContext
from ._workflow_executor import WorkflowExecutor

__all__ = [
"DEFAULT_MANAGER_INSTRUCTIONS",
"DEFAULT_MANAGER_STRUCTURED_OUTPUT_PROMPT",
"DEFAULT_MAX_ITERATIONS",
"AgentExecutor",
"AgentExecutorRequest",
Expand All @@ -126,30 +127,25 @@ __all__ = [
"FileCheckpointStorage",
"FunctionExecutor",
"GraphConnectivityError",
"GroupChatBuilder",
"GroupChatDirective",
"GroupChatStateSnapshot",
"HandoffBuilder",
"HandoffUserInputRequest",
"InMemoryCheckpointStorage",
"InProcRunnerContext",
"MagenticAgentDeltaEvent",
"MagenticAgentExecutor",
"MagenticAgentMessageEvent",
"MagenticBuilder",
"MagenticCallbackEvent",
"MagenticCallbackMode",
"MagenticContext",
"MagenticFinalResultEvent",
"MagenticManagerBase",
"MagenticOrchestratorExecutor",
"MagenticOrchestratorMessageEvent",
"MagenticPlanReviewDecision",
"MagenticPlanReviewReply",
"MagenticPlanReviewRequest",
"MagenticProgressLedger",
"MagenticProgressLedgerItem",
"MagenticRequestMessage",
"MagenticResponseMessage",
"MagenticStartMessage",
"Message",
"OrchestrationState",
"PendingRequestDetails",
"RequestInfoEvent",
"RequestInfoExecutor",
Expand Down
30 changes: 4 additions & 26 deletions python/packages/core/agent_framework/_workflows/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import logging
import uuid
from collections.abc import AsyncIterable, Sequence
from collections.abc import AsyncIterable
from dataclasses import dataclass
from datetime import datetime
from typing import TYPE_CHECKING, Any, ClassVar, TypedDict, cast
Expand All @@ -19,7 +19,6 @@
FunctionCallContent,
FunctionResultContent,
Role,
TextContent,
UsageDetails,
)

Expand All @@ -29,6 +28,7 @@
RequestInfoEvent,
WorkflowEvent,
)
from ._message_utils import normalize_messages_input

if TYPE_CHECKING:
from ._workflow import Workflow
Expand Down Expand Up @@ -131,7 +131,7 @@ async def run(
"""
# Collect all streaming updates
response_updates: list[AgentRunResponseUpdate] = []
input_messages = self._normalize_messages(messages)
input_messages = normalize_messages_input(messages)
thread = thread or self.get_new_thread()
response_id = str(uuid.uuid4())

Expand Down Expand Up @@ -165,7 +165,7 @@ async def run_stream(
Yields:
AgentRunResponseUpdate objects representing the workflow execution progress.
"""
input_messages = self._normalize_messages(messages)
input_messages = normalize_messages_input(messages)
thread = thread or self.get_new_thread()
response_updates: list[AgentRunResponseUpdate] = []
response_id = str(uuid.uuid4())
Expand Down Expand Up @@ -225,28 +225,6 @@ async def _run_stream_impl(
if update:
yield update

def _normalize_messages(
self,
messages: str | ChatMessage | Sequence[str] | Sequence[ChatMessage] | None = None,
) -> list[ChatMessage]:
"""Normalize input messages to a list of ChatMessage objects."""
if messages is None:
return []

if isinstance(messages, str):
return [ChatMessage(role=Role.USER, contents=[TextContent(text=messages)])]

if isinstance(messages, ChatMessage):
return [messages]

normalized: list[ChatMessage] = []
for msg in messages:
if isinstance(msg, str):
normalized.append(ChatMessage(role=Role.USER, contents=[TextContent(text=msg)]))
elif isinstance(msg, ChatMessage):
normalized.append(msg)
return normalized

def _convert_workflow_event_to_agent_update(
self,
response_id: str,
Expand Down
46 changes: 41 additions & 5 deletions python/packages/core/agent_framework/_workflows/_agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
AgentRunUpdateEvent, # type: ignore[reportPrivateUsage]
)
from ._executor import Executor, handler
from ._message_utils import normalize_messages_input
from ._workflow_context import WorkflowContext

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -167,7 +168,7 @@ async def from_response(
@handler
async def from_str(self, text: str, ctx: WorkflowContext[AgentExecutorResponse, AgentRunResponse]) -> None:
"""Accept a raw user prompt string and run the agent (one-shot)."""
self._cache = [ChatMessage(role="user", text=text)] # type: ignore[arg-type]
self._cache = normalize_messages_input(text)
await self._run_agent_and_emit(ctx)

@handler
Expand All @@ -177,15 +178,50 @@ async def from_message(
ctx: WorkflowContext[AgentExecutorResponse, AgentRunResponse],
) -> None:
"""Accept a single ChatMessage as input."""
self._cache = [message]
self._cache = normalize_messages_input(message)
await self._run_agent_and_emit(ctx)

@handler
async def from_messages(
self,
messages: list[ChatMessage],
messages: list[str | ChatMessage],
ctx: WorkflowContext[AgentExecutorResponse, AgentRunResponse],
) -> None:
"""Accept a list of ChatMessage objects as conversation context."""
self._cache = list(messages)
"""Accept a list of chat inputs (strings or ChatMessage) as conversation context."""
self._cache = normalize_messages_input(messages)
await self._run_agent_and_emit(ctx)

def snapshot_state(self) -> dict[str, Any]:
"""Capture current executor state for checkpointing.

Returns:
Dict containing serialized cache state
"""
from ._conversation_state import encode_chat_messages

return {
"cache": encode_chat_messages(self._cache),
}

def restore_state(self, state: dict[str, Any]) -> None:
"""Restore executor state from checkpoint.

Args:
state: Checkpoint data dict
"""
from ._conversation_state import decode_chat_messages

cache_payload = state.get("cache")
if cache_payload:
try:
self._cache = decode_chat_messages(cache_payload)
except Exception as exc:
logger.warning("Failed to restore cache: %s", exc)
self._cache = []
else:
self._cache = []

def reset(self) -> None:
"""Reset the internal cache of the executor."""
logger.debug("AgentExecutor %s: Resetting cache", self.id)
self._cache.clear()
Loading