Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ src/ai_company/
communication/ # Message bus, dispatcher, messenger, channels, delegation, loop prevention, conflict resolution, meeting protocol
config/ # YAML company config loading and validation
core/ # Shared domain models, base classes, and resilience config (RetryConfig, RateLimiterConfig)
engine/ # Agent orchestration, execution loops, parallel execution, task decomposition, routing, task assignment, centralized single-writer task state engine (TaskEngine), task lifecycle, recovery, shutdown, workspace isolation, coordination error classification, prompt policy validation, and AgentEngine-TaskEngine incremental status sync
engine/ # Agent orchestration, execution loops, parallel execution, task decomposition, routing, task assignment, centralized single-writer task state engine (TaskEngine), task lifecycle, recovery, shutdown, workspace isolation, coordination (multi-agent pipeline: TopologyDispatcher protocol, 4 dispatchers — SAS/centralized/decentralized/context-dependent, wave execution, workspace lifecycle integration), coordination error classification, and prompt policy validation
hr/ # HR engine: hiring, firing, onboarding, offboarding, agent registry, performance tracking (task metrics, collaboration scoring, trend detection), promotion/demotion (criteria evaluation, approval strategies, model mapping)
memory/ # Persistent agent memory (Mem0 initial, custom stack future — see Decision Log), retrieval pipeline (ranking, injection, context formatting, non-inferable filtering), shared org memory (org/), consolidation/archival (consolidation/)
persistence/ # Operational data persistence — pluggable PersistenceBackend protocol, SQLite initial (see Memory & Persistence design page)
Expand Down Expand Up @@ -127,7 +127,7 @@ src/ai_company/
- **Every module** with business logic MUST have: `from ai_company.observability import get_logger` then `logger = get_logger(__name__)`
- **Never** use `import logging` / `logging.getLogger()` / `print()` in application code
- **Variable name**: always `logger` (not `_logger`, not `log`)
- **Event names**: always use constants from the domain-specific module under `ai_company.observability.events` (e.g. `PROVIDER_CALL_START` from `events.provider`, `BUDGET_RECORD_ADDED` from `events.budget`, `CFO_ANOMALY_DETECTED` from `events.cfo`, `CONFLICT_DETECTED` from `events.conflict`, `MEETING_STARTED` from `events.meeting`, `CLASSIFICATION_START` from `events.classification`, `CONSOLIDATION_START` from `events.consolidation`, `ORG_MEMORY_QUERY_START` from `events.org_memory`, `API_REQUEST_STARTED` from `events.api`, `CODE_RUNNER_EXECUTE_START` from `events.code_runner`, `DOCKER_EXECUTE_START` from `events.docker`, `MCP_INVOKE_START` from `events.mcp`, `SECURITY_EVALUATE_START` from `events.security`, `HR_HIRING_REQUEST_CREATED` from `events.hr`, `PERF_METRIC_RECORDED` from `events.performance`, `TRUST_EVALUATE_START` from `events.trust`, `PROMOTION_EVALUATE_START` from `events.promotion`, `PROMPT_BUILD_START` from `events.prompt`, `MEMORY_RETRIEVAL_START` from `events.memory`, `AUTONOMY_ACTION_AUTO_APPROVED` from `events.autonomy`, `TIMEOUT_POLICY_EVALUATED` from `events.timeout`, `PERSISTENCE_AUDIT_ENTRY_SAVED` from `events.persistence`, `TASK_ENGINE_STARTED` from `events.task_engine`). Import directly: `from ai_company.observability.events.<domain> import EVENT_CONSTANT`
- **Event names**: always use constants from the domain-specific module under `ai_company.observability.events` (e.g. `PROVIDER_CALL_START` from `events.provider`, `BUDGET_RECORD_ADDED` from `events.budget`, `CFO_ANOMALY_DETECTED` from `events.cfo`, `CONFLICT_DETECTED` from `events.conflict`, `MEETING_STARTED` from `events.meeting`, `CLASSIFICATION_START` from `events.classification`, `CONSOLIDATION_START` from `events.consolidation`, `ORG_MEMORY_QUERY_START` from `events.org_memory`, `API_REQUEST_STARTED` from `events.api`, `CODE_RUNNER_EXECUTE_START` from `events.code_runner`, `DOCKER_EXECUTE_START` from `events.docker`, `MCP_INVOKE_START` from `events.mcp`, `SECURITY_EVALUATE_START` from `events.security`, `HR_HIRING_REQUEST_CREATED` from `events.hr`, `PERF_METRIC_RECORDED` from `events.performance`, `TRUST_EVALUATE_START` from `events.trust`, `PROMOTION_EVALUATE_START` from `events.promotion`, `PROMPT_BUILD_START` from `events.prompt`, `MEMORY_RETRIEVAL_START` from `events.memory`, `AUTONOMY_ACTION_AUTO_APPROVED` from `events.autonomy`, `TIMEOUT_POLICY_EVALUATED` from `events.timeout`, `PERSISTENCE_AUDIT_ENTRY_SAVED` from `events.persistence`, `TASK_ENGINE_STARTED` from `events.task_engine`, `COORDINATION_STARTED` from `events.coordination`). Import directly: `from ai_company.observability.events.<domain> import EVENT_CONSTANT`
- **Structured kwargs**: always `logger.info(EVENT, key=value)` — never `logger.info("msg %s", val)`
- **All error paths** must log at WARNING or ERROR with context before raising
- **All state transitions** must log at INFO
Expand Down
45 changes: 45 additions & 0 deletions docs/design/engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -793,3 +793,48 @@ the memory subsystem) historical single-agent success rate as inputs.
held-out configurations. The SynthOrg context differs (role-differentiated
agents vs. identical agents), so thresholds should be validated empirically
once multi-agent execution is implemented.

