Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ src/ai_company/
api/ # FastAPI REST + WebSocket routes
budget/ # Per-agent cost tracking and spending controls
cli/ # Typer CLI commands
communication/ # Inter-agent message bus and channels
communication/ # Message bus (protocol + in-memory backend), dispatcher, messenger facade, channels
config/ # YAML company config loading and validation
core/ # Shared domain models and base classes
engine/ # Agent orchestration, execution loops, task lifecycle, recovery, and shutdown
Expand Down
15 changes: 13 additions & 2 deletions DESIGN_SPEC.md
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,8 @@ When a loop is detected, the framework:
3. Escalates to the sender's manager (or human if at top of hierarchy)
4. Logs the loop for analytics and process improvement

> **Current state (M4 in-progress):** The communication foundation is implemented: `MessageBus` protocol with `InMemoryMessageBus` backend (asyncio queues, pull-model `receive()`), `MessageDispatcher` for concurrent handler routing via `asyncio.TaskGroup`, `AgentMessenger` per-agent facade (auto-fills sender/timestamp/ID, deterministic direct-channel naming `@{sorted_a}:{sorted_b}`), and `DeliveryEnvelope` for delivery tracking. Loop prevention (§5.5), conflict resolution (§5.6), and meeting protocol (§5.7) are planned for later M4 work.

### 5.6 Conflict Resolution Protocol

When two or more agents disagree on an approach (architecture, implementation, priority, etc.), the framework provides multiple configurable resolution strategies behind a `ConflictResolver` protocol. New strategies can be added without modifying existing ones. The strategy is configurable per company, per department, or per conflict type.
Expand Down Expand Up @@ -2318,10 +2320,17 @@ ai-company/
│ │ ├── meeting_engine.py # Meeting coordination (M4)
│ │ └── hr_engine.py # Hiring, firing, performance (M7)
│ ├── communication/ # Inter-agent communication
│ │ ├── bus_memory.py # InMemoryMessageBus implementation
│ │ ├── bus_protocol.py # MessageBus protocol interface
│ │ ├── channel.py # Channel model
│ │ ├── message.py # Message model
│ │ ├── config.py # Communication config
│ │ └── enums.py # Communication enums
│ │ ├── dispatcher.py # MessageDispatcher + DispatchResult
│ │ ├── enums.py # Communication enums
│ │ ├── errors.py # Communication error hierarchy
│ │ ├── handler.py # MessageHandler protocol, FunctionHandler, HandlerRegistration
│ │ ├── message.py # Message model
│ │ ├── messenger.py # AgentMessenger per-agent facade
│ │ └── subscription.py # Subscription + DeliveryEnvelope models
│ ├── memory/ # Agent memory system (M5, stubs only)
│ │ ├── store.py # Memory storage backend (M5)
│ │ ├── retrieval.py # Memory retrieval & ranking (M5)
Expand All @@ -2336,6 +2345,7 @@ ai-company/
│ │ ├── events/ # Per-domain event constants
│ │ │ ├── __init__.py # Package marker with usage docs; no re-exports
│ │ │ ├── budget.py # BUDGET_* constants
│ │ │ ├── communication.py # COMM_* constants
│ │ │ ├── config.py # CONFIG_* constants
│ │ │ ├── correlation.py # CORRELATION_* constants
│ │ │ ├── execution.py # EXECUTION_* constants
Expand Down Expand Up @@ -2489,6 +2499,7 @@ These conventions were established during the M0–M2+ review cycle. **Adopted**
| **State coordination** | Planned (M4) | Centralized single-writer: `TaskEngine` owns all task/project mutations via `asyncio.Queue`. Agents submit requests, engine applies `model_copy(update=...)` sequentially and publishes snapshots. `version: int` field on state models for future optimistic concurrency if multi-process scaling is needed. | Prevents lost updates by design. Trivial in single-threaded asyncio (no locks). Perfect audit trail. Industry consensus: MetaGPT, CrewAI, AutoGen all use prevention-by-design, not conflict resolution. See §6.8 State Coordination table. |
| **Workspace isolation** | Planned (M4) | Pluggable `WorkspaceIsolationStrategy` protocol. Default: planner + git worktrees. Each agent works in an isolated worktree; sequential merge on completion. Textual conflicts detected by git; semantic conflicts reviewed by agent or human. | Industry standard (Codex, Cursor, Claude Code, VS Code). Maximum parallelism. Leverages mature git infrastructure. See §6.8. |
| **Graceful shutdown** | Adopted (M3) | Pluggable `ShutdownStrategy` protocol. Default: cooperative with 30s timeout. Agents check shutdown event at turn boundaries. Force-cancel after timeout. `INTERRUPTED` status for force-cancelled tasks. M4/M5: upgrade to checkpoint-and-stop. | Cross-platform (Windows `signal.signal()` fallback). Bounded shutdown time. Mirrors cooperative shutdown in §6.7. |
| **Communication foundation** | Adopted (M4) | `MessageBus` protocol with `InMemoryMessageBus` backend (asyncio queues, pull-model `receive()` with shutdown signaling via `asyncio.Event`). `MessageDispatcher` routes to concurrent handlers via `asyncio.TaskGroup` with pre-allocated error collection. `AgentMessenger` per-agent facade auto-fills sender/timestamp/ID; deterministic direct-channel naming `@{sorted_a}:{sorted_b}`. `DeliveryEnvelope` for delivery tracking. `NotBlankStr` validation on all protocol boundary identifiers. | Pull-model avoids callback complexity and enables agents to consume at their own pace. Protocol + backend split enables future persistent/distributed bus implementations. Deterministic DM channel names prevent duplicates. See §5. |

