Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ src/ai_company/
api/ # FastAPI REST + WebSocket routes
budget/ # Per-agent cost tracking and spending controls
cli/ # Typer CLI commands
communication/ # Message bus (protocol + in-memory backend), dispatcher, messenger facade, channels
communication/ # Message bus, dispatcher, messenger, channels, delegation, loop prevention
config/ # YAML company config loading and validation
core/ # Shared domain models and base classes
engine/ # Agent orchestration, execution loops, task lifecycle, recovery, and shutdown
Expand Down
24 changes: 22 additions & 2 deletions DESIGN_SPEC.md
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ When a loop is detected, the framework:
3. Escalates to the sender's manager (or human if at top of hierarchy)
4. Logs the loop for analytics and process improvement

> **Current state (M4 in-progress):** The communication foundation is implemented: `MessageBus` protocol with `InMemoryMessageBus` backend (asyncio queues, pull-model `receive()`), `MessageDispatcher` for concurrent handler routing via `asyncio.TaskGroup`, `AgentMessenger` per-agent facade (auto-fills sender/timestamp/ID, deterministic direct-channel naming `@{sorted_a}:{sorted_b}`), and `DeliveryEnvelope` for delivery tracking. Loop prevention (§5.5), conflict resolution (§5.6), and meeting protocol (§5.7) are planned for later M4 work.
> **Current state (M4 in-progress):** The communication foundation is implemented: `MessageBus` protocol with `InMemoryMessageBus` backend (asyncio queues, pull-model `receive()`), `MessageDispatcher` for concurrent handler routing via `asyncio.TaskGroup`, `AgentMessenger` per-agent facade (auto-fills sender/timestamp/ID, deterministic direct-channel naming `@{sorted_a}:{sorted_b}`), and `DeliveryEnvelope` for delivery tracking. Loop prevention (§5.5) is implemented: `DelegationGuard` orchestrates five mechanisms (ancestry, depth, dedup, rate limit, circuit breaker) with `LoopPreventionConfig`. Hierarchical delegation is implemented via `DelegationService` with `HierarchyResolver` and `AuthorityValidator`. Task model extended with `parent_task_id` and `delegation_chain` fields. Conflict resolution (§5.6) and meeting protocol (§5.7) are planned for later M4 work.

### 5.6 Conflict Resolution Protocol

