Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ src/ai_company/
api/ # FastAPI REST + WebSocket routes
budget/ # Per-agent cost tracking and spending controls
cli/ # Typer CLI commands
communication/ # Inter-agent message bus and channels
communication/ # Message bus (protocol + in-memory backend), dispatcher, messenger facade, channels
config/ # YAML company config loading and validation
core/ # Shared domain models and base classes
engine/ # Agent orchestration, execution loops, task lifecycle, recovery, and shutdown
Expand Down
15 changes: 13 additions & 2 deletions DESIGN_SPEC.md
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,8 @@ When a loop is detected, the framework:
3. Escalates to the sender's manager (or human if at top of hierarchy)
4. Logs the loop for analytics and process improvement

> **Current state (M4 in-progress):** The communication foundation is implemented: `MessageBus` protocol with `InMemoryMessageBus` backend (asyncio queues, pull-model `receive()`), `MessageDispatcher` for concurrent handler routing via `asyncio.TaskGroup`, `AgentMessenger` per-agent facade (auto-fills sender/timestamp/ID, deterministic direct-channel naming `@{sorted_a}:{sorted_b}`), and `DeliveryEnvelope` for delivery tracking. Loop prevention (§5.5), conflict resolution (§5.6), and meeting protocol (§5.7) are planned for later M4 work.

### 5.6 Conflict Resolution Protocol

When two or more agents disagree on an approach (architecture, implementation, priority, etc.), the framework provides multiple configurable resolution strategies behind a `ConflictResolver` protocol. New strategies can be added without modifying existing ones. The strategy is configurable per company, per department, or per conflict type.
Expand Down Expand Up @@ -2318,10 +2320,17 @@ ai-company/
│ │ ├── meeting_engine.py # Meeting coordination (M4)
│ │ └── hr_engine.py # Hiring, firing, performance (M7)
│ ├── communication/ # Inter-agent communication
│ │ ├── bus_memory.py # InMemoryMessageBus implementation
│ │ ├── bus_protocol.py # MessageBus protocol interface
│ │ ├── channel.py # Channel model
│ │ ├── message.py # Message model
│ │ ├── config.py # Communication config
│ │ └── enums.py # Communication enums
│ │ ├── dispatcher.py # MessageDispatcher + DispatchResult
│ │ ├── enums.py # Communication enums
│ │ ├── errors.py # Communication error hierarchy
│ │ ├── handler.py # MessageHandler protocol, FunctionHandler, HandlerRegistration
│ │ ├── message.py # Message model
│ │ ├── messenger.py # AgentMessenger per-agent facade
│ │ └── subscription.py # Subscription + DeliveryEnvelope models
│ ├── memory/ # Agent memory system (M5, stubs only)
│ │ ├── store.py # Memory storage backend (M5)
│ │ ├── retrieval.py # Memory retrieval & ranking (M5)
Expand All @@ -2336,6 +2345,7 @@ ai-company/
│ │ ├── events/ # Per-domain event constants
│ │ │ ├── __init__.py # Package marker with usage docs; no re-exports
│ │ │ ├── budget.py # BUDGET_* constants
│ │ │ ├── communication.py # COMM_* constants
│ │ │ ├── config.py # CONFIG_* constants
│ │ │ ├── correlation.py # CORRELATION_* constants
│ │ │ ├── execution.py # EXECUTION_* constants
Expand Down Expand Up @@ -2489,6 +2499,7 @@ These conventions were established during the M0–M2+ review cycle. **Adopted**
| **State coordination** | Planned (M4) | Centralized single-writer: `TaskEngine` owns all task/project mutations via `asyncio.Queue`. Agents submit requests, engine applies `model_copy(update=...)` sequentially and publishes snapshots. `version: int` field on state models for future optimistic concurrency if multi-process scaling is needed. | Prevents lost updates by design. Trivial in single-threaded asyncio (no locks). Perfect audit trail. Industry consensus: MetaGPT, CrewAI, AutoGen all use prevention-by-design, not conflict resolution. See §6.8 State Coordination table. |
| **Workspace isolation** | Planned (M4) | Pluggable `WorkspaceIsolationStrategy` protocol. Default: planner + git worktrees. Each agent works in an isolated worktree; sequential merge on completion. Textual conflicts detected by git; semantic conflicts reviewed by agent or human. | Industry standard (Codex, Cursor, Claude Code, VS Code). Maximum parallelism. Leverages mature git infrastructure. See §6.8. |
| **Graceful shutdown** | Adopted (M3) | Pluggable `ShutdownStrategy` protocol. Default: cooperative with 30s timeout. Agents check shutdown event at turn boundaries. Force-cancel after timeout. `INTERRUPTED` status for force-cancelled tasks. M4/M5: upgrade to checkpoint-and-stop. | Cross-platform (Windows `signal.signal()` fallback). Bounded shutdown time. Mirrors cooperative shutdown in §6.7. |
| **Communication foundation** | Adopted (M4) | `MessageBus` protocol with `InMemoryMessageBus` backend (asyncio queues, pull-model `receive()` with shutdown signaling via `asyncio.Event`). `MessageDispatcher` routes to concurrent handlers via `asyncio.TaskGroup` with pre-allocated error collection. `AgentMessenger` per-agent facade auto-fills sender/timestamp/ID; deterministic direct-channel naming `@{sorted_a}:{sorted_b}`. `DeliveryEnvelope` for delivery tracking. `NotBlankStr` validation on all protocol boundary identifiers. | Pull-model avoids callback complexity and enables agents to consume at their own pace. Protocol + backend split enables future persistent/distributed bus implementations. Deterministic DM channel names prevent duplicates. See §5. |

