Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 75 additions & 12 deletions python/packages/core/agent_framework/_workflows/_handoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,57 @@ def _clone_chat_agent(agent: ChatAgent) -> ChatAgent:

@dataclass
class HandoffUserInputRequest:
"""Request message emitted when the workflow needs fresh user input."""
"""Request message emitted when the workflow needs fresh user input.

Note: The conversation field is intentionally excluded from checkpoint serialization
to prevent duplication. The conversation is preserved in the coordinator's state
and will be reconstructed on restore. See issue #2667.
"""

conversation: list[ChatMessage]
awaiting_agent_id: str
prompt: str
source_executor_id: str

def to_dict(self) -> dict[str, Any]:
"""Serialize to dict, excluding conversation to prevent checkpoint duplication.

The conversation is already preserved in the workflow coordinator's state.
Including it here would cause duplicate messages when restoring from checkpoint.
"""
return {
"awaiting_agent_id": self.awaiting_agent_id,
"prompt": self.prompt,
"source_executor_id": self.source_executor_id,
}

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "HandoffUserInputRequest":
"""Deserialize from dict, initializing conversation as empty.

The conversation will be reconstructed from the coordinator's state on restore.
"""
return cls(
conversation=[],
awaiting_agent_id=data["awaiting_agent_id"],
prompt=data["prompt"],
source_executor_id=data["source_executor_id"],
)


@dataclass
class _ConversationWithUserInput:
"""Internal message carrying full conversation + new user messages from gateway to coordinator."""
"""Internal message carrying full conversation + new user messages from gateway to coordinator.

Attributes:
full_conversation: The conversation messages to process.
is_post_restore: If True, indicates this message was created after a checkpoint restore.
The coordinator should append these messages to its existing conversation rather
than replacing it. This prevents duplicate messages (see issue #2667).
"""

full_conversation: list[ChatMessage] = field(default_factory=lambda: []) # type: ignore[misc]
is_post_restore: bool = False


@dataclass
Expand Down Expand Up @@ -439,9 +477,25 @@ async def handle_user_input(
message: _ConversationWithUserInput,
ctx: WorkflowContext[AgentExecutorRequest, list[ChatMessage]],
) -> None:
"""Receive full conversation with new user input from gateway, update history, trim for agent."""
# Update authoritative conversation
self._conversation = list(message.full_conversation)
"""Receive user input from gateway, update history, and route to agent.

The message.full_conversation may contain:
- Full conversation history + new user messages (normal flow)
- Only new user messages (post-checkpoint-restore flow, see issue #2667)

The gateway sets message.is_post_restore=True when resuming after a checkpoint
restore. In that case, we append the new messages to the existing conversation
rather than replacing it.
"""
incoming = message.full_conversation

if message.is_post_restore and self._conversation:
# Post-restore: append new user messages to existing conversation
# The coordinator already has its conversation restored from checkpoint
self._conversation.extend(incoming)
else:
# Normal flow: replace with full conversation
self._conversation = list(incoming) if incoming else self._conversation

# Reset autonomous turn counter on new user input
self._autonomous_turns = 0
Expand Down Expand Up @@ -626,15 +680,24 @@ async def resume_from_user(
response: object,
ctx: WorkflowContext[_ConversationWithUserInput],
) -> None:
"""Convert user input responses back into chat messages and resume the workflow."""
# Reconstruct full conversation with new user input
conversation = list(original_request.conversation)
"""Convert user input responses back into chat messages and resume the workflow.

After checkpoint restore, original_request.conversation will be empty (not serialized
to prevent duplication - see issue #2667). In this case, we send only the new user
messages and let the coordinator append them to its already-restored conversation.
"""
user_messages = _as_user_messages(response)
conversation.extend(user_messages)

# Send full conversation back to coordinator (not trimmed)
# Coordinator will update its authoritative history and trim for agent
message = _ConversationWithUserInput(full_conversation=conversation)
if original_request.conversation:
# Normal flow: have conversation history from the original request
conversation = list(original_request.conversation)
conversation.extend(user_messages)
message = _ConversationWithUserInput(full_conversation=conversation, is_post_restore=False)
else:
# Post-restore flow: conversation was not serialized, send only new user messages
# The coordinator will append these to its already-restored conversation
message = _ConversationWithUserInput(full_conversation=user_messages, is_post_restore=True)

await ctx.send_message(message, target_id="handoff-coordinator")


Expand Down
Loading
Loading