Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions DESIGN_SPEC.md
Original file line number Diff line number Diff line change
Expand Up @@ -602,13 +602,12 @@ conflict_resolution:

#### Strategy 2: Structured Debate + Judge

Both agents present arguments (1 round each, capped at `max_tokens_per_argument`). A judge — their shared manager, the CEO, or a configurable arbitrator agent — evaluates both positions and decides. The judge's reasoning and both arguments are logged as a dissent record.
Both agents present arguments (1 round each). A judge — their shared manager, the CEO, or a configurable arbitrator agent — evaluates both positions and decides. The judge's reasoning and both arguments are logged as a dissent record.

```yaml
conflict_resolution:
strategy: "debate"
debate:
max_tokens_per_argument: 500
judge: "shared_manager" # shared_manager, ceo, designated_agent
```

Expand All @@ -631,7 +630,7 @@ conflict_resolution:

Combines strategies with an intelligent review layer:

1. Both agents present arguments (1 round, capped tokens) — preserving dissent
1. Both agents present arguments (1 round) — preserving dissent
2. A **conflict review agent** evaluates the result:
- If the resolution is **clear** (one position is objectively better, or authority applies cleanly) → resolve automatically, log dissent record
- If the resolution is **ambiguous** (genuine trade-offs, no clear winner) → escalate to human queue with both positions + the review agent's analysis
Expand All @@ -640,7 +639,6 @@ Combines strategies with an intelligent review layer:
conflict_resolution:
strategy: "hybrid"
hybrid:
max_tokens_per_argument: 500
review_agent: "conflict_reviewer" # dedicated agent or role
escalate_on_ambiguity: true
```
Expand All @@ -652,7 +650,7 @@ conflict_resolution:

Meetings (§5.1 Pattern 3) follow configurable protocols that determine how agents interact during structured multi-agent conversations. Different meeting types naturally suit different protocols. All protocols implement a `MeetingProtocol` protocol, making the system extensible — new protocols can be registered and selected per meeting type. Cost bounds are enforced by `duration_tokens` in meeting config (§5.4).

> **Current state (M4 complete):** All 3 meeting protocols are implemented in `communication/meeting/`: `RoundRobinProtocol`, `PositionPapersProtocol`, and `StructuredPhasesProtocol`. The `MeetingOrchestrator` runs meetings end-to-end with token budget enforcement via `TokenTracker`. All protocols implement the `MeetingProtocol` protocol interface.
> **Current state (M4 complete):** All 3 meeting protocols are implemented in `communication/meeting/`: `RoundRobinProtocol`, `PositionPapersProtocol`, and `StructuredPhasesProtocol`. The `MeetingOrchestrator` runs meetings end-to-end with token budget enforcement via `TokenTracker`. Shared LLM response parsing for decisions and action items is in `_parsing.py`. All protocols implement the `MeetingProtocol` protocol interface.

#### Protocol 1: Round-Robin Transcript

Expand Down Expand Up @@ -2414,7 +2412,7 @@ ai-company/
│ │ ├── config.py # Communication config
│ │ ├── conflict_resolution/ # Conflict resolution subsystem (§5.6)
│ │ │ ├── __init__.py # Package exports
│ │ │ ├── _helpers.py # Shared utility (find_loser)
│ │ │ ├── _helpers.py # Shared utility (find_losers, pick_highest_seniority)
│ │ │ ├── authority_strategy.py # AuthorityResolver (Strategy 1)
│ │ │ ├── config.py # ConflictResolutionConfig, DebateConfig, HybridConfig
│ │ │ ├── debate_strategy.py # DebateResolver (Strategy 2)
Expand Down Expand Up @@ -2446,12 +2444,13 @@ ai-company/
│ │ ├── message.py # Message model
│ │ ├── meeting/ # Meeting protocol subsystem
│ │ │ ├── __init__.py # Package exports
│ │ │ ├── _parsing.py # Shared helpers for parsing decisions and action items
│ │ │ ├── _prompts.py # LLM prompt templates for meeting phases
│ │ │ ├── _token_tracker.py # TokenTracker for duration_tokens enforcement
│ │ │ ├── config.py # MeetingProtocolConfig, protocol-specific config models
│ │ │ ├── enums.py # MeetingProtocolType, MeetingPhase enums
│ │ │ ├── errors.py # Meeting error hierarchy
│ │ │ ├── models.py # MeetingRecord, AgendaItem, ActionItem, etc.
│ │ │ ├── models.py # MeetingRecord, MeetingAgendaItem, ActionItem, etc.
│ │ │ ├── orchestrator.py # MeetingOrchestrator (runs meetings end-to-end)
│ │ │ ├── position_papers.py # PositionPapersProtocol implementation
│ │ │ ├── protocol.py # MeetingProtocol protocol interface
Expand Down Expand Up @@ -2647,7 +2646,7 @@ These conventions were established during the M0–M2+ review cycle. **Adopted**
| **Pydantic alias for YAML directives** | Adopted (M2.5) | `Field(alias="_remove")` in `TemplateAgentConfig` — YAML uses `_remove: true`, Python accesses `agent.remove`. Keeps the YAML-facing name (underscore prefix signals internal directive) separate from the Python attribute name. | Underscore-prefixed YAML keys signal merge directives vs regular fields. Pydantic alias bridges the naming convention gap cleanly. |
| **Communication foundation** | Adopted (M4) | `MessageBus` protocol with `InMemoryMessageBus` backend (asyncio queues, pull-model `receive()` with shutdown signaling via `asyncio.Event`). `MessageDispatcher` routes to concurrent handlers via `asyncio.TaskGroup` with pre-allocated error collection. `AgentMessenger` per-agent facade auto-fills sender/timestamp/ID; deterministic direct-channel naming `@{sorted_a}:{sorted_b}`. `DeliveryEnvelope` for delivery tracking. `NotBlankStr` validation on all protocol boundary identifiers. | Pull-model avoids callback complexity and enables agents to consume at their own pace. Protocol + backend split enables future persistent/distributed bus implementations. Deterministic DM channel names prevent duplicates. See §5. |
| **Delegation & loop prevention** | Adopted (M4) | `HierarchyResolver` resolves org hierarchy from `Company` at construction (cycle-detected, `MappingProxyType`-frozen). `AuthorityValidator` checks chain-of-command + role permissions. `DelegationGuard` orchestrates five mechanisms (ancestry, depth, dedup, rate limit, circuit breaker) in sequence, short-circuiting on first rejection. `DelegationService` is synchronous (CPU-only); messaging integration deferred. Stateful mechanisms use injectable clock for deterministic testing. Task model extended with `parent_task_id` and `delegation_chain` fields. | Synchronous delegation avoids async complexity for CPU-only validation. Five-mechanism guard provides defence-in-depth against all loop patterns. Injectable clocks enable deterministic testing. See §5.4, §5.5. |
| **Conflict resolution** | Adopted (M4) | `ConflictResolver` protocol with async `resolve()` + sync `build_dissent_record()` split (resolve may call LLM, dissent record is pure construction). Four strategies: `AuthorityResolver` (seniority comparison iterating all N positions, hierarchy proximity tiebreaker via `get_lowest_common_manager`), `DebateResolver` (LLM judge via `JudgeEvaluator` protocol, authority fallback when absent), `HumanEscalationResolver` (stub, returns `ESCALATED_TO_HUMAN`), `HybridResolver` (LLM review + ambiguity escalation/authority fallback). `ConflictResolutionService` follows `DelegationService` pattern (`__slots__`, keyword-only constructor, `MappingProxyType`-wrapped resolver mapping, audit trail). `DissentRecord` preserves losing agent's reasoning. `Conflict.is_cross_department` is a `@computed_field` derived from positions. `HierarchyResolver` extended with `get_lowest_common_manager()` and `get_delegation_depth()`. | Protocol + strategy pattern enables adding new resolution approaches without modifying existing code. Async resolve accommodates LLM calls; sync dissent record avoids unnecessary async overhead. Shared `find_loser` utility prevents code duplication across strategies. See §5.6. |
| **Conflict resolution** | Adopted (M4) | `ConflictResolver` protocol with async `resolve()` + sync `build_dissent_records()` split (resolve may call LLM, dissent record is pure construction). Four strategies: `AuthorityResolver` (seniority comparison iterating all N positions, hierarchy proximity tiebreaker via `get_lowest_common_manager`), `DebateResolver` (LLM judge via `JudgeEvaluator` protocol, authority fallback when absent), `HumanEscalationResolver` (stub, returns `ESCALATED_TO_HUMAN`), `HybridResolver` (LLM review + ambiguity escalation/authority fallback). `ConflictResolutionService` follows `DelegationService` pattern (`__slots__`, keyword-only constructor, `MappingProxyType`-wrapped resolver mapping, audit trail). `DissentRecord` preserves losing agent's reasoning. `Conflict.is_cross_department` is a `@computed_field` derived from positions. `HierarchyResolver` extended with `get_lowest_common_manager()` and `get_delegation_depth()`. | Protocol + strategy pattern enables adding new resolution approaches without modifying existing code. Async resolve accommodates LLM calls; sync dissent record avoids unnecessary async overhead. Shared `find_losers` utility prevents code duplication across strategies. See §5.6. |

---

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ AI Company lets you spin up a virtual organization staffed entirely by AI agents

## Status

**M4: Multi-Agent** in progress (M0 Tooling, M1 Config & Core, M2 Providers, M3 Single Agent — all done). See [DESIGN_SPEC.md](DESIGN_SPEC.md) for the full high-level specification.
**M3: Single Agent** and **M4: Multi-Agent** in progress (M0 Tooling, M1 Config & Core, M2 Providers — all done). See [DESIGN_SPEC.md](DESIGN_SPEC.md) for the full high-level specification.

## Tech Stack

Expand Down
74 changes: 49 additions & 25 deletions src/ai_company/communication/bus_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
COMM_SUBSCRIPTION_CREATED,
COMM_SUBSCRIPTION_NOT_FOUND,
COMM_SUBSCRIPTION_REMOVED,
COMM_UNSUBSCRIBE_SENTINEL_FAILED,
)

logger = get_logger(__name__)
Expand Down Expand Up @@ -100,6 +99,7 @@ def __init__(self, *, config: MessageBusConfig) -> None:
self._queues: dict[tuple[str, str], asyncio.Queue[DeliveryEnvelope | None]] = {}
self._history: dict[str, deque[Message]] = {}
self._known_agents: set[str] = set()
self._waiters: dict[tuple[str, str], int] = {}
self._running = False
self._shutdown_event = asyncio.Event()

Expand Down Expand Up @@ -399,17 +399,17 @@ async def unsubscribe(
self._channels[channel_name] = channel.model_copy(
update={"subscribers": new_subs},
)
queue = self._queues.pop((channel_name, subscriber_id), None)
key = (channel_name, subscriber_id)
queue = self._queues.pop(key, None)
if queue is not None:
try:
# Put a sentinel for each pending waiter so all
# concurrent receive() calls are woken up.
# Safe to use put_nowait: queues are unbounded
# (maxsize=0), so QueueFull cannot be raised.
pending = self._waiters.pop(key, 0)
sentinels = max(1, pending)
for _ in range(sentinels):
queue.put_nowait(None)
except asyncio.QueueFull:
logger.exception(
COMM_UNSUBSCRIBE_SENTINEL_FAILED,
channel=channel_name,
subscriber=subscriber_id,
detail="Queue full — unsubscribe sentinel not delivered",
)
logger.info(
COMM_SUBSCRIPTION_REMOVED,
channel=channel_name,
Expand All @@ -429,19 +429,18 @@ async def receive(
the bus is stopped. When ``timeout`` is ``None``, awaits
indefinitely (or until shutdown).

Note: Only one ``receive()`` call should be pending per
``(channel_name, subscriber_id)`` pair at a time. The
unsubscribe sentinel wakes a single waiter; concurrent
receivers on the same subscription are not supported.

Args:
channel_name: Channel to receive from.
subscriber_id: Agent ID receiving.
timeout: Seconds to wait before returning ``None``.

Returns:
A delivery envelope, or ``None`` on timeout, shutdown,
or when an in-flight receive is woken by :meth:`unsubscribe`.
The next delivery envelope, or ``None`` when:

- *timeout* expires without a message arriving.
- The bus is shut down while waiting.
- The subscription is cancelled via :meth:`unsubscribe`
while a ``receive()`` call is in flight.

Raises:
MessageBusNotRunningError: If the bus is not running.
Expand All @@ -460,25 +459,50 @@ async def receive(
):
_raise_not_subscribed(channel_name, subscriber_id)
queue = self._ensure_queue(channel_name, subscriber_id)
result = await self._await_with_shutdown(queue, timeout)
key = (channel_name, subscriber_id)
self._waiters[key] = self._waiters.get(key, 0) + 1
try:
result = await self._await_with_shutdown(queue, timeout)
finally:
# Decrement outside the lock: no ``await`` separates the
# read and write of ``_waiters``, so no other coroutine
# can interleave in a single-threaded asyncio event loop.
# The asymmetry with the lock-guarded increment is
# intentional — the decrement must happen after
# _await_with_shutdown completes.
current = self._waiters.get(key)
if current is None:
# Key was removed (e.g. by unsubscribe); don't recreate.
pass
elif current <= 1:
self._waiters.pop(key, None)
else:
self._waiters[key] = current - 1
if result is None:
self._log_receive_null(channel_name, subscriber_id, timeout)
await self._log_receive_null(channel_name, subscriber_id, timeout)
return result

def _log_receive_null(
async def _log_receive_null(
self,
channel_name: str,
subscriber_id: str,
timeout: float | None,
timeout_seconds: float | None,
) -> None:
"""Log the cause when ``receive()`` returns ``None``."""
if self._shutdown_event.is_set():
"""Log the cause when ``receive()`` returns ``None``.

Acquires the lock to safely inspect bus state (queue map
and shutdown flag) so the inferred reason is not racy.
"""
async with self._lock:
is_shutdown = self._shutdown_event.is_set()
is_unsubscribed = (channel_name, subscriber_id) not in self._queues
if is_shutdown:
logger.debug(
COMM_RECEIVE_SHUTDOWN,
channel=channel_name,
subscriber=subscriber_id,
)
elif (channel_name, subscriber_id) not in self._queues:
elif is_unsubscribed:
logger.debug(
COMM_RECEIVE_UNSUBSCRIBED,
channel=channel_name,
Expand All @@ -489,7 +513,7 @@ def _log_receive_null(
COMM_RECEIVE_TIMEOUT,
channel=channel_name,
subscriber=subscriber_id,
timeout=timeout,
timeout=timeout_seconds,
)

async def _await_with_shutdown(
Expand Down
83 changes: 73 additions & 10 deletions src/ai_company/communication/conflict_resolution/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
ConflictPosition,
ConflictResolution,
)
from ai_company.communication.delegation.hierarchy import ( # noqa: TC001
HierarchyResolver,
)
from ai_company.communication.errors import (
ConflictStrategyError,
)
Expand All @@ -21,8 +24,9 @@ def find_losers(
) -> tuple[ConflictPosition, ...]:
"""Find all non-winning positions in a conflict.

For N-party conflicts (3+ agents), returns every position whose
agent was not the winner.
Returns every position whose agent was not the winner. For
2-party conflicts this is a single position; for N-party
conflicts (3+ agents) this may return multiple.

Args:
conflict: The original conflict.
Expand All @@ -32,18 +36,40 @@ def find_losers(
All losing agents' positions.

Raises:
ConflictStrategyError: If no losing position is found
(data integrity violation).
ConflictStrategyError: If the winning agent ID is not found
in the conflict positions, or if no losing position is
found (data integrity violation).
"""
losers = tuple(
pos for pos in conflict.positions if pos.agent_id != resolution.winning_agent_id
)
winner_id = resolution.winning_agent_id
position_ids = {pos.agent_id for pos in conflict.positions}
if winner_id not in position_ids:
msg = (
f"Winning agent {winner_id!r} not found in "
f"conflict positions {sorted(position_ids)!r}"
)
logger.error(
CONFLICT_STRATEGY_ERROR,
conflict_id=conflict.id,
winning_agent_id=winner_id,
position_agent_ids=sorted(position_ids),
error=msg,
)
raise ConflictStrategyError(
msg,
context={
"conflict_id": conflict.id,
"winning_agent_id": winner_id,
"position_agent_ids": sorted(position_ids),
},
)

losers = tuple(pos for pos in conflict.positions if pos.agent_id != winner_id)
if not losers:
msg = f"No losing position found for winner {resolution.winning_agent_id!r}"
msg = f"No losing position found for winner {winner_id!r}"
logger.warning(
CONFLICT_STRATEGY_ERROR,
conflict_id=conflict.id,
winning_agent_id=resolution.winning_agent_id,
winning_agent_id=winner_id,
error=msg,
)
raise ConflictStrategyError(
Expand Down Expand Up @@ -109,17 +135,54 @@ def find_position_or_raise(

def pick_highest_seniority(
conflict: Conflict,
*,
hierarchy: HierarchyResolver | None = None,
) -> ConflictPosition:
"""Pick the position with the highest seniority level.

When two agents share the same seniority level and a
``hierarchy`` is provided, the agent closer to the hierarchy
root (fewer ancestors) wins. Without a hierarchy, the
incumbent (first encountered) is kept on ties.

Args:
conflict: The conflict with agent positions.
hierarchy: Optional hierarchy resolver for tiebreaking
when seniority levels are equal.

Returns:
The position with the highest seniority.
"""
best = conflict.positions[0]
for pos in conflict.positions[1:]:
if compare_seniority(pos.agent_level, best.agent_level) > 0:
cmp = compare_seniority(pos.agent_level, best.agent_level)
if cmp > 0:
best = pos
elif cmp == 0 and hierarchy is not None:
best = _hierarchy_tiebreak(best, pos, hierarchy)
return best


def _hierarchy_tiebreak(
incumbent: ConflictPosition,
challenger: ConflictPosition,
hierarchy: HierarchyResolver,
) -> ConflictPosition:
"""Break a seniority tie using hierarchy depth.

The agent with fewer ancestors (closer to root) wins.
If both have the same depth, the incumbent is kept.

Args:
incumbent: Current best position.
challenger: Position challenging the incumbent.
hierarchy: Hierarchy resolver for depth lookup.

Returns:
The winning position.
"""
depth_inc = len(hierarchy.get_ancestors(incumbent.agent_id))
depth_chl = len(hierarchy.get_ancestors(challenger.agent_id))
if depth_chl < depth_inc:
return challenger
return incumbent
Loading