### Multi-Agent Coordination Pipeline

The `MultiAgentCoordinator` orchestrates the end-to-end pipeline that transforms
a parent task into parallel agent work:

```text
decompose → route → resolve topology → validate → dispatch → rollup → update parent
```
Comment thread
coderabbitai[bot] marked this conversation as resolved.

**Pipeline phases:**

1. **Decompose** — `DecompositionService` breaks the parent task into subtasks
with a dependency DAG
2. **Route** — `TaskRoutingService` assigns each subtask to an agent based on
skills, workload, and topology
3. **Resolve topology** — reads topology from routing decisions; falls back to
`CENTRALIZED` if `AUTO` was not resolved upstream
4. **Validate** — fails the pipeline if all subtasks are unroutable
5. **Dispatch** — a `TopologyDispatcher` executes waves (workspace setup →
parallel execution → merge → teardown)
6. **Rollup** — aggregates subtask statuses into a `SubtaskStatusRollup`
7. **Update parent** — transitions the parent task via `TaskEngine` (if provided)

Each phase produces a `CoordinationPhaseResult` (success/failure + duration).
Phase failures in decompose/route/validate raise `CoordinationPhaseError` with
partial results; rollup and update-parent failures are captured but do not abort
the pipeline.

**Topology dispatchers:**

| Dispatcher | Topology | Workspace Isolation | Wave Strategy |
|-----------|----------|-------------------|---------------|
| `SasDispatcher` | SAS | Never | Sequential waves from DAG |
| `CentralizedDispatcher` | Centralized | Optional (config-driven) | DAG waves, post-execution merge |
| `DecentralizedDispatcher` | Decentralized | Mandatory (raises if unavailable) | DAG waves, post-execution merge |
| `ContextDependentDispatcher` | Context-dependent | Per-wave (multi-subtask waves only) | DAG waves, per-wave merge/teardown |

The `select_dispatcher` factory maps a resolved `CoordinationTopology` to the
appropriate dispatcher; `AUTO` must be resolved before dispatch.

