diff --git a/README.md b/README.md index 157ae0c596..6fb4a680f0 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ The framework is provider-agnostic (any LLM via LiteLLM), configuration-driven ( **Agent Orchestration** -Define agents with roles, models, and tools. The engine handles task decomposition, routing, execution loops (ReAct, Plan-and-Execute, auto-selection by complexity), crash recovery (checkpoint resume), and multi-agent coordination. +Define agents with roles, models, and tools. The engine handles task decomposition, routing, execution loops (ReAct, Plan-and-Execute, Hybrid, auto-selection by complexity), crash recovery (checkpoint resume), and multi-agent coordination. diff --git a/docs/design/engine.md b/docs/design/engine.md index 92ebc0c2a7..d142ccd028 100644 --- a/docs/design/engine.md +++ b/docs/design/engine.md @@ -442,6 +442,9 @@ composes the execution loop with prompt construction, context management, tool invocation, and cost tracking into a single `run()` call. When an `auto_loop_config` is provided (mutually exclusive with `execution_loop`), the engine dynamically selects the loop per task via `_resolve_loop()`. +Optional `plan_execute_config`, `hybrid_loop_config`, and +`compaction_callback` are forwarded to the auto-selected loop so it +receives the same configuration as a statically configured loop. The engine also exposes an optional ``coordinate()`` method that delegates to a ``MultiAgentCoordinator`` when one is configured (see :doc:`coordination`). @@ -486,7 +489,10 @@ async run( Budget-aware downgrade: hybrid is downgraded to plan_execute when utilization >= threshold. Optional hybrid fallback applies when `hybrid_fallback` is configured. When no auto config is set, uses - the statically configured loop. + the statically configured loop. The auto-selected loop receives the + engine's `compaction_callback`, `plan_execute_config` (for + plan-execute), and `hybrid_loop_config` (for hybrid), along with the + approval gate and stagnation detector. 9. **Delegate to loop** -- calls `ExecutionLoop.execute()` with context, provider, tool invoker, budget checker, and completion config. If `timeout_seconds` is set, wraps the call in `asyncio.wait`; on expiry diff --git a/src/synthorg/engine/agent_engine.py b/src/synthorg/engine/agent_engine.py index 8ec418a1e1..eb04b628b0 100644 --- a/src/synthorg/engine/agent_engine.py +++ b/src/synthorg/engine/agent_engine.py @@ -96,6 +96,7 @@ from synthorg.budget.tracker import CostTracker from synthorg.core.agent import AgentIdentity from synthorg.core.task import Task + from synthorg.engine.compaction import CompactionCallback from synthorg.engine.coordination.models import ( CoordinationContext, CoordinationResult, @@ -107,6 +108,7 @@ ExecutionLoop, ShutdownChecker, ) + from synthorg.engine.plan_models import PlanExecuteConfig from synthorg.engine.stagnation.protocol import StagnationDetector from synthorg.engine.task_engine import TaskEngine from synthorg.persistence.repositories import ( @@ -152,11 +154,24 @@ class AgentEngine: enhanced in-flight budget checking. security_config: Optional security subsystem configuration. approval_store: Optional approval queue store. + parked_context_repo: Optional repository for parking + execution contexts during approval escalation. task_engine: Optional centralized task engine for real-time status sync (incremental transitions at each lifecycle point, best-effort). + checkpoint_repo: Optional checkpoint repository for + persisting execution state at turn boundaries. + Must be paired with ``heartbeat_repo``. + heartbeat_repo: Optional heartbeat repository for + crash detection during execution. Must be paired + with ``checkpoint_repo``. + checkpoint_config: Checkpoint tuning (interval, max size). + Defaults to ``CheckpointConfig()``. coordinator: Optional multi-agent coordinator for delegated coordination via :meth:`coordinate`. + stagnation_detector: Optional detector for repetitive + tool-call patterns. Wired into the execution loop + when using auto-selection or the default loop. auto_loop_config: Optional auto-loop selection configuration. Selects the execution loop per-task based on complexity and budget state. Mutually exclusive with @@ -164,6 +179,15 @@ class AgentEngine: hybrid_loop_config: Optional configuration for the hybrid plan+ReAct loop. Passed to ``build_execution_loop`` when auto-selection picks ``"hybrid"``. + compaction_callback: Optional async callback invoked at turn + boundaries to compress older conversation turns. Passed + to the execution loop (both static default and + auto-selected). When ``execution_loop`` is provided + directly, the caller is responsible for wiring this + callback into the loop. + plan_execute_config: Optional configuration for the + plan-execute loop. Passed to ``build_execution_loop`` + when auto-selection picks ``"plan_execute"``. """ def __init__( # noqa: PLR0913 @@ -188,6 +212,8 @@ def __init__( # noqa: PLR0913 stagnation_detector: StagnationDetector | None = None, auto_loop_config: AutoLoopConfig | None = None, hybrid_loop_config: HybridLoopConfig | None = None, + compaction_callback: CompactionCallback | None = None, + plan_execute_config: PlanExecuteConfig | None = None, ) -> None: if execution_loop is not None and auto_loop_config is not None: msg = "execution_loop and auto_loop_config are mutually exclusive" @@ -202,17 +228,22 @@ def __init__( # noqa: PLR0913 self._stagnation_detector = stagnation_detector self._auto_loop_config = auto_loop_config self._hybrid_loop_config = hybrid_loop_config + self._compaction_callback = compaction_callback + self._plan_execute_config = plan_execute_config self._approval_gate = self._make_approval_gate() if execution_loop is not None and ( - self._approval_gate is not None or self._stagnation_detector is not None + self._approval_gate is not None + or self._stagnation_detector is not None + or self._compaction_callback is not None ): logger.warning( APPROVAL_GATE_LOOP_WIRING_WARNING, note=( - "execution_loop provided externally — approval_gate " - "and stagnation_detector will NOT be wired " - "automatically. Configure the loop with " - "approval_gate= and stagnation_detector= explicitly." + "execution_loop provided externally -- approval_gate, " + "stagnation_detector, and compaction_callback will NOT " + "be wired automatically. Configure the loop with " + "approval_gate=, stagnation_detector=, and " + "compaction_callback= explicitly." ), ) self._loop: ExecutionLoop = execution_loop or self._make_default_loop() @@ -259,6 +290,9 @@ def __init__( # noqa: PLR0913 has_cost_tracker=self._cost_tracker is not None, has_budget_enforcer=self._budget_enforcer is not None, has_coordinator=self._coordinator is not None, + has_compaction_callback=self._compaction_callback is not None, + has_plan_execute_config=self._plan_execute_config is not None, + has_hybrid_loop_config=self._hybrid_loop_config is not None, ) @property @@ -1002,6 +1036,7 @@ def _make_default_loop(self) -> ReactLoop: return ReactLoop( approval_gate=self._approval_gate, stagnation_detector=self._stagnation_detector, + compaction_callback=self._compaction_callback, ) async def _resolve_loop( @@ -1015,11 +1050,6 @@ async def _resolve_loop( When ``auto_loop_config`` is set, selects the loop based on task complexity and optional budget state. Otherwise returns the statically configured loop (``self._loop``). - - Note: auto-selected loops use default ``PlanExecuteConfig`` - and do not receive a compaction callback. Provide an - ``execution_loop`` directly for custom plan-execute config - or compaction. """ if self._auto_loop_config is None: return self._loop @@ -1070,6 +1100,8 @@ async def _resolve_loop( loop_type, approval_gate=self._approval_gate, stagnation_detector=self._stagnation_detector, + compaction_callback=self._compaction_callback, + plan_execute_config=self._plan_execute_config, hybrid_loop_config=self._hybrid_loop_config, ) diff --git a/src/synthorg/engine/checkpoint/resume.py b/src/synthorg/engine/checkpoint/resume.py index d333baa680..a733cc49d4 100644 --- a/src/synthorg/engine/checkpoint/resume.py +++ b/src/synthorg/engine/checkpoint/resume.py @@ -11,6 +11,7 @@ from synthorg.engine.checkpoint.callback_factory import make_checkpoint_callback from synthorg.engine.checkpoint.models import CheckpointConfig # noqa: TC001 from synthorg.engine.context import AgentContext +from synthorg.engine.hybrid_loop import HybridLoop from synthorg.engine.plan_execute_loop import PlanExecuteLoop from synthorg.engine.react_loop import ReactLoop from synthorg.observability import get_logger @@ -136,6 +137,14 @@ def make_loop_with_callback( # noqa: PLR0913 stagnation_detector=loop.stagnation_detector, compaction_callback=loop.compaction_callback, ) + if isinstance(loop, HybridLoop): + return HybridLoop( + config=loop.config, + checkpoint_callback=callback, + approval_gate=loop.approval_gate, + stagnation_detector=loop.stagnation_detector, + compaction_callback=loop.compaction_callback, + ) logger.warning( CHECKPOINT_UNSUPPORTED_LOOP, loop_type=type(loop).__name__, diff --git a/tests/unit/engine/test_agent_engine_auto_loop.py b/tests/unit/engine/test_agent_engine_auto_loop.py index 67f2e5069a..c3337a5202 100644 --- a/tests/unit/engine/test_agent_engine_auto_loop.py +++ b/tests/unit/engine/test_agent_engine_auto_loop.py @@ -14,13 +14,18 @@ from synthorg.core.task import Task from synthorg.engine.agent_engine import AgentEngine from synthorg.engine.context import AgentContext +from synthorg.engine.hybrid_loop import HybridLoop +from synthorg.engine.hybrid_models import HybridLoopConfig from synthorg.engine.loop_selector import AutoLoopConfig +from synthorg.engine.plan_execute_loop import PlanExecuteLoop +from synthorg.engine.plan_models import PlanExecuteConfig from synthorg.engine.react_loop import ReactLoop from synthorg.engine.run_result import AgentRunResult from synthorg.observability.events.execution import ( EXECUTION_LOOP_AUTO_SELECTED, EXECUTION_LOOP_BUDGET_UNAVAILABLE, ) +from synthorg.providers.models import CompletionResponse if TYPE_CHECKING: from .conftest import MockCompletionProvider @@ -37,10 +42,11 @@ def _make_task_with_complexity( *, complexity: Complexity, agent_id: str, + task_id: str = "task-auto-001", ) -> Task: """Build a task with specific complexity for auto-loop tests.""" return Task( - id="task-auto-001", + id=task_id, title="Auto-loop test task", description="A task for testing auto-loop selection.", type=TaskType.DEVELOPMENT, @@ -52,6 +58,29 @@ def _make_task_with_complexity( ) +def _make_plan_exec_responses() -> list[CompletionResponse]: + """Build provider responses for a plan-execute loop run.""" + return [ + _make_completion_response( + content="1. Implement the feature\nExpected: Feature works correctly", + ), + _make_completion_response(content="Done."), + ] + + +def _make_hybrid_responses() -> list[CompletionResponse]: + """Build provider responses for a hybrid loop run.""" + return [ + _make_completion_response( + content="1. Implement the feature\nExpected: Feature works correctly", + ), + _make_completion_response(content="Done."), + _make_completion_response( + content='{"summary": "Done", "replan": false}', + ), + ] + + def _make_budget_enforcer() -> BudgetEnforcer: """Build a BudgetEnforcer with standard test config. @@ -108,12 +137,7 @@ async def test_medium_task_uses_plan_execute( sample_agent_with_personality: AgentIdentity, mock_provider_factory: type[MockCompletionProvider], ) -> None: - # Plan-execute needs: 1 planning response + 1 execution response - plan_response = _make_completion_response( - content=("1. Implement the feature\nExpected: Feature works correctly"), - ) - exec_response = _make_completion_response(content="Done.") - provider = mock_provider_factory([plan_response, exec_response]) + provider = mock_provider_factory(_make_plan_exec_responses()) engine = AgentEngine( provider=provider, auto_loop_config=AutoLoopConfig(), @@ -170,11 +194,7 @@ async def test_complex_tight_budget_uses_plan_execute( mock_provider_factory: type[MockCompletionProvider], ) -> None: """Complex + tight budget => plan_execute (not hybrid).""" - plan_response = _make_completion_response( - content=("1. Implement the feature\nExpected: Feature works correctly"), - ) - exec_response = _make_completion_response(content="Done.") - provider = mock_provider_factory([plan_response, exec_response]) + provider = mock_provider_factory(_make_plan_exec_responses()) enforcer = _make_budget_enforcer() @@ -217,16 +237,7 @@ async def test_complex_ok_budget_uses_hybrid( mock_provider_factory: type[MockCompletionProvider], ) -> None: """Complex + OK budget => hybrid loop selected.""" - plan_response = _make_completion_response( - content=("1. Implement the feature\nExpected: Feature works correctly"), - ) - exec_response = _make_completion_response(content="Done.") - summary_response = _make_completion_response( - content='{"summary": "Done", "replan": false}', - ) - provider = mock_provider_factory( - [plan_response, exec_response, summary_response], - ) + provider = mock_provider_factory(_make_hybrid_responses()) enforcer = _make_budget_enforcer() @@ -277,16 +288,7 @@ async def test_budget_unavailable_still_selects_loop( mock_provider_factory: type[MockCompletionProvider], ) -> None: """Budget utilization unknown => proceeds without downgrade.""" - plan_response = _make_completion_response( - content=("1. Implement the feature\nExpected: Feature works correctly"), - ) - exec_response = _make_completion_response(content="Done.") - summary_response = _make_completion_response( - content='{"summary": "Done", "replan": false}', - ) - provider = mock_provider_factory( - [plan_response, exec_response, summary_response], - ) + provider = mock_provider_factory(_make_hybrid_responses()) enforcer = _make_budget_enforcer() @@ -376,10 +378,211 @@ async def test_execute_resumed_loop_calls_resolve_loop( str(task.id), ) - # _resolve_loop was called with the checkpoint's task + # _resolve_loop was called with the checkpoint's task + IDs resolve_mock.assert_awaited_once() - call_task = resolve_mock.call_args[0][0] + call_args = resolve_mock.call_args + call_task = call_args[0][0] assert call_task.estimated_complexity == Complexity.MEDIUM + assert call_args[0][1] == str(sample_agent_with_personality.id) + assert call_args[0][2] == str(task.id) # The resolved loop instance was actually executed resolved_loop.execute.assert_awaited_once() + + +# -- Config wiring through auto-selection path ------------------- + + +@pytest.mark.unit +class TestAutoLoopConfigWiring: + """compaction_callback and plan_execute_config are wired through.""" + + async def test_compaction_callback_wired_to_react_via_auto_selection( + self, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """SIMPLE task -> ReactLoop receives compaction_callback.""" + provider = mock_provider_factory([]) + compact_cb = AsyncMock() + engine = AgentEngine( + provider=provider, + auto_loop_config=AutoLoopConfig(), + compaction_callback=compact_cb, + ) + task = _make_task_with_complexity( + complexity=Complexity.SIMPLE, + agent_id="agent-wire-001", + task_id="task-wire-001", + ) + loop = await engine._resolve_loop(task, "agent-wire-001", task.id) + assert isinstance(loop, ReactLoop) + assert loop.compaction_callback is compact_cb + + async def test_compaction_callback_wired_to_plan_execute_via_auto_selection( + self, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """MEDIUM task -> PlanExecuteLoop receives compaction_callback.""" + provider = mock_provider_factory([]) + compact_cb = AsyncMock() + engine = AgentEngine( + provider=provider, + auto_loop_config=AutoLoopConfig(), + compaction_callback=compact_cb, + ) + task = _make_task_with_complexity( + complexity=Complexity.MEDIUM, + agent_id="agent-wire-002", + task_id="task-wire-002", + ) + loop = await engine._resolve_loop(task, "agent-wire-002", task.id) + assert isinstance(loop, PlanExecuteLoop) + assert loop.compaction_callback is compact_cb + + async def test_compaction_callback_wired_to_hybrid_via_auto_selection( + self, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """COMPLEX task + OK budget -> HybridLoop receives compaction_callback.""" + provider = mock_provider_factory([]) + compact_cb = AsyncMock() + enforcer = _make_budget_enforcer() + engine = AgentEngine( + provider=provider, + auto_loop_config=AutoLoopConfig(), + compaction_callback=compact_cb, + budget_enforcer=enforcer, + ) + task = _make_task_with_complexity( + complexity=Complexity.COMPLEX, + agent_id="agent-wire-003", + task_id="task-wire-003", + ) + with patch.object( + enforcer, + "get_budget_utilization_pct", + new_callable=AsyncMock, + return_value=30.0, + ): + loop = await engine._resolve_loop(task, "agent-wire-003", task.id) + assert isinstance(loop, HybridLoop) + assert loop.compaction_callback is compact_cb + + async def test_plan_execute_config_wired_via_auto_selection( + self, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """MEDIUM task -> PlanExecuteLoop receives plan_execute_config.""" + provider = mock_provider_factory([]) + pe_config = PlanExecuteConfig(max_replans=7) + engine = AgentEngine( + provider=provider, + auto_loop_config=AutoLoopConfig(), + plan_execute_config=pe_config, + ) + task = _make_task_with_complexity( + complexity=Complexity.MEDIUM, + agent_id="agent-wire-004", + task_id="task-wire-004", + ) + loop = await engine._resolve_loop(task, "agent-wire-004", task.id) + assert isinstance(loop, PlanExecuteLoop) + assert loop.config.max_replans == 7 + + def test_compaction_callback_wired_to_default_loop( + self, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """Without auto_loop_config, default ReactLoop receives callback.""" + provider = mock_provider_factory([]) + compact_cb = MagicMock() + engine = AgentEngine( + provider=provider, + compaction_callback=compact_cb, + ) + assert isinstance(engine._loop, ReactLoop) + assert engine._loop.compaction_callback is compact_cb + + def test_compaction_callback_defaults_to_none( + self, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """Omitting compaction_callback leaves loop attribute None.""" + provider = mock_provider_factory([]) + engine = AgentEngine(provider=provider) + assert isinstance(engine._loop, ReactLoop) + assert engine._loop.compaction_callback is None + + async def test_hybrid_loop_config_wired_via_auto_selection( + self, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """COMPLEX task + OK budget -> HybridLoop receives hybrid_loop_config.""" + provider = mock_provider_factory([]) + hl_config = HybridLoopConfig(max_plan_steps=3, max_turns_per_step=8) + enforcer = _make_budget_enforcer() + engine = AgentEngine( + provider=provider, + auto_loop_config=AutoLoopConfig(), + hybrid_loop_config=hl_config, + budget_enforcer=enforcer, + ) + task = _make_task_with_complexity( + complexity=Complexity.COMPLEX, + agent_id="agent-wire-005", + task_id="task-wire-005", + ) + with patch.object( + enforcer, + "get_budget_utilization_pct", + new_callable=AsyncMock, + return_value=30.0, + ): + loop = await engine._resolve_loop(task, "agent-wire-005", task.id) + assert isinstance(loop, HybridLoop) + assert loop.config.max_plan_steps == 3 + assert loop.config.max_turns_per_step == 8 + + async def test_plan_execute_config_defaults_when_none( + self, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """Omitting plan_execute_config uses default PlanExecuteConfig.""" + provider = mock_provider_factory([]) + engine = AgentEngine( + provider=provider, + auto_loop_config=AutoLoopConfig(), + ) + task = _make_task_with_complexity( + complexity=Complexity.MEDIUM, + agent_id="agent-wire-006", + task_id="task-wire-006", + ) + loop = await engine._resolve_loop(task, "agent-wire-006", task.id) + assert isinstance(loop, PlanExecuteLoop) + default_config = PlanExecuteConfig() + assert loop.config.max_replans == default_config.max_replans + + async def test_both_compaction_and_plan_config_wired_simultaneously( + self, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """Both compaction_callback and plan_execute_config wired together.""" + provider = mock_provider_factory([]) + compact_cb = AsyncMock() + pe_config = PlanExecuteConfig(max_replans=5) + engine = AgentEngine( + provider=provider, + auto_loop_config=AutoLoopConfig(), + compaction_callback=compact_cb, + plan_execute_config=pe_config, + ) + task = _make_task_with_complexity( + complexity=Complexity.MEDIUM, + agent_id="agent-wire-007", + task_id="task-wire-007", + ) + loop = await engine._resolve_loop(task, "agent-wire-007", task.id) + assert isinstance(loop, PlanExecuteLoop) + assert loop.compaction_callback is compact_cb + assert loop.config.max_replans == 5 diff --git a/tests/unit/engine/test_loop_selector.py b/tests/unit/engine/test_loop_selector.py index b57108ef41..9e45b75f81 100644 --- a/tests/unit/engine/test_loop_selector.py +++ b/tests/unit/engine/test_loop_selector.py @@ -1,5 +1,7 @@ """Unit tests for execution loop auto-selection.""" +from unittest.mock import MagicMock + import pytest import structlog.testing from pydantic import ValidationError @@ -379,8 +381,6 @@ def test_build_plan_execute(self) -> None: assert loop.get_loop_type() == "plan_execute" def test_build_react_with_gates(self) -> None: - from unittest.mock import MagicMock - gate = MagicMock() detector = MagicMock() loop = build_execution_loop( @@ -421,8 +421,6 @@ def test_build_hybrid_with_config(self) -> None: assert loop.config.max_turns_per_step == 10 def test_build_hybrid_with_gates(self) -> None: - from unittest.mock import MagicMock - gate = MagicMock() detector = MagicMock() ckpt_cb = MagicMock() @@ -440,6 +438,26 @@ def test_build_hybrid_with_gates(self) -> None: assert loop._checkpoint_callback is ckpt_cb assert loop.compaction_callback is compact_cb + def test_build_react_with_compaction_callback(self) -> None: + """ReactLoop receives compaction_callback when provided.""" + compact_cb = MagicMock() + loop = build_execution_loop( + "react", + compaction_callback=compact_cb, + ) + assert isinstance(loop, ReactLoop) + assert loop.compaction_callback is compact_cb + + def test_build_plan_execute_with_compaction_callback(self) -> None: + """PlanExecuteLoop receives compaction_callback when provided.""" + compact_cb = MagicMock() + loop = build_execution_loop( + "plan_execute", + compaction_callback=compact_cb, + ) + assert isinstance(loop, PlanExecuteLoop) + assert loop.compaction_callback is compact_cb + def test_unknown_type_raises(self) -> None: with pytest.raises(ValueError, match="Unknown loop type"): build_execution_loop("nonexistent")