---

Expand Down
40 changes: 39 additions & 1 deletion src/ai_company/communication/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Communication domain models for the AI company framework."""
"""Communication subsystem for the AI company framework."""

from ai_company.communication.bus_memory import InMemoryMessageBus
from ai_company.communication.bus_protocol import MessageBus
from ai_company.communication.channel import Channel
from ai_company.communication.config import (
CircuitBreakerConfig,
Expand All @@ -9,8 +11,10 @@
MeetingsConfig,
MeetingTypeConfig,
MessageBusConfig,
MessageRetentionConfig,
RateLimitConfig,
)
from ai_company.communication.dispatcher import DispatchResult, MessageDispatcher
from ai_company.communication.enums import (
AttachmentType,
ChannelType,
Expand All @@ -19,25 +23,59 @@
MessagePriority,
MessageType,
)
from ai_company.communication.errors import (
ChannelAlreadyExistsError,
ChannelNotFoundError,
CommunicationError,
MessageBusAlreadyRunningError,
MessageBusNotRunningError,
NotSubscribedError,
)
from ai_company.communication.handler import (
FunctionHandler,
HandlerRegistration,
MessageHandler,
MessageHandlerFunc,
)
from ai_company.communication.message import Attachment, Message, MessageMetadata
from ai_company.communication.messenger import AgentMessenger
from ai_company.communication.subscription import DeliveryEnvelope, Subscription

__all__ = [
"AgentMessenger",
"Attachment",
"AttachmentType",
"Channel",
"ChannelAlreadyExistsError",
"ChannelNotFoundError",
"ChannelType",
"CircuitBreakerConfig",
"CommunicationConfig",
"CommunicationError",
"CommunicationPattern",
"DeliveryEnvelope",
"DispatchResult",
"FunctionHandler",
"HandlerRegistration",
"HierarchyConfig",
"InMemoryMessageBus",
"LoopPreventionConfig",
"MeetingTypeConfig",
"MeetingsConfig",
"Message",
"MessageBus",
"MessageBusAlreadyRunningError",
"MessageBusBackend",
"MessageBusConfig",
"MessageBusNotRunningError",
"MessageDispatcher",
"MessageHandler",
"MessageHandlerFunc",
"MessageMetadata",
"MessagePriority",
"MessageRetentionConfig",
"MessageType",
"NotSubscribedError",
"RateLimitConfig",
"Subscription",
]
Loading