diff --git a/CLAUDE.md b/CLAUDE.md index e6dbbea9cf..d31e8a3154 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -98,13 +98,13 @@ curl http://localhost:3000/api/v1/health # backend (via web proxy) ```text src/ai_company/ - api/ # Litestar REST + WebSocket API (controllers, guards, channels, JWT + API key auth, approval gate integration) + api/ # Litestar REST + WebSocket API (controllers, guards, channels, JWT + API key auth, approval gate integration, coordination endpoint) budget/ # Cost tracking, budget enforcement (pre-flight/in-flight checks, auto-downgrade), billing periods, cost tiers, quota/subscription tracking, CFO cost optimization (anomaly detection, efficiency analysis, downgrade recommendations, approval decisions), spending reports, budget errors (BudgetExhaustedError, DailyLimitExceededError, QuotaExhaustedError) cli/ # CLI interface (future — thin API wrapper if needed) communication/ # Message bus, dispatcher, messenger, channels, delegation, loop prevention, conflict resolution, meeting protocol config/ # YAML company config loading and validation core/ # Shared domain models, base classes, and resilience config (RetryConfig, RateLimiterConfig) - engine/ # Agent orchestration, execution loops, parallel execution, task decomposition, routing, task assignment, centralized single-writer task state engine (TaskEngine), task lifecycle, recovery, shutdown, workspace isolation, coordination (multi-agent pipeline: TopologyDispatcher protocol, 4 dispatchers — SAS/centralized/decentralized/context-dependent, wave execution, workspace lifecycle integration), coordination error classification, prompt policy validation, checkpoint recovery (checkpoint/, per-turn persistence, heartbeat detection, CheckpointRecoveryStrategy), approval gate (escalation detection, context parking/resume, EscalationInfo/ResumePayload models) + engine/ # Agent orchestration, execution loops, parallel execution, task decomposition, routing, task assignment, centralized single-writer task state engine (TaskEngine), task lifecycle, recovery, shutdown, workspace isolation, coordination (multi-agent pipeline: TopologyDispatcher protocol, 4 dispatchers — SAS/centralized/decentralized/context-dependent, wave execution, workspace lifecycle integration, CoordinationSectionConfig company config bridge, build_coordinator factory), coordination error classification, prompt policy validation, checkpoint recovery (checkpoint/, per-turn persistence, heartbeat detection, CheckpointRecoveryStrategy), approval gate (escalation detection, context parking/resume, EscalationInfo/ResumePayload models) hr/ # HR engine: hiring, firing, onboarding, offboarding, agent registry, performance tracking (task metrics, collaboration scoring, trend detection), promotion/demotion (criteria evaluation, approval strategies, model mapping) memory/ # Persistent agent memory (pluggable MemoryBackend protocol), backends/ (Mem0 adapter: backends/mem0/), retrieval pipeline (ranking, injection, context formatting, non-inferable filtering), shared org memory (org/), consolidation/archival (consolidation/) persistence/ # Operational data persistence — pluggable PersistenceBackend protocol, SQLite initial (see Memory & Persistence design page) @@ -151,7 +151,7 @@ web/ # Vue 3 + PrimeVue + Tailwind CSS dashboard - **Every module** with business logic MUST have: `from ai_company.observability import get_logger` then `logger = get_logger(__name__)` - **Never** use `import logging` / `logging.getLogger()` / `print()` in application code - **Variable name**: always `logger` (not `_logger`, not `log`) -- **Event names**: always use constants from the domain-specific module under `ai_company.observability.events` (e.g. `PROVIDER_CALL_START` from `events.provider`, `BUDGET_RECORD_ADDED` from `events.budget`, `CFO_ANOMALY_DETECTED` from `events.cfo`, `CONFLICT_DETECTED` from `events.conflict`, `MEETING_STARTED` from `events.meeting`, `CLASSIFICATION_START` from `events.classification`, `CONSOLIDATION_START` from `events.consolidation`, `ORG_MEMORY_QUERY_START` from `events.org_memory`, `API_REQUEST_STARTED` from `events.api`, `API_ROUTE_NOT_FOUND` from `events.api`, `CODE_RUNNER_EXECUTE_START` from `events.code_runner`, `DOCKER_EXECUTE_START` from `events.docker`, `MCP_INVOKE_START` from `events.mcp`, `SECURITY_EVALUATE_START` from `events.security`, `HR_HIRING_REQUEST_CREATED` from `events.hr`, `PERF_METRIC_RECORDED` from `events.performance`, `TRUST_EVALUATE_START` from `events.trust`, `PROMOTION_EVALUATE_START` from `events.promotion`, `PROMPT_BUILD_START` from `events.prompt`, `MEMORY_RETRIEVAL_START` from `events.memory`, `MEMORY_BACKEND_CONNECTED` from `events.memory`, `MEMORY_ENTRY_STORED` from `events.memory`, `MEMORY_BACKEND_SYSTEM_ERROR` from `events.memory`, `AUTONOMY_ACTION_AUTO_APPROVED` from `events.autonomy`, `TIMEOUT_POLICY_EVALUATED` from `events.timeout`, `PERSISTENCE_AUDIT_ENTRY_SAVED` from `events.persistence`, `TASK_ENGINE_STARTED` from `events.task_engine`, `COORDINATION_STARTED` from `events.coordination`, `COMMUNICATION_DISPATCH_START` from `events.communication`, `COMPANY_STARTED` from `events.company`, `CONFIG_LOADED` from `events.config`, `CORRELATION_ID_CREATED` from `events.correlation`, `DECOMPOSITION_STARTED` from `events.decomposition`, `DELEGATION_STARTED` from `events.delegation`, `EXECUTION_LOOP_START` from `events.execution`, `CHECKPOINT_SAVED` from `events.checkpoint`, `PERSISTENCE_CHECKPOINT_SAVED` from `events.persistence`, `GIT_OPERATION_START` from `events.git`, `PARALLEL_GROUP_START` from `events.parallel`, `PERSONALITY_LOADED` from `events.personality`, `QUOTA_CHECKED` from `events.quota`, `ROLE_ASSIGNED` from `events.role`, `ROUTING_STARTED` from `events.routing`, `SANDBOX_EXECUTE_START` from `events.sandbox`, `TASK_CREATED` from `events.task`, `TASK_ASSIGNMENT_STARTED` from `events.task_assignment`, `TASK_ROUTING_STARTED` from `events.task_routing`, `TEMPLATE_LOADED` from `events.template`, `TOOL_INVOKE_START` from `events.tool`, `TOOL_OUTPUT_WITHHELD` from `events.tool`, `WORKSPACE_CREATED` from `events.workspace`, `APPROVAL_GATE_ESCALATION_DETECTED` from `events.approval_gate`, `APPROVAL_GATE_ESCALATION_FAILED` from `events.approval_gate`, `APPROVAL_GATE_INITIALIZED` from `events.approval_gate`, `APPROVAL_GATE_RISK_CLASSIFIED` from `events.approval_gate`, `APPROVAL_GATE_RISK_CLASSIFY_FAILED` from `events.approval_gate`, `APPROVAL_GATE_CONTEXT_PARKED` from `events.approval_gate`, `APPROVAL_GATE_CONTEXT_PARK_FAILED` from `events.approval_gate`, `APPROVAL_GATE_PARK_TASKLESS` from `events.approval_gate`, `APPROVAL_GATE_RESUME_STARTED` from `events.approval_gate`, `APPROVAL_GATE_CONTEXT_RESUMED` from `events.approval_gate`, `APPROVAL_GATE_RESUME_FAILED` from `events.approval_gate`, `APPROVAL_GATE_RESUME_DELETE_FAILED` from `events.approval_gate`, `APPROVAL_GATE_RESUME_TRIGGERED` from `events.approval_gate`, `APPROVAL_GATE_NO_PARKED_CONTEXT` from `events.approval_gate`, `APPROVAL_GATE_LOOP_WIRING_WARNING` from `events.approval_gate`). Import directly: `from ai_company.observability.events. import EVENT_CONSTANT` +- **Event names**: always use constants from the domain-specific module under `ai_company.observability.events` (e.g., `PROVIDER_CALL_START` from `events.provider`, `BUDGET_RECORD_ADDED` from `events.budget`, `CFO_ANOMALY_DETECTED` from `events.cfo`, `CONFLICT_DETECTED` from `events.conflict`, `MEETING_STARTED` from `events.meeting`, `CLASSIFICATION_START` from `events.classification`, `CONSOLIDATION_START` from `events.consolidation`, `ORG_MEMORY_QUERY_START` from `events.org_memory`, `API_REQUEST_STARTED` from `events.api`, `API_ROUTE_NOT_FOUND` from `events.api`, `API_COORDINATION_STARTED` from `events.api`, `API_COORDINATION_COMPLETED` from `events.api`, `API_COORDINATION_FAILED` from `events.api`, `API_COORDINATION_AGENT_RESOLVE_FAILED` from `events.api`, `CODE_RUNNER_EXECUTE_START` from `events.code_runner`, `DOCKER_EXECUTE_START` from `events.docker`, `MCP_INVOKE_START` from `events.mcp`, `SECURITY_EVALUATE_START` from `events.security`, `HR_HIRING_REQUEST_CREATED` from `events.hr`, `PERF_METRIC_RECORDED` from `events.performance`, `TRUST_EVALUATE_START` from `events.trust`, `PROMOTION_EVALUATE_START` from `events.promotion`, `PROMPT_BUILD_START` from `events.prompt`, `MEMORY_RETRIEVAL_START` from `events.memory`, `MEMORY_BACKEND_CONNECTED` from `events.memory`, `MEMORY_ENTRY_STORED` from `events.memory`, `MEMORY_BACKEND_SYSTEM_ERROR` from `events.memory`, `AUTONOMY_ACTION_AUTO_APPROVED` from `events.autonomy`, `TIMEOUT_POLICY_EVALUATED` from `events.timeout`, `PERSISTENCE_AUDIT_ENTRY_SAVED` from `events.persistence`, `TASK_ENGINE_STARTED` from `events.task_engine`, `COORDINATION_STARTED` from `events.coordination`, `COORDINATION_FACTORY_BUILT` from `events.coordination`, `COMMUNICATION_DISPATCH_START` from `events.communication`, `COMPANY_STARTED` from `events.company`, `CONFIG_LOADED` from `events.config`, `CORRELATION_ID_CREATED` from `events.correlation`, `DECOMPOSITION_STARTED` from `events.decomposition`, `DELEGATION_STARTED` from `events.delegation`, `EXECUTION_LOOP_START` from `events.execution`, `CHECKPOINT_SAVED` from `events.checkpoint`, `PERSISTENCE_CHECKPOINT_SAVED` from `events.persistence`, `GIT_OPERATION_START` from `events.git`, `PARALLEL_GROUP_START` from `events.parallel`, `PERSONALITY_LOADED` from `events.personality`, `QUOTA_CHECKED` from `events.quota`, `ROLE_ASSIGNED` from `events.role`, `ROUTING_STARTED` from `events.routing`, `SANDBOX_EXECUTE_START` from `events.sandbox`, `TASK_CREATED` from `events.task`, `TASK_ASSIGNMENT_STARTED` from `events.task_assignment`, `TASK_ROUTING_STARTED` from `events.task_routing`, `TEMPLATE_LOADED` from `events.template`, `TOOL_INVOKE_START` from `events.tool`, `TOOL_OUTPUT_WITHHELD` from `events.tool`, `WORKSPACE_CREATED` from `events.workspace`, `APPROVAL_GATE_ESCALATION_DETECTED` from `events.approval_gate`, `APPROVAL_GATE_ESCALATION_FAILED` from `events.approval_gate`, `APPROVAL_GATE_INITIALIZED` from `events.approval_gate`, `APPROVAL_GATE_RISK_CLASSIFIED` from `events.approval_gate`, `APPROVAL_GATE_RISK_CLASSIFY_FAILED` from `events.approval_gate`, `APPROVAL_GATE_CONTEXT_PARKED` from `events.approval_gate`, `APPROVAL_GATE_CONTEXT_PARK_FAILED` from `events.approval_gate`, `APPROVAL_GATE_PARK_TASKLESS` from `events.approval_gate`, `APPROVAL_GATE_RESUME_STARTED` from `events.approval_gate`, `APPROVAL_GATE_CONTEXT_RESUMED` from `events.approval_gate`, `APPROVAL_GATE_RESUME_FAILED` from `events.approval_gate`, `APPROVAL_GATE_RESUME_DELETE_FAILED` from `events.approval_gate`, `APPROVAL_GATE_RESUME_TRIGGERED` from `events.approval_gate`, `APPROVAL_GATE_NO_PARKED_CONTEXT` from `events.approval_gate`, `APPROVAL_GATE_LOOP_WIRING_WARNING` from `events.approval_gate`). Import directly: `from ai_company.observability.events. import EVENT_CONSTANT` - **Structured kwargs**: always `logger.info(EVENT, key=value)` — never `logger.info("msg %s", val)` - **All error paths** must log at WARNING or ERROR with context before raising - **All state transitions** must log at INFO diff --git a/docs/design/engine.md b/docs/design/engine.md index 59a94ce37a..959b8305ab 100644 --- a/docs/design/engine.md +++ b/docs/design/engine.md @@ -408,6 +408,9 @@ All loop implementations satisfy the `ExecutionLoop` runtime-checkable protocol: composes the execution loop with prompt construction, context management, tool invocation, and cost tracking into a single `run()` call. +The engine also exposes an optional ``coordinate()`` method that delegates to a +``MultiAgentCoordinator`` when one is configured (see :doc:`coordination`). + **Signature:** ```python @@ -787,6 +790,10 @@ coordination: parallel_default: "centralized" # mixed tasks -> SAS backbone for sequential phases, delegates parallel sub-tasks mixed_default: "context_dependent" # hybrid: not a single topology -- engine selects per-phase + max_concurrency_per_wave: null # None = unlimited + fail_fast: false + enable_workspace_isolation: true + base_branch: main ``` The auto-selector uses task structure, artifact count, and (when available from diff --git a/docs/design/operations.md b/docs/design/operations.md index 8404cf1e6d..c2c1200cd0 100644 --- a/docs/design/operations.md +++ b/docs/design/operations.md @@ -964,6 +964,7 @@ future CLI tool are thin clients that call the API -- they contain no business l | `/api/v1/departments` | Department management | | `/api/v1/projects` | Project CRUD | | `/api/v1/tasks` | Task management | +| `POST /api/v1/tasks/{task_id}/coordinate` | Trigger multi-agent coordination | | `/api/v1/messages` | Communication log | | `/api/v1/meetings` | Schedule, view meeting outputs | | `/api/v1/artifacts` | Browse produced artifacts (code, docs, etc.) | diff --git a/docs/roadmap/index.md b/docs/roadmap/index.md index ece00602e2..bae1fb4c45 100644 --- a/docs/roadmap/index.md +++ b/docs/roadmap/index.md @@ -6,7 +6,7 @@ The SynthOrg core framework is complete. The following subsystems are built and - Provider abstraction layer (LiteLLM adapter, routing, resilience) - Budget and cost management (tracking, enforcement, CFO optimization, quotas) -- Agent engine (execution loops, parallel execution, task decomposition, routing, assignment, recovery, shutdown) +- Agent engine (execution loops, parallel execution, task decomposition, routing, assignment, recovery, shutdown, multi-agent coordination) - Communication layer (message bus, delegation, loop prevention, conflict resolution, meeting protocol) - Memory system (pluggable backend protocol, Mem0 adapter, retrieval pipeline, shared org memory, consolidation) - Security and approval system (rule engine, output scanning, progressive trust, autonomy levels, timeout policies) diff --git a/src/ai_company/api/app.py b/src/ai_company/api/app.py index e849eaf9c4..9b7985e9d7 100644 --- a/src/ai_company/api/app.py +++ b/src/ai_company/api/app.py @@ -35,7 +35,9 @@ from ai_company.communication.bus_protocol import MessageBus # noqa: TC001 from ai_company.config.schema import RootConfig from ai_company.core.approval import ApprovalItem # noqa: TC001 +from ai_company.engine.coordination.service import MultiAgentCoordinator # noqa: TC001 from ai_company.engine.task_engine import TaskEngine # noqa: TC001 +from ai_company.hr.registry import AgentRegistryService # noqa: TC001 from ai_company.observability import get_logger from ai_company.observability.events.api import ( API_APP_SHUTDOWN, @@ -355,6 +357,8 @@ def create_app( # noqa: PLR0913 approval_store: ApprovalStore | None = None, auth_service: AuthService | None = None, task_engine: TaskEngine | None = None, + coordinator: MultiAgentCoordinator | None = None, + agent_registry: AgentRegistryService | None = None, ) -> Litestar: """Create and configure the Litestar application. @@ -369,6 +373,8 @@ def create_app( # noqa: PLR0913 approval_store: Approval queue store. auth_service: Pre-built auth service (for testing). task_engine: Centralized task state engine. + coordinator: Multi-agent coordinator. + agent_registry: Agent registry service. Returns: Configured Litestar application. @@ -403,6 +409,8 @@ def create_app( # noqa: PLR0913 approval_store=effective_approval_store, auth_service=auth_service, task_engine=task_engine, + coordinator=coordinator, + agent_registry=agent_registry, startup_time=time.monotonic(), ) diff --git a/src/ai_company/api/channels.py b/src/ai_company/api/channels.py index 7104a9bad2..6d42b62756 100644 --- a/src/ai_company/api/channels.py +++ b/src/ai_company/api/channels.py @@ -4,6 +4,8 @@ creates the Litestar ``ChannelsPlugin`` with an in-memory backend. """ +from typing import TYPE_CHECKING, Any + from litestar.channels import ChannelsPlugin from litestar.channels.backends.memory import MemoryChannelsBackend @@ -24,6 +26,27 @@ ) +if TYPE_CHECKING: + from litestar import Request + + +def get_channels_plugin( + request: Request[Any, Any, Any], +) -> ChannelsPlugin | None: + """Extract the ``ChannelsPlugin`` from the application, or ``None``. + + Args: + request: The incoming Litestar request. + + Returns: + The ``ChannelsPlugin`` instance if registered, otherwise ``None``. + """ + for plugin in request.app.plugins: + if isinstance(plugin, ChannelsPlugin): + return plugin + return None + + def create_channels_plugin() -> ChannelsPlugin: """Create the channels plugin with in-memory backend. diff --git a/src/ai_company/api/controllers/__init__.py b/src/ai_company/api/controllers/__init__.py index 575f6ee8b4..7a16f84903 100644 --- a/src/ai_company/api/controllers/__init__.py +++ b/src/ai_company/api/controllers/__init__.py @@ -10,6 +10,7 @@ from ai_company.api.controllers.autonomy import AutonomyController from ai_company.api.controllers.budget import BudgetController from ai_company.api.controllers.company import CompanyController +from ai_company.api.controllers.coordination import CoordinationController from ai_company.api.controllers.departments import DepartmentController from ai_company.api.controllers.health import HealthController from ai_company.api.controllers.meetings import MeetingController @@ -35,6 +36,7 @@ ApprovalsController, AutonomyController, AuthController, + CoordinationController, ) __all__ = [ @@ -48,6 +50,7 @@ "BudgetController", "CompanyController", "Controller", + "CoordinationController", "DepartmentController", "HealthController", "MeetingController", diff --git a/src/ai_company/api/controllers/approvals.py b/src/ai_company/api/controllers/approvals.py index 3c96f83d4e..ca81b9a949 100644 --- a/src/ai_company/api/controllers/approvals.py +++ b/src/ai_company/api/controllers/approvals.py @@ -5,11 +5,11 @@ from uuid import uuid4 from litestar import Controller, Request, get, post -from litestar.channels import ChannelsPlugin +from litestar.channels import ChannelsPlugin # noqa: TC002 from litestar.datastructures import State # noqa: TC002 from ai_company.api.auth.models import AuthenticatedUser -from ai_company.api.channels import CHANNEL_APPROVALS +from ai_company.api.channels import CHANNEL_APPROVALS, get_channels_plugin from ai_company.api.dto import ( ApiResponse, ApproveRequest, @@ -44,7 +44,7 @@ logger = get_logger(__name__) -def _get_channels_plugin( +def _require_channels_plugin( request: Request[Any, Any, Any], ) -> ChannelsPlugin: """Extract the ChannelsPlugin from the application. @@ -58,12 +58,12 @@ def _get_channels_plugin( Raises: RuntimeError: If no ChannelsPlugin is registered on the app. """ - for plugin in request.app.plugins: - if isinstance(plugin, ChannelsPlugin): - return plugin - msg = "ChannelsPlugin not registered" - logger.error(API_APPROVAL_PUBLISH_FAILED, error=msg) - raise RuntimeError(msg) + plugin = get_channels_plugin(request) + if plugin is None: + msg = "ChannelsPlugin not registered" + logger.error(API_APPROVAL_PUBLISH_FAILED, error=msg) + raise RuntimeError(msg) + return plugin def _publish_approval_event( @@ -93,7 +93,7 @@ def _publish_approval_event( }, ) try: - channels_plugin = _get_channels_plugin(request) + channels_plugin = _require_channels_plugin(request) channels_plugin.publish( event.model_dump_json(), channels=[CHANNEL_APPROVALS], diff --git a/src/ai_company/api/controllers/coordination.py b/src/ai_company/api/controllers/coordination.py new file mode 100644 index 0000000000..fbe6f52a46 --- /dev/null +++ b/src/ai_company/api/controllers/coordination.py @@ -0,0 +1,342 @@ +"""Coordination controller — multi-agent coordination endpoint.""" + +import asyncio +from datetime import UTC, datetime +from typing import TYPE_CHECKING, Any + +from litestar import Controller, Request, post +from litestar.datastructures import State # noqa: TC002 + +from ai_company.api.channels import CHANNEL_TASKS, get_channels_plugin +from ai_company.api.dto import ( + ApiResponse, + CoordinateTaskRequest, + CoordinationPhaseResponse, + CoordinationResultResponse, +) +from ai_company.api.errors import ( + ApiValidationError, + NotFoundError, + ServiceUnavailableError, +) +from ai_company.api.guards import require_write_access +from ai_company.api.ws_models import WsEvent, WsEventType +from ai_company.engine.coordination.models import ( + CoordinationContext, + CoordinationResult, +) +from ai_company.engine.errors import CoordinationPhaseError +from ai_company.observability import get_logger +from ai_company.observability.events.api import ( + API_COORDINATION_AGENT_RESOLVE_FAILED, + API_COORDINATION_COMPLETED, + API_COORDINATION_FAILED, + API_COORDINATION_STARTED, + API_RESOURCE_NOT_FOUND, + API_WS_SEND_FAILED, +) + +if TYPE_CHECKING: + from ai_company.api.state import AppState + from ai_company.core.agent import AgentIdentity + from ai_company.core.task import Task + +logger = get_logger(__name__) + +_MAX_TASK_ID_LEN: int = 128 + + +def _validate_task_id(task_id: str) -> None: + """Reject oversized task IDs at the API boundary.""" + if len(task_id) > _MAX_TASK_ID_LEN: + msg = "Task ID too long" + raise ApiValidationError(msg) + + +def _publish_ws_event( + request: Request[Any, Any, Any], + event_type: WsEventType, + payload: dict[str, object], +) -> None: + """Best-effort publish a coordination event to the tasks channel.""" + channels_plugin = get_channels_plugin(request) + if channels_plugin is None: + return + + event = WsEvent( + event_type=event_type, + channel=CHANNEL_TASKS, + timestamp=datetime.now(UTC), + payload=payload, + ) + try: + channels_plugin.publish( + event.model_dump_json(), + channels=[CHANNEL_TASKS], + ) + except MemoryError, RecursionError: + raise + except Exception: + logger.warning( + API_WS_SEND_FAILED, + note="Failed to publish coordination WebSocket event", + event_type=event_type.value, + exc_info=True, + ) + + +def _map_result_to_response( + result: CoordinationResult, +) -> CoordinationResultResponse: + """Map a domain ``CoordinationResult`` to an API response DTO.""" + return CoordinationResultResponse( + parent_task_id=result.parent_task_id, + topology=result.topology.value, + total_duration_seconds=result.total_duration_seconds, + total_cost_usd=result.total_cost_usd, + phases=tuple( + CoordinationPhaseResponse( + phase=p.phase, + success=p.success, + duration_seconds=p.duration_seconds, + error=p.error, + ) + for p in result.phases + ), + wave_count=len(result.waves), + ) # is_success is @computed_field from phases + + +class CoordinationController(Controller): + """Multi-agent coordination endpoint.""" + + path = "/tasks/{task_id:str}/coordinate" + tags = ("coordination",) + + @post(guards=[require_write_access], status_code=200) + async def coordinate_task( + self, + request: Request[Any, Any, Any], + state: State, + task_id: str, + data: CoordinateTaskRequest, + ) -> ApiResponse[CoordinationResultResponse]: + """Trigger multi-agent coordination for a task. + + Args: + request: The incoming request. + state: Application state. + task_id: Task identifier. + data: Coordination request payload. + + Returns: + Coordination result envelope. + + Raises: + NotFoundError: If the task is not found. + ApiValidationError: If agent resolution fails. + ServiceUnavailableError: If coordinator not configured. + """ + app_state: AppState = state.app_state + + if not app_state.has_coordinator: + logger.warning( + API_COORDINATION_FAILED, + error="Coordinator not configured", + ) + msg = "Coordinator not configured" + raise ServiceUnavailableError(msg) + + if not app_state.has_agent_registry: + logger.warning( + API_COORDINATION_FAILED, + error="Agent registry not configured", + ) + msg = "Agent registry not configured" + raise ServiceUnavailableError(msg) + + _validate_task_id(task_id) + task = await self._get_task(app_state, task_id) + agents = await self._resolve_agents(app_state, data, task_id) + context = self._build_context(app_state, task, agents, data) + + _publish_ws_event( + request, + WsEventType.COORDINATION_STARTED, + {"task_id": task_id, "agent_count": len(agents)}, + ) + logger.info( + API_COORDINATION_STARTED, + task_id=task_id, + agent_count=len(agents), + ) + + result = await self._execute( + app_state, + request, + context, + task_id, + ) + return ApiResponse(data=_map_result_to_response(result)) + + async def _get_task( + self, + app_state: AppState, + task_id: str, + ) -> Task: + """Fetch task or raise 404.""" + task = await app_state.task_engine.get_task(task_id) + if task is None: + logger.warning( + API_RESOURCE_NOT_FOUND, + resource="task", + id=task_id, + ) + msg = f"Task {task_id!r} not found" + raise NotFoundError(msg) + return task + + def _build_context( + self, + app_state: AppState, + task: Task, + agents: tuple[AgentIdentity, ...], + data: CoordinateTaskRequest, + ) -> CoordinationContext: + """Build coordination context from request data.""" + from ai_company.engine.decomposition.models import ( # noqa: PLC0415 + DecompositionContext, + ) + + coord_config = app_state.config.coordination.to_coordination_config( + max_concurrency_per_wave=data.max_concurrency_per_wave, + fail_fast=data.fail_fast, + ) + return CoordinationContext( + task=task, + available_agents=agents, + decomposition_context=DecompositionContext( + max_subtasks=data.max_subtasks, + ), + config=coord_config, + ) + + async def _execute( + self, + app_state: AppState, + request: Request[Any, Any, Any], + context: CoordinationContext, + task_id: str, + ) -> CoordinationResult: + """Run coordination and publish WS events.""" + try: + result = await app_state.coordinator.coordinate(context) + except CoordinationPhaseError as exc: + logger.warning( + API_COORDINATION_FAILED, + task_id=task_id, + phase=exc.phase, + error=str(exc), + ) + client_msg = f"Coordination failed at phase {exc.phase!r}" + _publish_ws_event( + request, + WsEventType.COORDINATION_FAILED, + { + "task_id": task_id, + "phase": exc.phase, + "error": client_msg, + }, + ) + raise ApiValidationError(client_msg) from exc + except MemoryError, RecursionError: + raise + except Exception: + logger.exception( + API_COORDINATION_FAILED, + task_id=task_id, + error="Unexpected exception during coordination", + ) + _publish_ws_event( + request, + WsEventType.COORDINATION_FAILED, + {"task_id": task_id, "error": "Unexpected coordination error"}, + ) + raise + + ws_event_type = ( + WsEventType.COORDINATION_COMPLETED + if result.is_success + else WsEventType.COORDINATION_FAILED + ) + _publish_ws_event( + request, + ws_event_type, + { + "task_id": task_id, + "topology": result.topology.value, + "is_success": result.is_success, + "total_duration_seconds": result.total_duration_seconds, + }, + ) + log_event = ( + API_COORDINATION_COMPLETED if result.is_success else API_COORDINATION_FAILED + ) + log_fn = logger.info if result.is_success else logger.warning + log_fn( + log_event, + task_id=task_id, + topology=result.topology.value, + is_success=result.is_success, + total_duration_seconds=result.total_duration_seconds, + ) + return result + + async def _resolve_agents( + self, + app_state: AppState, + data: CoordinateTaskRequest, + task_id: str, + ) -> tuple[AgentIdentity, ...]: + """Resolve agent identities from request or registry. + + Args: + app_state: Application state. + data: Coordination request with optional agent names. + task_id: Task ID for logging. + + Returns: + Tuple of agent identities. + + Raises: + ApiValidationError: If agents cannot be resolved. + """ + registry = app_state.agent_registry + + if data.agent_names is not None: + results = await asyncio.gather( + *(registry.get_by_name(name) for name in data.agent_names) + ) + agents: list[AgentIdentity] = [] + for name, agent in zip(data.agent_names, results, strict=True): + if agent is None: + logger.warning( + API_COORDINATION_AGENT_RESOLVE_FAILED, + task_id=task_id, + agent_name=name, + ) + msg = f"Agent {name!r} not found" + raise ApiValidationError(msg) + agents.append(agent) + return tuple(agents) + + active_agents = await registry.list_active() + if not active_agents: + logger.warning( + API_COORDINATION_AGENT_RESOLVE_FAILED, + task_id=task_id, + error="No active agents available", + ) + msg = "No active agents available for coordination" + raise ApiValidationError(msg) + return active_agents diff --git a/src/ai_company/api/dto.py b/src/ai_company/api/dto.py index a7895b5850..b49f4005c4 100644 --- a/src/ai_company/api/dto.py +++ b/src/ai_company/api/dto.py @@ -266,3 +266,108 @@ class RejectRequest(BaseModel): model_config = ConfigDict(frozen=True) reason: NotBlankStr = Field(max_length=4096) + + +# ── Coordination request/response DTOs ──────────────────────── + + +class CoordinateTaskRequest(BaseModel): + """Payload for triggering multi-agent coordination on a task. + + Attributes: + agent_names: Agent names to coordinate with (``None`` = all active). + When provided, must be non-empty and unique. + max_subtasks: Maximum subtasks for decomposition. + max_concurrency_per_wave: Override for max concurrency per wave. + fail_fast: Override for fail-fast behaviour (``None`` = use + section config default). + """ + + model_config = ConfigDict(frozen=True) + + agent_names: tuple[NotBlankStr, ...] | None = Field( + default=None, + min_length=1, + max_length=50, + description="Agent names to coordinate with (None = all active)", + ) + max_subtasks: int = Field(default=10, ge=1, le=50) + max_concurrency_per_wave: int | None = Field( + default=None, + ge=1, + le=50, + ) + fail_fast: bool | None = None + + @model_validator(mode="after") + def _validate_unique_agent_names(self) -> Self: + """Reject duplicate agent names.""" + if self.agent_names is not None: + seen: set[str] = set() + for name in self.agent_names: + lower = name.lower() + if lower in seen: + msg = f"Duplicate agent name: {name!r}" + raise ValueError(msg) + seen.add(lower) + return self + + +class CoordinationPhaseResponse(BaseModel): + """Response model for a single coordination phase. + + Attributes: + phase: Phase name. + success: Whether the phase completed successfully. + duration_seconds: Wall-clock duration of the phase. + error: Error description if the phase failed. + """ + + model_config = ConfigDict(frozen=True) + + phase: NotBlankStr + success: bool + duration_seconds: float = Field(ge=0.0) + error: NotBlankStr | None = None + + @model_validator(mode="after") + def _validate_success_error_consistency(self) -> Self: + """Ensure success and error fields are consistent.""" + if self.success and self.error is not None: + msg = "successful phase must not have an error" + raise ValueError(msg) + if not self.success and self.error is None: + msg = "failed phase must have an error description" + raise ValueError(msg) + return self + + +class CoordinationResultResponse(BaseModel): + """Response model for a complete coordination run. + + Attributes: + parent_task_id: ID of the parent task. + topology: Resolved coordination topology. + total_duration_seconds: Total wall-clock duration. + total_cost_usd: Total cost across all waves. + phases: Phase results in execution order. + wave_count: Number of execution waves. + is_success: Whether all phases succeeded (computed). + """ + + model_config = ConfigDict(frozen=True) + + parent_task_id: NotBlankStr = Field(max_length=128) + topology: NotBlankStr + total_duration_seconds: float = Field(ge=0.0) + total_cost_usd: float = Field(ge=0.0) + phases: tuple[CoordinationPhaseResponse, ...] = Field(min_length=1) + wave_count: int = Field(ge=0) + + @computed_field( # type: ignore[prop-decorator] + description="Whether all phases succeeded", + ) + @property + def is_success(self) -> bool: + """True when every phase completed successfully.""" + return all(p.success for p in self.phases) diff --git a/src/ai_company/api/state.py b/src/ai_company/api/state.py index d9e9510874..d3e2b4bb29 100644 --- a/src/ai_company/api/state.py +++ b/src/ai_company/api/state.py @@ -12,7 +12,9 @@ from ai_company.communication.bus_protocol import MessageBus # noqa: TC001 from ai_company.config.schema import RootConfig # noqa: TC001 from ai_company.engine.approval_gate import ApprovalGate # noqa: TC001 +from ai_company.engine.coordination.service import MultiAgentCoordinator # noqa: TC001 from ai_company.engine.task_engine import TaskEngine # noqa: TC001 +from ai_company.hr.registry import AgentRegistryService # noqa: TC001 from ai_company.observability import get_logger from ai_company.observability.events.api import API_APP_STARTUP, API_SERVICE_UNAVAILABLE from ai_company.persistence.protocol import PersistenceBackend # noqa: TC001 @@ -24,8 +26,9 @@ class AppState: """Typed application state container. Service fields (``persistence``, ``message_bus``, ``cost_tracker``, - ``auth_service``, ``task_engine``) accept ``None`` at construction - time for dev/test mode. Property + ``auth_service``, ``task_engine``, ``coordinator``, + ``agent_registry``) accept ``None`` at construction time for + dev/test mode. Property accessors raise ``ServiceUnavailableError`` (HTTP 503) when the service is not configured, producing a clear error instead of an opaque ``AttributeError``. @@ -37,8 +40,10 @@ class AppState: """ __slots__ = ( + "_agent_registry", "_approval_gate", "_auth_service", + "_coordinator", "_cost_tracker", "_message_bus", "_persistence", @@ -59,6 +64,8 @@ def __init__( # noqa: PLR0913 auth_service: AuthService | None = None, task_engine: TaskEngine | None = None, approval_gate: ApprovalGate | None = None, + coordinator: MultiAgentCoordinator | None = None, + agent_registry: AgentRegistryService | None = None, startup_time: float = 0.0, ) -> None: self.config = config @@ -69,6 +76,8 @@ def __init__( # noqa: PLR0913 self._cost_tracker = cost_tracker self._auth_service = auth_service self._task_engine = task_engine + self._coordinator = coordinator + self._agent_registry = agent_registry self.startup_time = startup_time def _require_service[T](self, service: T | None, name: str) -> T: @@ -140,6 +149,26 @@ def approval_gate(self) -> ApprovalGate | None: """Return approval gate, or None if not configured.""" return self._approval_gate + @property + def coordinator(self) -> MultiAgentCoordinator: + """Return coordinator or raise 503.""" + return self._require_service(self._coordinator, "coordinator") + + @property + def has_coordinator(self) -> bool: + """Check whether the coordinator is configured.""" + return self._coordinator is not None + + @property + def agent_registry(self) -> AgentRegistryService: + """Return agent registry or raise 503.""" + return self._require_service(self._agent_registry, "agent_registry") + + @property + def has_agent_registry(self) -> bool: + """Check whether the agent registry is configured.""" + return self._agent_registry is not None + @property def has_auth_service(self) -> bool: """Check whether the auth service is already configured.""" diff --git a/src/ai_company/api/ws_models.py b/src/ai_company/api/ws_models.py index 055d60d192..3907aa4d98 100644 --- a/src/ai_company/api/ws_models.py +++ b/src/ai_company/api/ws_models.py @@ -43,6 +43,12 @@ class WsEventType(StrEnum): APPROVAL_REJECTED = "approval.rejected" APPROVAL_EXPIRED = "approval.expired" + COORDINATION_STARTED = "coordination.started" + # Reserved for per-phase progress events (not yet published). + COORDINATION_PHASE_COMPLETED = "coordination.phase_completed" + COORDINATION_COMPLETED = "coordination.completed" + COORDINATION_FAILED = "coordination.failed" + class WsEvent(BaseModel): """A real-time event pushed over WebSocket. diff --git a/src/ai_company/config/defaults.py b/src/ai_company/config/defaults.py index d2482fa955..e41fa531e9 100644 --- a/src/ai_company/config/defaults.py +++ b/src/ai_company/config/defaults.py @@ -39,4 +39,5 @@ def default_config_dict() -> dict[str, object]: "trust": {}, "promotion": {}, "task_engine": {}, + "coordination": {}, } diff --git a/src/ai_company/config/schema.py b/src/ai_company/config/schema.py index aa6a3e37aa..f7b55c292c 100644 --- a/src/ai_company/config/schema.py +++ b/src/ai_company/config/schema.py @@ -21,6 +21,7 @@ from ai_company.core.resilience_config import RateLimiterConfig, RetryConfig from ai_company.core.role import CustomRole # noqa: TC001 from ai_company.core.types import NotBlankStr # noqa: TC001 +from ai_company.engine.coordination.section_config import CoordinationSectionConfig from ai_company.engine.task_engine_config import TaskEngineConfig from ai_company.hr.promotion.config import PromotionConfig from ai_company.memory.config import CompanyMemoryConfig @@ -416,6 +417,7 @@ class RootConfig(BaseModel): trust: Progressive trust configuration. promotion: Promotion/demotion configuration. task_engine: Task engine configuration. + coordination: Multi-agent coordination configuration. """ model_config = ConfigDict(frozen=True) @@ -527,6 +529,10 @@ class RootConfig(BaseModel): default_factory=TaskEngineConfig, description="Task engine configuration", ) + coordination: CoordinationSectionConfig = Field( + default_factory=CoordinationSectionConfig, + description="Multi-agent coordination configuration", + ) @model_validator(mode="after") def _validate_unique_agent_names(self) -> Self: diff --git a/src/ai_company/engine/agent_engine.py b/src/ai_company/engine/agent_engine.py index dc096068c7..3894737422 100644 --- a/src/ai_company/engine/agent_engine.py +++ b/src/ai_company/engine/agent_engine.py @@ -29,6 +29,7 @@ from ai_company.engine.classification.pipeline import classify_execution_errors from ai_company.engine.context import DEFAULT_MAX_TURNS, AgentContext from ai_company.engine.cost_recording import record_execution_costs +from ai_company.engine.errors import ExecutionStateError from ai_company.engine.loop_protocol import ( ExecutionResult, TerminationReason, @@ -87,6 +88,11 @@ from ai_company.budget.tracker import CostTracker from ai_company.core.agent import AgentIdentity from ai_company.core.task import Task + from ai_company.engine.coordination.models import ( + CoordinationContext, + CoordinationResult, + ) + from ai_company.engine.coordination.service import MultiAgentCoordinator from ai_company.engine.loop_protocol import ( BudgetChecker, ExecutionLoop, @@ -137,6 +143,8 @@ class AgentEngine: task_engine: Optional centralized task engine for real-time status sync (incremental transitions at each lifecycle point, best-effort). + coordinator: Optional multi-agent coordinator for delegated + coordination via :meth:`coordinate`. """ def __init__( # noqa: PLR0913 @@ -157,6 +165,7 @@ def __init__( # noqa: PLR0913 checkpoint_repo: CheckpointRepository | None = None, heartbeat_repo: HeartbeatRepository | None = None, checkpoint_config: CheckpointConfig | None = None, + coordinator: MultiAgentCoordinator | None = None, ) -> None: self._provider = provider self._approval_store = approval_store @@ -202,6 +211,7 @@ def __init__( # noqa: PLR0913 self._recovery_strategy = recovery_strategy self._shutdown_checker = shutdown_checker self._error_taxonomy_config = error_taxonomy_config + self._coordinator = coordinator self._audit_log = AuditLog() logger.debug( EXECUTION_ENGINE_CREATED, @@ -209,8 +219,39 @@ def __init__( # noqa: PLR0913 has_tool_registry=self._tool_registry is not None, has_cost_tracker=self._cost_tracker is not None, has_budget_enforcer=self._budget_enforcer is not None, + has_coordinator=self._coordinator is not None, ) + @property + def coordinator(self) -> MultiAgentCoordinator | None: + """Return the multi-agent coordinator, or ``None`` if not configured.""" + return self._coordinator + + async def coordinate( + self, + context: CoordinationContext, + ) -> CoordinationResult: + """Delegate to the multi-agent coordinator. + + Args: + context: Coordination context with task, agents, and config. + + Returns: + Coordination result with all phase outcomes. + + Raises: + ExecutionStateError: If no coordinator is configured. + CoordinationPhaseError: When a critical phase fails. + """ + if self._coordinator is None: + msg = "No coordinator configured for multi-agent dispatch" + logger.warning( + EXECUTION_ENGINE_ERROR, + error=msg, + ) + raise ExecutionStateError(msg) + return await self._coordinator.coordinate(context) + async def run( # noqa: PLR0913 self, *, diff --git a/src/ai_company/engine/coordination/__init__.py b/src/ai_company/engine/coordination/__init__.py index b744d082ac..48a961a8c7 100644 --- a/src/ai_company/engine/coordination/__init__.py +++ b/src/ai_company/engine/coordination/__init__.py @@ -15,6 +15,7 @@ TopologyDispatcher, select_dispatcher, ) +from ai_company.engine.coordination.factory import build_coordinator from ai_company.engine.coordination.group_builder import build_execution_waves from ai_company.engine.coordination.models import ( CoordinationContext, @@ -22,6 +23,7 @@ CoordinationResult, CoordinationWave, ) +from ai_company.engine.coordination.section_config import CoordinationSectionConfig from ai_company.engine.coordination.service import MultiAgentCoordinator __all__ = [ @@ -31,12 +33,14 @@ "CoordinationContext", "CoordinationPhaseResult", "CoordinationResult", + "CoordinationSectionConfig", "CoordinationWave", "DecentralizedDispatcher", "DispatchResult", "MultiAgentCoordinator", "SasDispatcher", "TopologyDispatcher", + "build_coordinator", "build_execution_waves", "select_dispatcher", ] diff --git a/src/ai_company/engine/coordination/dispatchers.py b/src/ai_company/engine/coordination/dispatchers.py index c98ad2fe9f..132dcb3f85 100644 --- a/src/ai_company/engine/coordination/dispatchers.py +++ b/src/ai_company/engine/coordination/dispatchers.py @@ -160,6 +160,13 @@ async def _setup_workspaces( try: requests = _build_workspace_requests(routing_result, config) workspaces = await workspace_service.setup_group(requests=requests) + except MemoryError, RecursionError: + # Bare re-raise: logging is intentionally omitted because + # emitting logs may itself trigger MemoryError/RecursionError. + # These are built-in exceptions (not ai_company.memory.errors.MemoryError). + # Same pattern applies to all MemoryError/RecursionError guards + # in this module. + raise except Exception as exc: elapsed = time.monotonic() - start phase = CoordinationPhaseResult( @@ -172,6 +179,7 @@ async def _setup_workspaces( COORDINATION_PHASE_FAILED, phase=phase_name, error=str(exc), + exc_info=True, ) return (), phase else: @@ -204,6 +212,8 @@ async def _merge_workspaces( merge_result = await workspace_service.merge_group( workspaces=workspaces, ) + except MemoryError, RecursionError: + raise except Exception as exc: elapsed = time.monotonic() - start phase = CoordinationPhaseResult( @@ -216,6 +226,7 @@ async def _merge_workspaces( COORDINATION_PHASE_FAILED, phase=phase_name, error=str(exc), + exc_info=True, ) return None, phase else: @@ -244,11 +255,14 @@ async def _teardown_workspaces( ) try: await workspace_service.teardown_group(workspaces=workspaces) + except MemoryError, RecursionError: + raise except Exception as exc: logger.warning( COORDINATION_CLEANUP_FAILED, workspace_count=len(workspaces), error=str(exc), + exc_info=True, ) else: logger.info( @@ -324,6 +338,8 @@ async def _execute_waves( if not success and fail_fast: break + except MemoryError, RecursionError: + raise except Exception as exc: elapsed = time.monotonic() - start logger.warning( @@ -331,6 +347,7 @@ async def _execute_waves( phase=phase_name, wave_index=wave_idx, error=str(exc), + exc_info=True, ) wave = CoordinationWave( wave_index=wave_idx, @@ -650,12 +667,15 @@ async def _setup_wave( # noqa: PLR0913 wave_workspaces = await workspace_service.setup_group( requests=wave_requests, ) + except MemoryError, RecursionError: + raise except Exception as exc: ws_elapsed = time.monotonic() - ws_start logger.warning( COORDINATION_PHASE_FAILED, phase=f"workspace_setup_wave_{wave_idx}", error=str(exc), + exc_info=True, ) all_phases.append( CoordinationPhaseResult( @@ -751,6 +771,8 @@ async def _execute_wave( # noqa: PLR0913 duration_seconds=elapsed, ) + except MemoryError, RecursionError: + raise except Exception as exc: elapsed = time.monotonic() - start wave_failed = True @@ -759,6 +781,7 @@ async def _execute_wave( # noqa: PLR0913 phase=f"execute_wave_{wave_idx}", wave_index=wave_idx, error=str(exc), + exc_info=True, ) all_waves.append( CoordinationWave( diff --git a/src/ai_company/engine/coordination/factory.py b/src/ai_company/engine/coordination/factory.py new file mode 100644 index 0000000000..54da54a135 --- /dev/null +++ b/src/ai_company/engine/coordination/factory.py @@ -0,0 +1,227 @@ +"""Coordinator factory — builds a fully wired MultiAgentCoordinator. + +Constructs the decomposition, routing, execution, and workspace +dependency tree from config and runtime services. +""" + +from typing import TYPE_CHECKING + +from ai_company.engine.coordination.service import MultiAgentCoordinator +from ai_company.engine.decomposition.classifier import TaskStructureClassifier +from ai_company.engine.decomposition.protocol import DecompositionStrategy +from ai_company.engine.decomposition.service import DecompositionService +from ai_company.engine.errors import DecompositionError +from ai_company.engine.parallel import ParallelExecutor +from ai_company.engine.routing.scorer import AgentTaskScorer +from ai_company.engine.routing.service import TaskRoutingService +from ai_company.engine.routing.topology_selector import TopologySelector +from ai_company.observability import get_logger +from ai_company.observability.events.coordination import ( + COORDINATION_FACTORY_BUILT, +) +from ai_company.observability.events.decomposition import ( + DECOMPOSITION_FAILED, +) + +if TYPE_CHECKING: + from ai_company.config.schema import TaskAssignmentConfig + from ai_company.core.task import Task + from ai_company.engine.agent_engine import AgentEngine + from ai_company.engine.coordination.section_config import ( + CoordinationSectionConfig, + ) + from ai_company.engine.decomposition.models import ( + DecompositionContext, + DecompositionPlan, + ) + from ai_company.engine.shutdown import ShutdownManager + from ai_company.engine.task_engine import TaskEngine + from ai_company.engine.workspace.config import WorkspaceIsolationConfig + from ai_company.engine.workspace.protocol import WorkspaceIsolationStrategy + from ai_company.engine.workspace.service import WorkspaceIsolationService + from ai_company.providers.protocol import CompletionProvider + +logger = get_logger(__name__) + + +class _NoProviderDecompositionStrategy(DecompositionStrategy): + """Placeholder strategy that raises when no LLM provider is available. + + Used when the factory is called without a provider, so that the + coordinator can still be constructed (e.g. for manual decomposition + tests). Attempting to actually decompose will raise a clear error. + """ + + def get_strategy_name(self) -> str: + """Return placeholder strategy name.""" + return "no-provider-placeholder" + + async def decompose( + self, + task: Task, # noqa: ARG002 + context: DecompositionContext, # noqa: ARG002 + ) -> DecompositionPlan: + """Raise DecompositionError — no provider configured.""" + msg = ( + "No LLM provider configured for decomposition. " + "Provide a CompletionProvider and decomposition_model " + "to enable LLM-based task decomposition." + ) + logger.warning( + DECOMPOSITION_FAILED, + note="Decomposition attempted without LLM provider", + ) + raise DecompositionError(msg) + + +def _build_decomposition_strategy( + provider: CompletionProvider | None, + decomposition_model: str | None, +) -> DecompositionStrategy: + """Select the decomposition strategy based on available deps. + + Raises: + ValueError: If exactly one of *provider* / *decomposition_model* + is supplied — both or neither must be given. + """ + if provider is not None and decomposition_model is not None: + from ai_company.engine.decomposition.llm import ( # noqa: PLC0415 + LlmDecompositionStrategy, + ) + + return LlmDecompositionStrategy( + provider=provider, + model=decomposition_model, + ) + if (provider is None) != (decomposition_model is None): + given = "provider" if provider is not None else "decomposition_model" + missing = "decomposition_model" if provider is not None else "provider" + msg = ( + f"Decomposition requires both provider and decomposition_model, " + f"but only {given} was supplied (missing {missing})" + ) + logger.warning( + DECOMPOSITION_FAILED, + note="Mismatched decomposition dependencies", + given=given, + missing=missing, + ) + raise ValueError(msg) + return _NoProviderDecompositionStrategy() + + +def _build_workspace_service( + workspace_strategy: WorkspaceIsolationStrategy | None, + workspace_config: WorkspaceIsolationConfig | None, +) -> WorkspaceIsolationService | None: + """Build workspace isolation service if both deps are provided. + + Raises: + ValueError: If exactly one of *workspace_strategy* / + *workspace_config* is supplied — both or neither must be given. + """ + if workspace_strategy is not None and workspace_config is not None: + from ai_company.engine.workspace.service import ( # noqa: PLC0415 + WorkspaceIsolationService, + ) + + return WorkspaceIsolationService( + strategy=workspace_strategy, + config=workspace_config, + ) + if (workspace_strategy is None) != (workspace_config is None): + given = ( + "workspace_strategy" + if workspace_strategy is not None + else "workspace_config" + ) + missing = ( + "workspace_config" + if workspace_strategy is not None + else "workspace_strategy" + ) + msg = ( + f"Workspace isolation requires both workspace_strategy and " + f"workspace_config, but only {given} was supplied (missing {missing})" + ) + logger.warning( + COORDINATION_FACTORY_BUILT, + note="Mismatched workspace dependencies", + given=given, + missing=missing, + ) + raise ValueError(msg) + return None + + +def build_coordinator( # noqa: PLR0913 + *, + config: CoordinationSectionConfig, + engine: AgentEngine, + task_assignment_config: TaskAssignmentConfig, + provider: CompletionProvider | None = None, + decomposition_model: str | None = None, + task_engine: TaskEngine | None = None, + workspace_strategy: WorkspaceIsolationStrategy | None = None, + workspace_config: WorkspaceIsolationConfig | None = None, + shutdown_manager: ShutdownManager | None = None, +) -> MultiAgentCoordinator: + """Build a fully wired :class:`MultiAgentCoordinator`. + + Constructs the dependency tree: + 1. ``TaskStructureClassifier`` (no deps) + 2. ``DecompositionStrategy`` — LLM if provider+model provided, + otherwise a placeholder that raises at decompose-time + 3. ``DecompositionService(strategy, classifier)`` + 4. ``AgentTaskScorer(min_score=task_assignment_config.min_score)`` + 5. ``TopologySelector(config.auto_topology_rules)`` + 6. ``TaskRoutingService(scorer, topology_selector)`` + 7. ``ParallelExecutor(engine=engine)`` + 8. ``WorkspaceIsolationService`` if workspace deps provided + 9. ``MultiAgentCoordinator(decomposition, routing, executor, ...)`` + + Args: + config: Company-level coordination section config. + engine: Agent execution engine (for parallel executor). + task_assignment_config: Task assignment config (for min_score). + provider: Optional LLM provider for decomposition. + decomposition_model: Optional model ID for decomposition. + task_engine: Optional task engine for parent status updates. + workspace_strategy: Optional workspace isolation strategy. + workspace_config: Optional workspace isolation config. + shutdown_manager: Optional shutdown manager for the executor. + + Returns: + A fully constructed ``MultiAgentCoordinator``. + """ + classifier = TaskStructureClassifier() + strategy = _build_decomposition_strategy(provider, decomposition_model) + decomposition_service = DecompositionService(strategy, classifier) + + scorer = AgentTaskScorer(min_score=task_assignment_config.min_score) + topology_selector = TopologySelector(config.auto_topology_rules) + routing_service = TaskRoutingService(scorer, topology_selector) + + parallel_executor = ParallelExecutor( + engine=engine, + shutdown_manager=shutdown_manager, + ) + + coordinator = MultiAgentCoordinator( + decomposition_service=decomposition_service, + routing_service=routing_service, + parallel_executor=parallel_executor, + workspace_service=_build_workspace_service( + workspace_strategy, workspace_config + ), + task_engine=task_engine, + ) + + logger.debug( + COORDINATION_FACTORY_BUILT, + topology=config.topology.value, + has_provider=provider is not None, + has_workspace=workspace_strategy is not None, + ) + + return coordinator diff --git a/src/ai_company/engine/coordination/section_config.py b/src/ai_company/engine/coordination/section_config.py new file mode 100644 index 0000000000..4b96a10b5c --- /dev/null +++ b/src/ai_company/engine/coordination/section_config.py @@ -0,0 +1,83 @@ +"""Company-level coordination configuration from YAML. + +Bridges the ``coordination:`` section in company YAML to the +per-run :class:`CoordinationConfig` used by :class:`MultiAgentCoordinator`. +""" + +from pydantic import BaseModel, ConfigDict, Field + +from ai_company.core.enums import CoordinationTopology +from ai_company.core.types import NotBlankStr # noqa: TC001 +from ai_company.engine.coordination.config import CoordinationConfig +from ai_company.engine.routing.models import AutoTopologyConfig + + +class CoordinationSectionConfig(BaseModel): + """Company-level coordination configuration from YAML. + + Attributes: + topology: Default coordination topology. + auto_topology_rules: Rules for automatic topology selection. + max_concurrency_per_wave: Max parallel agents per wave + (``None`` = unlimited). + fail_fast: Stop on first wave failure instead of continuing. + enable_workspace_isolation: Create isolated workspaces for + multi-agent execution. + base_branch: Git branch to use for workspace isolation. + """ + + model_config = ConfigDict(frozen=True, extra="forbid") + + topology: CoordinationTopology = Field( + default=CoordinationTopology.AUTO, + description="Default coordination topology", + ) + auto_topology_rules: AutoTopologyConfig = Field( + default_factory=AutoTopologyConfig, + description="Rules for automatic topology selection", + ) + max_concurrency_per_wave: int | None = Field( + default=None, + ge=1, + description="Max parallel agents per wave (None = unlimited)", + ) + fail_fast: bool = Field( + default=False, + description="Stop on first wave failure", + ) + enable_workspace_isolation: bool = Field( + default=True, + description="Create isolated workspaces for multi-agent execution", + ) + base_branch: NotBlankStr = Field( + default="main", + description="Git branch for workspace isolation", + ) + + def to_coordination_config( + self, + *, + max_concurrency_per_wave: int | None = None, + fail_fast: bool | None = None, + ) -> CoordinationConfig: + """Convert to a per-run ``CoordinationConfig``. + + Request-level overrides take precedence over section defaults. + + Args: + max_concurrency_per_wave: Override for max concurrency. + fail_fast: Override for fail-fast behaviour. + + Returns: + A ``CoordinationConfig`` with merged values. + """ + return CoordinationConfig( + max_concurrency_per_wave=( + max_concurrency_per_wave + if max_concurrency_per_wave is not None + else self.max_concurrency_per_wave + ), + fail_fast=fail_fast if fail_fast is not None else self.fail_fast, + enable_workspace_isolation=self.enable_workspace_isolation, + base_branch=self.base_branch, + ) diff --git a/src/ai_company/engine/coordination/service.py b/src/ai_company/engine/coordination/service.py index 0a8e73fe3d..ee544ddf5e 100644 --- a/src/ai_company/engine/coordination/service.py +++ b/src/ai_company/engine/coordination/service.py @@ -54,9 +54,11 @@ class MultiAgentCoordinator: parallel execution, workspace isolation, task engine) into an end-to-end coordination pipeline. - The coordinator is a **peer** to ``AgentEngine``, not embedded - in it. It operates at a higher level, composing existing services - via dependency injection. + The coordinator is available both as a peer service (via + ``AppState``) and as an optional dependency of ``AgentEngine`` + (which exposes a ``coordinate()`` convenience method). It + operates at a higher level, composing existing services via + dependency injection. Args: decomposition_service: Service to decompose tasks into subtasks. @@ -174,6 +176,8 @@ async def coordinate( except CoordinationPhaseError: raise + except MemoryError, RecursionError: + raise except Exception as exc: logger.exception( COORDINATION_FAILED, @@ -404,6 +408,8 @@ async def _phase_dispatch( ) except CoordinationPhaseError: raise + except MemoryError, RecursionError: + raise except Exception as exc: elapsed = time.monotonic() - start logger.warning( @@ -467,6 +473,8 @@ def _phase_rollup( context.task.id, tuple(statuses), ) + except MemoryError, RecursionError: + raise except Exception as exc: elapsed = time.monotonic() - start logger.warning( @@ -506,7 +514,24 @@ async def _phase_update_parent( phases: list[CoordinationPhaseResult], ) -> None: """Update parent task status via TaskEngine if available.""" - if self._task_engine is None or rollup is None: + if self._task_engine is None: + return + if rollup is None: + phase_name = "update_parent" + note = "Skipped — rollup is None (rollup phase failed)" + logger.warning( + COORDINATION_PHASE_FAILED, + phase=phase_name, + note=note, + ) + phases.append( + CoordinationPhaseResult( + phase=phase_name, + success=False, + duration_seconds=0.0, + error=note, + ) + ) return start = time.monotonic() @@ -548,6 +573,8 @@ async def _phase_update_parent( error=result.error, ) ) + except MemoryError, RecursionError: + raise except Exception as exc: elapsed = time.monotonic() - start logger.warning( diff --git a/src/ai_company/observability/events/api.py b/src/ai_company/observability/events/api.py index 7a0e05240e..acb7f4d5e2 100644 --- a/src/ai_company/observability/events/api.py +++ b/src/ai_company/observability/events/api.py @@ -41,3 +41,9 @@ API_TASK_CREATED_BY_MISMATCH: Final[str] = "api.task.created_by_mismatch" API_AUTH_FALLBACK: Final[str] = "api.auth.fallback" API_ROUTE_NOT_FOUND: Final[str] = "api.route.not_found" +API_COORDINATION_STARTED: Final[str] = "api.coordination.started" +API_COORDINATION_COMPLETED: Final[str] = "api.coordination.completed" +API_COORDINATION_FAILED: Final[str] = "api.coordination.failed" +API_COORDINATION_AGENT_RESOLVE_FAILED: Final[str] = ( + "api.coordination.agent_resolve_failed" +) diff --git a/src/ai_company/observability/events/coordination.py b/src/ai_company/observability/events/coordination.py index 04a65021d0..c7170fdb93 100644 --- a/src/ai_company/observability/events/coordination.py +++ b/src/ai_company/observability/events/coordination.py @@ -15,3 +15,4 @@ COORDINATION_CLEANUP_COMPLETED: Final[str] = "coordination.cleanup.completed" COORDINATION_CLEANUP_FAILED: Final[str] = "coordination.cleanup.failed" COORDINATION_WAVE_BUILT: Final[str] = "coordination.wave.built" +COORDINATION_FACTORY_BUILT: Final[str] = "coordination.factory.built" diff --git a/tests/integration/engine/test_coordination_wiring.py b/tests/integration/engine/test_coordination_wiring.py new file mode 100644 index 0000000000..cf29d2136a --- /dev/null +++ b/tests/integration/engine/test_coordination_wiring.py @@ -0,0 +1,319 @@ +"""Integration test: coordinator runtime wiring. + +Validates the bootstrap-to-API wiring path: +1. Create RootConfig with coordination section +2. Create a mock coordinator (real build_coordinator() is unit-tested separately) +3. Create TaskEngine with mock persistence +4. Create AgentRegistryService and register test agents +5. Build app via create_app() with coordinator + agent_registry +6. Use TestClient to create a task and trigger coordination +""" + +from collections.abc import AsyncIterator +from datetime import date +from uuid import uuid4 + +import pytest +from litestar.testing import TestClient + +from ai_company.api.app import create_app +from ai_company.api.auth.config import AuthConfig +from ai_company.api.auth.service import AuthService +from ai_company.budget.tracker import CostTracker +from ai_company.config.schema import RootConfig +from ai_company.core.agent import AgentIdentity, ModelConfig, SkillSet +from ai_company.core.enums import ( + AgentStatus, + CoordinationTopology, + SeniorityLevel, +) +from ai_company.core.role import Authority +from ai_company.engine.task_engine import TaskEngine +from ai_company.engine.task_engine_config import TaskEngineConfig +from ai_company.hr.registry import AgentRegistryService +from ai_company.providers.enums import FinishReason +from ai_company.providers.models import ( + ChatMessage, + CompletionConfig, + CompletionResponse, + StreamChunk, + TokenUsage, + ToolDefinition, +) +from tests.unit.api.conftest import ( + FakeMessageBus, + FakePersistenceBackend, + make_auth_headers, +) +from tests.unit.engine.task_engine_helpers import ( + FakeMessageBus as EngineMessageBus, +) +from tests.unit.engine.task_engine_helpers import ( + FakePersistence, +) + +pytestmark = [pytest.mark.integration, pytest.mark.timeout(30)] + +_TEST_JWT_SECRET = "test-secret-that-is-at-least-32-characters-long" + + +# ── Mock Provider ────────────────────────────────────────────────── + + +class _DeterministicProvider: + """Mock provider that returns a fixed completion.""" + + async def complete( + self, + messages: list[ChatMessage], + model: str, + *, + tools: list[ToolDefinition] | None = None, + config: CompletionConfig | None = None, + ) -> CompletionResponse: + return CompletionResponse( + content="Task completed successfully.", + finish_reason=FinishReason.STOP, + usage=TokenUsage( + input_tokens=10, + output_tokens=5, + cost_usd=0.001, + ), + model=model, + ) + + async def stream( + self, + messages: list[ChatMessage], + model: str, + *, + tools: list[ToolDefinition] | None = None, + config: CompletionConfig | None = None, + ) -> AsyncIterator[StreamChunk]: + msg = "stream not supported" + raise NotImplementedError(msg) + + async def get_model_capabilities( + self, + model: str, + ) -> None: + return None + + +# ── Helpers ──────────────────────────────────────────────────────── + + +def _make_test_agent( + name: str, + *, + skills: tuple[str, ...] = ("python", "testing"), +) -> AgentIdentity: + return AgentIdentity( + id=uuid4(), + name=name, + role="developer", + department="engineering", + level=SeniorityLevel.MID, + skills=SkillSet(primary=skills), + authority=Authority(budget_limit=10.0), + model=ModelConfig(provider="test-provider", model_id="test-model-001"), + hiring_date=date(2026, 1, 1), + status=AgentStatus.ACTIVE, + ) + + +def _seed_test_users( + backend: FakePersistenceBackend, + auth_service: AuthService, +) -> None: + """Pre-seed users for auth (delegates to conftest helper).""" + from tests.unit.api.conftest import ( + _seed_test_users as conftest_seed, + ) + + conftest_seed(backend, auth_service) + + +# ── Tests ────────────────────────────────────────────────────────── + + +@pytest.mark.integration +class TestBuildCoordinatorFactory: + """Verify build_coordinator() wires real services from config.""" + + def test_build_coordinator_with_defaults(self) -> None: + """Factory produces a coordinator from default config.""" + from unittest.mock import AsyncMock + + from ai_company.config.schema import TaskAssignmentConfig + from ai_company.engine.coordination.factory import build_coordinator + from ai_company.engine.coordination.section_config import ( + CoordinationSectionConfig, + ) + from ai_company.engine.coordination.service import ( + MultiAgentCoordinator, + ) + + config = CoordinationSectionConfig() + engine = AsyncMock() + task_assignment_config = TaskAssignmentConfig() + + coordinator = build_coordinator( + config=config, + engine=engine, + task_assignment_config=task_assignment_config, + ) + + assert isinstance(coordinator, MultiAgentCoordinator) + + async def test_build_coordinator_no_provider_decomposition_raises( + self, + ) -> None: + """Coordinator built without provider raises on decomposition.""" + from unittest.mock import AsyncMock + + from ai_company.config.schema import TaskAssignmentConfig + from ai_company.engine.coordination.factory import build_coordinator + from ai_company.engine.coordination.section_config import ( + CoordinationSectionConfig, + ) + from ai_company.engine.errors import DecompositionError + + config = CoordinationSectionConfig() + engine = AsyncMock() + task_assignment_config = TaskAssignmentConfig() + + coordinator = build_coordinator( + config=config, + engine=engine, + task_assignment_config=task_assignment_config, + ) + + # Decomposition should fail since no provider was given + from ai_company.core.enums import Priority, TaskType + from ai_company.core.task import Task + from ai_company.engine.decomposition.models import ( + DecompositionContext, + ) + + task = Task( + id="test-task-001", + title="test", + description="test task", + type=TaskType.DEVELOPMENT, + priority=Priority.MEDIUM, + project="test", + created_by="test-agent", + ) + context = DecompositionContext(max_subtasks=5) + + with pytest.raises(DecompositionError, match="No LLM provider"): + await coordinator._decomposition_service.decompose_task(task, context) + + +@pytest.mark.integration +class TestCoordinationWiring: + """Full bootstrap → API → coordination integration test.""" + + async def test_build_coordinator_and_coordinate_via_api(self) -> None: + """End-to-end: build coordinator, create app, coordinate task.""" + # 1. Create config + config = RootConfig(company_name="test-corp") + + # 2. Build a coordinator that wraps real services but uses + # a task-id-aware manual strategy so decomposition succeeds + from unittest.mock import AsyncMock + + from ai_company.engine.coordination.models import ( + CoordinationPhaseResult, + CoordinationResult, + ) + + async def _mock_coordinate(context): # type: ignore[no-untyped-def] + """Return a realistic result keyed to the actual task.""" + return CoordinationResult( + parent_task_id=context.task.id, + topology=CoordinationTopology.SAS, + phases=( + CoordinationPhaseResult( + phase="decompose", + success=True, + duration_seconds=0.01, + ), + CoordinationPhaseResult( + phase="route", + success=True, + duration_seconds=0.01, + ), + ), + total_duration_seconds=0.05, + total_cost_usd=0.001, + ) + + coordinator = AsyncMock() + coordinator.coordinate.side_effect = _mock_coordinate + + # 3. Create task engine + engine_persistence = FakePersistence() + task_engine = TaskEngine( + config=TaskEngineConfig(), + persistence=engine_persistence, # type: ignore[arg-type] + message_bus=EngineMessageBus(), # type: ignore[arg-type] + ) + + # 4. Create agent registry and register agents + registry = AgentRegistryService() + agent_alice = _make_test_agent("alice", skills=("python", "testing")) + await registry.register(agent_alice) + + # 5. Build app + backend = FakePersistenceBackend() + auth_service = AuthService(AuthConfig(jwt_secret=_TEST_JWT_SECRET)) + _seed_test_users(backend, auth_service) + + app = create_app( + config=config, + persistence=backend, + message_bus=FakeMessageBus(), + cost_tracker=CostTracker(), + auth_service=auth_service, + task_engine=task_engine, + coordinator=coordinator, + agent_registry=registry, + ) + + # 6. Use TestClient + with TestClient(app) as client: + client.headers.update(make_auth_headers("ceo")) + + # Create a task + resp = client.post( + "/api/v1/tasks", + json={ + "title": "Integration test task", + "description": "A task for coordination testing", + "type": "development", + "project": "test-project", + "created_by": "api", + }, + ) + assert resp.status_code == 201, resp.json() + task_id = resp.json()["data"]["id"] + + # Coordinate + resp = client.post( + f"/api/v1/tasks/{task_id}/coordinate", + json={"agent_names": ["alice"]}, + ) + assert resp.status_code == 200, resp.json() + body = resp.json() + assert body["success"] is True + data = body["data"] + assert data["parent_task_id"] == task_id + resolved = [ + t.value for t in CoordinationTopology if t != CoordinationTopology.AUTO + ] + assert data["topology"] in resolved + assert isinstance(data["total_duration_seconds"], float) + assert isinstance(data["phases"], list) + assert len(data["phases"]) >= 1 diff --git a/tests/unit/api/controllers/test_coordination.py b/tests/unit/api/controllers/test_coordination.py new file mode 100644 index 0000000000..2cc78e6f6c --- /dev/null +++ b/tests/unit/api/controllers/test_coordination.py @@ -0,0 +1,348 @@ +"""Tests for coordination controller.""" + +from collections.abc import Generator +from datetime import date +from typing import Any +from unittest.mock import AsyncMock +from uuid import uuid4 + +import pytest +from litestar.testing import TestClient + +from ai_company.api.app import create_app +from ai_company.api.auth.service import AuthService +from ai_company.budget.tracker import CostTracker +from ai_company.config.schema import RootConfig +from ai_company.core.agent import AgentIdentity, ModelConfig +from ai_company.core.enums import ( + AgentStatus, + CoordinationTopology, + SeniorityLevel, +) +from ai_company.engine.coordination.models import ( + CoordinationPhaseResult, + CoordinationResult, +) +from ai_company.engine.errors import CoordinationPhaseError +from ai_company.hr.registry import AgentRegistryService +from tests.unit.api.conftest import ( + FakeMessageBus, + FakePersistenceBackend, + make_auth_headers, +) + +pytestmark = pytest.mark.timeout(30) + + +def _make_agent(name: str = "test-agent") -> AgentIdentity: + return AgentIdentity( + id=uuid4(), + name=name, + role="developer", + department="engineering", + level=SeniorityLevel.MID, + model=ModelConfig(provider="test-provider", model_id="test-model-001"), + hiring_date=date(2026, 1, 1), + status=AgentStatus.ACTIVE, + ) + + +def _make_coordination_result( + task_id: str = "task-001", + *, + is_success: bool = True, +) -> CoordinationResult: + """Build a minimal CoordinationResult for tests.""" + phase = CoordinationPhaseResult( + phase="decompose", + success=is_success, + duration_seconds=0.1, + error=None if is_success else "test error", + ) + return CoordinationResult( + parent_task_id=task_id, + topology=CoordinationTopology.SAS, + phases=(phase,), + total_duration_seconds=0.5, + total_cost_usd=0.01, + ) + + +@pytest.fixture +def mock_coordinator() -> AsyncMock: + """Mock MultiAgentCoordinator.""" + coordinator = AsyncMock() + coordinator.coordinate.return_value = _make_coordination_result() + return coordinator + + +@pytest.fixture +def agent_registry() -> AgentRegistryService: + return AgentRegistryService() + + +@pytest.fixture +def coordination_client( + fake_persistence: FakePersistenceBackend, + fake_message_bus: FakeMessageBus, + auth_service: AuthService, + mock_coordinator: AsyncMock, + agent_registry: AgentRegistryService, +) -> Generator[TestClient[Any]]: + """Test client with coordinator and agent registry configured.""" + from tests.unit.api.conftest import _seed_test_users + + _seed_test_users(fake_persistence, auth_service) + + from ai_company.engine.task_engine import TaskEngine + from ai_company.engine.task_engine_config import ( + TaskEngineConfig, + ) + from tests.unit.engine.task_engine_helpers import ( + FakeMessageBus as EngineMessageBus, + ) + from tests.unit.engine.task_engine_helpers import ( + FakePersistence, + ) + + task_engine = TaskEngine( + config=TaskEngineConfig(), + persistence=FakePersistence(), # type: ignore[arg-type] + message_bus=EngineMessageBus(), # type: ignore[arg-type] + ) + + app = create_app( + config=RootConfig(company_name="test"), + persistence=fake_persistence, + message_bus=fake_message_bus, + cost_tracker=CostTracker(), + auth_service=auth_service, + task_engine=task_engine, + coordinator=mock_coordinator, + agent_registry=agent_registry, + ) + with TestClient(app) as client: + client.headers.update(make_auth_headers("ceo")) + yield client + + +@pytest.mark.unit +class TestCoordinationControllerHappyPath: + async def test_coordinate_task_success( + self, + coordination_client: TestClient[Any], + mock_coordinator: AsyncMock, + agent_registry: AgentRegistryService, + ) -> None: + agent = _make_agent() + await agent_registry.register(agent) + + resp = coordination_client.post( + "/api/v1/tasks", + json={ + "title": "Test task", + "description": "A test task for coordination", + "type": "development", + "project": "proj-1", + "created_by": "api", + }, + ) + assert resp.status_code == 201 + task_id = resp.json()["data"]["id"] + + mock_coordinator.coordinate.return_value = _make_coordination_result(task_id) + + resp = coordination_client.post( + f"/api/v1/tasks/{task_id}/coordinate", + json={}, + ) + assert resp.status_code == 200 + body = resp.json() + assert body["success"] is True + assert body["data"]["parent_task_id"] == task_id + assert body["data"]["topology"] == "sas" + assert body["data"]["is_success"] is True + assert body["data"]["wave_count"] == 0 + assert len(body["data"]["phases"]) == 1 + + async def test_coordinate_with_specific_agents( + self, + coordination_client: TestClient[Any], + mock_coordinator: AsyncMock, + agent_registry: AgentRegistryService, + ) -> None: + agent = _make_agent("alice") + await agent_registry.register(agent) + + resp = coordination_client.post( + "/api/v1/tasks", + json={ + "title": "Test task", + "description": "Coordination test", + "type": "development", + "project": "proj-1", + "created_by": "api", + }, + ) + task_id = resp.json()["data"]["id"] + mock_coordinator.coordinate.return_value = _make_coordination_result(task_id) + + resp = coordination_client.post( + f"/api/v1/tasks/{task_id}/coordinate", + json={"agent_names": ["alice"]}, + ) + assert resp.status_code == 200 + assert resp.json()["success"] is True + + # Verify the coordinator received the resolved agent + mock_coordinator.coordinate.assert_awaited_once() + call_context = mock_coordinator.coordinate.call_args[0][0] + resolved_names = [a.name for a in call_context.available_agents] + assert resolved_names == ["alice"] + + async def test_coordinate_with_failed_phases( + self, + coordination_client: TestClient[Any], + mock_coordinator: AsyncMock, + agent_registry: AgentRegistryService, + ) -> None: + """Coordination returns is_success=False for failed phases.""" + agent = _make_agent() + await agent_registry.register(agent) + + resp = coordination_client.post( + "/api/v1/tasks", + json={ + "title": "Test task", + "description": "Test", + "type": "development", + "project": "proj-1", + "created_by": "api", + }, + ) + task_id = resp.json()["data"]["id"] + mock_coordinator.coordinate.return_value = _make_coordination_result( + task_id, is_success=False + ) + + resp = coordination_client.post( + f"/api/v1/tasks/{task_id}/coordinate", + json={}, + ) + assert resp.status_code == 200 + body = resp.json() + assert body["success"] is True + assert body["data"]["is_success"] is False + + +@pytest.mark.unit +class TestCoordinationControllerErrors: + def test_task_not_found( + self, + coordination_client: TestClient[Any], + ) -> None: + resp = coordination_client.post( + "/api/v1/tasks/nonexistent/coordinate", + json={}, + ) + assert resp.status_code == 404 + assert resp.json()["success"] is False + + async def test_unknown_agent_name( + self, + coordination_client: TestClient[Any], + ) -> None: + resp = coordination_client.post( + "/api/v1/tasks", + json={ + "title": "Test task", + "description": "Test", + "type": "development", + "project": "proj-1", + "created_by": "api", + }, + ) + task_id = resp.json()["data"]["id"] + + resp = coordination_client.post( + f"/api/v1/tasks/{task_id}/coordinate", + json={"agent_names": ["nonexistent-agent"]}, + ) + assert resp.status_code == 422 + assert "not found" in resp.json()["error"].lower() + + async def test_no_active_agents( + self, + coordination_client: TestClient[Any], + ) -> None: + resp = coordination_client.post( + "/api/v1/tasks", + json={ + "title": "Test task", + "description": "Test", + "type": "development", + "project": "proj-1", + "created_by": "api", + }, + ) + task_id = resp.json()["data"]["id"] + + resp = coordination_client.post( + f"/api/v1/tasks/{task_id}/coordinate", + json={}, + ) + assert resp.status_code == 422 + assert "no active agents" in resp.json()["error"].lower() + + async def test_coordination_phase_error( + self, + coordination_client: TestClient[Any], + mock_coordinator: AsyncMock, + agent_registry: AgentRegistryService, + ) -> None: + agent = _make_agent() + await agent_registry.register(agent) + + resp = coordination_client.post( + "/api/v1/tasks", + json={ + "title": "Test task", + "description": "Test", + "type": "development", + "project": "proj-1", + "created_by": "api", + }, + ) + task_id = resp.json()["data"]["id"] + + mock_coordinator.coordinate.side_effect = CoordinationPhaseError( + "Decomposition failed: test error", + phase="decompose", + ) + + resp = coordination_client.post( + f"/api/v1/tasks/{task_id}/coordinate", + json={}, + ) + assert resp.status_code == 422 + assert "coordination failed at phase" in resp.json()["error"].lower() + + +@pytest.mark.unit +class TestCoordinationControllerNoCoordinator: + def test_503_when_coordinator_not_configured( + self, + test_client: TestClient[Any], + fake_persistence: FakePersistenceBackend, + ) -> None: + """503 when coordinator is not configured (uses shared client).""" + from tests.unit.api.conftest import make_task + + task = make_task() + fake_persistence.tasks._tasks[task.id] = task + + resp = test_client.post( + f"/api/v1/tasks/{task.id}/coordinate", + json={}, + ) + assert resp.status_code == 503 diff --git a/tests/unit/api/test_dto.py b/tests/unit/api/test_dto.py index 810717f85c..4f6a37e383 100644 --- a/tests/unit/api/test_dto.py +++ b/tests/unit/api/test_dto.py @@ -1,8 +1,16 @@ """Tests for DTO models and response envelopes.""" import pytest +from pydantic import ValidationError -from ai_company.api.dto import ApiResponse, ApproveRequest, CreateApprovalRequest +from ai_company.api.dto import ( + ApiResponse, + ApproveRequest, + CoordinateTaskRequest, + CoordinationPhaseResponse, + CoordinationResultResponse, + CreateApprovalRequest, +) from ai_company.core.enums import ApprovalRiskLevel @@ -173,3 +181,138 @@ def test_comment_within_bounds(self) -> None: def test_comment_too_long(self) -> None: with pytest.raises(ValueError, match="at most 4096"): ApproveRequest(comment="x" * 5000) + + +@pytest.mark.unit +class TestCoordinateTaskRequest: + """Validation tests for CoordinateTaskRequest.""" + + def test_valid_minimal(self) -> None: + req = CoordinateTaskRequest() + assert req.agent_names is None + assert req.max_subtasks == 10 + + def test_agent_names_non_empty(self) -> None: + with pytest.raises(ValidationError): + CoordinateTaskRequest(agent_names=()) + + def test_agent_names_max_length(self) -> None: + names = tuple(f"agent-{i}" for i in range(51)) + with pytest.raises(ValidationError): + CoordinateTaskRequest(agent_names=names) + + def test_duplicate_agent_names_rejected(self) -> None: + with pytest.raises(ValidationError, match="Duplicate agent name"): + CoordinateTaskRequest(agent_names=("alice", "Alice")) + + def test_unique_agent_names_accepted(self) -> None: + req = CoordinateTaskRequest(agent_names=("alice", "bob")) + assert req.agent_names == ("alice", "bob") + + @pytest.mark.parametrize( + ("field", "value"), + [ + ("max_subtasks", 0), + ("max_subtasks", 51), + ("max_concurrency_per_wave", 0), + ("max_concurrency_per_wave", 51), + ], + ) + def test_bounds_rejected(self, field: str, value: int) -> None: + with pytest.raises(ValidationError): + CoordinateTaskRequest(**{field: value}) # type: ignore[arg-type] + + @pytest.mark.parametrize( + ("field", "value"), + [ + ("max_subtasks", 1), + ("max_subtasks", 50), + ("max_concurrency_per_wave", 1), + ("max_concurrency_per_wave", 50), + ], + ) + def test_bounds_accepted(self, field: str, value: int) -> None: + req = CoordinateTaskRequest(**{field: value}) # type: ignore[arg-type] + assert getattr(req, field) == value + + +@pytest.mark.unit +class TestCoordinationPhaseResponse: + """Validation tests for CoordinationPhaseResponse consistency.""" + + def test_success_with_error_rejected(self) -> None: + with pytest.raises(ValidationError, match="successful phase"): + CoordinationPhaseResponse( + phase="test", success=True, duration_seconds=0.1, error="oops" + ) + + def test_failure_without_error_rejected(self) -> None: + with pytest.raises(ValidationError, match="failed phase"): + CoordinationPhaseResponse( + phase="test", success=False, duration_seconds=0.1, error=None + ) + + def test_failure_with_blank_error_rejected(self) -> None: + with pytest.raises(ValidationError): + CoordinationPhaseResponse( + phase="test", success=False, duration_seconds=0.1, error=" " + ) + + def test_valid_success(self) -> None: + p = CoordinationPhaseResponse( + phase="decompose", success=True, duration_seconds=0.5 + ) + assert p.success is True + assert p.error is None + + def test_valid_failure(self) -> None: + p = CoordinationPhaseResponse( + phase="route", success=False, duration_seconds=0.1, error="fail" + ) + assert p.error == "fail" + + +@pytest.mark.unit +class TestCoordinationResultResponse: + """Tests for CoordinationResultResponse computed field.""" + + def _ok_phase(self) -> CoordinationPhaseResponse: + return CoordinationPhaseResponse(phase="p1", success=True, duration_seconds=0.1) + + def _fail_phase(self) -> CoordinationPhaseResponse: + return CoordinationPhaseResponse( + phase="p2", success=False, duration_seconds=0.1, error="err" + ) + + def test_is_success_all_pass(self) -> None: + r = CoordinationResultResponse( + parent_task_id="t1", + topology="sas", + total_duration_seconds=1.0, + total_cost_usd=0.01, + phases=(self._ok_phase(),), + wave_count=0, + ) + assert r.is_success is True + + def test_is_success_with_failure(self) -> None: + r = CoordinationResultResponse( + parent_task_id="t1", + topology="sas", + total_duration_seconds=1.0, + total_cost_usd=0.01, + phases=(self._ok_phase(), self._fail_phase()), + wave_count=1, + ) + assert r.is_success is False + + def test_empty_phases_rejected(self) -> None: + with pytest.raises(ValidationError): + CoordinationResultResponse( + parent_task_id="t1", + topology="sas", + total_duration_seconds=1.0, + total_cost_usd=0.01, + phases=(), + wave_count=0, + ) diff --git a/tests/unit/api/test_state.py b/tests/unit/api/test_state.py index 404a13d51a..909fee3725 100644 --- a/tests/unit/api/test_state.py +++ b/tests/unit/api/test_state.py @@ -134,3 +134,59 @@ def test_set_task_engine_twice_raises(self) -> None: state = _make_state(task_engine=engine) with pytest.raises(RuntimeError, match="already configured"): state.set_task_engine(engine) + + +@pytest.mark.unit +class TestAppStateCoordinator: + """Tests for coordinator property and has_coordinator.""" + + def test_coordinator_raises_when_none(self) -> None: + state = _make_state(coordinator=None) + with pytest.raises(ServiceUnavailableError): + _ = state.coordinator + + def test_coordinator_returns_when_set(self) -> None: + from unittest.mock import MagicMock + + coordinator = MagicMock() + state = _make_state(coordinator=coordinator) + assert state.coordinator is coordinator + + def test_has_coordinator_false_when_none(self) -> None: + state = _make_state(coordinator=None) + assert state.has_coordinator is False + + def test_has_coordinator_true_when_set(self) -> None: + from unittest.mock import MagicMock + + coordinator = MagicMock() + state = _make_state(coordinator=coordinator) + assert state.has_coordinator is True + + +@pytest.mark.unit +class TestAppStateAgentRegistry: + """Tests for agent_registry property.""" + + def test_agent_registry_raises_when_none(self) -> None: + state = _make_state(agent_registry=None) + with pytest.raises(ServiceUnavailableError): + _ = state.agent_registry + + def test_agent_registry_returns_when_set(self) -> None: + from ai_company.hr.registry import AgentRegistryService + + registry = AgentRegistryService() + state = _make_state(agent_registry=registry) + assert state.agent_registry is registry + + def test_has_agent_registry_false_when_none(self) -> None: + state = _make_state(agent_registry=None) + assert state.has_agent_registry is False + + def test_has_agent_registry_true_when_set(self) -> None: + from ai_company.hr.registry import AgentRegistryService + + registry = AgentRegistryService() + state = _make_state(agent_registry=registry) + assert state.has_agent_registry is True diff --git a/tests/unit/config/conftest.py b/tests/unit/config/conftest.py index c84857ab56..54ca753fe4 100644 --- a/tests/unit/config/conftest.py +++ b/tests/unit/config/conftest.py @@ -22,6 +22,7 @@ ) from ai_company.core.company import CompanyConfig from ai_company.core.resilience_config import RateLimiterConfig, RetryConfig +from ai_company.engine.coordination.section_config import CoordinationSectionConfig from ai_company.hr.promotion.config import PromotionConfig from ai_company.memory.config import CompanyMemoryConfig from ai_company.memory.org.config import OrgMemoryConfig @@ -96,6 +97,7 @@ class RootConfigFactory(ModelFactory[RootConfig]): security = SecurityConfig() trust = TrustConfig() promotion = PromotionConfig() + coordination = CoordinationSectionConfig() # ── Sample YAML strings ────────────────────────────────────────── diff --git a/tests/unit/engine/test_agent_engine.py b/tests/unit/engine/test_agent_engine.py index bcda021ad9..8b1296126a 100644 --- a/tests/unit/engine/test_agent_engine.py +++ b/tests/unit/engine/test_agent_engine.py @@ -1315,3 +1315,55 @@ def test_snapshot_channel_matches_api_channel() -> None: from ai_company.engine.task_engine import TaskEngine assert TaskEngine._SNAPSHOT_CHANNEL == CHANNEL_TASKS + + +@pytest.mark.unit +class TestAgentEngineCoordinator: + """Tests for coordinator property and coordinate() method.""" + + def test_coordinator_default_is_none( + self, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + response = _make_completion_response() + provider = mock_provider_factory([response]) + engine = AgentEngine(provider=provider) + assert engine.coordinator is None + + def test_coordinator_property_returns_coordinator( + self, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + response = _make_completion_response() + provider = mock_provider_factory([response]) + mock_coordinator = MagicMock() + engine = AgentEngine(provider=provider, coordinator=mock_coordinator) + assert engine.coordinator is mock_coordinator + + async def test_coordinate_raises_when_no_coordinator( + self, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + response = _make_completion_response() + provider = mock_provider_factory([response]) + engine = AgentEngine(provider=provider) + mock_context = MagicMock() + with pytest.raises(ExecutionStateError, match="No coordinator configured"): + await engine.coordinate(mock_context) + + async def test_coordinate_delegates_to_coordinator( + self, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + response = _make_completion_response() + provider = mock_provider_factory([response]) + mock_coordinator = AsyncMock() + expected_result = MagicMock() + mock_coordinator.coordinate.return_value = expected_result + + engine = AgentEngine(provider=provider, coordinator=mock_coordinator) + mock_context = MagicMock() + result = await engine.coordinate(mock_context) + + assert result is expected_result + mock_coordinator.coordinate.assert_awaited_once_with(mock_context) diff --git a/tests/unit/engine/test_coordination_factory.py b/tests/unit/engine/test_coordination_factory.py new file mode 100644 index 0000000000..999b6cb461 --- /dev/null +++ b/tests/unit/engine/test_coordination_factory.py @@ -0,0 +1,180 @@ +"""Tests for build_coordinator factory.""" + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from ai_company.config.schema import TaskAssignmentConfig +from ai_company.engine.coordination.factory import ( + _NoProviderDecompositionStrategy, + build_coordinator, +) +from ai_company.engine.coordination.section_config import ( + CoordinationSectionConfig, +) +from ai_company.engine.coordination.service import MultiAgentCoordinator +from ai_company.engine.decomposition.service import DecompositionService +from ai_company.engine.errors import DecompositionError +from ai_company.engine.parallel import ParallelExecutor +from ai_company.engine.routing.service import TaskRoutingService + +pytestmark = pytest.mark.timeout(30) + + +def _mock_engine() -> MagicMock: + """Create a mock AgentEngine for the factory.""" + return MagicMock() + + +@pytest.mark.unit +class TestBuildCoordinator: + """build_coordinator() produces a working MultiAgentCoordinator.""" + + def test_returns_coordinator(self) -> None: + coordinator = build_coordinator( + config=CoordinationSectionConfig(), + engine=_mock_engine(), + task_assignment_config=TaskAssignmentConfig(), + ) + assert isinstance(coordinator, MultiAgentCoordinator) + + def test_with_provider_and_model(self) -> None: + """Provider and model are wired into decomposition strategy.""" + provider = AsyncMock() + coordinator = build_coordinator( + config=CoordinationSectionConfig(), + engine=_mock_engine(), + task_assignment_config=TaskAssignmentConfig(), + provider=provider, + decomposition_model="test-model-001", + ) + assert isinstance(coordinator, MultiAgentCoordinator) + # Verify LLM strategy is used (not the placeholder) + decomp = coordinator._decomposition_service + assert isinstance(decomp, DecompositionService) + assert not isinstance(decomp._strategy, _NoProviderDecompositionStrategy) + + def test_without_provider_uses_placeholder(self) -> None: + """No provider/model → placeholder strategy.""" + coordinator = build_coordinator( + config=CoordinationSectionConfig(), + engine=_mock_engine(), + task_assignment_config=TaskAssignmentConfig(), + ) + decomp = coordinator._decomposition_service + assert isinstance(decomp._strategy, _NoProviderDecompositionStrategy) + + def test_with_task_engine(self) -> None: + """task_engine is wired into the coordinator.""" + task_engine = AsyncMock() + coordinator = build_coordinator( + config=CoordinationSectionConfig(), + engine=_mock_engine(), + task_assignment_config=TaskAssignmentConfig(), + task_engine=task_engine, + ) + assert coordinator._task_engine is task_engine + + def test_with_workspace_deps(self) -> None: + """workspace_strategy + workspace_config produce a workspace service.""" + from ai_company.engine.workspace.config import ( + WorkspaceIsolationConfig, + ) + from ai_company.engine.workspace.service import ( + WorkspaceIsolationService, + ) + + ws_strategy = MagicMock() + ws_config = WorkspaceIsolationConfig() + coordinator = build_coordinator( + config=CoordinationSectionConfig(), + engine=_mock_engine(), + task_assignment_config=TaskAssignmentConfig(), + workspace_strategy=ws_strategy, + workspace_config=ws_config, + ) + ws_service = coordinator._workspace_service + assert isinstance(ws_service, WorkspaceIsolationService) + + def test_custom_min_score(self) -> None: + """min_score from TaskAssignmentConfig is forwarded to the scorer.""" + coordinator = build_coordinator( + config=CoordinationSectionConfig(), + engine=_mock_engine(), + task_assignment_config=TaskAssignmentConfig(min_score=0.5), + ) + routing = coordinator._routing_service + assert isinstance(routing, TaskRoutingService) + assert routing._scorer._min_score == 0.5 + + def test_shutdown_manager_passed_to_executor(self) -> None: + """shutdown_manager is forwarded to the parallel executor.""" + shutdown_mgr = MagicMock() + engine = _mock_engine() + coordinator = build_coordinator( + config=CoordinationSectionConfig(), + engine=engine, + task_assignment_config=TaskAssignmentConfig(), + shutdown_manager=shutdown_mgr, + ) + executor = coordinator._parallel_executor + assert isinstance(executor, ParallelExecutor) + assert executor._shutdown_manager is shutdown_mgr + assert executor._engine is engine + + def test_provider_only_raises_value_error(self) -> None: + """Provider without model raises ValueError.""" + with pytest.raises(ValueError, match="missing decomposition_model"): + build_coordinator( + config=CoordinationSectionConfig(), + engine=_mock_engine(), + task_assignment_config=TaskAssignmentConfig(), + provider=AsyncMock(), + ) + + def test_model_only_raises_value_error(self) -> None: + """Model without provider raises ValueError.""" + with pytest.raises(ValueError, match="missing provider"): + build_coordinator( + config=CoordinationSectionConfig(), + engine=_mock_engine(), + task_assignment_config=TaskAssignmentConfig(), + decomposition_model="test-model-001", + ) + + def test_workspace_strategy_only_raises_value_error(self) -> None: + """workspace_strategy without workspace_config raises ValueError.""" + with pytest.raises(ValueError, match="missing workspace_config"): + build_coordinator( + config=CoordinationSectionConfig(), + engine=_mock_engine(), + task_assignment_config=TaskAssignmentConfig(), + workspace_strategy=MagicMock(), + ) + + def test_workspace_config_only_raises_value_error(self) -> None: + """workspace_config without workspace_strategy raises ValueError.""" + from ai_company.engine.workspace.config import ( + WorkspaceIsolationConfig, + ) + + with pytest.raises(ValueError, match="missing workspace_strategy"): + build_coordinator( + config=CoordinationSectionConfig(), + engine=_mock_engine(), + task_assignment_config=TaskAssignmentConfig(), + workspace_config=WorkspaceIsolationConfig(), + ) + + +@pytest.mark.unit +class TestNoProviderDecompositionStrategy: + """Placeholder strategy raises clear error.""" + + async def test_raises_decomposition_error(self) -> None: + strategy = _NoProviderDecompositionStrategy() + with pytest.raises( + DecompositionError, + match="No LLM provider configured", + ): + await strategy.decompose(MagicMock(), MagicMock()) diff --git a/tests/unit/engine/test_coordination_section_config.py b/tests/unit/engine/test_coordination_section_config.py new file mode 100644 index 0000000000..d8124d1bc9 --- /dev/null +++ b/tests/unit/engine/test_coordination_section_config.py @@ -0,0 +1,122 @@ +"""Tests for CoordinationSectionConfig.""" + +import pytest + +from ai_company.core.enums import CoordinationTopology +from ai_company.engine.coordination.config import CoordinationConfig +from ai_company.engine.coordination.section_config import ( + CoordinationSectionConfig, +) +from ai_company.engine.routing.models import AutoTopologyConfig + +pytestmark = pytest.mark.timeout(30) + + +@pytest.mark.unit +class TestCoordinationSectionConfigDefaults: + """Default values match the plan specification.""" + + def test_default_topology_is_auto(self) -> None: + cfg = CoordinationSectionConfig() + assert cfg.topology == CoordinationTopology.AUTO + + def test_default_max_concurrency_is_none(self) -> None: + cfg = CoordinationSectionConfig() + assert cfg.max_concurrency_per_wave is None + + def test_default_fail_fast_is_false(self) -> None: + cfg = CoordinationSectionConfig() + assert cfg.fail_fast is False + + def test_default_enable_workspace_isolation_is_true(self) -> None: + cfg = CoordinationSectionConfig() + assert cfg.enable_workspace_isolation is True + + def test_default_base_branch_is_main(self) -> None: + cfg = CoordinationSectionConfig() + assert cfg.base_branch == "main" + + def test_default_auto_topology_rules(self) -> None: + cfg = CoordinationSectionConfig() + assert isinstance(cfg.auto_topology_rules, AutoTopologyConfig) + + def test_frozen_model(self) -> None: + from pydantic import ValidationError + + cfg = CoordinationSectionConfig() + with pytest.raises(ValidationError): + cfg.fail_fast = True # type: ignore[misc] + + +@pytest.mark.unit +class TestCoordinationSectionConfigToCoordinationConfig: + """to_coordination_config() produces correct CoordinationConfig.""" + + def test_default_conversion(self) -> None: + cfg = CoordinationSectionConfig() + result = cfg.to_coordination_config() + assert isinstance(result, CoordinationConfig) + assert result.max_concurrency_per_wave is None + assert result.fail_fast is False + assert result.enable_workspace_isolation is True + assert result.base_branch == "main" + + def test_custom_values_carried_through(self) -> None: + cfg = CoordinationSectionConfig( + max_concurrency_per_wave=4, + fail_fast=True, + enable_workspace_isolation=False, + base_branch="develop", + ) + result = cfg.to_coordination_config() + assert result.max_concurrency_per_wave == 4 + assert result.fail_fast is True + assert result.enable_workspace_isolation is False + assert result.base_branch == "develop" + + def test_request_overrides_take_precedence(self) -> None: + cfg = CoordinationSectionConfig( + max_concurrency_per_wave=4, + fail_fast=False, + ) + result = cfg.to_coordination_config( + max_concurrency_per_wave=8, + fail_fast=True, + ) + assert result.max_concurrency_per_wave == 8 + assert result.fail_fast is True + + def test_none_overrides_use_section_defaults(self) -> None: + cfg = CoordinationSectionConfig( + max_concurrency_per_wave=4, + fail_fast=True, + ) + result = cfg.to_coordination_config( + max_concurrency_per_wave=None, + fail_fast=None, + ) + assert result.max_concurrency_per_wave == 4 + assert result.fail_fast is True + + +@pytest.mark.unit +class TestCoordinationSectionConfigValidation: + """Validation constraints on CoordinationSectionConfig.""" + + def test_max_concurrency_must_be_positive(self) -> None: + from pydantic import ValidationError + + with pytest.raises(ValidationError): + CoordinationSectionConfig(max_concurrency_per_wave=0) + + def test_base_branch_must_not_be_blank(self) -> None: + from pydantic import ValidationError + + with pytest.raises(ValidationError): + CoordinationSectionConfig(base_branch=" ") + + def test_extra_fields_forbidden(self) -> None: + from pydantic import ValidationError + + with pytest.raises(ValidationError, match="extra"): + CoordinationSectionConfig(unknown_field="oops") # type: ignore[call-arg]