diff --git a/CLAUDE.md b/CLAUDE.md index ace929e1d8..7c6ca59b46 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -79,8 +79,8 @@ PYTHONPATH=. uv run zensical build # docs - Two phases: **construction** (`create_app` body) wires synchronous services; **on_startup** (`_build_lifecycle.on_startup`) wires services that need a connected persistence backend. - Construction-phase ordering invariants: `agent_registry` must be built BEFORE `auto_wire_meetings`; `tunnel_provider` is wired unconditionally (not gated by `integrations.enabled`). - On-startup ordering invariants: `SettingsService` auto-wire must precede `WorkflowExecutionObserver` registration (so it picks up resolver-driven `max_subworkflow_depth` instead of the seed default); `OntologyService` wires after `persistence.connect()` via `_wire_ontology_service`. -- Worker execution service: `synthorg.workers.runtime_builder.build_worker_execution_service` selects behind the provider-present switch (`AgentEngineExecutionService` with a provider, `NoProviderExecutionService` empty-company backstop) and installs via the `AppState.worker_execution_service` seam. The boot install hook is appended FIRST after the persistence/SettingsService hooks so the once-only `set_worker_execution_service` cannot lose the race with the property's lazy `LifecycleAdvancingExecutionService` default. Empty-company also rejects task creation at the controller (`AgentRuntimeNotConfiguredError`, 4014). `swap_worker_execution_service` / `swap_provider_registry` hold a lock (synchronised against lazy reads). -- Setup completion: `post_setup_reinit()` (provider reload, agent bootstrap, AND worker-execution-service rebuild + hot-swap, defined in `src/synthorg/api/controllers/setup/agent_helpers.py`) propagates failures, and `settings_svc.set("api", "setup_complete", "true")` only runs if reinit returns clean. The whole check/validate/reinit/persist sequence is serialised under `COMPLETE_LOCK` in the same module so two concurrent `/setup/complete` requests cannot race on the flag write. A half-configured runtime presenting itself as "complete" is worse than a clear error the operator can retry after fixing the underlying provider config. +- Runtime services: `synthorg.workers.runtime_builder.build_runtime_services` selects behind ONE provider-present switch and returns a `RuntimeServices` pair (worker execution service + multi-agent coordinator) built from a SINGLE shared boot `AgentEngine`: `AgentEngineExecutionService` + a `build_coordinator(...)` coordinator with a provider, `NoProviderExecutionService` + `None` coordinator as the empty-company backstop. The `_install_runtime_services` boot hook installs both via the `AppState.worker_execution_service` and `AppState.coordinator` seams; it is appended FIRST after the persistence/SettingsService hooks so the once-only `set_worker_execution_service` / `set_coordinator` cannot lose the race with the worker property's lazy `LifecycleAdvancingExecutionService` default. Empty-company rejects task creation at the controller (`AgentRuntimeNotConfiguredError`, 4014) and `/coordinate` honestly 503s (no coordinator). `swap_worker_execution_service` / `swap_coordinator` / `swap_provider_registry` hold a lock (synchronised against lazy reads). +- Setup completion: `post_setup_reinit()` (provider reload, agent bootstrap, AND runtime-services rebuild + dual hot-swap of the worker execution service and coordinator, defined in `src/synthorg/api/controllers/setup/agent_helpers.py`) propagates failures, and `settings_svc.set("api", "setup_complete", "true")` only runs if reinit returns clean. The whole check/validate/reinit/persist sequence is serialised under `COMPLETE_LOCK` in the same module so two concurrent `/setup/complete` requests cannot race on the flag write. A half-configured runtime presenting itself as "complete" is worse than a clear error the operator can retry after fixing the underlying provider config. ## MCP / Telemetry / Resilience diff --git a/README.md b/README.md index 42b18f4ddd..9a8208b114 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ These are the capabilities that make SynthOrg an autonomous studio. They are des - **Best-in-class operate tier**: a golden-company benchmark, mission control with run replay, a cost forecast/kill-switch dial, a measurable learning curve, deterministic replay, run narratives, and an adversarial red-team. - **Agent capability layer**: a knowledge and provenance retrieval substrate, research mode, continual improvement, governed external API access, headless-browser and virtual-desktop testing, and more. -Until the agent runtime lands, multi-agent coordination, coordination metrics, autonomy/trust enforcement on a live run, and the self-improvement loop are designed and unit-tested but not exercised end to end. The design for each lives in the [Design Specification](https://synthorg.io/docs/design/). +The multi-agent coordinator runs end to end behind the provider-present switch (decompose, route, parallel execution, rollup; `/coordinate` returns a real result when a provider is configured). Coordination metrics, autonomy/trust enforcement on a live run, and the self-improvement loop are designed and unit-tested but not yet exercised end to end. The design for each lives in the [Design Specification](https://synthorg.io/docs/design/). ## Quick Start diff --git a/docs/design/coordination.md b/docs/design/coordination.md index 66bb1ce213..81c84d1482 100644 --- a/docs/design/coordination.md +++ b/docs/design/coordination.md @@ -7,7 +7,7 @@ description: Agent crash recovery, graceful shutdown protocol, concurrent worksp !!! warning "Designed behaviour; runtime in active development" - This page is the source of truth for the **designed** behaviour of this subsystem. Multi-agent coordination is not wired into a running product yet (the `/coordinate` path is not active); this is in active development (see the [Roadmap](../roadmap/index.md)). The code described here is built and unit-tested as components but not yet run by a live agent. + This page is the source of truth for the **designed** behaviour of this subsystem. The multi-agent coordinator is wired at boot behind the provider-present switch: with a provider configured, `/coordinate` runs decompose, route, parallel execution, then rollup end to end; an empty company (no provider) still returns 503. The surrounding resilience features on this page (crash recovery with checkpoint resume, graceful shutdown, the self-improvement loop) remain in active development (see the [Roadmap](../roadmap/index.md)). This page covers system-level features that span multiple agents and protect against failure: crash recovery with checkpoint resume, graceful shutdown strategies, concurrent workspace isolation (Git worktrees / virtual filesystem / per-branch), and multi-agent coordination topology (centralized, decentralized, context-dependent dispatchers). diff --git a/scripts/_ghost_wiring_manifest.txt b/scripts/_ghost_wiring_manifest.txt index 80620b8000..be62846eaa 100644 --- a/scripts/_ghost_wiring_manifest.txt +++ b/scripts/_ghost_wiring_manifest.txt @@ -24,7 +24,7 @@ # instantiated/called). Bare name match on ast.Call(func=Name|Attribute). ENFORCED AgentEngine #1956 -- runtime root; construct at boot behind the provider switch -PENDING build_coordinator #1958 -- call at boot to populate app_state.coordinator +ENFORCED build_coordinator #1958 -- called by workers.runtime_builder.build_runtime_services behind the provider switch PENDING BaselineStore #1959 -- construct at boot (window from budget.baseline_window_size) PENDING CoordinationMetricsCollector #1959 -- construct at boot, thread into execution ENFORCED IntakeEngine #1961 -- wired at boot via client/runtime_builder.build_client_simulation_runtime diff --git a/src/synthorg/api/app.py b/src/synthorg/api/app.py index 65fe388b49..1b2aa2b185 100644 --- a/src/synthorg/api/app.py +++ b/src/synthorg/api/app.py @@ -992,25 +992,31 @@ def create_app( # noqa: C901, PLR0912, PLR0913, PLR0915 effective_config=effective_config, ) - _worker_service_installed = False + _runtime_services_installed = False - async def _install_worker_execution_service() -> None: - # Installs the worker execution service behind the - # provider-present switch. Appended first (runs immediately + async def _install_runtime_services() -> None: + # Installs the worker execution service AND the multi-agent + # coordinator behind the single provider-present switch, both + # sharing one boot AgentEngine. Appended first (runs immediately # after the core startup hooks that connect persistence and # wire SettingsService / ConfigResolver), and before any other # appended hook, so the once-only ``set_worker_execution_service`` - # cannot lose a race with the property's lazy lifecycle-only - # default. With no provider this installs the empty-company - # backstop; a provider added later swaps in the live service via - # ``post_setup_reinit`` (no restart). The closure flag keeps the - # one-shot ``set_`` idempotent across a lifespan re-entry - # (shared-app test fixtures), mirroring ``_wire_chief_of_staff_chat``. - nonlocal _worker_service_installed - if _worker_service_installed: + # / ``set_coordinator`` cannot lose a race with the + # worker-service property's lazy lifecycle-only default. With no + # provider this installs the empty-company backstop and no + # coordinator (``/coordinate`` honestly 503s); a provider added + # later swaps both in via ``post_setup_reinit`` (no restart). The + # closure flag keeps the one-shot ``set_`` calls idempotent + # across a lifespan re-entry (shared-app test fixtures), + # mirroring ``_wire_chief_of_staff_chat``. + nonlocal _runtime_services_installed + if _runtime_services_installed: return + from synthorg.engine.errors import ( # noqa: PLC0415 + RuntimeServicesBuildError, + ) from synthorg.workers.runtime_builder import ( # noqa: PLC0415 - build_worker_execution_service, + build_runtime_services, ) # Pin the sandbox workspace onto the mounted data volume in an @@ -1022,7 +1028,7 @@ async def _install_worker_execution_service() -> None: app_state.set_agent_workspace_root(env_workspace_root) try: - service = await build_worker_execution_service( + services = await build_runtime_services( app_state, workspace_root=app_state.agent_workspace_root, ) @@ -1031,16 +1037,28 @@ async def _install_worker_execution_service() -> None: except Exception as exc: logger.error( API_APP_STARTUP, - service="worker_execution_service", - note="failed to build the worker execution service at boot", + service="runtime_services", + note="failed to build the runtime services at boot", + provider_present=app_state.has_active_provider, error_type=type(exc).__name__, error=safe_error_description(exc), ) - raise - app_state.set_worker_execution_service(service) - _worker_service_installed = True - - startup = [*startup, _install_worker_execution_service] + msg = "Runtime services failed to build at boot" + raise RuntimeServicesBuildError(msg) from exc + app_state.set_worker_execution_service( + services.worker_execution_service, + ) + # An explicitly injected coordinator (``create_app(coordinator=)`` + # in tests / custom DI) wins over the autowired one, matching the + # injection-over-autowire convention used across ``create_app``. + # ``set_coordinator_if_absent`` makes the check-and-set atomic in + # the seam (no boot-time check-then-act), so an injected + # coordinator is kept and the built one is a logged no-op then. + if services.coordinator is not None: + app_state.set_coordinator_if_absent(services.coordinator) + _runtime_services_installed = True + + startup = [*startup, _install_runtime_services] # Project telemetry: build collector (reads SYNTHORG_TELEMETRY_ENABLED env for # opt-in, defaults to disabled). Attach to app_state so the health diff --git a/src/synthorg/api/controllers/setup/agent_helpers.py b/src/synthorg/api/controllers/setup/agent_helpers.py index 1cfe000aba..5be959d97f 100644 --- a/src/synthorg/api/controllers/setup/agent_helpers.py +++ b/src/synthorg/api/controllers/setup/agent_helpers.py @@ -139,31 +139,61 @@ async def post_setup_reinit(app_state: AppState) -> None: ) raise - # 3. Rebuild + hot-swap the worker execution service so a provider - # added after an empty-company start wakes the agent runtime - # live, with no process restart. Raise on failure so the caller - # keeps ``setup_complete=false`` rather than presenting a - # half-configured runtime as complete. + # 3. Rebuild + hot-swap BOTH runtime services so a provider added + # after an empty-company start wakes the whole runtime live. + await _rebuild_runtime_services(app_state) + + +async def _rebuild_runtime_services(app_state: AppState) -> None: + """Rebuild and hot-swap both runtime services (worker execution + coordinator). + + Invoked after provider configuration to bring the full agent runtime + online without a process restart. Swaps the worker execution service + and the multi-agent coordinator so ``/coordinate`` stops returning + 503 and the worker-callable execute endpoint uses the new provider. + + Raises on failure (either a typed ``RuntimeServicesBuildError`` or a + wrapped exception) so :func:`post_setup_reinit` can keep the setup flag + as incomplete. A half-configured runtime reporting itself as complete is + worse than a clear error the operator can retry after fixing the + underlying provider configuration. + """ try: + from synthorg.engine.errors import ( # noqa: PLC0415 + RuntimeServicesBuildError, + ) from synthorg.workers.runtime_builder import ( # noqa: PLC0415 - build_worker_execution_service, + build_runtime_services, ) - service = await build_worker_execution_service( + services = await build_runtime_services( app_state, workspace_root=app_state.agent_workspace_root, ) - app_state.swap_worker_execution_service(service) + app_state.swap_worker_execution_service( + services.worker_execution_service, + ) + if services.coordinator is not None: + app_state.swap_coordinator(services.coordinator) except MemoryError, RecursionError: raise + except RuntimeServicesBuildError: + # Already a typed domain error (logged at its origin); re-raise + # unchanged so post_setup_reinit keeps setup_complete=false. + raise except Exception as exc: - logger.warning( + # Critical: a provider was configured but the runtime failed to + # wire. ERROR (not WARNING) so monitoring/operator dashboards + # alert; wrapped in a domain error so the /setup/complete + # controller can map it to an actionable status. + logger.error( SETUP_AGENT_BOOTSTRAP_FAILED, - context="worker_execution_service_rebuild", + context="runtime_services_rebuild", error_type=type(exc).__name__, error=safe_error_description(exc), ) - raise + msg = "Runtime services failed to rebuild after provider config" + raise RuntimeServicesBuildError(msg) from exc async def check_needs_admin( diff --git a/src/synthorg/api/state.py b/src/synthorg/api/state.py index 9a572222d9..1794dd11c8 100644 --- a/src/synthorg/api/state.py +++ b/src/synthorg/api/state.py @@ -1149,9 +1149,91 @@ def coordinator(self) -> MultiAgentCoordinator: @property def has_coordinator(self) -> bool: - """Check whether the coordinator is configured.""" + """Check whether the coordinator is configured. + + Unsynchronised by design: a single reference read is atomic + under CPython and ``swap_coordinator`` only ever reassigns one + already-set coordinator for another, so a concurrent reader sees + a consistent old-or-new instance (both non-None). The only + ``None -> set`` flip happens once at boot before HTTP traffic. + Locking this hot read (the ``/coordinate`` gate calls it per + request) would add cost for a benign snapshot. + """ return self._coordinator is not None + def set_coordinator(self, coordinator: MultiAgentCoordinator) -> None: + """Attach the multi-agent coordinator (once-only, boot only). + + Once-only: a second set raises, matching the + ``worker_execution_service`` seam. The boot runtime-services + hook uses :meth:`set_coordinator_if_absent` instead so an + explicitly injected coordinator wins; this strict variant is + retained for callers that require the once-only guarantee. + Hot-reload after setup uses :meth:`swap_coordinator`. + """ + self._set_once("_coordinator", coordinator, "Coordinator") + + def set_coordinator_if_absent( + self, + coordinator: MultiAgentCoordinator, + ) -> bool: + """Attach the coordinator only if none is configured (atomic). + + The boot runtime-services hook calls this unconditionally behind + the provider-present switch, so ``/coordinate`` stops returning + 503 once a provider is configured. An explicitly injected + coordinator (constructor ``coordinator=``) is already set and + wins: this is a logged no-op then. The check-and-set is atomic + under ``_lazy_service_lock`` so the boot install cannot race a + concurrent ``swap_coordinator`` or property read (eliminating the + former check-then-act at the call site). + + Returns: + ``True`` if this call installed the coordinator, ``False`` + if one was already configured (injected) and kept. + """ + with self._lazy_service_lock: + if self._coordinator is not None: + logger.info( + API_APP_STARTUP, + service="coordinator", + transition="skipped_injected", + ) + return False + self._coordinator = coordinator + logger.info( + API_APP_STARTUP, + service="coordinator", + transition="attached", + ) + return True + + def swap_coordinator(self, coordinator: MultiAgentCoordinator) -> None: + """Replace the coordinator (hot-reload). + + Distinct from :meth:`set_coordinator`, which is once-only: this + replaces an already-wired coordinator so a provider configured + against an empty-company start brings ``/coordinate`` online + without a restart (``post_setup_reinit``). Holds + ``_lazy_service_lock`` so the write is synchronised against + concurrent property reads, mirroring + :meth:`swap_worker_execution_service`. + """ + with self._lazy_service_lock: + previous = self._coordinator + if previous is coordinator: + transition = "noop" + elif previous is None: + transition = "attached" + else: + transition = "replaced" + self._coordinator = coordinator + logger.info( + API_APP_STARTUP, + service="coordinator", + transition=transition, + ) + @property def performance_tracker(self) -> PerformanceTracker: """Return performance tracker or raise 503.""" @@ -1160,6 +1242,11 @@ def performance_tracker(self) -> PerformanceTracker: "performance_tracker", ) + @property + def has_performance_tracker(self) -> bool: + """Check whether the performance tracker is configured.""" + return self._performance_tracker is not None + @property def agent_registry(self) -> AgentRegistryService: """Return agent registry or raise 503.""" diff --git a/src/synthorg/core/state_machine.py b/src/synthorg/core/state_machine.py index 63236559fb..355aa6d13a 100644 --- a/src/synthorg/core/state_machine.py +++ b/src/synthorg/core/state_machine.py @@ -21,6 +21,7 @@ def validate_transition(current: TaskStatus, target: TaskStatus) -> None: _MACHINE.validate(current, target) """ +from collections import deque from copy import deepcopy from types import MappingProxyType from typing import TYPE_CHECKING, Protocol @@ -198,3 +199,65 @@ def validate(self, current: S, target: S) -> None: current_state=current.value, target_state=target.value, ) + + def path_to(self, current: S, target: S) -> tuple[S, ...] | None: + """Return the shortest valid hop sequence from ``current`` to ``target``. + + Breadth-first over the transition table, so the result is a + minimal-length path. Used by callers that must drive an entity + through the lifecycle (rather than assert a single hop), e.g. + the coordinator advancing a parent task to its rollup-derived + status when it may still be several valid hops away. + + Args: + current: The current state. + target: The desired state. + + Returns: + ``()`` when ``current == target`` (nothing to do); a tuple + of the intermediate states followed by ``target`` (each hop + individually valid per the table) when a path exists; or + ``None`` when ``current`` is unknown to the table or no path + exists (e.g. ``current`` is terminal and not ``target``). + + Callers must distinguish all three cases explicitly; ``()`` is + falsy but is *not* the same as ``None`` (already-there vs + unreachable). The correct shape is:: + + path = machine.path_to(current, target) + if path is None: + ... # unreachable: surface an error + else: + for hop in path: # empty tuple -> no-op loop + ... + + A plain ``if path:`` is a bug -- it collapses "already there" + and "unreachable" into one branch. + """ + if current == target: + return () + if current not in self._transitions: + return None + # BFS; ``came_from`` maps each discovered state to its + # predecessor so the path can be reconstructed once ``target`` + # is reached. + came_from: dict[S, S] = {} + queue: deque[S] = deque((current,)) + seen: set[S] = {current} + while queue: + state = queue.popleft() + for nxt in self._transitions.get(state, frozenset()): + if nxt in seen: + continue + came_from[nxt] = state + if nxt == target: + hops: list[S] = [target] + cursor = target + while came_from[cursor] != current: + cursor = came_from[cursor] + hops.append(cursor) + hops.reverse() + return tuple(hops) + seen.add(nxt) + queue.append(nxt) + return None diff --git a/src/synthorg/core/task_transitions.py b/src/synthorg/core/task_transitions.py index b29e27d832..803630bf68 100644 --- a/src/synthorg/core/task_transitions.py +++ b/src/synthorg/core/task_transitions.py @@ -98,3 +98,22 @@ def validate_transition(current: TaskStatus, target: TaskStatus) -> None: is not in :data:`VALID_TRANSITIONS`. """ _MACHINE.validate(current, target) + + +def transition_path( + current: TaskStatus, + target: TaskStatus, +) -> tuple[TaskStatus, ...] | None: + """Return the shortest valid hop sequence from *current* to *target*. + + Args: + current: The current task status. + target: The desired task status. + + Returns: + ``()`` when already at *target*; a tuple of intermediate + statuses ending in *target* (each hop individually valid) when + a lifecycle path exists; or ``None`` when *target* is + unreachable from *current* (e.g. *current* is terminal). + """ + return _MACHINE.path_to(current, target) diff --git a/src/synthorg/engine/coordination/group_builder.py b/src/synthorg/engine/coordination/group_builder.py index 0039b9ed7d..64c445fe19 100644 --- a/src/synthorg/engine/coordination/group_builder.py +++ b/src/synthorg/engine/coordination/group_builder.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING +from synthorg.core.enums import TaskStatus from synthorg.engine.decomposition.dag import DependencyGraph from synthorg.engine.errors import CoordinationError from synthorg.engine.parallel_models import ( @@ -131,10 +132,30 @@ def build_execution_waves( if worktree_path: resource_claims = (worktree_path,) + # The decomposition service creates every subtask in + # CREATED. Routing has now selected an agent, so promote the + # subtask to ASSIGNED bound to that agent: the execution + # engine only runs ASSIGNED/IN_PROGRESS tasks whose + # ``assigned_to`` matches the running agent, and without this + # every dispatched sub-agent would be rejected at the engine + # seam (the orchestration would run but no agent work would). + # The CREATED -> ASSIGNED transition is deliberately not + # validated here: this is an in-memory ``model_copy`` for + # dispatch, and the task-engine submit seam enforces the + # state machine when the subtask actually runs. Validating + # eagerly here would reject legitimately re-dispatched + # subtasks that are not in CREATED. + assigned_task = task.model_copy( + update={ + "status": TaskStatus.ASSIGNED, + "assigned_to": str(candidate.agent_identity.id), + } + ) + assignments.append( AgentAssignment( identity=candidate.agent_identity, - task=task, + task=assigned_task, resource_claims=resource_claims, ) ) diff --git a/src/synthorg/engine/coordination/parent_rollup.py b/src/synthorg/engine/coordination/parent_rollup.py new file mode 100644 index 0000000000..ec5b70823d --- /dev/null +++ b/src/synthorg/engine/coordination/parent_rollup.py @@ -0,0 +1,427 @@ +"""Parent-task rollup: walk the parent to its rollup-derived status. + +Extracted from the coordination service so the lifecycle-walk logic is +unit-testable in isolation and the service stays within the file/method +size budget. + +The parent task may have advanced since the coordination context was +captured, and the rollup-derived status is often several valid hops away +(a freshly CREATED parent must pass through ASSIGNED then IN_PROGRESS +before any terminal status; a fully-completed coordination must pass +through IN_REVIEW before COMPLETED). A single blind transition would be +rejected by the task state machine, so the shortest valid path is walked +hop by hop. +""" + +from typing import TYPE_CHECKING, Final, NamedTuple +from uuid import uuid4 + +from synthorg.core.enums import TaskStatus +from synthorg.core.task_transitions import transition_path +from synthorg.engine.coordination.models import CoordinationPhaseResult +from synthorg.engine.task_engine_models import TransitionTaskMutation +from synthorg.observability import get_logger, safe_error_description +from synthorg.observability.events.coordination import ( + COORDINATION_PHASE_COMPLETED, + COORDINATION_PHASE_FAILED, + COORDINATION_PHASE_STARTED, +) + +if TYPE_CHECKING: + from synthorg.core.clock import Clock + from synthorg.engine.coordination.dispatcher_types import DispatchResult + from synthorg.engine.coordination.models import CoordinationContext + from synthorg.engine.decomposition.models import ( + DecompositionResult, + SubtaskStatusRollup, + ) + from synthorg.engine.decomposition.service import DecompositionService + from synthorg.engine.task_engine import TaskEngine + +logger = get_logger(__name__) + +# Requester recorded on coordinator-driven parent-task transitions, and +# the synthetic assignee stamped on the parent when the lifecycle forces +# it through ASSIGNED (the parent is owned by the coordinating context, +# not a single agent -- subtasks carry the real per-agent assignments). +COORDINATOR_ACTOR: Final[str] = "coordinator" + + +class ParentUpdateOutcome(NamedTuple): + """Result of walking the parent task to its rollup-derived status. + + Attributes: + success: ``False`` when no valid lifecycle path exists or a hop + was rejected mid-walk. + error: Operator-readable note when ``success`` is ``False``; on a + mid-walk rejection it includes the parent's actual live + status so concurrent external finalisation is diagnosable. + hops_completed: Number of transitions that landed (``0`` for an + already-at-target no-op, which is still ``success=True``). + """ + + success: bool + error: str | None + hops_completed: int + + +def _hop_overrides(hop: TaskStatus) -> dict[str, object]: + """Per-hop mutation overrides. + + The Task model requires a non-null ``assigned_to`` for ASSIGNED (it + then persists across later hops). The coordinated parent has no + single owning agent, so stamp the coordinator sentinel on the forced + ASSIGNED hop only. + """ + if hop is TaskStatus.ASSIGNED: + return {"assigned_to": COORDINATOR_ACTOR} + return {} + + +async def _hop_failure_note( + task_engine: TaskEngine, + *, + task_id: str, + target_hop: TaskStatus, + submit_error: str | None, +) -> str: + """Build a diagnostic note for a rejected lifecycle hop. + + The task-engine submit seam validates every transition, so a + mid-walk rejection is almost always concurrent external finalisation + of the parent. The real failure (``submit_error``) is always reported; + this best-effort re-read only adds the parent's actual live status so + an operator can see why the hop was rejected. + + A re-read failure is a benign diagnostic-only step: it must not mask + the original error, so the catch is intentionally narrow (only + ``MemoryError`` and ``RecursionError`` propagate per project + convention; all other exceptions are swallowed so the base failure + note is returned unchanged). + """ + base = ( + f"Parent hop to {target_hop.value!r} rejected: " + f"{submit_error or 'unknown error'}" + ) + try: + live = await task_engine.get_task(task_id) + except MemoryError, RecursionError: + raise + except Exception: + return base + if live is None: + return f"{base} (parent no longer found)" + return f"{base} (parent now {live.status.value!r})" + + +async def advance_parent_to_rollup_status( + task_engine: TaskEngine, + *, + task_id: str, + current_status: TaskStatus, + rollup: SubtaskStatusRollup, +) -> ParentUpdateOutcome: + """Walk the parent task to ``rollup.derived_parent_status``. + + Each intermediate hop carries a lifecycle-advance reason so the + status history stays legible; only the final hop carries the rollup + summary. Returns a no-op success when the parent is already at the + derived status (empty path). On a hop rejection the parent stays at + the last successfully applied (individually valid) hop -- a logged + partial advance, not corruption, since the submit seam validates + every transition. + + Args: + task_engine: The task engine seam (validates each transition). + task_id: The parent task id. + current_status: The parent's live status (already re-read by the + caller, so the path starts from reality not a stale snapshot). + rollup: The subtask status rollup; supplies the derived parent + status and the completed/failed counts for the final reason. + + Returns: + A :class:`ParentUpdateOutcome` describing success, the failure + note (if any), and how many hops landed. + """ + target = rollup.derived_parent_status + path = transition_path(current_status, target) + if path is None: + note = ( + f"Parent status {current_status.value!r} cannot reach " + f"rollup status {target.value!r}: no valid lifecycle path " + f"(parent already terminal or externally finalised)" + ) + return ParentUpdateOutcome(success=False, error=note, hops_completed=0) + + rollup_reason = ( + f"Coordination rollup: " + f"{rollup.completed}/{rollup.total} completed, " + f"{rollup.failed}/{rollup.total} failed" + ) + completed_hops = 0 + for index, hop in enumerate(path): + # Every hop except the last carries the generic + # "Coordination lifecycle advance" reason; only the final hop + # (the one that lands the parent on the rollup-derived status) + # carries the completed/failed rollup summary. + is_final = index == len(path) - 1 + mutation = TransitionTaskMutation( + request_id=str(uuid4()), + requested_by=COORDINATOR_ACTOR, + task_id=task_id, + target_status=hop, + reason=(rollup_reason if is_final else "Coordination lifecycle advance"), + overrides=_hop_overrides(hop), + ) + result = await task_engine.submit(mutation) + if not result.success: + note = await _hop_failure_note( + task_engine, + task_id=task_id, + target_hop=hop, + submit_error=result.error, + ) + return ParentUpdateOutcome( + success=False, + error=note, + hops_completed=completed_hops, + ) + completed_hops += 1 + + return ParentUpdateOutcome( + success=True, + error=None, + hops_completed=completed_hops, + ) + + +def _collect_subtask_statuses( + dispatch_result: DispatchResult, + decomp_result: DecompositionResult, +) -> tuple[TaskStatus, ...]: + """Map execution outcomes to per-subtask statuses, including unexecuted. + + Walks the dispatch result waves and extracts a success/failure status + for each executed subtask. Subtasks missing from the waves (unroutable, + blocked by prerequisites, or skipped by fail-fast) are marked as BLOCKED + so the rollup statuses reflect the complete set of expected subtasks, + not only those that reached execution. + + Returns: + Tuple of ``TaskStatus`` values, one per expected subtask in plan + order: ``COMPLETED`` (executed successfully), ``FAILED`` (executed + but raised), or ``BLOCKED`` (never executed). + """ + statuses: list[TaskStatus] = [] + for wave in dispatch_result.waves: + if wave.execution_result is None: + statuses.extend(TaskStatus.BLOCKED for _ in wave.subtask_ids) + continue + statuses.extend( + TaskStatus.COMPLETED if outcome.is_success else TaskStatus.FAILED + for outcome in wave.execution_result.outcomes + ) + missing_count = len(decomp_result.plan.subtasks) - len(statuses) + if missing_count > 0: + statuses.extend(TaskStatus.BLOCKED for _ in range(missing_count)) + return tuple(statuses) + + +def compute_status_rollup( # noqa: PLR0913 + *, + decomposition_service: DecompositionService, + clock: Clock, + context: CoordinationContext, + dispatch_result: DispatchResult, + decomp_result: DecompositionResult, + phases: list[CoordinationPhaseResult], +) -> SubtaskStatusRollup | None: + """Compute and record the subtask status rollup phase. + + Collects subtask execution outcomes, invokes the decomposition service + to compute the rollup, and owns all ``rollup`` phase bookkeeping: + monotonic timing, structured logging (start/complete/failure events), + and accumulation of the phase result. + + Returns: + ``SubtaskStatusRollup`` on success; ``None`` on failure. A failed + computation is recorded as a failed ``CoordinationPhaseResult`` in + the ``phases`` list and a WARNING log entry (so the phase list + surfaces the failure point without re-raising). + """ + start = clock.monotonic() + phase_name = "rollup" + logger.info(COORDINATION_PHASE_STARTED, phase=phase_name) + try: + statuses = _collect_subtask_statuses(dispatch_result, decomp_result) + rollup = decomposition_service.rollup_status(context.task.id, statuses) + except MemoryError, RecursionError: + raise + except Exception as exc: + elapsed = clock.monotonic() - start + logger.warning( + COORDINATION_PHASE_FAILED, + phase=phase_name, + error_type=type(exc).__name__, + error=safe_error_description(exc), + ) + phases.append( + CoordinationPhaseResult( + phase=phase_name, + success=False, + duration_seconds=elapsed, + error=safe_error_description(exc), + ) + ) + return None + + elapsed = clock.monotonic() - start + phases.append( + CoordinationPhaseResult( + phase=phase_name, + success=True, + duration_seconds=elapsed, + ) + ) + logger.info( + COORDINATION_PHASE_COMPLETED, + phase=phase_name, + duration_seconds=elapsed, + ) + return rollup + + +def _fail_update_parent_phase( + phases: list[CoordinationPhaseResult], + *, + clock: Clock, + error: str, + start: float | None, + error_type: str | None = None, +) -> None: + """Log and append an ``update_parent`` phase failure to the result list. + + Args: + phases: Phase result accumulator (mutated in-place). + clock: Clock for duration measurement. + error: Operator-readable failure note. + start: Monotonic clock reading when the phase started, or ``None`` + if the failure occurred before phase timing began (e.g. when + the rollup computation failed). When ``None``, duration is + recorded as ``0.0``. + error_type: Optional exception type name; included in the log if + provided to distinguish the failure source. + """ + elapsed = 0.0 if start is None else clock.monotonic() - start + if error_type is None: + logger.warning(COORDINATION_PHASE_FAILED, phase="update_parent", error=error) + else: + logger.warning( + COORDINATION_PHASE_FAILED, + phase="update_parent", + error_type=error_type, + error=error, + ) + phases.append( + CoordinationPhaseResult( + phase="update_parent", + success=False, + duration_seconds=elapsed, + error=error, + ) + ) + + +def _record_update_parent_outcome( + phases: list[CoordinationPhaseResult], + *, + clock: Clock, + outcome: ParentUpdateOutcome, + start: float, +) -> None: + """Log + append the result of the parent lifecycle walk.""" + elapsed = clock.monotonic() - start + if outcome.success: + logger.info( + COORDINATION_PHASE_COMPLETED, + phase="update_parent", + duration_seconds=elapsed, + hops=outcome.hops_completed, + ) + else: + logger.warning( + COORDINATION_PHASE_FAILED, + phase="update_parent", + error=outcome.error, + hops_completed=outcome.hops_completed, + ) + phases.append( + CoordinationPhaseResult( + phase="update_parent", + success=outcome.success, + duration_seconds=elapsed, + error=outcome.error, + ) + ) + + +async def run_update_parent_phase( + *, + task_engine: TaskEngine | None, + clock: Clock, + context: CoordinationContext, + rollup: SubtaskStatusRollup | None, + phases: list[CoordinationPhaseResult], +) -> None: + """Walk the parent task to its rollup-derived status (phase wrapper). + + The ``update_parent`` phase advances the parent from its current status + to the status derived from the subtask rollup, traversing any required + intermediate lifecycle hops. No exceptions propagate: failures are + recorded as failed ``CoordinationPhaseResult`` entries so the + coordination pipeline completes even when parent update is unavailable. + + No-op when ``task_engine`` is ``None`` (empty company). Fails the phase + (not propagating) when the rollup is missing, the parent is gone, or a + lifecycle hop is rejected (usually concurrent external finalisation). + """ + if task_engine is None: + return + if rollup is None: + _fail_update_parent_phase( + phases, + clock=clock, + error="Skipped -- rollup is None (rollup phase failed)", + start=None, + ) + return + + start = clock.monotonic() + logger.info(COORDINATION_PHASE_STARTED, phase="update_parent") + try: + live_task = await task_engine.get_task(context.task.id) + if live_task is None: + _fail_update_parent_phase( + phases, + clock=clock, + error=f"Parent task {context.task.id!r} not found", + start=start, + ) + return + outcome = await advance_parent_to_rollup_status( + task_engine, + task_id=context.task.id, + current_status=live_task.status, + rollup=rollup, + ) + _record_update_parent_outcome(phases, clock=clock, outcome=outcome, start=start) + except MemoryError, RecursionError: + raise + except Exception as exc: + _fail_update_parent_phase( + phases, + clock=clock, + error=safe_error_description(exc), + start=start, + error_type=type(exc).__name__, + ) diff --git a/src/synthorg/engine/coordination/section_config.py b/src/synthorg/engine/coordination/section_config.py index 2596b191eb..dcadf5b41a 100644 --- a/src/synthorg/engine/coordination/section_config.py +++ b/src/synthorg/engine/coordination/section_config.py @@ -9,7 +9,7 @@ from pydantic import BaseModel, ConfigDict, Field, model_validator from synthorg.core.enums import CoordinationTopology -from synthorg.core.types import NotBlankStr # noqa: TC001 +from synthorg.core.types import NotBlankStr from synthorg.engine.coordination.config import CoordinationConfig from synthorg.engine.routing.models import AutoTopologyConfig from synthorg.settings.enums import SettingNamespace @@ -33,6 +33,8 @@ class CoordinationSectionConfig(BaseModel): enable_workspace_isolation: Create isolated workspaces for multi-agent execution. base_branch: Git branch to use for workspace isolation. + decomposition_model: LLM model identifier for the + coordinator's task decomposition strategy. """ model_config = ConfigDict(frozen=True, extra="forbid", allow_inf_nan=False) @@ -62,6 +64,11 @@ class CoordinationSectionConfig(BaseModel): namespace=SettingNamespace.COORDINATION, key="base_branch", ), + MirrorField( + field="decomposition_model", + namespace=SettingNamespace.COORDINATION, + key="decomposition_model", + ), ) topology: CoordinationTopology = Field( @@ -89,6 +96,17 @@ class CoordinationSectionConfig(BaseModel): default="main", description="Git branch for workspace isolation", ) + decomposition_model: NotBlankStr = Field( + default=NotBlankStr("example-medium-001"), + description=( + "LLM model identifier used by the coordinator's task " + "decomposition strategy. Resolved against the first " + "registered provider at boot. Overrideable via the " + "SYNTHORG_COORDINATION_DECOMPOSITION_MODEL environment " + "variable (precedence: DB > env > this code default), " + "applied on the next coordinator rebuild." + ), + ) @model_validator(mode="before") @classmethod diff --git a/src/synthorg/engine/coordination/service.py b/src/synthorg/engine/coordination/service.py index ecce14f3ac..82e73c3c9d 100644 --- a/src/synthorg/engine/coordination/service.py +++ b/src/synthorg/engine/coordination/service.py @@ -1,19 +1,18 @@ """Multi-agent coordination service. -Orchestrates the end-to-end pipeline: decompose → route → resolve -topology → dispatch (workspace setup → execute waves → merge) → -rollup → update parent task. +Orchestrates: decompose, route, resolve topology, dispatch, rollup, +update parent. Rollup + parent lifecycle walk live in +:mod:`synthorg.engine.coordination.parent_rollup`. """ from collections.abc import ( Callable, # noqa: TC003 -- runtime-read by typing.get_type_hints() ) from typing import TYPE_CHECKING -from uuid import uuid4 from synthorg.budget.currency import assert_currencies_match from synthorg.core.clock import Clock, SystemClock -from synthorg.core.enums import CoordinationTopology, TaskStatus +from synthorg.core.enums import CoordinationTopology from synthorg.engine.coordination.attribution import ( AgentContribution, CoordinationResultWithAttribution, @@ -24,8 +23,11 @@ CoordinationPhaseResult, CoordinationResult, ) +from synthorg.engine.coordination.parent_rollup import ( + compute_status_rollup, + run_update_parent_phase, +) from synthorg.engine.errors import CoordinationPhaseError -from synthorg.engine.task_engine_models import TransitionTaskMutation from synthorg.observability import get_logger, safe_error_description from synthorg.observability.events.coordination import ( COORDINATION_CLEANUP_FAILED, @@ -43,7 +45,6 @@ from synthorg.engine.coordination.models import CoordinationContext from synthorg.engine.decomposition.models import ( DecompositionResult, - SubtaskStatusRollup, ) from synthorg.engine.decomposition.service import DecompositionService from synthorg.engine.middleware.coordination_protocol import ( @@ -320,7 +321,14 @@ async def coordinate( # noqa: PLR0912, PLR0915, C901 phases.extend(dispatch_result.phases) # Rollup - rollup = self._phase_rollup(context, dispatch_result, decomp_result, phases) + rollup = compute_status_rollup( + decomposition_service=self._decomposition_service, + clock=self._clock, + context=context, + dispatch_result=dispatch_result, + decomp_result=decomp_result, + phases=phases, + ) # Middleware: after_rollup if mw_chain is not None: @@ -344,7 +352,13 @@ async def coordinate( # noqa: PLR0912, PLR0915, C901 rollup = mw_ctx.status_rollup # Update parent task - await self._phase_update_parent(context, rollup, phases) + await run_update_parent_phase( + task_engine=self._task_engine, + clock=self._clock, + context=context, + rollup=rollup, + phases=phases, + ) total_duration = self._clock.monotonic() - pipeline_start wave_results = tuple( @@ -676,165 +690,3 @@ async def _phase_dispatch( phase=phase_name, partial_phases=tuple(phases), ) from exc - - def _phase_rollup( - self, - context: CoordinationContext, - dispatch_result: DispatchResult, - decomp_result: DecompositionResult, - phases: list[CoordinationPhaseResult], - ) -> SubtaskStatusRollup | None: - """Compute status rollup from execution outcomes. - - Includes all expected subtasks -- those missing from waves - (unroutable, blocked by prerequisites, or skipped by - fail-fast) are counted as BLOCKED. - """ - start = self._clock.monotonic() - phase_name = "rollup" - - logger.info(COORDINATION_PHASE_STARTED, phase=phase_name) - try: - # Collect statuses from wave outcomes - statuses: list[TaskStatus] = [] - for wave in dispatch_result.waves: - if wave.execution_result is None: - statuses.extend(TaskStatus.BLOCKED for _ in wave.subtask_ids) - continue - - for outcome in wave.execution_result.outcomes: - if outcome.is_success: - statuses.append(TaskStatus.COMPLETED) - else: - statuses.append(TaskStatus.FAILED) - - # Fill missing subtasks as BLOCKED (unroutable, - # blocked prerequisites, or fail-fast skipped) - expected_count = len(decomp_result.plan.subtasks) - missing_count = expected_count - len(statuses) - if missing_count > 0: - statuses.extend(TaskStatus.BLOCKED for _ in range(missing_count)) - - rollup = self._decomposition_service.rollup_status( - context.task.id, - tuple(statuses), - ) - except MemoryError, RecursionError: - raise - except Exception as exc: - elapsed = self._clock.monotonic() - start - logger.warning( - COORDINATION_PHASE_FAILED, - phase=phase_name, - error_type=type(exc).__name__, - error=safe_error_description(exc), - ) - phases.append( - CoordinationPhaseResult( - phase=phase_name, - success=False, - duration_seconds=elapsed, - error=safe_error_description(exc), - ) - ) - return None - - elapsed = self._clock.monotonic() - start - phases.append( - CoordinationPhaseResult( - phase=phase_name, - success=True, - duration_seconds=elapsed, - ) - ) - logger.info( - COORDINATION_PHASE_COMPLETED, - phase=phase_name, - duration_seconds=elapsed, - ) - return rollup - - async def _phase_update_parent( - self, - context: CoordinationContext, - rollup: SubtaskStatusRollup | None, - phases: list[CoordinationPhaseResult], - ) -> None: - """Update parent task status via TaskEngine if available.""" - if self._task_engine is None: - return - if rollup is None: - phase_name = "update_parent" - note = "Skipped -- rollup is None (rollup phase failed)" - logger.warning( - COORDINATION_PHASE_FAILED, - phase=phase_name, - note=note, - ) - phases.append( - CoordinationPhaseResult( - phase=phase_name, - success=False, - duration_seconds=0.0, - error=note, - ) - ) - return - - start = self._clock.monotonic() - phase_name = "update_parent" - - logger.info(COORDINATION_PHASE_STARTED, phase=phase_name) - try: - mutation = TransitionTaskMutation( - request_id=str(uuid4()), - requested_by="coordinator", - task_id=context.task.id, - target_status=rollup.derived_parent_status, - reason=( - f"Coordination rollup: " - f"{rollup.completed}/{rollup.total} completed, " - f"{rollup.failed}/{rollup.total} failed" - ), - ) - result = await self._task_engine.submit(mutation) - elapsed = self._clock.monotonic() - start - - if result.success: - logger.info( - COORDINATION_PHASE_COMPLETED, - phase=phase_name, - duration_seconds=elapsed, - ) - else: - logger.warning( - COORDINATION_PHASE_FAILED, - phase=phase_name, - error=result.error, - ) - phases.append( - CoordinationPhaseResult( - phase=phase_name, - success=result.success, - duration_seconds=elapsed, - error=result.error, - ) - ) - except MemoryError, RecursionError: - raise - except Exception as exc: - elapsed = self._clock.monotonic() - start - logger.warning( - COORDINATION_PHASE_FAILED, - phase=phase_name, - error_type=type(exc).__name__, - error=safe_error_description(exc), - ) - phases.append( - CoordinationPhaseResult( - phase=phase_name, - success=False, - duration_seconds=elapsed, - error=safe_error_description(exc), - ) - ) diff --git a/src/synthorg/engine/errors.py b/src/synthorg/engine/errors.py index be679f9a63..899d2c5830 100644 --- a/src/synthorg/engine/errors.py +++ b/src/synthorg/engine/errors.py @@ -272,6 +272,17 @@ def __init__( self.partial_phases: tuple[CoordinationPhaseResult, ...] = partial_phases +class RuntimeServicesBuildError(EngineError): + """Raised when the boot/reinit runtime-services build fails. + + Wraps the underlying failure from ``build_runtime_services`` (provider + registry, tool registry, agent engine, or coordinator factory) so the + boot hook and the ``/setup/complete`` controller see a typed domain + error instead of a raw exception. The original cause is preserved via + ``raise ... from exc``. + """ + + class WorkflowExecutionError(EngineError): """Base exception for workflow execution failures.""" diff --git a/src/synthorg/settings/definitions/coordination.py b/src/synthorg/settings/definitions/coordination.py index ce9710736d..548812b795 100644 --- a/src/synthorg/settings/definitions/coordination.py +++ b/src/synthorg/settings/definitions/coordination.py @@ -53,6 +53,22 @@ ) ) +_r.register( + SettingDefinition( + namespace=SettingNamespace.COORDINATION, + key="decomposition_model", + type=SettingType.STRING, + default="example-medium-001", + description=( + "LLM model identifier the coordinator's task decomposition" + " strategy invokes against the first registered provider." + " Resolved at boot; a runtime change applies on the next" + " coordinator rebuild (provider re-init)." + ), + group="General", + ) +) + _r.register( SettingDefinition( namespace=SettingNamespace.COORDINATION, diff --git a/src/synthorg/workers/runtime_builder.py b/src/synthorg/workers/runtime_builder.py index f94a4abdce..92a13e058d 100644 --- a/src/synthorg/workers/runtime_builder.py +++ b/src/synthorg/workers/runtime_builder.py @@ -1,22 +1,37 @@ -"""Provider-present switch: build the worker execution service. +"""Provider-present switch: build the boot runtime services. This is the construction site for the agent runtime. With a provider -configured it assembles one boot-time :class:`AgentEngine` (LLM + -sandboxed tools + memory, governed by the SecOps safety spine) wrapped -in an :class:`AgentEngineExecutionService`. With no provider it returns -an :class:`NoProviderExecutionService` so the execute seam fails loudly -instead of silently walking status labels. +configured it assembles ONE boot-time :class:`AgentEngine` (LLM + +sandboxed tools + memory, governed by the SecOps safety spine) and +shares that single engine between two consumers: + +* an :class:`AgentEngineExecutionService` (the worker-callable execute + seam), and +* a :class:`~synthorg.engine.coordination.service.MultiAgentCoordinator` + built via :func:`~synthorg.engine.coordination.factory.build_coordinator`, + whose :class:`~synthorg.engine.parallel.ParallelExecutor` runs sub-agents + on the same engine. + +With no provider it returns a :class:`NoProviderExecutionService` and a +``None`` coordinator, so the execute seam fails loudly and +``/coordinate`` honestly 503s instead of silently walking status +labels. The same builder serves the boot install and the setup-reinit -rebuild, so configuring a provider brings the runtime online without -a process restart. +rebuild, so configuring a provider brings the whole runtime (worker +execution AND multi-agent coordination) online without a process +restart. """ import asyncio -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, NamedTuple from synthorg.engine.agent_engine import AgentEngine -from synthorg.observability import get_logger +from synthorg.engine.coordination.factory import build_coordinator +from synthorg.engine.routing.scorer import RoutingScorerConfig +from synthorg.engine.workspace.config import WorkspaceIsolationConfig +from synthorg.engine.workspace.git_worktree import PlannerWorktreeStrategy +from synthorg.observability import get_logger, safe_error_description from synthorg.observability.events.api import API_APP_STARTUP from synthorg.security.action_types import ActionTypeRegistry from synthorg.security.autonomy.resolver import AutonomyResolver @@ -32,6 +47,7 @@ from pathlib import Path from synthorg.api.state import AppState + from synthorg.engine.coordination.service import MultiAgentCoordinator from synthorg.providers.protocol import CompletionProvider from synthorg.providers.registry import ProviderRegistry @@ -39,6 +55,29 @@ _WEB_TIMEOUT_NS: str = "tools" _WEB_TIMEOUT_KEY: str = "web_request_timeout_seconds" +_GIT_TIMEOUT_NS: str = "tools" +_GIT_TIMEOUT_KEY: str = "git_command_timeout_seconds" +_DECOMPOSITION_NS: str = "coordination" +_DECOMPOSITION_KEY: str = "decomposition_model" + + +class RuntimeServices(NamedTuple): + """The pair of runtime services built behind the provider switch. + + INVARIANT (enforced by construction in :func:`build_runtime_services`, + not by the type): when ``coordinator`` is not ``None`` it and + ``worker_execution_service`` share the *same* boot + :class:`AgentEngine` instance, so worker tasks and coordinator + sub-agents observe one interrupt store, event-stream hub, and clock + seam. A divergent engine would split agent state silently; + ``tests/unit/workers/test_runtime_builder.py`` asserts the identity. + ``coordinator`` is ``None`` only in the empty-company (no-provider) + case, where ``worker_execution_service`` is a + :class:`NoProviderExecutionService`. + """ + + worker_execution_service: WorkerExecutionService + coordinator: MultiAgentCoordinator | None def _select_active_provider( @@ -52,7 +91,7 @@ def _select_active_provider( if not app_state.has_active_provider: logger.info( API_APP_STARTUP, - service="worker_execution_service", + service="runtime_services", mode="no_provider", note="empty company -- task execution rejected at the seam", ) @@ -63,7 +102,7 @@ def _select_active_provider( if not names: logger.info( API_APP_STARTUP, - service="worker_execution_service", + service="runtime_services", mode="no_provider", note="provider registry present but empty", ) @@ -71,7 +110,7 @@ def _select_active_provider( if len(names) > 1: logger.warning( API_APP_STARTUP, - service="worker_execution_service", + service="runtime_services", note=( "multiple providers registered; the boot AgentEngine " "runs every agent against the first provider -- " @@ -111,7 +150,12 @@ def _construct_agent_engine( registry: ProviderRegistry, tool_registry: ToolRegistry, ) -> AgentEngine: - """Assemble the boot ``AgentEngine`` from live application state.""" + """Assemble the boot ``AgentEngine`` from live application state. + + A single instance is shared by the worker execution service and the + coordinator's parallel executor so both consumers observe the same + interrupt store, event stream hub, and clock seam. + """ return AgentEngine( provider=provider, provider_registry=registry, @@ -131,12 +175,136 @@ def _construct_agent_engine( ) -async def build_worker_execution_service( +async def _build_workspace_strategy( + app_state: AppState, +) -> tuple[PlannerWorktreeStrategy, WorkspaceIsolationConfig]: + """Build the git-worktree workspace isolation strategy + config. + + The strategy operates on ``app_state.agent_workspace_root`` (the + same directory the worker runtime's sandbox tools use). Git + subprocess invocations are bounded by the operator-tuned + ``tools.git_command_timeout_seconds`` so a hung worktree command + cannot stall a coordination wave. Construction (here, at boot) never + touches git; a real repository is only required later, when a + coordination wave first invokes ``workspace_service.setup_group()`` + during dispatch, and only when ``enable_workspace_isolation`` is set + and the wave has multiple subtasks. + """ + ws_config = WorkspaceIsolationConfig() + git_timeout = await app_state.config_resolver.get_float( + _GIT_TIMEOUT_NS, + _GIT_TIMEOUT_KEY, + ) + strategy = PlannerWorktreeStrategy( + config=ws_config.planner_worktrees, + repo_root=app_state.agent_workspace_root, + cmd_timeout=git_timeout, + clock=app_state.clock, + ) + return strategy, ws_config + + +async def _resolve_routing_scorer_config( + app_state: AppState, +) -> RoutingScorerConfig | None: + """Project routing-scorer weights out of the engine bridge config. + + Fail-open: a bridge-resolution failure (missing setting, validation + error, persistence flake) or a projection failure keeps the + coordinator buildable by returning ``None`` so the factory falls + back to ``task_assignment_config.min_score``. Mirrors the fail-open + pattern used by ``auto_create_template_agents._resolve_matcher_config`` + and ``post_setup_reinit``. The resolve and projection stages are + caught separately so the log says which one failed (a persistent + config bug vs a transient resolver flake are diagnosed differently). + """ + try: + bridge = await app_state.config_resolver.get_engine_bridge_config() + except MemoryError, RecursionError: + raise + except Exception as exc: + logger.warning( + API_APP_STARTUP, + service="coordinator", + context="routing_scorer_config_resolve", + note="engine bridge config unavailable; using scorer defaults", + error_type=type(exc).__name__, + error=safe_error_description(exc), + ) + return None + try: + return RoutingScorerConfig.from_bridge_config(bridge) + except MemoryError, RecursionError: + raise + except Exception as exc: + logger.warning( + API_APP_STARTUP, + service="coordinator", + context="routing_scorer_config_projection", + note="scorer config projection failed; using scorer defaults", + error_type=type(exc).__name__, + error=safe_error_description(exc), + ) + return None + + +async def _build_runtime_coordinator( + app_state: AppState, + engine: AgentEngine, + provider: CompletionProvider, +) -> MultiAgentCoordinator: + """Build the multi-agent coordinator sharing the boot engine. + + Resolves the operator-tuned decomposition model and routing-scorer + weights, wires real git-worktree workspace isolation, then delegates + to the unit-tested :func:`build_coordinator` factory. The three + resolution steps are independent, so they run concurrently under a + ``TaskGroup`` to keep boot latency down (structured concurrency: any + failure cancels the siblings and propagates). + """ + async with asyncio.TaskGroup() as tg: + model_task = tg.create_task( + app_state.config_resolver.get_str( + _DECOMPOSITION_NS, + _DECOMPOSITION_KEY, + ) + ) + scorer_task = tg.create_task(_resolve_routing_scorer_config(app_state)) + workspace_task = tg.create_task(_build_workspace_strategy(app_state)) + decomposition_model = model_task.result() + routing_scorer_config = scorer_task.result() + workspace_strategy, workspace_config = workspace_task.result() + performance_tracker = ( + app_state.performance_tracker if app_state.has_performance_tracker else None + ) + coordinator = build_coordinator( + config=app_state.config.coordination, + engine=engine, + task_assignment_config=app_state.config.task_assignment, + provider=provider, + decomposition_model=decomposition_model, + task_engine=app_state.task_engine, + workspace_strategy=workspace_strategy, + workspace_config=workspace_config, + performance_tracker=performance_tracker, + routing_scorer_config=routing_scorer_config, + ) + logger.info( + API_APP_STARTUP, + service="coordinator", + mode="multi_agent", + decomposition_model=decomposition_model, + topology=app_state.config.coordination.topology.value, + ) + return coordinator + + +async def build_runtime_services( app_state: AppState, *, workspace_root: Path, -) -> WorkerExecutionService: - """Return the worker execution service for the current provider state. +) -> RuntimeServices: + """Return the runtime services for the current provider state. Args: app_state: Live application state (provider registry, task @@ -147,12 +315,17 @@ async def build_worker_execution_service( re-init path rebuilds against the same directory. Returns: - ``AgentEngineExecutionService`` when a provider is registered, - otherwise ``NoProviderExecutionService``. + ``RuntimeServices`` with an ``AgentEngineExecutionService`` and a + live ``MultiAgentCoordinator`` (both sharing one + ``AgentEngine``) when a provider is registered; otherwise a + ``NoProviderExecutionService`` and a ``None`` coordinator. """ selected = _select_active_provider(app_state) if selected is None: - return NoProviderExecutionService() + return RuntimeServices( + worker_execution_service=NoProviderExecutionService(), + coordinator=None, + ) registry, names = selected provider = registry.get(names[0]) @@ -170,16 +343,25 @@ async def build_worker_execution_service( registry=ActionTypeRegistry(), config=app_state.config.config.autonomy, ) + coordinator = await _build_runtime_coordinator( + app_state, + engine, + provider, + ) logger.info( API_APP_STARTUP, - service="worker_execution_service", + service="runtime_services", mode="agent_engine", provider=names[0], tool_count=tool_count, ) - return AgentEngineExecutionService( + worker_execution_service = AgentEngineExecutionService( engine=engine, task_engine=app_state.task_engine, agent_registry=app_state.agent_registry, autonomy_resolver=autonomy_resolver, ) + return RuntimeServices( + worker_execution_service=worker_execution_service, + coordinator=coordinator, + ) diff --git a/tests/e2e/test_coordinator_online_seam.py b/tests/e2e/test_coordinator_online_seam.py new file mode 100644 index 0000000000..58e9032811 --- /dev/null +++ b/tests/e2e/test_coordinator_online_seam.py @@ -0,0 +1,288 @@ +"""Acceptance: the multi-agent coordinator is online behind the switch. + +Builds the REAL coordinator through the production +``build_runtime_services`` (the exact code the boot hook runs) with a +deterministic ``ScriptedDriver``, then drives a decomposable task +through ``coordinate()`` end to end: decompose -> route -> parallel +dispatch -> rollup. The scripted provider is the simulation harness: +its branching strategy returns a valid decomposition plan when called +with the ``submit_decomposition_plan`` tool, and a plain STOP +completion for every sub-agent turn, so the whole pipeline runs without +a live LLM. ``/coordinate`` returns a real ``CoordinationResult`` with +every pipeline phase recorded rather than a 503. + +Workspace isolation is wired at boot (``PlannerWorktreeStrategy``) but +the per-run config disables it so the pipeline never touches git: this +test isolates the coordinator runtime, not the git-worktree path +(covered separately). +""" + +from collections.abc import AsyncGenerator +from datetime import date +from pathlib import Path +from uuid import uuid4 + +import pytest + +from synthorg.api.approval_store import ApprovalStore +from synthorg.api.state import AppState +from synthorg.config.schema import RootConfig +from synthorg.core.agent import AgentIdentity, ModelConfig, SkillSet +from synthorg.core.clock import SystemClock +from synthorg.core.enums import ( + AgentStatus, + Priority, + SeniorityLevel, + TaskType, +) +from synthorg.core.role import Authority, Skill +from synthorg.engine.coordination.config import CoordinationConfig +from synthorg.engine.coordination.models import CoordinationContext +from synthorg.engine.coordination.service import MultiAgentCoordinator +from synthorg.engine.decomposition.models import DecompositionContext +from synthorg.engine.task_engine import TaskEngine +from synthorg.engine.task_engine_models import CreateTaskData +from synthorg.hr.registry import AgentRegistryService +from synthorg.providers.drivers.scripted import ScriptedDriver +from synthorg.providers.enums import FinishReason +from synthorg.providers.models import ( + ChatMessage, + CompletionConfig, + CompletionResponse, + TokenUsage, + ToolCall, + ToolDefinition, +) +from synthorg.providers.registry import ProviderRegistry +from synthorg.settings.registry import get_registry +from synthorg.settings.resolver import ConfigResolver +from synthorg.settings.service import SettingsService +from synthorg.workers.runtime_builder import build_runtime_services +from tests._shared import mock_of +from tests.unit.api.fakes import FakePersistenceBackend + +pytestmark = pytest.mark.e2e + +_DECOMPOSITION_TOOL = "submit_decomposition_plan" +_RESEARCH_SKILL = "research" +_ANALYSIS_SKILL = "analysis" + + +class _DecompositionAwareStrategy: + """Scripted strategy that branches decomposition vs sub-agent turns. + + The decomposition strategy is the only caller that passes the + ``submit_decomposition_plan`` tool; it gets a valid two-subtask + plan back. Every other LLM call is a sub-agent execution turn and + gets a plain STOP completion so the agent loop terminates in one + turn without needing a live model. + """ + + def next_response( + self, + messages: list[ChatMessage], + model: str, + tools: list[ToolDefinition] | None, + config: CompletionConfig | None, + ) -> CompletionResponse: + usage = TokenUsage(input_tokens=8, output_tokens=4, cost=0.0001) + is_decomposition = tools is not None and any( + t.name == _DECOMPOSITION_TOOL for t in tools + ) + if is_decomposition: + return CompletionResponse( + content=None, + tool_calls=( + ToolCall( + id="decomp-1", + name=_DECOMPOSITION_TOOL, + arguments={ + # Two independent sub-problems with distinct + # required skills so routing assigns them to + # different agents and they run in parallel. + "task_structure": "parallel", + "coordination_topology": "centralized", + "subtasks": [ + { + "id": "sub-research", + "title": "Research the data sources", + "description": "Investigate inputs.", + "required_skills": [_RESEARCH_SKILL], + }, + { + "id": "sub-analysis", + "title": "Analyse the findings", + "description": "Synthesise results.", + "required_skills": [_ANALYSIS_SKILL], + }, + ], + }, + ), + ), + finish_reason=FinishReason.TOOL_USE, + usage=usage, + model=model, + ) + return CompletionResponse( + content="Subtask complete.", + finish_reason=FinishReason.STOP, + usage=usage, + model=model, + ) + + +def _make_agent(name: str, skill: str) -> AgentIdentity: + return AgentIdentity( + id=uuid4(), + name=name, + role="developer", + department="engineering", + level=SeniorityLevel.MID, + skills=SkillSet( + primary=(Skill(id=skill, name=skill),), + ), + authority=Authority(budget_limit=10.0), + model=ModelConfig(provider="test-provider", model_id="test-model-001"), + hiring_date=date(2026, 1, 1), + status=AgentStatus.ACTIVE, + ) + + +@pytest.fixture +async def persistence() -> AsyncGenerator[FakePersistenceBackend]: + backend = FakePersistenceBackend() + await backend.connect() + yield backend + await backend.disconnect() + + +@pytest.fixture +async def task_engine( + persistence: FakePersistenceBackend, +) -> AsyncGenerator[TaskEngine]: + engine = TaskEngine(persistence=persistence) + await engine.start() + yield engine + await engine.stop() + + +async def test_coordinator_runs_decomposable_task_end_to_end( + persistence: FakePersistenceBackend, + task_engine: TaskEngine, + tmp_path: Path, +) -> None: + provider = ScriptedDriver( + "test-provider", + strategy=_DecompositionAwareStrategy(), + ) + registry = ProviderRegistry({"test-provider": provider}) + agent_registry = AgentRegistryService() + alice = _make_agent("alice", _RESEARCH_SKILL) + bob = _make_agent("bob", _ANALYSIS_SKILL) + await agent_registry.register(alice) + await agent_registry.register(bob) + + root_config = RootConfig(company_name="coordinator-online-test") + settings_service = SettingsService( + repository=persistence.settings, + registry=get_registry(), + ) + config_resolver = ConfigResolver( + settings_service=settings_service, + config=root_config, + ) + app_state = mock_of[AppState]( + has_active_provider=True, + provider_registry=registry, + config=root_config, + config_resolver=config_resolver, + task_engine=task_engine, + agent_registry=agent_registry, + approval_store=ApprovalStore(), + clock=SystemClock(), + event_stream_hub=None, + interrupt_store=None, + agent_workspace_root=tmp_path, + has_cost_tracker=False, + has_audit_log=False, + has_memory_backend=False, + has_performance_tracker=False, + ) + + runtime = await build_runtime_services( + app_state, + workspace_root=tmp_path, + ) + coordinator = runtime.coordinator + assert isinstance(coordinator, MultiAgentCoordinator) + + created = await task_engine.create_task( + CreateTaskData( + title="Financial analysis", + description="Decompose into research and analysis.", + type=TaskType.DEVELOPMENT, + project="proj-coord", + created_by="operator", + priority=Priority.MEDIUM, + ), + requested_by="operator", + ) + + context = CoordinationContext( + task=created, + available_agents=(alice, bob), + decomposition_context=DecompositionContext(max_subtasks=4), + # Workspace isolation is wired at boot but disabled per-run so + # the pipeline never touches git in the harness. + config=CoordinationConfig( + enable_workspace_isolation=False, + fail_fast=False, + ), + ) + + attributed = await coordinator.coordinate(context) + result = attributed.result + + # /coordinate returns a real result (no 503, no CoordinationPhaseError + # bubbled out of coordinate()). + assert result.parent_task_id == created.id + # Decompose ran: the scripted plan produced two subtasks. + assert result.decomposition_result is not None + assert len(result.decomposition_result.plan.subtasks) == 2 + # Route ran: both subtasks routed across the team (alice + bob). + assert result.routing_result is not None + assert len(result.routing_result.decisions) == 2 + routed_agents = { + d.selected_candidate.agent_identity.name + for d in result.routing_result.decisions + } + assert routed_agents == {"alice", "bob"} + # Parallel dispatch ran and the sub-agents ACTUALLY executed: both + # subtasks were promoted CREATED -> ASSIGNED for their routed agent, + # so the engine ran each one and produced an AgentRunResult. A + # subtask still in CREATED would be rejected at the engine seam with + # an ExecutionStateError and a ``None`` result, so a non-None result + # is the proof the dispatch path assigned it. + assert len(result.waves) >= 1 + executed = [ + outcome + for wave in result.waves + if wave.execution_result is not None + for outcome in wave.execution_result.outcomes + ] + assert len(executed) == 2 + assert all(o.result is not None for o in executed) + # Rollup aggregated the two real per-agent outcomes across the team. + assert result.status_rollup is not None + assert result.status_rollup.total == 2 + phase_names = {p.phase for p in result.phases} + assert {"decompose", "route", "rollup"} <= phase_names + # The parent began as freshly CREATED; the coordinator walked it + # through the valid lifecycle so the rollup-derived status was + # reachable, instead of attempting one invalid hop and recording a + # failed update_parent phase. + update_parent = next((p for p in result.phases if p.phase == "update_parent"), None) + assert update_parent is not None + assert update_parent.success, update_parent.error + assert result.total_duration_seconds >= 0.0 + assert isinstance(attributed.agent_contributions, tuple) diff --git a/tests/e2e/test_runtime_online_seam.py b/tests/e2e/test_runtime_online_seam.py index 8ff5d77cba..c669bc0f99 100644 --- a/tests/e2e/test_runtime_online_seam.py +++ b/tests/e2e/test_runtime_online_seam.py @@ -1,8 +1,8 @@ """Acceptance: the agent runtime is online behind the provider switch. Drives a real task through the ``WorkerExecutionService.execute_once`` -seam built by the production ``build_worker_execution_service`` (the -exact code the boot hook runs) -- not ``AgentEngine.run`` directly. +seam built by the production ``build_runtime_services`` (the exact +code the boot hook runs) -- not ``AgentEngine.run`` directly. With a provider configured a real agent runs via the deterministic ``ScriptedDriver`` and the minimal safety spine is exercised end to end: the SecOps interceptor consults the autonomy/rule verdict on the @@ -54,7 +54,7 @@ from synthorg.settings.resolver import ConfigResolver from synthorg.settings.service import SettingsService from synthorg.workers.execution_service import AgentEngineExecutionService -from synthorg.workers.runtime_builder import build_worker_execution_service +from synthorg.workers.runtime_builder import build_runtime_services from tests._shared import mock_of from tests._shared.scripted_provider import ( make_e2e_identity, @@ -131,15 +131,18 @@ async def test_runtime_executes_task_through_seam_with_safety_spine( clock=SystemClock(), event_stream_hub=None, interrupt_store=None, + agent_workspace_root=tmp_path, has_cost_tracker=False, has_audit_log=False, has_memory_backend=False, + has_performance_tracker=False, ) - service = await build_worker_execution_service( + runtime = await build_runtime_services( app_state, workspace_root=tmp_path, ) + service = runtime.worker_execution_service assert isinstance(service, AgentEngineExecutionService) created = await task_engine.create_task( diff --git a/tests/integration/api/conftest.py b/tests/integration/api/conftest.py index 77a3ac9beb..b9a05b55ca 100644 --- a/tests/integration/api/conftest.py +++ b/tests/integration/api/conftest.py @@ -59,12 +59,14 @@ def build_runtime_app( *, with_provider: bool, company_name: str, + coordinator: Any = None, ) -> Any: """Build an app for provider-present-switch integration tests. ``with_provider`` registers one scripted provider so the company is not empty; otherwise no registry is passed and task creation is - rejected at the controller. + rejected at the controller. ``coordinator`` injects an explicit + coordinator so the injection-over-autowire convention can be tested. """ root_config = RootConfig(company_name=company_name) auth_service = AuthService( @@ -90,6 +92,7 @@ def build_runtime_app( agent_registry=AgentRegistryService(), settings_service=settings_service, provider_registry=provider_registry, + coordinator=coordinator, ) diff --git a/tests/integration/api/test_post_setup_reinit_wake.py b/tests/integration/api/test_post_setup_reinit_wake.py new file mode 100644 index 0000000000..5e555750d7 --- /dev/null +++ b/tests/integration/api/test_post_setup_reinit_wake.py @@ -0,0 +1,111 @@ +"""post_setup_reinit wakes BOTH runtime services on provider config. + +An empty company boots with no provider: the worker seam is the +``NoProviderExecutionService`` backstop and ``/coordinate`` honestly +503s (no coordinator). When the operator configures a provider and +``/setup/complete`` runs, ``post_setup_reinit`` must rebuild the +runtime services and hot-swap in BOTH the live worker execution +service AND the multi-agent coordinator, so ``/coordinate`` comes +online without a process restart. Waking only the worker seam would +leave ``/coordinate`` permanently 503 until a restart, so the rebuild +must swap both. +""" + +import pytest +from litestar.testing import AsyncTestClient + +from synthorg.api.controllers.setup.agent_helpers import post_setup_reinit +from synthorg.config.provider_schema import ProviderConfig +from synthorg.engine.coordination.service import MultiAgentCoordinator +from synthorg.engine.errors import RuntimeServicesBuildError +from synthorg.providers.registry import ProviderRegistry +from synthorg.workers.execution_service import ( + AgentEngineExecutionService, + NoProviderExecutionService, +) +from tests.integration.api.conftest import build_runtime_app +from tests.unit.api.fakes import FakeMessageBus, FakePersistenceBackend + +pytestmark = pytest.mark.integration + +_COMPANY_NAME = "reinit-wake-test" + + +async def test_reinit_wakes_worker_and_coordinator_on_provider_config( + fake_persistence: FakePersistenceBackend, + fake_message_bus: FakeMessageBus, +) -> None: + app = build_runtime_app( + fake_persistence, + fake_message_bus, + with_provider=False, + company_name=_COMPANY_NAME, + ) + async with AsyncTestClient(app=app) as client: + app_state = client.app.state["app_state"] + + # Empty company at boot: no coordinator, backstop worker seam. + coordinator_at_boot = app_state.has_coordinator + assert coordinator_at_boot is False + assert isinstance( + app_state.worker_execution_service, + NoProviderExecutionService, + ) + + # Operator configures a provider: the provider registry becomes + # populated (the state post_setup_reinit's provider-reload step + # produces), so the subsequent runtime-services rebuild must wake + # the coordinator as well as the worker seam. + app_state.swap_provider_registry( + ProviderRegistry.from_config( + {"test-provider": ProviderConfig(driver="scripted")}, + ), + ) + + await post_setup_reinit(app_state) + + # Both runtime services are now live, no restart. + active_provider = app_state.has_active_provider + coordinator_after_wake = app_state.has_coordinator + assert active_provider is True + assert coordinator_after_wake is True + coordinator = app_state.coordinator + worker = app_state.worker_execution_service + assert isinstance(coordinator, MultiAgentCoordinator) + assert isinstance(worker, AgentEngineExecutionService) + + +async def test_reinit_raises_when_coordinator_swap_fails( + fake_persistence: FakePersistenceBackend, + fake_message_bus: FakeMessageBus, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A failed coordinator swap must abort reinit (typed, re-raised). + + If the worker swap succeeds but the coordinator swap raises, the + whole rebuild must raise (a typed ``RuntimeServicesBuildError``) so + ``post_setup_reinit``'s caller keeps ``setup_complete=false`` rather + than presenting a half-configured runtime as complete. + """ + app = build_runtime_app( + fake_persistence, + fake_message_bus, + with_provider=False, + company_name=_COMPANY_NAME, + ) + async with AsyncTestClient(app=app) as client: + app_state = client.app.state["app_state"] + app_state.swap_provider_registry( + ProviderRegistry.from_config( + {"test-provider": ProviderConfig(driver="scripted")}, + ), + ) + + def _boom(_coordinator: object) -> None: + msg = "coordinator swap failed" + raise RuntimeError(msg) + + monkeypatch.setattr(app_state, "swap_coordinator", _boom) + + with pytest.raises(RuntimeServicesBuildError): + await post_setup_reinit(app_state) diff --git a/tests/integration/api/test_runtime_install_ordering.py b/tests/integration/api/test_runtime_install_ordering.py index 5ee78ce52a..63150defac 100644 --- a/tests/integration/api/test_runtime_install_ordering.py +++ b/tests/integration/api/test_runtime_install_ordering.py @@ -1,4 +1,4 @@ -"""The boot hook installs the worker execution service before any read. +"""The boot hook installs the runtime services before any read. If any startup hook read ``app_state.worker_execution_service`` before the install hook ran, the property's lazy @@ -6,17 +6,21 @@ once-only ``set_worker_execution_service`` would then raise, failing startup. A clean startup whose installed service is the builder's output (never the lifecycle-only default) is the practical proof that -the ordering invariant holds. +the ordering invariant holds. The same provider-present switch wires +the multi-agent coordinator, so ``has_coordinator`` reflects provider +presence: true with a provider, false on the empty-company backstop. """ import pytest from litestar.testing import TestClient +from synthorg.engine.coordination.service import MultiAgentCoordinator from synthorg.workers.execution_service import ( AgentEngineExecutionService, LifecycleAdvancingExecutionService, NoProviderExecutionService, ) +from tests._shared import mock_of from tests.integration.api.conftest import build_runtime_app from tests.unit.api.fakes import FakeMessageBus, FakePersistenceBackend @@ -36,12 +40,16 @@ def test_no_provider_installs_backstop_not_lazy_default( company_name=_COMPANY_NAME, ) with TestClient(app) as client: - service = client.app.state["app_state"].worker_execution_service + app_state = client.app.state["app_state"] + service = app_state.worker_execution_service + has_coordinator = app_state.has_coordinator assert isinstance(service, NoProviderExecutionService) assert not isinstance(service, LifecycleAdvancingExecutionService) + # Empty company: no coordinator, /coordinate honestly 503s. + assert has_coordinator is False -def test_provider_installs_agent_engine_service( +def test_provider_installs_agent_engine_service_and_coordinator( fake_persistence: FakePersistenceBackend, fake_message_bus: FakeMessageBus, ) -> None: @@ -52,5 +60,35 @@ def test_provider_installs_agent_engine_service( company_name=_COMPANY_NAME, ) with TestClient(app) as client: - service = client.app.state["app_state"].worker_execution_service + app_state = client.app.state["app_state"] + service = app_state.worker_execution_service + has_coordinator = app_state.has_coordinator + coordinator = app_state.coordinator assert isinstance(service, AgentEngineExecutionService) + # Same provider switch wires the coordinator behind /coordinate. + assert has_coordinator is True + assert isinstance(coordinator, MultiAgentCoordinator) + + +def test_injected_coordinator_wins_over_autowired( + fake_persistence: FakePersistenceBackend, + fake_message_bus: FakeMessageBus, +) -> None: + """An explicitly injected coordinator survives the boot hook. + + ``set_coordinator_if_absent`` keeps the constructor-injected + coordinator instead of overwriting it with the autowired one (the + injection-over-autowire convention), even with a provider present. + """ + injected = mock_of[MultiAgentCoordinator]() + app = build_runtime_app( + fake_persistence, + fake_message_bus, + with_provider=True, + company_name=_COMPANY_NAME, + coordinator=injected, + ) + with TestClient(app) as client: + app_state = client.app.state["app_state"] + coordinator = app_state.coordinator + assert coordinator is injected diff --git a/tests/unit/api/test_state.py b/tests/unit/api/test_state.py index 612dd32b4b..712051a34a 100644 --- a/tests/unit/api/test_state.py +++ b/tests/unit/api/test_state.py @@ -14,6 +14,7 @@ from synthorg.hr.training.service import TrainingService from synthorg.security.timeout.scheduler import ApprovalTimeoutScheduler from synthorg.settings.service import SettingsService +from tests._shared import mock_of from tests.unit.api.fakes import ( FakeMessageBus, FakePersistenceBackend, @@ -110,9 +111,7 @@ def test_task_engine_raises_when_none(self) -> None: _ = state.task_engine def test_task_engine_returns_when_set(self) -> None: - from unittest.mock import MagicMock - - engine = MagicMock(spec=TaskEngine) + engine = mock_of[TaskEngine]() state = _make_state(task_engine=engine) assert state.task_engine is engine @@ -121,24 +120,18 @@ def test_has_task_engine_false_when_none(self) -> None: assert state.has_task_engine is False def test_has_task_engine_true_when_set(self) -> None: - from unittest.mock import MagicMock - - engine = MagicMock(spec=TaskEngine) + engine = mock_of[TaskEngine]() state = _make_state(task_engine=engine) assert state.has_task_engine is True def test_set_task_engine_succeeds_once(self) -> None: - from unittest.mock import MagicMock - - engine = MagicMock(spec=TaskEngine) + engine = mock_of[TaskEngine]() state = _make_state() state.set_task_engine(engine) assert state.task_engine is engine def test_set_task_engine_twice_raises(self) -> None: - from unittest.mock import MagicMock - - engine = MagicMock(spec=TaskEngine) + engine = mock_of[TaskEngine]() state = _make_state(task_engine=engine) with pytest.raises(RuntimeError, match="already configured"): state.set_task_engine(engine) @@ -154,9 +147,7 @@ def test_coordinator_raises_when_none(self) -> None: _ = state.coordinator def test_coordinator_returns_when_set(self) -> None: - from unittest.mock import MagicMock - - coordinator = MagicMock(spec=MultiAgentCoordinator) + coordinator = mock_of[MultiAgentCoordinator]() state = _make_state(coordinator=coordinator) assert state.coordinator is coordinator @@ -165,12 +156,59 @@ def test_has_coordinator_false_when_none(self) -> None: assert state.has_coordinator is False def test_has_coordinator_true_when_set(self) -> None: - from unittest.mock import MagicMock - - coordinator = MagicMock(spec=MultiAgentCoordinator) + coordinator = mock_of[MultiAgentCoordinator]() state = _make_state(coordinator=coordinator) assert state.has_coordinator is True + def test_set_coordinator_attaches_when_none(self) -> None: + coordinator = mock_of[MultiAgentCoordinator]() + state = _make_state(coordinator=None) + state.set_coordinator(coordinator) + assert state.coordinator is coordinator + assert state.has_coordinator is True + + def test_set_coordinator_is_once_only(self) -> None: + first = mock_of[MultiAgentCoordinator]() + second = mock_of[MultiAgentCoordinator]() + state = _make_state(coordinator=first) + with pytest.raises(RuntimeError, match="already configured"): + state.set_coordinator(second) + + def test_set_coordinator_if_absent_installs_when_none(self) -> None: + coordinator = mock_of[MultiAgentCoordinator]() + state = _make_state(coordinator=None) + installed = state.set_coordinator_if_absent(coordinator) + assert installed is True + assert state.coordinator is coordinator + + def test_set_coordinator_if_absent_keeps_injected(self) -> None: + injected = mock_of[MultiAgentCoordinator]() + autowired = mock_of[MultiAgentCoordinator]() + state = _make_state(coordinator=injected) + installed = state.set_coordinator_if_absent(autowired) + # Injection-over-autowire: the injected one wins, no raise. + assert installed is False + assert state.coordinator is injected + + def test_swap_coordinator_attaches_when_none(self) -> None: + coordinator = mock_of[MultiAgentCoordinator]() + state = _make_state(coordinator=None) + state.swap_coordinator(coordinator) + assert state.coordinator is coordinator + + def test_swap_coordinator_replaces_existing(self) -> None: + first = mock_of[MultiAgentCoordinator]() + second = mock_of[MultiAgentCoordinator]() + state = _make_state(coordinator=first) + state.swap_coordinator(second) + assert state.coordinator is second + + def test_swap_coordinator_noop_when_identical(self) -> None: + coordinator = mock_of[MultiAgentCoordinator]() + state = _make_state(coordinator=coordinator) + state.swap_coordinator(coordinator) + assert state.coordinator is coordinator + @pytest.mark.unit class TestAppStateAgentRegistry: @@ -253,17 +291,13 @@ def test_review_gate_service_none_by_default(self) -> None: assert state.review_gate_service is None def test_set_review_gate_service_succeeds_once(self) -> None: - from unittest.mock import MagicMock - - svc = MagicMock(spec=ReviewGateService) + svc = mock_of[ReviewGateService]() state = _make_state() state.set_review_gate_service(svc) assert state.review_gate_service is svc def test_set_review_gate_service_twice_raises(self) -> None: - from unittest.mock import MagicMock - - svc = MagicMock(spec=ReviewGateService) + svc = mock_of[ReviewGateService]() state = _make_state() state.set_review_gate_service(svc) with pytest.raises(RuntimeError, match="already configured"): @@ -279,17 +313,13 @@ def test_approval_timeout_scheduler_none_by_default(self) -> None: assert state.approval_timeout_scheduler is None def test_set_approval_timeout_scheduler_succeeds_once(self) -> None: - from unittest.mock import MagicMock - - scheduler = MagicMock(spec=ApprovalTimeoutScheduler) + scheduler = mock_of[ApprovalTimeoutScheduler]() state = _make_state() state.set_approval_timeout_scheduler(scheduler) assert state.approval_timeout_scheduler is scheduler def test_set_approval_timeout_scheduler_twice_raises(self) -> None: - from unittest.mock import MagicMock - - scheduler = MagicMock(spec=ApprovalTimeoutScheduler) + scheduler = mock_of[ApprovalTimeoutScheduler]() state = _make_state() state.set_approval_timeout_scheduler(scheduler) with pytest.raises(RuntimeError, match="already configured"): diff --git a/tests/unit/core/test_state_machine.py b/tests/unit/core/test_state_machine.py index 6dc6977c95..7032fb2e61 100644 --- a/tests/unit/core/test_state_machine.py +++ b/tests/unit/core/test_state_machine.py @@ -122,3 +122,77 @@ def test_explicit_display_label_wins(self) -> None: ) with pytest.raises(ValueError, match="Invalid My Fancy Label transition"): machine.validate(_Color.RED, _Color.BLUE) + + +@pytest.mark.unit +class TestStateMachinePathTo: + """``path_to`` returns the shortest valid hop sequence (BFS).""" + + def test_same_state_is_empty_path(self) -> None: + machine = _make_machine() + assert machine.path_to(_Color.RED, _Color.RED) == () + + def test_single_hop(self) -> None: + machine = _make_machine() + assert machine.path_to(_Color.RED, _Color.GREEN) == (_Color.GREEN,) + + def test_multi_hop_is_shortest(self) -> None: + machine = _make_machine() + # RED -> GREEN -> BLUE is the only (and shortest) route. + assert machine.path_to(_Color.RED, _Color.BLUE) == ( + _Color.GREEN, + _Color.BLUE, + ) + + def test_terminal_source_is_unreachable(self) -> None: + machine = _make_machine() + assert machine.path_to(_Color.BLUE, _Color.RED) is None + + def test_unknown_source_is_none(self) -> None: + machine: StateMachine[_Color] = StateMachine( + {_Color.RED: frozenset({_Color.GREEN})}, + name="color", + invalid_event="x", + config_event="y", + ) + # GREEN has no table entry -> no path can originate from it. + assert machine.path_to(_Color.GREEN, _Color.RED) is None + + def test_every_hop_in_path_is_individually_valid(self) -> None: + machine = _make_machine() + path = machine.path_to(_Color.RED, _Color.BLUE) + assert path is not None + cursor = _Color.RED + for hop in path: + machine.validate(cursor, hop) # raises if any hop is illegal + cursor = hop + + def test_cyclic_graph_terminates_with_shortest_path(self) -> None: + """A cyclic transition table must not loop forever in BFS. + + ``seen`` guards revisits, so an explicit A<->B cycle still + terminates and yields the minimal-length path. A regression + here would hang the suite, not just fail an assertion. + """ + + class _Node(StrEnum): + A = "a" + B = "b" + C = "c" + + cyclic = StateMachine( + { + _Node.A: frozenset({_Node.B}), + _Node.B: frozenset({_Node.A, _Node.C}), + _Node.C: frozenset(), + }, + name="node", + invalid_event="test.node.invalid", + config_event="test.node.config_error", + all_states=_Node, + ) + + assert cyclic.path_to(_Node.A, _Node.C) == (_Node.B, _Node.C) + assert cyclic.path_to(_Node.B, _Node.A) == (_Node.A,) + assert cyclic.path_to(_Node.A, _Node.A) == () + assert cyclic.path_to(_Node.C, _Node.A) is None diff --git a/tests/unit/core/test_task_transitions.py b/tests/unit/core/test_task_transitions.py index 7599855521..73674d4398 100644 --- a/tests/unit/core/test_task_transitions.py +++ b/tests/unit/core/test_task_transitions.py @@ -4,7 +4,11 @@ import structlog from synthorg.core.enums import TaskStatus -from synthorg.core.task_transitions import VALID_TRANSITIONS, validate_transition +from synthorg.core.task_transitions import ( + VALID_TRANSITIONS, + transition_path, + validate_transition, +) from synthorg.observability.events.task import TASK_TRANSITION_INVALID # ── Valid Transitions ───────────────────────────────────────────── @@ -238,3 +242,45 @@ def test_validate_transition_with_missing_entry(self) -> None: ) with pytest.raises(ValueError, match="has no entry"): empty_machine.validate(TaskStatus.CREATED, TaskStatus.ASSIGNED) + + +@pytest.mark.unit +class TestTransitionPath: + """``transition_path`` walks the real task lifecycle.""" + + def test_same_status_is_empty(self) -> None: + assert transition_path(TaskStatus.IN_PROGRESS, TaskStatus.IN_PROGRESS) == () + + def test_created_to_completed_routes_through_review(self) -> None: + # A fully-completed coordination on a freshly CREATED parent: + # COMPLETED is only reachable via IN_REVIEW. + assert transition_path(TaskStatus.CREATED, TaskStatus.COMPLETED) == ( + TaskStatus.ASSIGNED, + TaskStatus.IN_PROGRESS, + TaskStatus.IN_REVIEW, + TaskStatus.COMPLETED, + ) + + def test_created_to_failed_is_shortest(self) -> None: + # ASSIGNED can fail directly, so the shortest route skips + # IN_PROGRESS: CREATED -> ASSIGNED -> FAILED. + assert transition_path(TaskStatus.CREATED, TaskStatus.FAILED) == ( + TaskStatus.ASSIGNED, + TaskStatus.FAILED, + ) + + def test_in_progress_to_failed_single_hop(self) -> None: + assert transition_path(TaskStatus.IN_PROGRESS, TaskStatus.FAILED) == ( + TaskStatus.FAILED, + ) + + def test_terminal_source_is_unreachable(self) -> None: + assert transition_path(TaskStatus.COMPLETED, TaskStatus.FAILED) is None + + def test_every_hop_is_individually_valid(self) -> None: + path = transition_path(TaskStatus.CREATED, TaskStatus.COMPLETED) + assert path is not None + cursor = TaskStatus.CREATED + for hop in path: + validate_transition(cursor, hop) # raises if any hop illegal + cursor = hop diff --git a/tests/unit/engine/test_coordination_group_builder.py b/tests/unit/engine/test_coordination_group_builder.py index c8f085fbf8..90f2568d3b 100644 --- a/tests/unit/engine/test_coordination_group_builder.py +++ b/tests/unit/engine/test_coordination_group_builder.py @@ -2,7 +2,7 @@ import pytest -from synthorg.core.enums import CoordinationTopology, TaskStructure +from synthorg.core.enums import CoordinationTopology, TaskStatus, TaskStructure from synthorg.engine.coordination.config import CoordinationConfig from synthorg.engine.coordination.group_builder import build_execution_waves from synthorg.engine.routing.models import ( @@ -59,6 +59,34 @@ def test_single_subtask_one_group(self) -> None: assert len(waves[0].assignments) == 1 assert waves[0].assignments[0].task.id == "sub-a" + @pytest.mark.unit + def test_subtask_promoted_to_assigned_for_routed_agent(self) -> None: + """Subtasks are promoted CREATED -> ASSIGNED bound to the agent. + + The execution engine only runs ASSIGNED/IN_PROGRESS tasks whose + ``assigned_to`` matches the running agent; without this + promotion every dispatched sub-agent would be rejected at the + engine seam. + """ + sub_a = make_subtask("sub-a") + decomp = make_decomposition((sub_a,)) + decision = _make_routing_decision("sub-a", "alice") + routing = RoutingResult( + parent_task_id="parent-1", + decisions=(decision,), + ) + + waves = build_execution_waves( + decomposition_result=decomp, + routing_result=routing, + config=CoordinationConfig(), + ) + + assignment = waves[0].assignments[0] + agent_id = str(decision.selected_candidate.agent_identity.id) + assert assignment.task.status is TaskStatus.ASSIGNED + assert assignment.task.assigned_to == agent_id + @pytest.mark.unit def test_two_independent_subtasks_one_wave(self) -> None: """Two independent subtasks produce 1 wave with 2 assignments.""" diff --git a/tests/unit/engine/test_coordination_section_config.py b/tests/unit/engine/test_coordination_section_config.py index 5b4dc101d9..99f4af7b2e 100644 --- a/tests/unit/engine/test_coordination_section_config.py +++ b/tests/unit/engine/test_coordination_section_config.py @@ -34,6 +34,31 @@ def test_default_base_branch_is_main(self) -> None: cfg = CoordinationSectionConfig() assert cfg.base_branch == "main" + def test_default_decomposition_model(self) -> None: + cfg = CoordinationSectionConfig() + assert cfg.decomposition_model == "example-medium-001" + + def test_custom_decomposition_model(self) -> None: + cfg = CoordinationSectionConfig(decomposition_model="example-large-001") + assert cfg.decomposition_model == "example-large-001" + + def test_decomposition_model_must_not_be_blank(self) -> None: + from pydantic import ValidationError + + with pytest.raises(ValidationError): + CoordinationSectionConfig(decomposition_model=" ") + + def test_decomposition_model_env_mirror( + self, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + monkeypatch.setenv( + "SYNTHORG_COORDINATION_DECOMPOSITION_MODEL", + "env-model-001", + ) + cfg = CoordinationSectionConfig() + assert cfg.decomposition_model == "env-model-001" + def test_default_auto_topology_rules(self) -> None: cfg = CoordinationSectionConfig() assert isinstance(cfg.auto_topology_rules, AutoTopologyConfig) diff --git a/tests/unit/engine/test_coordination_service.py b/tests/unit/engine/test_coordination_service.py index a55d89df5a..81894e7ba1 100644 --- a/tests/unit/engine/test_coordination_service.py +++ b/tests/unit/engine/test_coordination_service.py @@ -14,6 +14,7 @@ TaskStatus, TaskStructure, ) +from synthorg.core.task_transitions import transition_path from synthorg.engine.coordination.config import CoordinationConfig from synthorg.engine.coordination.models import ( CoordinationContext, @@ -345,6 +346,10 @@ async def test_task_engine_parent_update(self) -> None: agent_id = str(routing.decisions[0].selected_candidate.agent_identity.id) task_engine = AsyncMock() + # The parent is freshly CREATED, so the coordinator walks the + # full lifecycle (CREATED -> ASSIGNED -> IN_PROGRESS -> + # IN_REVIEW -> COMPLETED) to reach the COMPLETED rollup status. + task_engine.get_task.return_value = make_assignment_task(id="parent-1") task_engine.submit.return_value = TaskMutationResult( request_id="req-1", success=True, @@ -373,7 +378,14 @@ async def test_task_engine_parent_update(self) -> None: attributed = await coordinator.coordinate(ctx) assert attributed.is_success - task_engine.submit.assert_called_once() + # One submit per valid lifecycle hop to COMPLETED, in order. + expected = transition_path(TaskStatus.CREATED, TaskStatus.COMPLETED) + assert expected is not None + assert task_engine.submit.await_count == len(expected) + submitted = [ + call.args[0].target_status for call in task_engine.submit.await_args_list + ] + assert submitted == list(expected) @pytest.mark.unit async def test_no_task_engine_skips_update(self) -> None: @@ -606,6 +618,7 @@ async def test_update_parent_submit_fails(self) -> None: agent_id = str(routing.decisions[0].selected_candidate.agent_identity.id) task_engine = AsyncMock() + task_engine.get_task.return_value = make_assignment_task(id="parent-1") task_engine.submit.return_value = TaskMutationResult( request_id="req-1", success=False, @@ -643,6 +656,7 @@ async def test_update_parent_exception_captured(self) -> None: agent_id = str(routing.decisions[0].selected_candidate.agent_identity.id) task_engine = AsyncMock() + task_engine.get_task.return_value = make_assignment_task(id="parent-1") task_engine.submit.side_effect = RuntimeError("engine down") coordinator = _make_coordinator( diff --git a/tests/unit/engine/test_parent_rollup.py b/tests/unit/engine/test_parent_rollup.py new file mode 100644 index 0000000000..4b12c2dacb --- /dev/null +++ b/tests/unit/engine/test_parent_rollup.py @@ -0,0 +1,274 @@ +"""Tests for the parent-rollup lifecycle walk and phase wrapper.""" + +from unittest.mock import AsyncMock + +import pytest + +from synthorg.core.enums import TaskStatus +from synthorg.core.task_transitions import transition_path +from synthorg.engine.coordination.models import ( + CoordinationContext, + CoordinationPhaseResult, +) +from synthorg.engine.coordination.parent_rollup import ( + COORDINATOR_ACTOR, + advance_parent_to_rollup_status, + run_update_parent_phase, +) +from synthorg.engine.decomposition.models import SubtaskStatusRollup +from synthorg.engine.task_engine import TaskEngine +from synthorg.engine.task_engine_models import TaskMutationResult +from tests._shared import FakeClock, mock_of +from tests.unit.engine.conftest import ( + make_assignment_agent, + make_assignment_task, +) + +pytestmark = pytest.mark.unit + + +def _rollup( + *, + total: int, + completed: int = 0, + failed: int = 0, +) -> SubtaskStatusRollup: + """Build a rollup whose ``derived_parent_status`` is predictable.""" + return SubtaskStatusRollup( + parent_task_id="parent-1", + total=total, + completed=completed, + failed=failed, + in_progress=0, + blocked=0, + cancelled=0, + ) + + +def _ok_result() -> TaskMutationResult: + return TaskMutationResult(request_id="r", success=True, version=1) + + +def _fail_result(error: str) -> TaskMutationResult: + return TaskMutationResult( + request_id="r", + success=False, + error=error, + error_code="validation", + version=1, + ) + + +class TestAdvanceParentToRollupStatus: + """Unit coverage for ``advance_parent_to_rollup_status``.""" + + async def test_empty_path_is_noop_success(self) -> None: + """Parent already at the derived status: no submits, success.""" + task_engine = mock_of[TaskEngine](submit=AsyncMock()) + rollup = _rollup(total=1, completed=1) # derived COMPLETED + + outcome = await advance_parent_to_rollup_status( + task_engine, + task_id="parent-1", + current_status=TaskStatus.COMPLETED, + rollup=rollup, + ) + + assert outcome.success is True + assert outcome.hops_completed == 0 + assert outcome.error is None + task_engine.submit.assert_not_awaited() + + async def test_no_valid_path_returns_failure(self) -> None: + """Terminal parent that cannot reach the derived status.""" + task_engine = mock_of[TaskEngine](submit=AsyncMock()) + rollup = _rollup(total=1, failed=1) # derived FAILED + + outcome = await advance_parent_to_rollup_status( + task_engine, + task_id="parent-1", + current_status=TaskStatus.COMPLETED, + rollup=rollup, + ) + + assert outcome.success is False + assert outcome.hops_completed == 0 + assert outcome.error is not None + assert "no valid lifecycle path" in outcome.error + task_engine.submit.assert_not_awaited() + + async def test_full_lifecycle_submits_each_valid_hop(self) -> None: + """Each hop is submitted in lifecycle order with the sentinel.""" + expected = transition_path(TaskStatus.CREATED, TaskStatus.COMPLETED) + assert expected is not None + task_engine = mock_of[TaskEngine]( + submit=AsyncMock(return_value=_ok_result()), + ) + rollup = _rollup(total=1, completed=1) # derived COMPLETED + + outcome = await advance_parent_to_rollup_status( + task_engine, + task_id="parent-1", + current_status=TaskStatus.CREATED, + rollup=rollup, + ) + + assert outcome.success is True + assert outcome.hops_completed == len(expected) + mutations = [call.args[0] for call in task_engine.submit.await_args_list] + assert [m.target_status for m in mutations] == list(expected) + assert all(m.requested_by == COORDINATOR_ACTOR for m in mutations) + for mutation in mutations: + if mutation.target_status is TaskStatus.ASSIGNED: + assert mutation.overrides == {"assigned_to": COORDINATOR_ACTOR} + else: + assert mutation.overrides == {} + + async def test_partial_hop_failure_records_completed_hops(self) -> None: + """A mid-walk rejection stops, reports hops + diagnostic note.""" + task_engine = mock_of[TaskEngine]( + submit=AsyncMock( + side_effect=[ + _ok_result(), + _ok_result(), + _fail_result("transition not allowed"), + ], + ), + get_task=AsyncMock( + return_value=make_assignment_task( + id="parent-1", + status=TaskStatus.IN_PROGRESS, + assigned_to="coordinator", + ), + ), + ) + rollup = _rollup(total=1, completed=1) # derived COMPLETED + + outcome = await advance_parent_to_rollup_status( + task_engine, + task_id="parent-1", + current_status=TaskStatus.CREATED, + rollup=rollup, + ) + + assert outcome.success is False + assert outcome.hops_completed == 2 + assert outcome.error is not None + assert "transition not allowed" in outcome.error + # Diagnostic re-read surfaces the parent's actual live status. + assert "in_progress" in outcome.error + + async def test_hop_failure_note_falls_back_when_reread_raises( + self, + ) -> None: + """A re-read failure must not mask the original submit error.""" + task_engine = mock_of[TaskEngine]( + submit=AsyncMock(return_value=_fail_result("rejected")), + get_task=AsyncMock(side_effect=RuntimeError("db down")), + ) + rollup = _rollup(total=1, completed=1) + + outcome = await advance_parent_to_rollup_status( + task_engine, + task_id="parent-1", + current_status=TaskStatus.CREATED, + rollup=rollup, + ) + + assert outcome.success is False + assert outcome.error is not None + assert "rejected" in outcome.error + assert "parent now" not in outcome.error + + +class TestRunUpdateParentPhase: + """Unit coverage for the ``run_update_parent_phase`` wrapper.""" + + def _context(self) -> CoordinationContext: + return CoordinationContext( + task=make_assignment_task(id="parent-1"), + available_agents=(make_assignment_agent("alice"),), + ) + + async def test_no_task_engine_is_noop(self) -> None: + """No task engine wired: nothing recorded.""" + phases: list[CoordinationPhaseResult] = [] + await run_update_parent_phase( + task_engine=None, + clock=FakeClock(), + context=self._context(), + rollup=_rollup(total=1, completed=1), + phases=phases, + ) + assert phases == [] + + async def test_rollup_none_records_failed_phase(self) -> None: + """Missing rollup records a failed update_parent phase.""" + phases: list[CoordinationPhaseResult] = [] + await run_update_parent_phase( + task_engine=mock_of[TaskEngine](), + clock=FakeClock(), + context=self._context(), + rollup=None, + phases=phases, + ) + assert len(phases) == 1 + assert phases[0].phase == "update_parent" + assert phases[0].success is False + assert phases[0].error is not None + assert "rollup is None" in phases[0].error + + async def test_parent_not_found_records_failed_phase(self) -> None: + """A missing live parent records a failed phase, no raise.""" + phases: list[CoordinationPhaseResult] = [] + await run_update_parent_phase( + task_engine=mock_of[TaskEngine]( + get_task=AsyncMock(return_value=None), + ), + clock=FakeClock(), + context=self._context(), + rollup=_rollup(total=1, completed=1), + phases=phases, + ) + assert len(phases) == 1 + assert phases[0].success is False + assert phases[0].error is not None + assert "not found" in phases[0].error + + async def test_get_task_exception_is_captured_not_raised(self) -> None: + """A TaskEngine exception is captured as a failed phase.""" + phases: list[CoordinationPhaseResult] = [] + await run_update_parent_phase( + task_engine=mock_of[TaskEngine]( + get_task=AsyncMock(side_effect=RuntimeError("engine down")), + ), + clock=FakeClock(), + context=self._context(), + rollup=_rollup(total=1, completed=1), + phases=phases, + ) + assert len(phases) == 1 + assert phases[0].success is False + assert phases[0].error is not None + assert "engine down" in phases[0].error + + async def test_happy_path_records_success_with_hops(self) -> None: + """A successful walk records a successful phase.""" + expected = transition_path(TaskStatus.CREATED, TaskStatus.COMPLETED) + assert expected is not None + phases: list[CoordinationPhaseResult] = [] + await run_update_parent_phase( + task_engine=mock_of[TaskEngine]( + get_task=AsyncMock( + return_value=make_assignment_task(id="parent-1"), + ), + submit=AsyncMock(return_value=_ok_result()), + ), + clock=FakeClock(), + context=self._context(), + rollup=_rollup(total=1, completed=1), + phases=phases, + ) + assert len(phases) == 1 + assert phases[0].success is True + assert phases[0].error is None diff --git a/tests/unit/settings/test_coordination_decomposition_model.py b/tests/unit/settings/test_coordination_decomposition_model.py new file mode 100644 index 0000000000..b65f8e30ab --- /dev/null +++ b/tests/unit/settings/test_coordination_decomposition_model.py @@ -0,0 +1,71 @@ +"""Registry coverage for ``coordination.decomposition_model``. + +The coordinator's LLM decomposition strategy resolves its model id +from this Cat-1 setting at boot (DB > env > code default). It is a +plain mutable string entry: a runtime change applies on the next +coordinator rebuild (provider re-init). +""" + +from unittest.mock import AsyncMock + +import pytest + +from synthorg.persistence.settings_protocol import SettingsRepository +from synthorg.settings import definitions as _settings_definitions # noqa: F401 +from synthorg.settings.registry import get_registry +from synthorg.settings.service import SettingsService + +pytestmark = pytest.mark.unit + + +@pytest.fixture +def service() -> SettingsService: + repo = AsyncMock(spec=SettingsRepository) + repo.get = AsyncMock(return_value=None) + repo.get_namespace = AsyncMock(return_value=()) + repo.list_items = AsyncMock(return_value=()) + repo.save = AsyncMock(return_value=True) + return SettingsService( + repository=repo, + registry=get_registry(), + ) + + +def test_decomposition_model_registered_mutable() -> None: + defn = get_registry().get("coordination", "decomposition_model") + assert defn is not None + assert defn.default == "example-medium-001" + assert defn.read_only_post_init is False + assert defn.restart_required is False + + +async def test_decomposition_model_falls_back_to_default( + service: SettingsService, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv( + "SYNTHORG_COORDINATION_DECOMPOSITION_MODEL", + raising=False, + ) + value = await service.get("coordination", "decomposition_model") + assert value.value == "example-medium-001" + + +async def test_decomposition_model_resolves_through_env( + service: SettingsService, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv( + "SYNTHORG_COORDINATION_DECOMPOSITION_MODEL", + "env-model-001", + ) + value = await service.get("coordination", "decomposition_model") + assert value.value == "env-model-001" + + +async def test_decomposition_model_set_succeeds( + service: SettingsService, +) -> None: + repo: AsyncMock = service._repository # type: ignore[assignment] + await service.set("coordination", "decomposition_model", "example-large-001") + repo.save.assert_awaited_once() diff --git a/tests/unit/workers/test_runtime_builder.py b/tests/unit/workers/test_runtime_builder.py index 2386443f54..ae8d6a3893 100644 --- a/tests/unit/workers/test_runtime_builder.py +++ b/tests/unit/workers/test_runtime_builder.py @@ -1,6 +1,7 @@ -"""Unit tests for the provider-present worker-execution-service switch.""" +"""Unit tests for the provider-present runtime-services switch.""" from pathlib import Path +from typing import cast from unittest.mock import AsyncMock import pytest @@ -8,33 +9,86 @@ from synthorg.api.state import AppState from synthorg.config.provider_schema import ProviderConfig from synthorg.config.schema import RootConfig +from synthorg.engine.coordination.service import MultiAgentCoordinator from synthorg.engine.task_engine import TaskEngine from synthorg.hr.registry import AgentRegistryService from synthorg.providers.registry import ProviderRegistry +from synthorg.settings.bridge_configs import EngineBridgeConfig from synthorg.settings.resolver import ConfigResolver from synthorg.workers.execution_service import ( AgentEngineExecutionService, NoProviderExecutionService, ) -from synthorg.workers.runtime_builder import build_worker_execution_service +from synthorg.workers.runtime_builder import ( + RuntimeServices, + build_runtime_services, +) from tests._shared import FakeClock, mock_of pytestmark = pytest.mark.unit +def _provider_app_state( + registry: ProviderRegistry, + workspace: Path, + *, + bridge_config_error: Exception | None = None, +) -> AppState: + """Build a mocked AppState for the provider-present path. + + ``bridge_config_error`` makes ``get_engine_bridge_config`` raise, to + exercise the fail-open routing-scorer-config resolve branch. + """ + if bridge_config_error is None: + bridge_mock = AsyncMock(return_value=EngineBridgeConfig()) + else: + bridge_mock = AsyncMock(side_effect=bridge_config_error) + # ``mock_of[T](...)`` is ``Any`` by design; cast back to the spec so + # the helper keeps a precise signature for its callers. + return cast( + "AppState", + mock_of[AppState]( + has_active_provider=True, + provider_registry=registry, + config=RootConfig(company_name="test-corp"), + config_resolver=mock_of[ConfigResolver]( + get_float=AsyncMock(return_value=30.0), + get_str=AsyncMock(return_value="example-medium-001"), + get_engine_bridge_config=bridge_mock, + ), + task_engine=mock_of[TaskEngine](), + agent_registry=AgentRegistryService(), + approval_store=None, + clock=FakeClock(), + event_stream_hub=None, + interrupt_store=None, + agent_workspace_root=workspace, + has_cost_tracker=False, + has_audit_log=False, + has_memory_backend=False, + has_performance_tracker=False, + ), + ) + + class TestProviderPresentSwitch: - async def test_no_provider_returns_no_provider_service( + async def test_no_provider_returns_no_provider_runtime( self, tmp_path: Path, ) -> None: app_state = mock_of[AppState](has_active_provider=False) - service = await build_worker_execution_service( + result = await build_runtime_services( app_state, workspace_root=tmp_path, ) - assert isinstance(service, NoProviderExecutionService) + assert isinstance(result, RuntimeServices) + assert isinstance( + result.worker_execution_service, + NoProviderExecutionService, + ) + assert result.coordinator is None - async def test_empty_registry_returns_no_provider_service( + async def test_empty_registry_returns_no_provider_runtime( self, tmp_path: Path, ) -> None: @@ -42,41 +96,87 @@ async def test_empty_registry_returns_no_provider_service( has_active_provider=True, provider_registry=ProviderRegistry({}), ) - service = await build_worker_execution_service( + result = await build_runtime_services( app_state, workspace_root=tmp_path, ) - assert isinstance(service, NoProviderExecutionService) + assert isinstance( + result.worker_execution_service, + NoProviderExecutionService, + ) + assert result.coordinator is None - async def test_provider_present_returns_agent_engine_service( + async def test_provider_present_returns_runtime_pair( self, tmp_path: Path, ) -> None: registry = ProviderRegistry.from_config( {"test-provider": ProviderConfig(driver="scripted")} ) - app_state = mock_of[AppState]( - has_active_provider=True, - provider_registry=registry, - config=RootConfig(company_name="test-corp"), - config_resolver=mock_of[ConfigResolver]( - get_float=AsyncMock(return_value=30.0), - ), - task_engine=mock_of[TaskEngine](), - agent_registry=AgentRegistryService(), - approval_store=None, - clock=FakeClock(), - event_stream_hub=None, - interrupt_store=None, - has_cost_tracker=False, - has_audit_log=False, - has_memory_backend=False, + app_state = _provider_app_state(registry, tmp_path) + + result = await build_runtime_services( + app_state, + workspace_root=tmp_path, + ) + + assert isinstance( + result.worker_execution_service, + AgentEngineExecutionService, + ) + assert isinstance(result.coordinator, MultiAgentCoordinator) + + async def test_worker_and_coordinator_share_one_engine( + self, + tmp_path: Path, + ) -> None: + registry = ProviderRegistry.from_config( + {"test-provider": ProviderConfig(driver="scripted")} + ) + app_state = _provider_app_state(registry, tmp_path) + + result = await build_runtime_services( + app_state, + workspace_root=tmp_path, ) - service = await build_worker_execution_service( + + worker = result.worker_execution_service + coordinator = result.coordinator + assert isinstance(worker, AgentEngineExecutionService) + assert coordinator is not None + # The coordinator's parallel executor must run sub-agents on the + # exact same boot AgentEngine as the worker execute seam. + assert coordinator._parallel_executor._engine is worker._engine + + async def test_scorer_config_resolve_failure_is_fail_open( + self, + tmp_path: Path, + ) -> None: + """A bridge-config resolve failure must not break the build. + + ``_resolve_routing_scorer_config`` fails open (returns ``None``) + so the coordinator is still built; the factory falls back to + ``task_assignment_config.min_score``. + """ + registry = ProviderRegistry.from_config( + {"test-provider": ProviderConfig(driver="scripted")} + ) + app_state = _provider_app_state( + registry, + tmp_path, + bridge_config_error=RuntimeError("settings backend down"), + ) + + result = await build_runtime_services( app_state, workspace_root=tmp_path, ) - assert isinstance(service, AgentEngineExecutionService) + + assert isinstance( + result.worker_execution_service, + AgentEngineExecutionService, + ) + assert isinstance(result.coordinator, MultiAgentCoordinator) async def test_multiple_providers_warns_and_selects_first( self, @@ -88,30 +188,18 @@ async def test_multiple_providers_warns_and_selects_first( "test-provider-2": ProviderConfig(driver="scripted"), } ) - app_state = mock_of[AppState]( - has_active_provider=True, - provider_registry=registry, - config=RootConfig(company_name="test-corp"), - config_resolver=mock_of[ConfigResolver]( - get_float=AsyncMock(return_value=30.0), - ), - task_engine=mock_of[TaskEngine](), - agent_registry=AgentRegistryService(), - approval_store=None, - clock=FakeClock(), - event_stream_hub=None, - interrupt_store=None, - has_cost_tracker=False, - has_audit_log=False, - has_memory_backend=False, - ) + app_state = _provider_app_state(registry, tmp_path) # Exercises the >1-provider branch (warning + first-provider # selection); it must still build a live runtime, not reject. - service = await build_worker_execution_service( + result = await build_runtime_services( app_state, workspace_root=tmp_path, ) - assert isinstance(service, AgentEngineExecutionService) + assert isinstance( + result.worker_execution_service, + AgentEngineExecutionService, + ) + assert isinstance(result.coordinator, MultiAgentCoordinator) async def test_builds_nonexistent_deep_workspace_path( self, @@ -120,30 +208,18 @@ async def test_builds_nonexistent_deep_workspace_path( registry = ProviderRegistry.from_config( {"test-provider": ProviderConfig(driver="scripted")} ) - app_state = mock_of[AppState]( - has_active_provider=True, - provider_registry=registry, - config=RootConfig(company_name="test-corp"), - config_resolver=mock_of[ConfigResolver]( - get_float=AsyncMock(return_value=30.0), - ), - task_engine=mock_of[TaskEngine](), - agent_registry=AgentRegistryService(), - approval_store=None, - clock=FakeClock(), - event_stream_hub=None, - interrupt_store=None, - has_cost_tracker=False, - has_audit_log=False, - has_memory_backend=False, - ) deep = tmp_path / "missing" / "agent" / "workspace" + app_state = _provider_app_state(registry, deep) assert not deep.exists() - service = await build_worker_execution_service( + result = await build_runtime_services( app_state, workspace_root=deep, ) - assert isinstance(service, AgentEngineExecutionService) + assert isinstance( + result.worker_execution_service, + AgentEngineExecutionService, + ) + assert isinstance(result.coordinator, MultiAgentCoordinator) assert deep.is_dir()