Expand Down Expand Up @@ -785,6 +785,8 @@ task:
deadline: null
max_retries: 1 # max reassignment attempts after failure (0 = no retry)
status: "assigned"
parent_task_id: null # parent task ID when created via delegation
delegation_chain: [] # ordered agent IDs of delegators (root first)
```

### 6.3 Workflow Types
Expand Down Expand Up @@ -2350,10 +2352,26 @@ ai-company/
│ │ ├── bus_protocol.py # MessageBus protocol interface
│ │ ├── channel.py # Channel model
│ │ ├── config.py # Communication config
│ │ ├── delegation/ # Hierarchical delegation subsystem
│ │ │ ├── __init__.py # Package exports
│ │ │ ├── authority.py # AuthorityValidator + AuthorityCheckResult
│ │ │ ├── hierarchy.py # HierarchyResolver (org hierarchy from Company)
│ │ │ ├── models.py # DelegationRequest, DelegationResult, DelegationRecord
│ │ │ └── service.py # DelegationService (orchestrates delegation flow)
│ │ ├── dispatcher.py # MessageDispatcher + DispatchResult
│ │ ├── enums.py # Communication enums
│ │ ├── errors.py # Communication error hierarchy
│ │ ├── errors.py # Communication + delegation error hierarchy
│ │ ├── handler.py # MessageHandler protocol, FunctionHandler, HandlerRegistration
│ │ ├── loop_prevention/ # Delegation loop prevention mechanisms
│ │ │ ├── __init__.py # Package exports
│ │ │ ├── _pair_key.py # Canonical agent-pair key utility
│ │ │ ├── ancestry.py # Ancestry cycle detection (pure function)
│ │ │ ├── circuit_breaker.py # DelegationCircuitBreaker, CircuitBreakerState
│ │ │ ├── dedup.py # DelegationDeduplicator (time-windowed)
│ │ │ ├── depth.py # Max delegation depth check (pure function)
│ │ │ ├── guard.py # DelegationGuard (orchestrates all mechanisms)
│ │ │ ├── models.py # GuardCheckOutcome
│ │ │ └── rate_limit.py # DelegationRateLimiter (per-pair)
│ │ ├── message.py # Message model
│ │ ├── messenger.py # AgentMessenger per-agent facade
│ │ └── subscription.py # Subscription + DeliveryEnvelope models
Expand All @@ -2373,6 +2391,7 @@ ai-company/
│ │ │ ├── budget.py # BUDGET_* constants
│ │ │ ├── communication.py # COMM_* constants
│ │ │ ├── config.py # CONFIG_* constants
│ │ │ ├── delegation.py # DELEGATION_* constants
│ │ │ ├── correlation.py # CORRELATION_* constants
│ │ │ ├── execution.py # EXECUTION_* constants
│ │ │ ├── git.py # GIT_* constants
Expand Down Expand Up @@ -2532,6 +2551,7 @@ These conventions were established during the M0–M2+ review cycle. **Adopted**
| **Workspace isolation** | Planned (M4) | Pluggable `WorkspaceIsolationStrategy` protocol. Default: planner + git worktrees. Each agent works in an isolated worktree; sequential merge on completion. Textual conflicts detected by git; semantic conflicts reviewed by agent or human. | Industry standard (Codex, Cursor, Claude Code, VS Code). Maximum parallelism. Leverages mature git infrastructure. See §6.8. |
| **Graceful shutdown** | Adopted (M3) | Pluggable `ShutdownStrategy` protocol. Default: cooperative with 30s timeout. Agents check shutdown event at turn boundaries. Force-cancel after timeout. `INTERRUPTED` status for force-cancelled tasks. M4/M5: upgrade to checkpoint-and-stop. | Cross-platform (Windows `signal.signal()` fallback). Bounded shutdown time. Mirrors cooperative shutdown in §6.7. |
| **Communication foundation** | Adopted (M4) | `MessageBus` protocol with `InMemoryMessageBus` backend (asyncio queues, pull-model `receive()` with shutdown signaling via `asyncio.Event`). `MessageDispatcher` routes to concurrent handlers via `asyncio.TaskGroup` with pre-allocated error collection. `AgentMessenger` per-agent facade auto-fills sender/timestamp/ID; deterministic direct-channel naming `@{sorted_a}:{sorted_b}`. `DeliveryEnvelope` for delivery tracking. `NotBlankStr` validation on all protocol boundary identifiers. | Pull-model avoids callback complexity and enables agents to consume at their own pace. Protocol + backend split enables future persistent/distributed bus implementations. Deterministic DM channel names prevent duplicates. See §5. |
| **Delegation & loop prevention** | Adopted (M4) | `HierarchyResolver` resolves org hierarchy from `Company` at construction (cycle-detected, `MappingProxyType`-frozen). `AuthorityValidator` checks chain-of-command + role permissions. `DelegationGuard` orchestrates five mechanisms (ancestry, depth, dedup, rate limit, circuit breaker) in sequence, short-circuiting on first rejection. `DelegationService` is synchronous (CPU-only); messaging integration deferred. Stateful mechanisms use injectable clock for deterministic testing. Task model extended with `parent_task_id` and `delegation_chain` fields. | Synchronous delegation avoids async complexity for CPU-only validation. Five-mechanism guard provides defence-in-depth against all loop patterns. Injectable clocks enable deterministic testing. See §5.4, §5.5. |

---

Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ AI Company lets you spin up a virtual organization staffed entirely by AI agents
- **Deep Agent Identity** - Names, personalities, skills, seniority levels, performance tracking
- **Multi-Provider** - Any LLM via LiteLLM — cloud APIs, OpenRouter (400+ models), local Ollama, and more
- **Smart Cost Management** - Per-agent budget tracking, auto model routing, CFO agent optimization
- **Hierarchical Delegation** - Chain-of-command task delegation with five-mechanism loop prevention
- **Configurable Autonomy** - From fully autonomous to human-approves-everything, with a Security Ops agent in between
- **Persistent Memory** - Agents remember past decisions, code, relationships (memory layer TBD)
- **HR System** - Hire, fire, promote agents. HR agent analyzes skill gaps and proposes candidates
Expand All @@ -23,7 +24,7 @@ AI Company lets you spin up a virtual organization staffed entirely by AI agents

## Status

**M3: Single Agent** in progress (M0 Tooling, M1 Config & Core, M2 Providers — all done). See [DESIGN_SPEC.md](DESIGN_SPEC.md) for the full high-level specification.
**M4: Multi-Agent** in progress (M0 Tooling, M1 Config & Core, M2 Providers, M3 Single Agent — all done). See [DESIGN_SPEC.md](DESIGN_SPEC.md) for the full high-level specification.

## Tech Stack

Expand Down
51 changes: 51 additions & 0 deletions src/ai_company/communication/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@
MessageRetentionConfig,
RateLimitConfig,
)
from ai_company.communication.delegation import (
AuthorityCheckResult,
AuthorityValidator,
DelegationRecord,
DelegationRequest,
DelegationResult,
DelegationService,
HierarchyResolver,
)
from ai_company.communication.dispatcher import DispatchResult, MessageDispatcher
from ai_company.communication.enums import (
AttachmentType,
Expand All @@ -27,6 +36,15 @@
ChannelAlreadyExistsError,
ChannelNotFoundError,
CommunicationError,
DelegationAncestryError,
DelegationAuthorityError,
DelegationCircuitOpenError,
DelegationDepthError,
DelegationDuplicateError,
DelegationError,
DelegationLoopError,
DelegationRateLimitError,
HierarchyResolutionError,
MessageBusAlreadyRunningError,
MessageBusNotRunningError,
NotSubscribedError,
Expand All @@ -37,6 +55,16 @@
MessageHandler,
MessageHandlerFunc,
)
from ai_company.communication.loop_prevention import (
CircuitBreakerState,
DelegationCircuitBreaker,
DelegationDeduplicator,
DelegationGuard,
DelegationRateLimiter,
GuardCheckOutcome,
check_ancestry,
check_delegation_depth,
)
from ai_company.communication.message import Attachment, Message, MessageMetadata
from ai_company.communication.messenger import AgentMessenger
from ai_company.communication.subscription import DeliveryEnvelope, Subscription
Expand All @@ -45,19 +73,40 @@
"AgentMessenger",
"Attachment",
"AttachmentType",
"AuthorityCheckResult",
"AuthorityValidator",
"Channel",
"ChannelAlreadyExistsError",
"ChannelNotFoundError",
"ChannelType",
"CircuitBreakerConfig",
"CircuitBreakerState",
"CommunicationConfig",
"CommunicationError",
"CommunicationPattern",
"DelegationAncestryError",
"DelegationAuthorityError",
"DelegationCircuitBreaker",
"DelegationCircuitOpenError",
"DelegationDeduplicator",
"DelegationDepthError",
"DelegationDuplicateError",
"DelegationError",
"DelegationGuard",
"DelegationLoopError",
"DelegationRateLimiter",
"DelegationRecord",
"DelegationRequest",
"DelegationResult",
"DelegationService",
"DeliveryEnvelope",
"DispatchResult",
"FunctionHandler",
"GuardCheckOutcome",
"HandlerRegistration",
"HierarchyConfig",
"HierarchyResolutionError",
"HierarchyResolver",
"InMemoryMessageBus",
"LoopPreventionConfig",
"MeetingTypeConfig",
Expand All @@ -78,4 +127,6 @@
"NotSubscribedError",
"RateLimitConfig",
"Subscription",
"check_ancestry",
"check_delegation_depth",
]
27 changes: 27 additions & 0 deletions src/ai_company/communication/delegation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Hierarchical delegation subsystem."""

from ai_company.communication.delegation.authority import (
AuthorityCheckResult,
AuthorityValidator,
)
from ai_company.communication.delegation.hierarchy import (
HierarchyResolver,
)
from ai_company.communication.delegation.models import (
DelegationRecord,
DelegationRequest,
DelegationResult,
)
from ai_company.communication.delegation.service import (
DelegationService,
)

__all__ = [
"AuthorityCheckResult",
"AuthorityValidator",
"DelegationRecord",
"DelegationRequest",
"DelegationResult",
"DelegationService",
"HierarchyResolver",
]
158 changes: 158 additions & 0 deletions src/ai_company/communication/delegation/authority.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
"""Authority validation for hierarchical delegation."""

from typing import Self

from pydantic import BaseModel, ConfigDict, Field, model_validator

from ai_company.communication.config import HierarchyConfig # noqa: TC001
from ai_company.communication.delegation.hierarchy import ( # noqa: TC001
HierarchyResolver,
)
from ai_company.core.agent import AgentIdentity # noqa: TC001
from ai_company.observability import get_logger
from ai_company.observability.events.delegation import (
DELEGATION_AUTHORITY_DENIED,
DELEGATION_AUTHORIZED,
)

logger = get_logger(__name__)


class AuthorityCheckResult(BaseModel):
"""Result of an authority validation check.

Attributes:
allowed: Whether the delegation is authorized.
reason: Explanation (empty on success).
"""

model_config = ConfigDict(frozen=True)

allowed: bool = Field(description="Whether delegation is allowed")
reason: str = Field(default="", description="Explanation")

@model_validator(mode="after")
def _validate_allowed_reason(self) -> Self:
"""Enforce allowed/reason correlation."""
if self.allowed and self.reason:
msg = "reason must be empty when allowed is True"
raise ValueError(msg)
if not self.allowed and not self.reason.strip():
msg = "reason is required when allowed is False"
raise ValueError(msg)
return self
Comment thread
coderabbitai[bot] marked this conversation as resolved.


class AuthorityValidator:
"""Validates delegation authority using hierarchy and role permissions.

Checks:
1. Hierarchy: delegatee must be a subordinate of delegator
(direct or skip-level depending on config).
2. Roles: if ``delegator.authority.can_delegate_to`` is
non-empty, ``delegatee.role`` must be in it; if empty,
all roles are permitted.

Args:
hierarchy: Resolved org hierarchy.
hierarchy_config: Hierarchy enforcement configuration.
"""

__slots__ = ("_config", "_hierarchy")

def __init__(
self,
hierarchy: HierarchyResolver,
hierarchy_config: HierarchyConfig,
) -> None:
self._hierarchy = hierarchy
self._config = hierarchy_config

def validate(
self,
delegator: AgentIdentity,
delegatee: AgentIdentity,
) -> AuthorityCheckResult:
"""Validate whether delegator can delegate to delegatee.

Args:
delegator: Identity of the delegating agent.
delegatee: Identity of the target agent.

Returns:
Result indicating whether delegation is authorized.
"""
if self._config.enforce_chain_of_command:
result = self._check_hierarchy(delegator, delegatee)
if not result.allowed:
return result

result = self._check_role_permissions(delegator, delegatee)
if not result.allowed:
return result

logger.info(
DELEGATION_AUTHORIZED,
delegator=delegator.name,
delegatee=delegatee.name,
)
return AuthorityCheckResult(allowed=True)

def _check_hierarchy(
self,
delegator: AgentIdentity,
delegatee: AgentIdentity,
) -> AuthorityCheckResult:
"""Check hierarchy constraints."""
is_direct = self._hierarchy.is_direct_report(delegator.name, delegatee.name)
if is_direct:
return AuthorityCheckResult(allowed=True)

if self._config.allow_skip_level:
is_sub = self._hierarchy.is_subordinate(delegator.name, delegatee.name)
if is_sub:
return AuthorityCheckResult(allowed=True)

reason = (
f"{delegatee.name!r} is not a "
f"{'subordinate' if self._config.allow_skip_level else 'direct report'} "
f"of {delegator.name!r}"
)
logger.info(
DELEGATION_AUTHORITY_DENIED,
delegator=delegator.name,
delegatee=delegatee.name,
reason=reason,
)
return AuthorityCheckResult(
allowed=False,
reason=reason,
)

def _check_role_permissions(
self,
delegator: AgentIdentity,
delegatee: AgentIdentity,
) -> AuthorityCheckResult:
"""Check role-based delegation permissions."""
allowed_roles = delegator.authority.can_delegate_to
if not allowed_roles:
return AuthorityCheckResult(allowed=True)

if delegatee.role in allowed_roles:
return AuthorityCheckResult(allowed=True)

reason = (
f"Role {delegatee.role!r} is not in "
f"delegator's can_delegate_to: {allowed_roles}"
)
logger.info(
DELEGATION_AUTHORITY_DENIED,
delegator=delegator.name,
delegatee=delegatee.name,
reason=reason,
)
return AuthorityCheckResult(
allowed=False,
reason=reason,
)
Loading