**Wave execution** (`group_builder.build_execution_waves`) converts DAG parallel
groups and routing decisions into `ParallelExecutionGroup` instances. Subtasks
without routing decisions are skipped. Empty waves (all subtasks unroutable) are
dropped.
34 changes: 34 additions & 0 deletions src/ai_company/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,22 @@
AgentContext,
AgentContextSnapshot,
)
from ai_company.engine.coordination import (
CentralizedDispatcher,
ContextDependentDispatcher,
CoordinationConfig,
CoordinationContext,
CoordinationPhaseResult,
CoordinationResult,
CoordinationWave,
DecentralizedDispatcher,
DispatchResult,
MultiAgentCoordinator,
SasDispatcher,
TopologyDispatcher,
build_execution_waves,
select_dispatcher,
)
from ai_company.engine.decomposition import (
DecompositionContext,
DecompositionPlan,
Expand All @@ -55,6 +71,8 @@
TaskStructureClassifier,
)
from ai_company.engine.errors import (
CoordinationError,
CoordinationPhaseError,
DecompositionCycleError,
DecompositionDepthError,
DecompositionError,
Expand Down Expand Up @@ -189,12 +207,22 @@
"AutoTopologyConfig",
"BudgetChecker",
"CancelTaskMutation",
"CentralizedDispatcher",
"ClassificationResult",
"CleanupCallback",
"ContextDependentDispatcher",
"CooperativeTimeoutStrategy",
"CoordinationConfig",
"CoordinationContext",
"CoordinationError",
"CoordinationPhaseError",
"CoordinationPhaseResult",
"CoordinationResult",
"CoordinationWave",
"CostOptimizedAssignmentStrategy",
"CreateTaskData",
"CreateTaskMutation",
"DecentralizedDispatcher",
"DecompositionContext",
"DecompositionCycleError",
"DecompositionDepthError",
Expand All @@ -206,6 +234,7 @@
"DefaultTokenEstimator",
"DeleteTaskMutation",
"DependencyGraph",
"DispatchResult",
"EngineError",
"ErrorFinding",
"ErrorSeverity",
Expand All @@ -226,6 +255,7 @@
"MergeConflict",
"MergeOrchestrator",
"MergeResult",
"MultiAgentCoordinator",
"NoEligibleAgentError",
"ParallelExecutionError",
"ParallelExecutionGroup",
Expand All @@ -249,6 +279,7 @@
"RoutingCandidate",
"RoutingDecision",
"RoutingResult",
"SasDispatcher",
"ShutdownChecker",
"ShutdownManager",
"ShutdownResult",
Expand Down Expand Up @@ -280,6 +311,7 @@
"TaskStructureClassifier",
"TaskVersionConflictError",
"TerminationReason",
"TopologyDispatcher",
"TopologySelector",
"TransitionTaskMutation",
"TurnRecord",
Expand All @@ -296,7 +328,9 @@
"WorkspaceRequest",
"WorkspaceSetupError",
"add_token_usage",
"build_execution_waves",
"build_strategy_map",
"build_system_prompt",
"classify_execution_errors",
"select_dispatcher",
]
42 changes: 42 additions & 0 deletions src/ai_company/engine/coordination/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Multi-agent coordination engine.

Connects decomposition, routing, workspace isolation, and parallel
execution into an end-to-end pipeline orchestrated by topology-driven
dispatchers.
"""

from ai_company.engine.coordination.config import CoordinationConfig
from ai_company.engine.coordination.dispatchers import (
CentralizedDispatcher,
ContextDependentDispatcher,
DecentralizedDispatcher,
DispatchResult,
SasDispatcher,
TopologyDispatcher,
select_dispatcher,
)
from ai_company.engine.coordination.group_builder import build_execution_waves
from ai_company.engine.coordination.models import (
CoordinationContext,
CoordinationPhaseResult,
CoordinationResult,
CoordinationWave,
)
from ai_company.engine.coordination.service import MultiAgentCoordinator

__all__ = [
"CentralizedDispatcher",
"ContextDependentDispatcher",
"CoordinationConfig",
"CoordinationContext",
"CoordinationPhaseResult",
"CoordinationResult",
"CoordinationWave",
"DecentralizedDispatcher",
"DispatchResult",
"MultiAgentCoordinator",
"SasDispatcher",
"TopologyDispatcher",
"build_execution_waves",
"select_dispatcher",
]
38 changes: 38 additions & 0 deletions src/ai_company/engine/coordination/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Coordination configuration."""

from pydantic import BaseModel, ConfigDict, Field

from ai_company.core.types import NotBlankStr # noqa: TC001


class CoordinationConfig(BaseModel):
"""Configuration for a multi-agent coordination run.

Attributes:
max_concurrency_per_wave: Max parallel agents per wave
(``None`` = unlimited).
fail_fast: Stop on first wave failure instead of continuing.
enable_workspace_isolation: Create isolated workspaces for
multi-agent execution.
base_branch: Git branch to use for workspace isolation.
"""

model_config = ConfigDict(frozen=True, extra="forbid")

max_concurrency_per_wave: int | None = Field(
default=None,
ge=1,
description="Max parallel agents per wave (None = unlimited)",
)
fail_fast: bool = Field(
default=False,
description="Stop on first wave failure",
)
enable_workspace_isolation: bool = Field(
default=True,
description="Create isolated workspaces for multi-agent execution",
)
base_branch: NotBlankStr = Field(
default="main",
description="Git branch for workspace isolation",
)
Loading
Loading