---

Expand Down
40 changes: 39 additions & 1 deletion src/ai_company/communication/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Communication domain models for the AI company framework."""
"""Communication subsystem for the AI company framework."""

from ai_company.communication.bus_memory import InMemoryMessageBus
from ai_company.communication.bus_protocol import MessageBus
from ai_company.communication.channel import Channel
from ai_company.communication.config import (
CircuitBreakerConfig,
Expand All @@ -9,8 +11,10 @@
MeetingsConfig,
MeetingTypeConfig,
MessageBusConfig,
MessageRetentionConfig,
RateLimitConfig,
)
from ai_company.communication.dispatcher import DispatchResult, MessageDispatcher
from ai_company.communication.enums import (
AttachmentType,
ChannelType,
Expand All @@ -19,25 +23,59 @@
MessagePriority,
MessageType,
)
from ai_company.communication.errors import (
ChannelAlreadyExistsError,
ChannelNotFoundError,
CommunicationError,
MessageBusAlreadyRunningError,
MessageBusNotRunningError,
NotSubscribedError,
)
from ai_company.communication.handler import (
FunctionHandler,
HandlerRegistration,
MessageHandler,
MessageHandlerFunc,
)
from ai_company.communication.message import Attachment, Message, MessageMetadata
from ai_company.communication.messenger import AgentMessenger
from ai_company.communication.subscription import DeliveryEnvelope, Subscription

__all__ = [
"AgentMessenger",
"Attachment",
"AttachmentType",
"Channel",
"ChannelAlreadyExistsError",
"ChannelNotFoundError",
"ChannelType",
"CircuitBreakerConfig",
"CommunicationConfig",
"CommunicationError",
"CommunicationPattern",
"DeliveryEnvelope",
"DispatchResult",
"FunctionHandler",
"HandlerRegistration",
"HierarchyConfig",
"InMemoryMessageBus",
"LoopPreventionConfig",
"MeetingTypeConfig",
"MeetingsConfig",
"Message",
"MessageBus",
"MessageBusAlreadyRunningError",
"MessageBusBackend",
"MessageBusConfig",
"MessageBusNotRunningError",
"MessageDispatcher",
"MessageHandler",
"MessageHandlerFunc",
"MessageMetadata",
"MessagePriority",
"MessageRetentionConfig",
"MessageType",
"NotSubscribedError",
"RateLimitConfig",
"Subscription",
]
Loading