From f30d41d60b5cdc782be20e54887c8df5aa47eedd Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Sun, 8 Mar 2026 11:29:12 +0100 Subject: [PATCH 1/3] fix: address 40 post-merge review findings from PRs #164-#167 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Critical (C1-C4): - Parse decisions/action_items from LLM synthesis in all 3 meeting protocols - Validate winning_agent_id exists in find_losers() before computing losers Major (M1-M17): - Guard summary budget reserve when leader_summarizes=False - Add synthesis sub-reserve in structured phases discussion - Reject duplicate participant_ids in meeting orchestrator - Freeze protocol registry with MappingProxyType - Warn when token tracker exceeds budget - Add hierarchy tiebreaker to pick_highest_seniority() - Wire hierarchy into debate/hybrid authority fallbacks - Fast-path get_lowest_common_manager(a, a) → a - Validate _SENIORITY_ORDER matches enum members at import - Remove dead max_tokens_per_argument config field - Verify task IDs match plan subtask IDs in DecompositionResult - Return CANCELLED for mixed completed+cancelled terminal states - Fix double-logging in rollup compute() for empty case - Copy subtask dependencies from plan to created Tasks - Reject duplicate subtask IDs in RoutingResult - Wake all pending waiters on unsubscribe (not just one) Minor (m1-m15): - Remove duplicate MEETING_CONFLICT_DETECTED log events - Replace assert with explicit raises in meeting protocols - Include presenter_id in formatted agenda prompt - Validate token aggregates in MeetingMinutes - Require non-empty error_message for FAILED/BUDGET_EXHAUSTED - Move _MIN_POSITIONS to local constant in service.py - Precompute seniority rank dict for O(1) lookups - Remove dead asyncio.QueueFull catch on unbounded queue - Fix racy state check in _log_receive_null (acquire lock) - Type channel_name as NotBlankStr in messenger - Document unsubscribe as None return path in receive() - Preserve traceback context in parallel.py re-raise - Validate parent_task.id matches plan.parent_task_id - Add logging before raises in routing model validators Trivial (t1-t4): - Use centralized event constant in routing scorer - Add task_structure/coordination_topology to Task docstring - Fix DESIGN_SPEC.md model/function names to match code - Fix StructuredPhasesConfig docstring Tests (T1-T5): - Assert MEETING_CONTRIBUTION enum value - Add timeout markers to all meeting test modules - Add 3+ participant test for authority/debate strategies - Remove dead max_tokens_per_argument test references - Update HybridResolver tests for new hierarchy parameter Closes #169 --- DESIGN_SPEC.md | 6 +- src/ai_company/communication/bus_memory.py | 51 +++--- .../conflict_resolution/_helpers.py | 78 ++++++++- .../conflict_resolution/config.py | 12 -- .../conflict_resolution/debate_strategy.py | 6 +- .../conflict_resolution/hybrid_strategy.py | 12 +- .../conflict_resolution/service.py | 3 +- .../communication/delegation/hierarchy.py | 5 +- .../communication/meeting/_parsing.py | 154 ++++++++++++++++++ .../communication/meeting/_prompts.py | 2 + .../communication/meeting/_token_tracker.py | 18 +- .../communication/meeting/config.py | 3 +- .../communication/meeting/models.py | 29 ++++ .../communication/meeting/orchestrator.py | 23 ++- .../communication/meeting/position_papers.py | 24 ++- .../communication/meeting/round_robin.py | 17 +- .../meeting/structured_phases.py | 40 +++-- src/ai_company/communication/messenger.py | 15 +- src/ai_company/core/enums.py | 26 ++- src/ai_company/core/task.py | 4 + src/ai_company/engine/decomposition/models.py | 20 ++- src/ai_company/engine/decomposition/rollup.py | 9 + .../engine/decomposition/service.py | 1 + src/ai_company/engine/parallel.py | 6 +- src/ai_company/engine/routing/models.py | 53 +++++- src/ai_company/engine/routing/scorer.py | 3 +- src/ai_company/engine/routing/service.py | 19 +++ .../observability/events/task_routing.py | 1 + .../communication/test_meeting_integration.py | 2 + .../test_authority_strategy.py | 65 ++++++++ .../conflict_resolution/test_config.py | 14 +- .../test_debate_strategy.py | 12 +- .../test_hybrid_strategy.py | 39 ++++- .../unit/communication/meeting/test_config.py | 2 + .../unit/communication/meeting/test_enums.py | 2 + .../unit/communication/meeting/test_errors.py | 2 + .../unit/communication/meeting/test_models.py | 8 +- .../meeting/test_orchestrator.py | 2 + .../meeting/test_position_papers.py | 2 + .../communication/meeting/test_prompts.py | 2 + .../communication/meeting/test_protocol.py | 2 + .../communication/meeting/test_round_robin.py | 2 + .../meeting/test_structured_phases.py | 2 + .../meeting/test_token_tracker.py | 2 + tests/unit/communication/test_enums.py | 1 + .../unit/engine/test_decomposition_models.py | 8 +- 46 files changed, 690 insertions(+), 119 deletions(-) create mode 100644 src/ai_company/communication/meeting/_parsing.py diff --git a/DESIGN_SPEC.md b/DESIGN_SPEC.md index b42269a0c8..fe36fb40f8 100644 --- a/DESIGN_SPEC.md +++ b/DESIGN_SPEC.md @@ -2414,7 +2414,7 @@ ai-company/ │ │ ├── config.py # Communication config │ │ ├── conflict_resolution/ # Conflict resolution subsystem (§5.6) │ │ │ ├── __init__.py # Package exports -│ │ │ ├── _helpers.py # Shared utility (find_loser) +│ │ │ ├── _helpers.py # Shared utility (find_losers, build_dissent_records) │ │ │ ├── authority_strategy.py # AuthorityResolver (Strategy 1) │ │ │ ├── config.py # ConflictResolutionConfig, DebateConfig, HybridConfig │ │ │ ├── debate_strategy.py # DebateResolver (Strategy 2) @@ -2451,7 +2451,7 @@ ai-company/ │ │ │ ├── config.py # MeetingProtocolConfig, protocol-specific config models │ │ │ ├── enums.py # MeetingProtocolType, MeetingPhase enums │ │ │ ├── errors.py # Meeting error hierarchy -│ │ │ ├── models.py # MeetingRecord, AgendaItem, ActionItem, etc. +│ │ │ ├── models.py # MeetingRecord, MeetingAgendaItem, ActionItem, etc. │ │ │ ├── orchestrator.py # MeetingOrchestrator (runs meetings end-to-end) │ │ │ ├── position_papers.py # PositionPapersProtocol implementation │ │ │ ├── protocol.py # MeetingProtocol protocol interface @@ -2647,7 +2647,7 @@ These conventions were established during the M0–M2+ review cycle. **Adopted** | **Pydantic alias for YAML directives** | Adopted (M2.5) | `Field(alias="_remove")` in `TemplateAgentConfig` — YAML uses `_remove: true`, Python accesses `agent.remove`. Keeps the YAML-facing name (underscore prefix signals internal directive) separate from the Python attribute name. | Underscore-prefixed YAML keys signal merge directives vs regular fields. Pydantic alias bridges the naming convention gap cleanly. | | **Communication foundation** | Adopted (M4) | `MessageBus` protocol with `InMemoryMessageBus` backend (asyncio queues, pull-model `receive()` with shutdown signaling via `asyncio.Event`). `MessageDispatcher` routes to concurrent handlers via `asyncio.TaskGroup` with pre-allocated error collection. `AgentMessenger` per-agent facade auto-fills sender/timestamp/ID; deterministic direct-channel naming `@{sorted_a}:{sorted_b}`. `DeliveryEnvelope` for delivery tracking. `NotBlankStr` validation on all protocol boundary identifiers. | Pull-model avoids callback complexity and enables agents to consume at their own pace. Protocol + backend split enables future persistent/distributed bus implementations. Deterministic DM channel names prevent duplicates. See §5. | | **Delegation & loop prevention** | Adopted (M4) | `HierarchyResolver` resolves org hierarchy from `Company` at construction (cycle-detected, `MappingProxyType`-frozen). `AuthorityValidator` checks chain-of-command + role permissions. `DelegationGuard` orchestrates five mechanisms (ancestry, depth, dedup, rate limit, circuit breaker) in sequence, short-circuiting on first rejection. `DelegationService` is synchronous (CPU-only); messaging integration deferred. Stateful mechanisms use injectable clock for deterministic testing. Task model extended with `parent_task_id` and `delegation_chain` fields. | Synchronous delegation avoids async complexity for CPU-only validation. Five-mechanism guard provides defence-in-depth against all loop patterns. Injectable clocks enable deterministic testing. See §5.4, §5.5. | -| **Conflict resolution** | Adopted (M4) | `ConflictResolver` protocol with async `resolve()` + sync `build_dissent_record()` split (resolve may call LLM, dissent record is pure construction). Four strategies: `AuthorityResolver` (seniority comparison iterating all N positions, hierarchy proximity tiebreaker via `get_lowest_common_manager`), `DebateResolver` (LLM judge via `JudgeEvaluator` protocol, authority fallback when absent), `HumanEscalationResolver` (stub, returns `ESCALATED_TO_HUMAN`), `HybridResolver` (LLM review + ambiguity escalation/authority fallback). `ConflictResolutionService` follows `DelegationService` pattern (`__slots__`, keyword-only constructor, `MappingProxyType`-wrapped resolver mapping, audit trail). `DissentRecord` preserves losing agent's reasoning. `Conflict.is_cross_department` is a `@computed_field` derived from positions. `HierarchyResolver` extended with `get_lowest_common_manager()` and `get_delegation_depth()`. | Protocol + strategy pattern enables adding new resolution approaches without modifying existing code. Async resolve accommodates LLM calls; sync dissent record avoids unnecessary async overhead. Shared `find_loser` utility prevents code duplication across strategies. See §5.6. | +| **Conflict resolution** | Adopted (M4) | `ConflictResolver` protocol with async `resolve()` + sync `build_dissent_records()` split (resolve may call LLM, dissent record is pure construction). Four strategies: `AuthorityResolver` (seniority comparison iterating all N positions, hierarchy proximity tiebreaker via `get_lowest_common_manager`), `DebateResolver` (LLM judge via `JudgeEvaluator` protocol, authority fallback when absent), `HumanEscalationResolver` (stub, returns `ESCALATED_TO_HUMAN`), `HybridResolver` (LLM review + ambiguity escalation/authority fallback). `ConflictResolutionService` follows `DelegationService` pattern (`__slots__`, keyword-only constructor, `MappingProxyType`-wrapped resolver mapping, audit trail). `DissentRecord` preserves losing agent's reasoning. `Conflict.is_cross_department` is a `@computed_field` derived from positions. `HierarchyResolver` extended with `get_lowest_common_manager()` and `get_delegation_depth()`. | Protocol + strategy pattern enables adding new resolution approaches without modifying existing code. Async resolve accommodates LLM calls; sync dissent record avoids unnecessary async overhead. Shared `find_losers` utility prevents code duplication across strategies. See §5.6. | --- diff --git a/src/ai_company/communication/bus_memory.py b/src/ai_company/communication/bus_memory.py index d06d957117..a1129afd0c 100644 --- a/src/ai_company/communication/bus_memory.py +++ b/src/ai_company/communication/bus_memory.py @@ -46,7 +46,6 @@ COMM_SUBSCRIPTION_CREATED, COMM_SUBSCRIPTION_NOT_FOUND, COMM_SUBSCRIPTION_REMOVED, - COMM_UNSUBSCRIBE_SENTINEL_FAILED, ) logger = get_logger(__name__) @@ -100,6 +99,7 @@ def __init__(self, *, config: MessageBusConfig) -> None: self._queues: dict[tuple[str, str], asyncio.Queue[DeliveryEnvelope | None]] = {} self._history: dict[str, deque[Message]] = {} self._known_agents: set[str] = set() + self._waiters: dict[tuple[str, str], int] = {} self._running = False self._shutdown_event = asyncio.Event() @@ -399,17 +399,15 @@ async def unsubscribe( self._channels[channel_name] = channel.model_copy( update={"subscribers": new_subs}, ) - queue = self._queues.pop((channel_name, subscriber_id), None) + key = (channel_name, subscriber_id) + queue = self._queues.pop(key, None) if queue is not None: - try: + # Put a sentinel for each pending waiter so all + # concurrent receive() calls are woken up. + pending = self._waiters.pop(key, 0) + sentinels = max(1, pending) + for _ in range(sentinels): queue.put_nowait(None) - except asyncio.QueueFull: - logger.exception( - COMM_UNSUBSCRIBE_SENTINEL_FAILED, - channel=channel_name, - subscriber=subscriber_id, - detail="Queue full — unsubscribe sentinel not delivered", - ) logger.info( COMM_SUBSCRIPTION_REMOVED, channel=channel_name, @@ -429,11 +427,6 @@ async def receive( the bus is stopped. When ``timeout`` is ``None``, awaits indefinitely (or until shutdown). - Note: Only one ``receive()`` call should be pending per - ``(channel_name, subscriber_id)`` pair at a time. The - unsubscribe sentinel wakes a single waiter; concurrent - receivers on the same subscription are not supported. - Args: channel_name: Channel to receive from. subscriber_id: Agent ID receiving. @@ -460,25 +453,37 @@ async def receive( ): _raise_not_subscribed(channel_name, subscriber_id) queue = self._ensure_queue(channel_name, subscriber_id) - result = await self._await_with_shutdown(queue, timeout) + key = (channel_name, subscriber_id) + self._waiters[key] = self._waiters.get(key, 0) + 1 + try: + result = await self._await_with_shutdown(queue, timeout) + finally: + self._waiters[key] = max(0, self._waiters.get(key, 0) - 1) if result is None: - self._log_receive_null(channel_name, subscriber_id, timeout) + await self._log_receive_null(channel_name, subscriber_id, timeout) return result - def _log_receive_null( + async def _log_receive_null( self, channel_name: str, subscriber_id: str, - timeout: float | None, + timeout_seconds: float | None, ) -> None: - """Log the cause when ``receive()`` returns ``None``.""" - if self._shutdown_event.is_set(): + """Log the cause when ``receive()`` returns ``None``. + + Acquires the lock to safely inspect bus state (queue map + and shutdown flag) so the inferred reason is not racy. + """ + async with self._lock: + is_shutdown = self._shutdown_event.is_set() + is_unsubscribed = (channel_name, subscriber_id) not in self._queues + if is_shutdown: logger.debug( COMM_RECEIVE_SHUTDOWN, channel=channel_name, subscriber=subscriber_id, ) - elif (channel_name, subscriber_id) not in self._queues: + elif is_unsubscribed: logger.debug( COMM_RECEIVE_UNSUBSCRIBED, channel=channel_name, @@ -489,7 +494,7 @@ def _log_receive_null( COMM_RECEIVE_TIMEOUT, channel=channel_name, subscriber=subscriber_id, - timeout=timeout, + timeout=timeout_seconds, ) async def _await_with_shutdown( diff --git a/src/ai_company/communication/conflict_resolution/_helpers.py b/src/ai_company/communication/conflict_resolution/_helpers.py index eb9a6330b4..6761c489d2 100644 --- a/src/ai_company/communication/conflict_resolution/_helpers.py +++ b/src/ai_company/communication/conflict_resolution/_helpers.py @@ -5,6 +5,9 @@ ConflictPosition, ConflictResolution, ) +from ai_company.communication.delegation.hierarchy import ( # noqa: TC001 + HierarchyResolver, +) from ai_company.communication.errors import ( ConflictStrategyError, ) @@ -32,18 +35,40 @@ def find_losers( All losing agents' positions. Raises: - ConflictStrategyError: If no losing position is found - (data integrity violation). + ConflictStrategyError: If the winning agent ID is not found + in the conflict positions, or if no losing position is + found (data integrity violation). """ - losers = tuple( - pos for pos in conflict.positions if pos.agent_id != resolution.winning_agent_id - ) + winner_id = resolution.winning_agent_id + position_ids = {pos.agent_id for pos in conflict.positions} + if winner_id not in position_ids: + msg = ( + f"Winning agent {winner_id!r} not found in " + f"conflict positions {sorted(position_ids)!r}" + ) + logger.error( + CONFLICT_STRATEGY_ERROR, + conflict_id=conflict.id, + winning_agent_id=winner_id, + position_agent_ids=sorted(position_ids), + error=msg, + ) + raise ConflictStrategyError( + msg, + context={ + "conflict_id": conflict.id, + "winning_agent_id": winner_id, + "position_agent_ids": sorted(position_ids), + }, + ) + + losers = tuple(pos for pos in conflict.positions if pos.agent_id != winner_id) if not losers: - msg = f"No losing position found for winner {resolution.winning_agent_id!r}" + msg = f"No losing position found for winner {winner_id!r}" logger.warning( CONFLICT_STRATEGY_ERROR, conflict_id=conflict.id, - winning_agent_id=resolution.winning_agent_id, + winning_agent_id=winner_id, error=msg, ) raise ConflictStrategyError( @@ -109,17 +134,54 @@ def find_position_or_raise( def pick_highest_seniority( conflict: Conflict, + *, + hierarchy: HierarchyResolver | None = None, ) -> ConflictPosition: """Pick the position with the highest seniority level. + When two agents share the same seniority level and a + ``hierarchy`` is provided, the agent closer to the hierarchy + root (fewer ancestors) wins. Without a hierarchy, the + incumbent (first encountered) is kept on ties. + Args: conflict: The conflict with agent positions. + hierarchy: Optional hierarchy resolver for tiebreaking + when seniority levels are equal. Returns: The position with the highest seniority. """ best = conflict.positions[0] for pos in conflict.positions[1:]: - if compare_seniority(pos.agent_level, best.agent_level) > 0: + cmp = compare_seniority(pos.agent_level, best.agent_level) + if cmp > 0: best = pos + elif cmp == 0 and hierarchy is not None: + best = _hierarchy_tiebreak(best, pos, hierarchy) return best + + +def _hierarchy_tiebreak( + incumbent: ConflictPosition, + challenger: ConflictPosition, + hierarchy: HierarchyResolver, +) -> ConflictPosition: + """Break a seniority tie using hierarchy depth. + + The agent with fewer ancestors (closer to root) wins. + If both have the same depth, the incumbent is kept. + + Args: + incumbent: Current best position. + challenger: Position challenging the incumbent. + hierarchy: Hierarchy resolver for depth lookup. + + Returns: + The winning position. + """ + depth_inc = len(hierarchy.get_ancestors(incumbent.agent_id)) + depth_chl = len(hierarchy.get_ancestors(challenger.agent_id)) + if depth_chl < depth_inc: + return challenger + return incumbent diff --git a/src/ai_company/communication/conflict_resolution/config.py b/src/ai_company/communication/conflict_resolution/config.py index 26c1f71a83..44198ea94e 100644 --- a/src/ai_company/communication/conflict_resolution/config.py +++ b/src/ai_company/communication/conflict_resolution/config.py @@ -10,18 +10,12 @@ class DebateConfig(BaseModel): """Configuration for the structured debate strategy. Attributes: - max_tokens_per_argument: Token budget per argument. judge: Judge selection — ``"shared_manager"`` (lowest common manager), ``"ceo"`` (hierarchy root), or a named agent. """ model_config = ConfigDict(frozen=True) - max_tokens_per_argument: int = Field( - default=500, - gt=0, - description="Token budget per argument", - ) judge: NotBlankStr = Field( default="shared_manager", description='Judge selection: "shared_manager", "ceo", or agent name', @@ -32,7 +26,6 @@ class HybridConfig(BaseModel): """Configuration for the hybrid resolution strategy. Attributes: - max_tokens_per_argument: Token budget per argument. review_agent: Agent tasked with reviewing positions. escalate_on_ambiguity: Whether to escalate to human when the review result is ambiguous. @@ -40,11 +33,6 @@ class HybridConfig(BaseModel): model_config = ConfigDict(frozen=True) - max_tokens_per_argument: int = Field( - default=500, - gt=0, - description="Token budget per argument", - ) review_agent: NotBlankStr = Field( default="conflict_reviewer", description="Agent tasked with reviewing positions", diff --git a/src/ai_company/communication/conflict_resolution/debate_strategy.py b/src/ai_company/communication/conflict_resolution/debate_strategy.py index ba30dcfc5d..8c5a2bb0ec 100644 --- a/src/ai_company/communication/conflict_resolution/debate_strategy.py +++ b/src/ai_company/communication/conflict_resolution/debate_strategy.py @@ -238,19 +238,21 @@ def _determine_judge(self, conflict: Conflict) -> str: # invalid names surface at evaluation time. return self._config.judge - @staticmethod def _authority_fallback( + self, conflict: Conflict, ) -> JudgeDecision: """Fall back to authority when no judge evaluator is available. + Uses hierarchy as a tiebreaker when seniority levels are equal. + Args: conflict: The conflict to resolve. Returns: Decision with winning agent ID and reasoning. """ - best = pick_highest_seniority(conflict) + best = pick_highest_seniority(conflict, hierarchy=self._hierarchy) return JudgeDecision( winning_agent_id=best.agent_id, reasoning=( diff --git a/src/ai_company/communication/conflict_resolution/hybrid_strategy.py b/src/ai_company/communication/conflict_resolution/hybrid_strategy.py index ef5e48b49e..3019893b41 100644 --- a/src/ai_company/communication/conflict_resolution/hybrid_strategy.py +++ b/src/ai_company/communication/conflict_resolution/hybrid_strategy.py @@ -27,6 +27,9 @@ ConflictResolver, JudgeEvaluator, ) +from ai_company.communication.delegation.hierarchy import ( # noqa: TC001 + HierarchyResolver, +) from ai_company.communication.enums import ConflictResolutionStrategy from ai_company.observability import get_logger from ai_company.observability.events.conflict import ( @@ -53,6 +56,7 @@ class HybridResolver: When no evaluator is provided, falls back to authority. Args: + hierarchy: Resolved organizational hierarchy. config: Hybrid strategy configuration. human_resolver: Human escalation resolver for ambiguous cases. review_evaluator: Optional LLM-based reviewer. @@ -60,6 +64,7 @@ class HybridResolver: __slots__ = ( "_config", + "_hierarchy", "_human_resolver", "_review_evaluator", ) @@ -67,10 +72,12 @@ class HybridResolver: def __init__( self, *, + hierarchy: HierarchyResolver, config: HybridConfig, human_resolver: ConflictResolver, review_evaluator: JudgeEvaluator | None = None, ) -> None: + self._hierarchy = hierarchy self._config = config self._human_resolver = human_resolver self._review_evaluator = review_evaluator @@ -201,7 +208,8 @@ def _authority_fallback( ) -> ConflictResolution: """Fall back to authority-based resolution. - Logs the fallback reason and resolves by highest seniority. + Logs the fallback reason and resolves by highest seniority, + using hierarchy as a tiebreaker when seniority levels are equal. Args: conflict: The conflict to resolve. @@ -216,7 +224,7 @@ def _authority_fallback( strategy="hybrid", reason=reason, ) - best = pick_highest_seniority(conflict) + best = pick_highest_seniority(conflict, hierarchy=self._hierarchy) return ConflictResolution( conflict_id=conflict.id, diff --git a/src/ai_company/communication/conflict_resolution/service.py b/src/ai_company/communication/conflict_resolution/service.py index f35bdf39d9..67be30b118 100644 --- a/src/ai_company/communication/conflict_resolution/service.py +++ b/src/ai_company/communication/conflict_resolution/service.py @@ -15,7 +15,6 @@ ConflictResolutionConfig, ) from ai_company.communication.conflict_resolution.models import ( - _MIN_POSITIONS, Conflict, ConflictPosition, ConflictResolution, @@ -44,6 +43,8 @@ logger = get_logger(__name__) +_MIN_POSITIONS = 2 + class ConflictResolutionService: """Orchestrates conflict detection, resolution, and audit. diff --git a/src/ai_company/communication/delegation/hierarchy.py b/src/ai_company/communication/delegation/hierarchy.py index 9914399fa8..b609389722 100644 --- a/src/ai_company/communication/delegation/hierarchy.py +++ b/src/ai_company/communication/delegation/hierarchy.py @@ -217,7 +217,8 @@ def get_lowest_common_manager( """Find the lowest common manager of two agents. If one agent is an ancestor of the other, that agent is - returned as the LCM. + returned as the LCM. When both arguments refer to the same + agent, that agent is returned directly. Args: agent_a: First agent name. @@ -227,6 +228,8 @@ def get_lowest_common_manager( Name of the lowest common manager, or None if no common manager exists. """ + if agent_a == agent_b: + return agent_a ancestors_a = self.get_ancestors(agent_a) ancestors_b_set = set(self.get_ancestors(agent_b)) # Check if agent_a is an ancestor of agent_b diff --git a/src/ai_company/communication/meeting/_parsing.py b/src/ai_company/communication/meeting/_parsing.py new file mode 100644 index 0000000000..2b711ce437 --- /dev/null +++ b/src/ai_company/communication/meeting/_parsing.py @@ -0,0 +1,154 @@ +"""Shared helpers for parsing decisions and action items from LLM text. + +Extracts structured ``Decision`` and ``ActionItem`` data from free-form +synthesis/summary responses produced by meeting protocol LLM calls. +""" + +import re + +from ai_company.communication.meeting.models import ActionItem + +# Patterns for section headers +_DECISIONS_HEADER_RE = re.compile( + r"^#+\s*decisions?\b|^decisions?\s*:", + re.IGNORECASE | re.MULTILINE, +) +_ACTION_ITEMS_HEADER_RE = re.compile( + r"^#+\s*action\s+items?\b|^action\s+items?\s*:", + re.IGNORECASE | re.MULTILINE, +) +_ANY_HEADER_RE = re.compile( + r"^#+\s+\S|^\S.*:\s*$", + re.MULTILINE, +) + +# List item patterns (numbered or bulleted) +_LIST_ITEM_RE = re.compile( + r"^\s*(?:\d+[\.\)]\s*|-\s*|\*\s*|\u2022\s*)(.+)", + re.MULTILINE, +) + +# Pattern for "assignee: " or "(assigned to )" or +# "- assignee: " at end of line +_ASSIGNEE_RE = re.compile( + r"(?:" + r"\(?assigned?\s+to:?\s*(.+?)\)?" + r"|assignee:?\s*(.+?)" + r")\s*$", + re.IGNORECASE, +) + + +def _extract_section( + text: str, + header_re: re.Pattern[str], +) -> str: + """Extract text between a section header and the next header. + + Args: + text: Full response text. + header_re: Compiled regex matching the section header. + + Returns: + Section body text, or empty string if header not found. + """ + match = header_re.search(text) + if match is None: + return "" + + start = match.end() + # Find the next header after this section + next_header = _ANY_HEADER_RE.search(text, start) + end = next_header.start() if next_header is not None else len(text) + + return text[start:end] + + +def parse_decisions(summary_text: str) -> tuple[str, ...]: + """Parse decisions from an LLM summary/synthesis response. + + Looks for a "Decisions" section header, then extracts numbered + or bulleted list items. Falls back to empty tuple if no + decisions section is found. + + Args: + summary_text: The full summary/synthesis text from the LLM. + + Returns: + Tuple of decision strings (may be empty). + """ + section = _extract_section(summary_text, _DECISIONS_HEADER_RE) + if not section: + return () + + decisions: list[str] = [] + for match in _LIST_ITEM_RE.finditer(section): + text = match.group(1).strip() + if text: + decisions.append(text) + + return tuple(decisions) + + +def _parse_assignee(text: str) -> tuple[str, str | None]: + """Extract assignee from an action item line. + + Args: + text: The action item text (may contain assignee info). + + Returns: + Tuple of (cleaned description, assignee_id or None). + """ + match = _ASSIGNEE_RE.search(text) + if match is None: + return text, None + + assignee = (match.group(1) or match.group(2) or "").strip() + # Remove the assignee part from the description + description = text[: match.start()].strip() + # Strip trailing punctuation left over + description = description.rstrip(" -,;:") + + if not assignee or not description: + return text, None + + return description, assignee + + +def parse_action_items( + summary_text: str, +) -> tuple[ActionItem, ...]: + """Parse action items from an LLM summary/synthesis response. + + Looks for an "Action Items" section header, then extracts + bulleted or numbered list items. Attempts to detect assignee + information within each item. + + Args: + summary_text: The full summary/synthesis text from the LLM. + + Returns: + Tuple of ActionItem instances (may be empty). + """ + section = _extract_section(summary_text, _ACTION_ITEMS_HEADER_RE) + if not section: + return () + + items: list[ActionItem] = [] + for match in _LIST_ITEM_RE.finditer(section): + raw_text = match.group(1).strip() + if not raw_text: + continue + + description, assignee_id = _parse_assignee(raw_text) + if not description: + continue + + items.append( + ActionItem( + description=description, + assignee_id=assignee_id, + ) + ) + + return tuple(items) diff --git a/src/ai_company/communication/meeting/_prompts.py b/src/ai_company/communication/meeting/_prompts.py index 009b5d3591..448e5e31f4 100644 --- a/src/ai_company/communication/meeting/_prompts.py +++ b/src/ai_company/communication/meeting/_prompts.py @@ -24,5 +24,7 @@ def build_agenda_prompt(agenda: MeetingAgenda) -> str: entry = f" {i}. {item.title}" if item.description: entry += f" — {item.description}" + if item.presenter_id: + entry += f" (presenter: {item.presenter_id})" parts.append(entry) return "\n".join(parts) diff --git a/src/ai_company/communication/meeting/_token_tracker.py b/src/ai_company/communication/meeting/_token_tracker.py index 2493873c25..13dba01aea 100644 --- a/src/ai_company/communication/meeting/_token_tracker.py +++ b/src/ai_company/communication/meeting/_token_tracker.py @@ -12,6 +12,13 @@ import dataclasses +from ai_company.observability import get_logger +from ai_company.observability.events.meeting import ( + MEETING_BUDGET_EXHAUSTED, +) + +logger = get_logger(__name__) + @dataclasses.dataclass class TokenTracker: @@ -61,8 +68,17 @@ def record(self, input_tokens: int, output_tokens: int) -> None: if input_tokens < 0 or output_tokens < 0: msg = ( f"Token counts must be non-negative, got " - f"input_tokens={input_tokens}, output_tokens={output_tokens}" + f"input_tokens={input_tokens}, " + f"output_tokens={output_tokens}" ) raise ValueError(msg) self.input_tokens += input_tokens self.output_tokens += output_tokens + + if self.used > self.budget: + logger.warning( + MEETING_BUDGET_EXHAUSTED, + tokens_used=self.used, + token_budget=self.budget, + overage=self.used - self.budget, + ) diff --git a/src/ai_company/communication/meeting/config.py b/src/ai_company/communication/meeting/config.py index e07abcb998..cd32bf8453 100644 --- a/src/ai_company/communication/meeting/config.py +++ b/src/ai_company/communication/meeting/config.py @@ -62,7 +62,8 @@ class StructuredPhasesConfig(BaseModel): Attributes: skip_discussion_if_no_conflicts: Skip discussion when no conflicts are detected. - max_discussion_tokens: Token budget for the discussion round. + max_discussion_tokens: Token budget for the discussion + round. """ model_config = ConfigDict(frozen=True) diff --git a/src/ai_company/communication/meeting/models.py b/src/ai_company/communication/meeting/models.py index 1a0d4f1ef3..7a3825802a 100644 --- a/src/ai_company/communication/meeting/models.py +++ b/src/ai_company/communication/meeting/models.py @@ -234,6 +234,29 @@ def _validate_participants(self) -> Self: raise ValueError(msg) return self + @model_validator(mode="after") + def _validate_token_aggregates(self) -> Self: + """Ensure aggregate token counts match sum of contributions.""" + if not self.contributions: + return self + sum_input = sum(c.input_tokens for c in self.contributions) + sum_output = sum(c.output_tokens for c in self.contributions) + if self.total_input_tokens != sum_input: + msg = ( + f"total_input_tokens ({self.total_input_tokens}) " + f"does not match sum of contributions " + f"({sum_input})" + ) + raise ValueError(msg) + if self.total_output_tokens != sum_output: + msg = ( + f"total_output_tokens ({self.total_output_tokens})" + f" does not match sum of contributions " + f"({sum_output})" + ) + raise ValueError(msg) + return self + class MeetingRecord(BaseModel): """Audit trail entry for a meeting execution. @@ -291,6 +314,12 @@ def _validate_status_consistency(self) -> Self: "failed or budget_exhausted" ) raise ValueError(msg) + if not self.error_message.strip(): + msg = ( + "error_message must not be empty when status " + "is failed or budget_exhausted" + ) + raise ValueError(msg) if self.minutes is not None: msg = "minutes must be None when status is failed or budget_exhausted" raise ValueError(msg) diff --git a/src/ai_company/communication/meeting/orchestrator.py b/src/ai_company/communication/meeting/orchestrator.py index f19ebba5b9..3d7d79beaf 100644 --- a/src/ai_company/communication/meeting/orchestrator.py +++ b/src/ai_company/communication/meeting/orchestrator.py @@ -5,7 +5,10 @@ from action items, and records audit trail entries. """ +import copy +from collections import Counter from collections.abc import Mapping # noqa: TC003 +from types import MappingProxyType from uuid import uuid4 from ai_company.communication.meeting.config import MeetingProtocolConfig # noqa: TC001 @@ -91,7 +94,9 @@ def __init__( agent_caller: AgentCaller, task_creator: TaskCreator | None = None, ) -> None: - self._protocol_registry = protocol_registry + self._protocol_registry: MappingProxyType[ + MeetingProtocolType, MeetingProtocol + ] = MappingProxyType(copy.deepcopy(dict(protocol_registry))) self._agent_caller = agent_caller self._task_creator = task_creator self._records: list[MeetingRecord] = [] @@ -390,6 +395,22 @@ def _validate_inputs( msg, context={"meeting_id": meeting_id}, ) + if len(participant_ids) != len(set(participant_ids)): + dupes = sorted(v for v, c in Counter(participant_ids).items() if c > 1) + logger.warning( + MEETING_VALIDATION_FAILED, + meeting_id=meeting_id, + error="duplicate participant_ids", + duplicates=dupes, + ) + msg = f"Duplicate participant IDs: {dupes}" + raise MeetingParticipantError( + msg, + context={ + "meeting_id": meeting_id, + "duplicates": dupes, + }, + ) if leader_id in participant_ids: logger.warning( MEETING_VALIDATION_FAILED, diff --git a/src/ai_company/communication/meeting/position_papers.py b/src/ai_company/communication/meeting/position_papers.py index 3b3fa819a7..d471d9bc92 100644 --- a/src/ai_company/communication/meeting/position_papers.py +++ b/src/ai_company/communication/meeting/position_papers.py @@ -9,6 +9,10 @@ import asyncio from datetime import UTC, datetime +from ai_company.communication.meeting._parsing import ( + parse_action_items, + parse_decisions, +) from ai_company.communication.meeting._prompts import build_agenda_prompt from ai_company.communication.meeting._token_tracker import TokenTracker from ai_company.communication.meeting.config import PositionPapersConfig # noqa: TC001 @@ -146,6 +150,10 @@ async def run( # noqa: PLR0913 synthesis_contribution, ) + synthesis_text = synthesis_contribution.content + decisions = parse_decisions(synthesis_text) + action_items = parse_action_items(synthesis_text) + logger.debug( MEETING_TOKENS_RECORDED, meeting_id=meeting_id, @@ -163,7 +171,9 @@ async def run( # noqa: PLR0913 participant_ids=participant_ids, agenda=agenda, contributions=tuple(contributions), - summary=synthesis_contribution.content, + summary=synthesis_text, + decisions=decisions, + action_items=action_items, total_input_tokens=tracker.input_tokens, total_output_tokens=tracker.output_tokens, started_at=started_at, @@ -269,12 +279,12 @@ async def _collect_paper( # All slots must be filled — TaskGroup propagates ExceptionGroup # on any task failure, so reaching this point means all succeeded. - assert all(r is not None for r in results), ( # noqa: S101 - f"Expected {n} position papers but some slots are None" - ) - assert all(c is not None for c in contrib_results), ( # noqa: S101 - f"Expected {n} contributions but some slots are None" - ) + if not all(r is not None for r in results): + msg = f"Expected {n} position papers but some slots are None" + raise RuntimeError(msg) + if not all(c is not None for c in contrib_results): + msg = f"Expected {n} contributions but some slots are None" + raise RuntimeError(msg) papers: list[tuple[str, str]] = list(results) # type: ignore[arg-type] paper_contributions: list[MeetingContribution] = list(contrib_results) # type: ignore[arg-type] diff --git a/src/ai_company/communication/meeting/round_robin.py b/src/ai_company/communication/meeting/round_robin.py index a88b30a5d1..dce8ed7b24 100644 --- a/src/ai_company/communication/meeting/round_robin.py +++ b/src/ai_company/communication/meeting/round_robin.py @@ -8,6 +8,10 @@ from datetime import UTC, datetime +from ai_company.communication.meeting._parsing import ( + parse_action_items, + parse_decisions, +) from ai_company.communication.meeting._prompts import build_agenda_prompt from ai_company.communication.meeting._token_tracker import TokenTracker from ai_company.communication.meeting.config import RoundRobinConfig # noqa: TC001 @@ -19,6 +23,7 @@ MeetingBudgetExhaustedError, ) from ai_company.communication.meeting.models import ( + ActionItem, MeetingAgenda, MeetingContribution, MeetingMinutes, @@ -122,7 +127,11 @@ async def run( # noqa: PLR0913 tracker = TokenTracker(budget=token_budget) agenda_text = build_agenda_prompt(agenda) - summary_reserve = int(token_budget * _SUMMARY_RESERVE_FRACTION) + summary_reserve = ( + int(token_budget * _SUMMARY_RESERVE_FRACTION) + if self._config.leader_summarizes + else 0 + ) discussion_budget = token_budget - summary_reserve contributions = await self._run_discussion_rounds( @@ -139,6 +148,8 @@ async def run( # noqa: PLR0913 # Summary phase summary = "" + decisions: tuple[str, ...] = () + action_items: tuple[ActionItem, ...] = () all_contributions: tuple[MeetingContribution, ...] = tuple(contributions) if self._config.leader_summarizes and not tracker.is_exhausted: summary, summary_contribution = await self._run_summary( @@ -151,6 +162,8 @@ async def run( # noqa: PLR0913 turn_number=turn_number, ) all_contributions = (*contributions, summary_contribution) + decisions = parse_decisions(summary) + action_items = parse_action_items(summary) elif self._config.leader_summarizes and tracker.is_exhausted: logger.warning( MEETING_SUMMARY_SKIPPED, @@ -190,6 +203,8 @@ async def run( # noqa: PLR0913 agenda=agenda, contributions=all_contributions, summary=summary, + decisions=decisions, + action_items=action_items, total_input_tokens=tracker.input_tokens, total_output_tokens=tracker.output_tokens, started_at=started_at, diff --git a/src/ai_company/communication/meeting/structured_phases.py b/src/ai_company/communication/meeting/structured_phases.py index 98abebabdc..3fc57881d0 100644 --- a/src/ai_company/communication/meeting/structured_phases.py +++ b/src/ai_company/communication/meeting/structured_phases.py @@ -9,6 +9,10 @@ import asyncio from datetime import UTC, datetime +from ai_company.communication.meeting._parsing import ( + parse_action_items, + parse_decisions, +) from ai_company.communication.meeting._prompts import build_agenda_prompt from ai_company.communication.meeting._token_tracker import TokenTracker from ai_company.communication.meeting.config import ( @@ -272,6 +276,9 @@ async def run( # noqa: PLR0913 synthesis_contribution, ) + decisions = parse_decisions(summary) + action_items = parse_action_items(summary) + logger.debug( MEETING_TOKENS_RECORDED, meeting_id=meeting_id, @@ -290,6 +297,8 @@ async def run( # noqa: PLR0913 agenda=agenda, contributions=contributions, summary=summary, + decisions=decisions, + action_items=action_items, conflicts_detected=conflicts_detected, total_input_tokens=tracker.input_tokens, total_output_tokens=tracker.output_tokens, @@ -389,12 +398,12 @@ async def _collect_input( # All slots must be filled — TaskGroup propagates ExceptionGroup # on any task failure, so reaching this point means all succeeded. - assert all(r is not None for r in result_inputs), ( # noqa: S101 - f"Expected {num_participants} inputs but some slots are None" - ) - assert all(c is not None for c in result_contributions), ( # noqa: S101 - f"Expected {num_participants} contributions but some slots are None" - ) + if not all(r is not None for r in result_inputs): + msg = f"Expected {num_participants} inputs but some slots are None" + raise RuntimeError(msg) + if not all(c is not None for c in result_contributions): + msg = f"Expected {num_participants} contributions but some slots are None" + raise RuntimeError(msg) inputs: list[tuple[str, str]] = list(result_inputs) # type: ignore[arg-type] input_contributions: list[MeetingContribution] = list( result_contributions, # type: ignore[arg-type] @@ -476,12 +485,6 @@ async def _run_discussion( # noqa: PLR0913 meeting_id=meeting_id, conflicts_found=conflicts_detected, ) - logger.debug( - MEETING_CONFLICT_DETECTED, - meeting_id=meeting_id, - conflicts_found=conflicts_detected, - raw_response=conflict_response.content, - ) should_discuss = conflicts_detected or ( not self._config.skip_discussion_if_no_conflicts @@ -540,9 +543,13 @@ async def _run_discussion_round( # noqa: PLR0913 phase=MeetingPhase.DISCUSSION, ) + # Reserve tokens for the synthesis phase that follows + # discussion so that discussion cannot exhaust the budget. + synthesis_reserve = int(tracker.remaining * _SYNTHESIS_RESERVE_FRACTION) + available_for_discussion = max(0, tracker.remaining - synthesis_reserve) discussion_budget = min( self._config.max_discussion_tokens, - tracker.remaining, + available_for_discussion, ) tokens_per_agent = max( 1, @@ -551,9 +558,10 @@ async def _run_discussion_round( # noqa: PLR0913 round_contributions: list[MeetingContribution] = [] round_discussion: list[tuple[str, str]] = [] + discussion_used = 0 for pid in participant_ids: - if tracker.is_exhausted: + if tracker.is_exhausted or discussion_used >= discussion_budget: logger.warning( MEETING_BUDGET_EXHAUSTED, meeting_id=meeting_id, @@ -576,15 +584,17 @@ async def _run_discussion_round( # noqa: PLR0913 phase=MeetingPhase.DISCUSSION, ) + remaining_discussion = discussion_budget - discussion_used disc_response = await agent_caller( pid, disc_prompt, - min(tokens_per_agent, tracker.remaining), + min(tokens_per_agent, remaining_discussion), ) tracker.record( disc_response.input_tokens, disc_response.output_tokens, ) + discussion_used += disc_response.input_tokens + disc_response.output_tokens disc_contribution = MeetingContribution( agent_id=pid, diff --git a/src/ai_company/communication/messenger.py b/src/ai_company/communication/messenger.py index eaba761199..fe2ffac9b5 100644 --- a/src/ai_company/communication/messenger.py +++ b/src/ai_company/communication/messenger.py @@ -14,6 +14,7 @@ DeliveryEnvelope, Subscription, ) +from ai_company.core.types import NotBlankStr # noqa: TC001 from ai_company.observability import get_logger from ai_company.observability.events.communication import ( COMM_DISPATCH_NO_DISPATCHER, @@ -235,7 +236,7 @@ async def broadcast( ) return msg - async def subscribe(self, channel_name: str) -> Subscription: + async def subscribe(self, channel_name: NotBlankStr) -> Subscription: """Subscribe this agent to a channel. Args: @@ -256,7 +257,7 @@ async def subscribe(self, channel_name: str) -> Subscription: ) return sub - async def unsubscribe(self, channel_name: str) -> None: + async def unsubscribe(self, channel_name: NotBlankStr) -> None: """Unsubscribe this agent from a channel. Args: @@ -274,7 +275,7 @@ async def unsubscribe(self, channel_name: str) -> None: async def receive( self, - channel_name: str, + channel_name: NotBlankStr, *, timeout: float | None = None, # noqa: ASYNC109 ) -> DeliveryEnvelope | None: @@ -285,8 +286,12 @@ async def receive( timeout: Max seconds to wait, or ``None`` for indefinite. Returns: - The next delivery envelope, or ``None`` on timeout, shutdown, - or when an in-flight receive is woken by :meth:`unsubscribe`. + The next delivery envelope, or ``None`` when: + + - *timeout* expires without a message arriving. + - The bus is shut down while waiting. + - The subscription is cancelled via :meth:`unsubscribe` + while a ``receive()`` call is in flight. """ return await self._bus.receive( channel_name, diff --git a/src/ai_company/core/enums.py b/src/ai_company/core/enums.py index 8c348dc1ac..2cc19161f8 100644 --- a/src/ai_company/core/enums.py +++ b/src/ai_company/core/enums.py @@ -23,6 +23,30 @@ class SeniorityLevel(StrEnum): _SENIORITY_ORDER: tuple[SeniorityLevel, ...] = tuple(SeniorityLevel) +# Validate that _SENIORITY_ORDER contains every SeniorityLevel member +# exactly once and preserves enum declaration order. This guards +# against silent breakage if the enum is reordered or extended without +# updating the ordering tuple. +_all_members = set(SeniorityLevel) +_order_set = set(_SENIORITY_ORDER) +if _order_set != _all_members: + _missing = _all_members - _order_set + _extra = _order_set - _all_members + _msg = ( + f"_SENIORITY_ORDER is out of sync with SeniorityLevel: " + f"missing={_missing}, extra={_extra}" + ) + raise RuntimeError(_msg) +if len(_SENIORITY_ORDER) != len(_order_set): + _msg = "_SENIORITY_ORDER contains duplicate entries" + raise RuntimeError(_msg) +del _all_members, _order_set + +# Precomputed rank lookup for O(1) seniority comparison. +_SENIORITY_RANK: dict[SeniorityLevel, int] = { + level: idx for idx, level in enumerate(_SENIORITY_ORDER) +} + def compare_seniority(a: SeniorityLevel, b: SeniorityLevel) -> int: """Compare two seniority levels. @@ -37,7 +61,7 @@ def compare_seniority(a: SeniorityLevel, b: SeniorityLevel) -> int: Returns: Integer indicating relative seniority. """ - return _SENIORITY_ORDER.index(a) - _SENIORITY_ORDER.index(b) + return _SENIORITY_RANK[a] - _SENIORITY_RANK[b] class AgentStatus(StrEnum): diff --git a/src/ai_company/core/task.py b/src/ai_company/core/task.py index 95c33f1ce9..2973031a3a 100644 --- a/src/ai_company/core/task.py +++ b/src/ai_company/core/task.py @@ -70,6 +70,10 @@ class Task(BaseModel): parent_task_id: Parent task ID when created via delegation (``None`` for root tasks). delegation_chain: Ordered agent IDs of delegators (root first). + task_structure: Classification of how subtasks relate to each + other (``None`` when not yet classified). + coordination_topology: Coordination topology for multi-agent + execution (defaults to ``AUTO``). """ model_config = ConfigDict(frozen=True) diff --git a/src/ai_company/engine/decomposition/models.py b/src/ai_company/engine/decomposition/models.py index 96556d7a22..38f42877a3 100644 --- a/src/ai_company/engine/decomposition/models.py +++ b/src/ai_company/engine/decomposition/models.py @@ -148,11 +148,22 @@ def _validate_plan_task_consistency(self) -> Self: if len(self.created_tasks) != len(self.plan.subtasks): msg = ( f"created_tasks count ({len(self.created_tasks)}) " - f"does not match plan subtask count ({len(self.plan.subtasks)})" + f"does not match plan subtask count " + f"({len(self.plan.subtasks)})" ) raise ValueError(msg) task_ids = {t.id for t in self.created_tasks} + plan_ids = {s.id for s in self.plan.subtasks} + if task_ids != plan_ids: + missing = sorted(plan_ids - task_ids) + extra = sorted(task_ids - plan_ids) + msg = ( + f"created_tasks IDs do not match plan subtask IDs" + f" (missing={missing}, extra={extra})" + ) + raise ValueError(msg) + edge_ids = {eid for edge in self.dependency_edges for eid in edge} unknown_edge_ids = edge_ids - task_ids if unknown_edge_ids: @@ -234,9 +245,12 @@ def derived_parent_status(self) -> TaskStatus: # noqa: PLR0911 if self.blocked > 0: return TaskStatus.BLOCKED - # All subtasks in terminal states (completed + cancelled mix) + # All subtasks in terminal states but mixed completed + cancelled + # — not fully completed (pure completed already handled above), + # and not fully cancelled (pure cancelled already handled above). + # Report as CANCELLED since some work was abandoned. if self.completed + self.cancelled == self.total: - return TaskStatus.COMPLETED + return TaskStatus.CANCELLED return TaskStatus.IN_PROGRESS diff --git a/src/ai_company/engine/decomposition/rollup.py b/src/ai_company/engine/decomposition/rollup.py index 179d964e96..9daa3b51ff 100644 --- a/src/ai_company/engine/decomposition/rollup.py +++ b/src/ai_company/engine/decomposition/rollup.py @@ -49,6 +49,15 @@ def compute( total=0, reason="rollup computed with no subtask statuses", ) + return SubtaskStatusRollup( + parent_task_id=parent_task_id, + total=0, + completed=0, + failed=0, + in_progress=0, + blocked=0, + cancelled=0, + ) completed = subtask_statuses.count(TaskStatus.COMPLETED) failed = subtask_statuses.count(TaskStatus.FAILED) diff --git a/src/ai_company/engine/decomposition/service.py b/src/ai_company/engine/decomposition/service.py index d9e983fbfd..3624fac7a9 100644 --- a/src/ai_company/engine/decomposition/service.py +++ b/src/ai_company/engine/decomposition/service.py @@ -128,6 +128,7 @@ async def _do_decompose( created_by=task.created_by, parent_task_id=task.id, delegation_chain=task.delegation_chain, + dependencies=subtask_def.dependencies, status=TaskStatus.CREATED, estimated_complexity=subtask_def.estimated_complexity, ) diff --git a/src/ai_company/engine/parallel.py b/src/ai_company/engine/parallel.py index bc870aad07..68ba97e55b 100644 --- a/src/ai_company/engine/parallel.py +++ b/src/ai_company/engine/parallel.py @@ -178,10 +178,12 @@ async def execute_group( if task_error is not None: task_error.add_note(lock_msg) else: - raise ParallelExecutionError(lock_msg) from release_error + raise ParallelExecutionError( + lock_msg, + ) from release_error if task_error is not None: - raise task_error + raise task_error from task_error result = self._build_result( group, diff --git a/src/ai_company/engine/routing/models.py b/src/ai_company/engine/routing/models.py index 47341932d8..eed25f9927 100644 --- a/src/ai_company/engine/routing/models.py +++ b/src/ai_company/engine/routing/models.py @@ -4,6 +4,7 @@ results, and topology configuration. """ +from collections import Counter from typing import Self from pydantic import BaseModel, ConfigDict, Field, model_validator @@ -11,6 +12,12 @@ from ai_company.core.agent import AgentIdentity # noqa: TC001 from ai_company.core.enums import CoordinationTopology from ai_company.core.types import NotBlankStr # noqa: TC001 +from ai_company.observability import get_logger +from ai_company.observability.events.task_routing import ( + TASK_ROUTING_FAILED, +) + +logger = get_logger(__name__) class RoutingCandidate(BaseModel): @@ -72,6 +79,11 @@ def _validate_selected_not_in_alternatives(self) -> Self: msg = ( f"Selected candidate {selected_name!r} also appears in alternatives" ) + logger.warning( + TASK_ROUTING_FAILED, + subtask_id=self.subtask_id, + error=msg, + ) raise ValueError(msg) return self @@ -99,8 +111,36 @@ class RoutingResult(BaseModel): @model_validator(mode="after") def _validate_unique_subtask_ids(self) -> Self: - """Ensure no subtask appears in both decisions and unroutable.""" - decision_ids = {d.subtask_id for d in self.decisions} + """Ensure no duplicate or overlapping subtask IDs.""" + # Check for duplicate IDs within decisions + decision_id_list = [d.subtask_id for d in self.decisions] + decision_dupes = sorted( + i for i, c in Counter(decision_id_list).items() if c > 1 + ) + if decision_dupes: + msg = f"Duplicate subtask IDs in decisions: {decision_dupes}" + logger.warning( + TASK_ROUTING_FAILED, + parent_task_id=self.parent_task_id, + error=msg, + ) + raise ValueError(msg) + + # Check for duplicate IDs within unroutable + unroutable_dupes = sorted( + i for i, c in Counter(self.unroutable).items() if c > 1 + ) + if unroutable_dupes: + msg = f"Duplicate subtask IDs in unroutable: {unroutable_dupes}" + logger.warning( + TASK_ROUTING_FAILED, + parent_task_id=self.parent_task_id, + error=msg, + ) + raise ValueError(msg) + + # Check for overlap between decisions and unroutable + decision_ids = set(decision_id_list) unroutable_set = set(self.unroutable) overlap = decision_ids & unroutable_set if overlap: @@ -108,6 +148,11 @@ def _validate_unique_subtask_ids(self) -> Self: f"Subtask IDs appear in both decisions and " f"unroutable: {sorted(overlap)}" ) + logger.warning( + TASK_ROUTING_FAILED, + parent_task_id=self.parent_task_id, + error=msg, + ) raise ValueError(msg) return self @@ -154,5 +199,9 @@ def _validate_no_auto_defaults(self) -> Self: value = getattr(self, field_name) if value == CoordinationTopology.AUTO: msg = f"{field_name} cannot be AUTO — would cause infinite resolution" + logger.warning( + TASK_ROUTING_FAILED, + error=msg, + ) raise ValueError(msg) return self diff --git a/src/ai_company/engine/routing/scorer.py b/src/ai_company/engine/routing/scorer.py index d656b9c1e4..613cf951f2 100644 --- a/src/ai_company/engine/routing/scorer.py +++ b/src/ai_company/engine/routing/scorer.py @@ -11,6 +11,7 @@ from ai_company.observability import get_logger from ai_company.observability.events.task_routing import ( TASK_ROUTING_AGENT_SCORED, + TASK_ROUTING_SCORER_INVALID_CONFIG, ) if TYPE_CHECKING: @@ -58,7 +59,7 @@ def __init__(self, min_score: float = 0.1) -> None: if not 0.0 <= min_score <= 1.0: msg = f"min_score must be between 0.0 and 1.0, got {min_score}" logger.warning( - "agent_task_scorer.invalid_min_score", + TASK_ROUTING_SCORER_INVALID_CONFIG, min_score=min_score, error=msg, ) diff --git a/src/ai_company/engine/routing/service.py b/src/ai_company/engine/routing/service.py index 7f2f69cf2c..33c02d3f55 100644 --- a/src/ai_company/engine/routing/service.py +++ b/src/ai_company/engine/routing/service.py @@ -114,8 +114,27 @@ def _do_route( Returns: Routing result with decisions and unroutable subtask IDs. + + Raises: + ValueError: If parent_task.id does not match + plan.parent_task_id. """ plan = decomposition_result.plan + + if parent_task.id != plan.parent_task_id: + msg = ( + f"parent_task.id {parent_task.id!r} does not " + f"match plan.parent_task_id " + f"{plan.parent_task_id!r}" + ) + logger.warning( + TASK_ROUTING_FAILED, + parent_task_id=parent_task.id, + plan_parent_task_id=plan.parent_task_id, + error=msg, + ) + raise ValueError(msg) + topology = self._topology_selector.select(parent_task, plan) decisions: list[RoutingDecision] = [] diff --git a/src/ai_company/observability/events/task_routing.py b/src/ai_company/observability/events/task_routing.py index 57d2b68633..724b6f2431 100644 --- a/src/ai_company/observability/events/task_routing.py +++ b/src/ai_company/observability/events/task_routing.py @@ -11,3 +11,4 @@ TASK_ROUTING_COMPLETE: Final[str] = "task_routing.complete" TASK_ROUTING_FAILED: Final[str] = "task_routing.failed" TASK_ROUTING_NO_AGENTS: Final[str] = "task_routing.no_agents" +TASK_ROUTING_SCORER_INVALID_CONFIG: Final[str] = "task_routing.scorer.invalid_config" diff --git a/tests/integration/communication/test_meeting_integration.py b/tests/integration/communication/test_meeting_integration.py index ba30f5ae3b..bd0ceb8147 100644 --- a/tests/integration/communication/test_meeting_integration.py +++ b/tests/integration/communication/test_meeting_integration.py @@ -34,6 +34,8 @@ StructuredPhasesProtocol, ) +pytestmark = pytest.mark.timeout(30) + def _make_agent_caller( responses: dict[str, list[str]] | None = None, diff --git a/tests/unit/communication/conflict_resolution/test_authority_strategy.py b/tests/unit/communication/conflict_resolution/test_authority_strategy.py index 45765de496..d2a590426b 100644 --- a/tests/unit/communication/conflict_resolution/test_authority_strategy.py +++ b/tests/unit/communication/conflict_resolution/test_authority_strategy.py @@ -166,6 +166,71 @@ async def test_subordinate_vs_supervisor_equal_seniority( assert resolution.winning_agent_id == "backend_lead" +@pytest.mark.unit +class TestAuthorityResolverThreeParticipants: + async def test_three_participants_highest_seniority_wins( + self, + hierarchy: HierarchyResolver, + ) -> None: + """Authority resolution picks highest seniority among 3+ agents.""" + resolver = AuthorityResolver(hierarchy=hierarchy) + conflict = make_conflict( + positions=( + make_position( + agent_id="jr_dev", + level=SeniorityLevel.JUNIOR, + position="Quick hack", + ), + make_position( + agent_id="sr_dev", + level=SeniorityLevel.SENIOR, + position="Proper refactor", + ), + make_position( + agent_id="backend_lead", + level=SeniorityLevel.LEAD, + position="Full redesign", + ), + ), + ) + resolution = await resolver.resolve(conflict) + assert resolution.winning_agent_id == "backend_lead" + assert resolution.outcome == ConflictResolutionOutcome.RESOLVED_BY_AUTHORITY + + async def test_three_participants_produces_two_dissent_records( + self, + hierarchy: HierarchyResolver, + ) -> None: + """With 3 participants, the two losers each produce a dissent record.""" + resolver = AuthorityResolver(hierarchy=hierarchy) + conflict = make_conflict( + positions=( + make_position( + agent_id="jr_dev", + level=SeniorityLevel.JUNIOR, + position="Approach A", + ), + make_position( + agent_id="sr_dev", + level=SeniorityLevel.SENIOR, + position="Approach B", + ), + make_position( + agent_id="backend_lead", + level=SeniorityLevel.LEAD, + position="Approach C", + ), + ), + ) + resolution = await resolver.resolve(conflict) + records = resolver.build_dissent_records(conflict, resolution) + assert len(records) == 2 + dissenter_ids = {r.dissenting_agent_id for r in records} + assert dissenter_ids == {"jr_dev", "sr_dev"} + for record in records: + assert record.strategy_used == ConflictResolutionStrategy.AUTHORITY + + @pytest.mark.unit class TestAuthorityResolverDissentRecord: async def test_dissent_record_has_loser_info( diff --git a/tests/unit/communication/conflict_resolution/test_config.py b/tests/unit/communication/conflict_resolution/test_config.py index b26936d7ae..fc72a5ab99 100644 --- a/tests/unit/communication/conflict_resolution/test_config.py +++ b/tests/unit/communication/conflict_resolution/test_config.py @@ -17,12 +17,10 @@ class TestDebateConfig: def test_defaults(self) -> None: cfg = DebateConfig() - assert cfg.max_tokens_per_argument == 500 assert cfg.judge == "shared_manager" def test_custom_values(self) -> None: - cfg = DebateConfig(max_tokens_per_argument=1000, judge="ceo") - assert cfg.max_tokens_per_argument == 1000 + cfg = DebateConfig(judge="ceo") assert cfg.judge == "ceo" def test_frozen(self) -> None: @@ -30,26 +28,16 @@ def test_frozen(self) -> None: with pytest.raises(ValidationError): cfg.judge = "changed" # type: ignore[misc] - def test_zero_tokens_rejected(self) -> None: - with pytest.raises(ValidationError): - DebateConfig(max_tokens_per_argument=0) - - def test_negative_tokens_rejected(self) -> None: - with pytest.raises(ValidationError): - DebateConfig(max_tokens_per_argument=-1) - @pytest.mark.unit class TestHybridConfig: def test_defaults(self) -> None: cfg = HybridConfig() - assert cfg.max_tokens_per_argument == 500 assert cfg.review_agent == "conflict_reviewer" assert cfg.escalate_on_ambiguity is True def test_custom_values(self) -> None: cfg = HybridConfig( - max_tokens_per_argument=300, review_agent="senior_reviewer", escalate_on_ambiguity=False, ) diff --git a/tests/unit/communication/conflict_resolution/test_debate_strategy.py b/tests/unit/communication/conflict_resolution/test_debate_strategy.py index cf82791765..d8fe59666e 100644 --- a/tests/unit/communication/conflict_resolution/test_debate_strategy.py +++ b/tests/unit/communication/conflict_resolution/test_debate_strategy.py @@ -265,13 +265,15 @@ async def test_three_party_shared_manager_judge( self, hierarchy: HierarchyResolver, ) -> None: - """3-party conflict uses iterative LCM for judge selection.""" + """3-party conflict across teams uses iterative LCM for judge.""" judge = FakeJudgeEvaluator(winner_id="sr_dev") resolver = DebateResolver( hierarchy=hierarchy, config=DebateConfig(judge="shared_manager"), judge_evaluator=judge, ) + # sr_dev, jr_dev are under backend_lead; ui_dev is under + # frontend_lead. LCM of agents from different teams = cto. conflict = make_conflict( positions=( make_position(agent_id="sr_dev", level=SeniorityLevel.SENIOR), @@ -281,16 +283,16 @@ async def test_three_party_shared_manager_judge( position="Approach B", ), make_position( - agent_id="backend_lead", - level=SeniorityLevel.LEAD, + agent_id="ui_dev", + level=SeniorityLevel.JUNIOR, position="Approach C", - reasoning="Lead perspective", + reasoning="Frontend perspective", ), ), ) resolution = await resolver.resolve(conflict) assert resolution.winning_agent_id == "sr_dev" - # Judge should be LCM of all three — cto (all are under cto) + # Judge should be LCM of all three — cto (cross-team) _, judge_id = judge.calls[0] assert judge_id == "cto" diff --git a/tests/unit/communication/conflict_resolution/test_hybrid_strategy.py b/tests/unit/communication/conflict_resolution/test_hybrid_strategy.py index d07e1d3669..c223168d0d 100644 --- a/tests/unit/communication/conflict_resolution/test_hybrid_strategy.py +++ b/tests/unit/communication/conflict_resolution/test_hybrid_strategy.py @@ -14,6 +14,9 @@ ConflictResolutionOutcome, ) from ai_company.communication.conflict_resolution.protocol import JudgeDecision +from ai_company.communication.delegation.hierarchy import ( + HierarchyResolver, # noqa: TC001 +) from ai_company.communication.enums import ConflictResolutionStrategy from ai_company.core.enums import SeniorityLevel @@ -39,9 +42,13 @@ async def evaluate( @pytest.mark.unit class TestHybridResolverAutoResolve: - async def test_clear_winner_auto_resolves(self) -> None: + async def test_clear_winner_auto_resolves( + self, + hierarchy: HierarchyResolver, + ) -> None: evaluator = FakeReviewEvaluator(winner_id="sr_dev") resolver = HybridResolver( + hierarchy=hierarchy, config=HybridConfig(), human_resolver=HumanEscalationResolver(), review_evaluator=evaluator, @@ -63,10 +70,14 @@ async def test_clear_winner_auto_resolves(self) -> None: @pytest.mark.unit class TestHybridResolverAmbiguous: - async def test_ambiguous_escalates_to_human(self) -> None: + async def test_ambiguous_escalates_to_human( + self, + hierarchy: HierarchyResolver, + ) -> None: # Return a winner that is NOT a participant evaluator = FakeReviewEvaluator(winner_id="unknown_agent") resolver = HybridResolver( + hierarchy=hierarchy, config=HybridConfig(escalate_on_ambiguity=True), human_resolver=HumanEscalationResolver(), review_evaluator=evaluator, @@ -84,9 +95,13 @@ async def test_ambiguous_escalates_to_human(self) -> None: resolution = await resolver.resolve(conflict) assert resolution.outcome == ConflictResolutionOutcome.ESCALATED_TO_HUMAN - async def test_ambiguous_falls_back_to_authority(self) -> None: + async def test_ambiguous_falls_back_to_authority( + self, + hierarchy: HierarchyResolver, + ) -> None: evaluator = FakeReviewEvaluator(winner_id="unknown_agent") resolver = HybridResolver( + hierarchy=hierarchy, config=HybridConfig(escalate_on_ambiguity=False), human_resolver=HumanEscalationResolver(), review_evaluator=evaluator, @@ -108,8 +123,12 @@ async def test_ambiguous_falls_back_to_authority(self) -> None: @pytest.mark.unit class TestHybridResolverNoEvaluator: - async def test_no_evaluator_falls_back_to_authority(self) -> None: + async def test_no_evaluator_falls_back_to_authority( + self, + hierarchy: HierarchyResolver, + ) -> None: resolver = HybridResolver( + hierarchy=hierarchy, config=HybridConfig(), human_resolver=HumanEscalationResolver(), review_evaluator=None, @@ -131,9 +150,13 @@ async def test_no_evaluator_falls_back_to_authority(self) -> None: @pytest.mark.unit class TestHybridResolverDissentRecord: - async def test_dissent_record_for_resolved(self) -> None: + async def test_dissent_record_for_resolved( + self, + hierarchy: HierarchyResolver, + ) -> None: evaluator = FakeReviewEvaluator(winner_id="sr_dev") resolver = HybridResolver( + hierarchy=hierarchy, config=HybridConfig(), human_resolver=HumanEscalationResolver(), review_evaluator=evaluator, @@ -153,9 +176,13 @@ async def test_dissent_record_for_resolved(self) -> None: assert records[0].dissenting_agent_id == "jr_dev" assert records[0].strategy_used == ConflictResolutionStrategy.HYBRID - async def test_dissent_record_for_escalated(self) -> None: + async def test_dissent_record_for_escalated( + self, + hierarchy: HierarchyResolver, + ) -> None: evaluator = FakeReviewEvaluator(winner_id="unknown") resolver = HybridResolver( + hierarchy=hierarchy, config=HybridConfig(escalate_on_ambiguity=True), human_resolver=HumanEscalationResolver(), review_evaluator=evaluator, diff --git a/tests/unit/communication/meeting/test_config.py b/tests/unit/communication/meeting/test_config.py index 4976acb186..0839c36b9c 100644 --- a/tests/unit/communication/meeting/test_config.py +++ b/tests/unit/communication/meeting/test_config.py @@ -11,6 +11,8 @@ ) from ai_company.communication.meeting.enums import MeetingProtocolType +pytestmark = pytest.mark.timeout(30) + @pytest.mark.unit class TestRoundRobinConfig: diff --git a/tests/unit/communication/meeting/test_enums.py b/tests/unit/communication/meeting/test_enums.py index 5120028185..3f7501db46 100644 --- a/tests/unit/communication/meeting/test_enums.py +++ b/tests/unit/communication/meeting/test_enums.py @@ -8,6 +8,8 @@ MeetingStatus, ) +pytestmark = pytest.mark.timeout(30) + @pytest.mark.unit class TestMeetingProtocolType: diff --git a/tests/unit/communication/meeting/test_errors.py b/tests/unit/communication/meeting/test_errors.py index 70918e6b8d..f81fc00a3e 100644 --- a/tests/unit/communication/meeting/test_errors.py +++ b/tests/unit/communication/meeting/test_errors.py @@ -11,6 +11,8 @@ MeetingProtocolNotFoundError, ) +pytestmark = pytest.mark.timeout(30) + @pytest.mark.unit class TestMeetingErrorHierarchy: diff --git a/tests/unit/communication/meeting/test_models.py b/tests/unit/communication/meeting/test_models.py index 4d008544a5..9b0601ee36 100644 --- a/tests/unit/communication/meeting/test_models.py +++ b/tests/unit/communication/meeting/test_models.py @@ -24,6 +24,8 @@ _NOW = datetime(2026, 3, 8, 10, 0, tzinfo=UTC) _LATER = datetime(2026, 3, 8, 10, 30, tzinfo=UTC) +pytestmark = pytest.mark.timeout(30) + @pytest.mark.unit class TestAgentResponse: @@ -223,7 +225,11 @@ def test_with_contributions(self) -> None: output_tokens=20, timestamp=_NOW, ) - minutes = self._make_minutes(contributions=(contrib,)) + minutes = self._make_minutes( + contributions=(contrib,), + total_input_tokens=10, + total_output_tokens=20, + ) assert len(minutes.contributions) == 1 def test_with_action_items(self) -> None: diff --git a/tests/unit/communication/meeting/test_orchestrator.py b/tests/unit/communication/meeting/test_orchestrator.py index a2bab470d3..a5fc46b0d9 100644 --- a/tests/unit/communication/meeting/test_orchestrator.py +++ b/tests/unit/communication/meeting/test_orchestrator.py @@ -35,6 +35,8 @@ make_mock_agent_caller, ) +pytestmark = pytest.mark.timeout(30) + def _make_orchestrator( *, diff --git a/tests/unit/communication/meeting/test_position_papers.py b/tests/unit/communication/meeting/test_position_papers.py index 357b6df64b..9ba70531d0 100644 --- a/tests/unit/communication/meeting/test_position_papers.py +++ b/tests/unit/communication/meeting/test_position_papers.py @@ -19,6 +19,8 @@ make_mock_agent_caller, ) +pytestmark = pytest.mark.timeout(30) + @pytest.mark.unit class TestPositionPapersProtocolType: diff --git a/tests/unit/communication/meeting/test_prompts.py b/tests/unit/communication/meeting/test_prompts.py index 9b70aab502..cb32283179 100644 --- a/tests/unit/communication/meeting/test_prompts.py +++ b/tests/unit/communication/meeting/test_prompts.py @@ -5,6 +5,8 @@ from ai_company.communication.meeting._prompts import build_agenda_prompt from ai_company.communication.meeting.models import MeetingAgenda, MeetingAgendaItem +pytestmark = pytest.mark.timeout(30) + @pytest.mark.unit class TestBuildAgendaPrompt: diff --git a/tests/unit/communication/meeting/test_protocol.py b/tests/unit/communication/meeting/test_protocol.py index 8f814fbe54..9348eeca1b 100644 --- a/tests/unit/communication/meeting/test_protocol.py +++ b/tests/unit/communication/meeting/test_protocol.py @@ -14,6 +14,8 @@ TaskCreator, ) +pytestmark = pytest.mark.timeout(30) + @pytest.mark.unit class TestMeetingProtocolInterface: diff --git a/tests/unit/communication/meeting/test_round_robin.py b/tests/unit/communication/meeting/test_round_robin.py index b3bc19ebd5..c8c37d02f1 100644 --- a/tests/unit/communication/meeting/test_round_robin.py +++ b/tests/unit/communication/meeting/test_round_robin.py @@ -17,6 +17,8 @@ make_mock_agent_caller, ) +pytestmark = pytest.mark.timeout(30) + @pytest.mark.unit class TestRoundRobinProtocolType: diff --git a/tests/unit/communication/meeting/test_structured_phases.py b/tests/unit/communication/meeting/test_structured_phases.py index 5ac0955469..810d174294 100644 --- a/tests/unit/communication/meeting/test_structured_phases.py +++ b/tests/unit/communication/meeting/test_structured_phases.py @@ -20,6 +20,8 @@ make_mock_agent_caller, ) +pytestmark = pytest.mark.timeout(30) + @pytest.mark.unit class TestStructuredPhasesProtocolType: diff --git a/tests/unit/communication/meeting/test_token_tracker.py b/tests/unit/communication/meeting/test_token_tracker.py index e2cab2ce1e..f1d643a5ab 100644 --- a/tests/unit/communication/meeting/test_token_tracker.py +++ b/tests/unit/communication/meeting/test_token_tracker.py @@ -4,6 +4,8 @@ from ai_company.communication.meeting._token_tracker import TokenTracker +pytestmark = pytest.mark.timeout(30) + @pytest.mark.unit class TestTokenTracker: diff --git a/tests/unit/communication/test_enums.py b/tests/unit/communication/test_enums.py index c92a993f9b..0fcd9bb856 100644 --- a/tests/unit/communication/test_enums.py +++ b/tests/unit/communication/test_enums.py @@ -28,6 +28,7 @@ def test_values(self) -> None: assert MessageType.DELEGATION.value == "delegation" assert MessageType.STATUS_REPORT.value == "status_report" assert MessageType.ESCALATION.value == "escalation" + assert MessageType.MEETING_CONTRIBUTION.value == "meeting_contribution" def test_string_identity(self) -> None: assert str(MessageType.TASK_UPDATE) == "task_update" diff --git a/tests/unit/engine/test_decomposition_models.py b/tests/unit/engine/test_decomposition_models.py index 864e85b621..4184e406aa 100644 --- a/tests/unit/engine/test_decomposition_models.py +++ b/tests/unit/engine/test_decomposition_models.py @@ -364,7 +364,11 @@ def test_pending_work_defaults_to_in_progress(self) -> None: @pytest.mark.unit def test_completed_plus_cancelled_mix(self) -> None: - """Fully terminal mix of COMPLETED+CANCELLED -> COMPLETED.""" + """Fully terminal mix of COMPLETED+CANCELLED -> CANCELLED. + + When some subtasks were cancelled, the parent cannot be considered + fully completed — CANCELLED signals partial abandonment. + """ rollup = SubtaskStatusRollup( parent_task_id="task-1", total=5, @@ -374,7 +378,7 @@ def test_completed_plus_cancelled_mix(self) -> None: blocked=0, cancelled=2, ) - assert rollup.derived_parent_status == TaskStatus.COMPLETED + assert rollup.derived_parent_status == TaskStatus.CANCELLED # --------------------------------------------------------------------------- From cf27048087800302f09d45d39ec2ca4a5575abc1 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Sun, 8 Mar 2026 13:13:45 +0100 Subject: [PATCH 2/3] fix: address pre-PR review findings for post-merge feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix list-item regex crossing line boundaries (\s* → [^\S\n]*) - Move parent_task_id validation before empty-agents early return - Fix circular exception cause in parallel.py re-raise - Remove unused COMM_UNSUBSCRIBE_SENTINEL_FAILED constant - Use NotBlankStr for error_message field (replaces manual check) - Add logger + logging before raises in parsing/position_papers/structured_phases - Fix import ordering in rollup.py - Remove dead max_tokens_per_argument from DESIGN_SPEC.md examples - Correct M3 status in README.md - Improve docstrings across bus_memory, helpers, hybrid_strategy, orchestrator - Add test_parsing.py (18 tests) + expand tests in 7 existing modules --- DESIGN_SPEC.md | 9 +- README.md | 2 +- src/ai_company/communication/bus_memory.py | 15 +- .../conflict_resolution/_helpers.py | 5 +- .../conflict_resolution/hybrid_strategy.py | 3 +- .../communication/meeting/_parsing.py | 13 +- .../communication/meeting/_token_tracker.py | 9 ++ .../communication/meeting/models.py | 14 +- .../communication/meeting/orchestrator.py | 8 +- .../communication/meeting/position_papers.py | 2 + .../meeting/structured_phases.py | 4 +- src/ai_company/engine/decomposition/rollup.py | 7 +- src/ai_company/engine/parallel.py | 2 +- src/ai_company/engine/routing/service.py | 33 ++--- .../observability/events/communication.py | 5 - .../conflict_resolution/test_helpers.py | 62 ++++++++ .../delegation/test_hierarchy.py | 6 + .../unit/communication/meeting/test_models.py | 67 +++++++++ .../meeting/test_orchestrator.py | 15 ++ .../communication/meeting/test_parsing.py | 132 ++++++++++++++++++ tests/unit/communication/test_bus_memory.py | 26 ++++ .../unit/engine/test_decomposition_models.py | 20 +++ .../unit/engine/test_decomposition_service.py | 17 +++ tests/unit/engine/test_routing_models.py | 37 +++++ tests/unit/engine/test_routing_service.py | 13 ++ 25 files changed, 468 insertions(+), 58 deletions(-) create mode 100644 tests/unit/communication/meeting/test_parsing.py diff --git a/DESIGN_SPEC.md b/DESIGN_SPEC.md index fe36fb40f8..0896a3afcd 100644 --- a/DESIGN_SPEC.md +++ b/DESIGN_SPEC.md @@ -602,13 +602,12 @@ conflict_resolution: #### Strategy 2: Structured Debate + Judge -Both agents present arguments (1 round each, capped at `max_tokens_per_argument`). A judge — their shared manager, the CEO, or a configurable arbitrator agent — evaluates both positions and decides. The judge's reasoning and both arguments are logged as a dissent record. +Both agents present arguments (1 round each). A judge — their shared manager, the CEO, or a configurable arbitrator agent — evaluates both positions and decides. The judge's reasoning and both arguments are logged as a dissent record. ```yaml conflict_resolution: strategy: "debate" debate: - max_tokens_per_argument: 500 judge: "shared_manager" # shared_manager, ceo, designated_agent ``` @@ -631,7 +630,7 @@ conflict_resolution: Combines strategies with an intelligent review layer: -1. Both agents present arguments (1 round, capped tokens) — preserving dissent +1. Both agents present arguments (1 round) — preserving dissent 2. A **conflict review agent** evaluates the result: - If the resolution is **clear** (one position is objectively better, or authority applies cleanly) → resolve automatically, log dissent record - If the resolution is **ambiguous** (genuine trade-offs, no clear winner) → escalate to human queue with both positions + the review agent's analysis @@ -640,7 +639,6 @@ Combines strategies with an intelligent review layer: conflict_resolution: strategy: "hybrid" hybrid: - max_tokens_per_argument: 500 review_agent: "conflict_reviewer" # dedicated agent or role escalate_on_ambiguity: true ``` @@ -652,7 +650,7 @@ conflict_resolution: Meetings (§5.1 Pattern 3) follow configurable protocols that determine how agents interact during structured multi-agent conversations. Different meeting types naturally suit different protocols. All protocols implement a `MeetingProtocol` protocol, making the system extensible — new protocols can be registered and selected per meeting type. Cost bounds are enforced by `duration_tokens` in meeting config (§5.4). -> **Current state (M4 complete):** All 3 meeting protocols are implemented in `communication/meeting/`: `RoundRobinProtocol`, `PositionPapersProtocol`, and `StructuredPhasesProtocol`. The `MeetingOrchestrator` runs meetings end-to-end with token budget enforcement via `TokenTracker`. All protocols implement the `MeetingProtocol` protocol interface. +> **Current state (M4 complete):** All 3 meeting protocols are implemented in `communication/meeting/`: `RoundRobinProtocol`, `PositionPapersProtocol`, and `StructuredPhasesProtocol`. The `MeetingOrchestrator` runs meetings end-to-end with token budget enforcement via `TokenTracker`. Shared LLM response parsing for decisions and action items is in `_parsing.py`. All protocols implement the `MeetingProtocol` protocol interface. #### Protocol 1: Round-Robin Transcript @@ -2446,6 +2444,7 @@ ai-company/ │ │ ├── message.py # Message model │ │ ├── meeting/ # Meeting protocol subsystem │ │ │ ├── __init__.py # Package exports +│ │ │ ├── _parsing.py # Shared helpers for parsing decisions and action items │ │ │ ├── _prompts.py # LLM prompt templates for meeting phases │ │ │ ├── _token_tracker.py # TokenTracker for duration_tokens enforcement │ │ │ ├── config.py # MeetingProtocolConfig, protocol-specific config models diff --git a/README.md b/README.md index c7f280982f..01f651d469 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ AI Company lets you spin up a virtual organization staffed entirely by AI agents ## Status -**M4: Multi-Agent** in progress (M0 Tooling, M1 Config & Core, M2 Providers, M3 Single Agent — all done). See [DESIGN_SPEC.md](DESIGN_SPEC.md) for the full high-level specification. +**M4: Multi-Agent** in progress (M0 Tooling, M1 Config & Core, M2 Providers — all done; M3 Single Agent in progress). See [DESIGN_SPEC.md](DESIGN_SPEC.md) for the full high-level specification. ## Tech Stack diff --git a/src/ai_company/communication/bus_memory.py b/src/ai_company/communication/bus_memory.py index a1129afd0c..846bb898fc 100644 --- a/src/ai_company/communication/bus_memory.py +++ b/src/ai_company/communication/bus_memory.py @@ -404,6 +404,8 @@ async def unsubscribe( if queue is not None: # Put a sentinel for each pending waiter so all # concurrent receive() calls are woken up. + # Safe to use put_nowait: queues are unbounded + # (maxsize=0), so QueueFull cannot be raised. pending = self._waiters.pop(key, 0) sentinels = max(1, pending) for _ in range(sentinels): @@ -433,8 +435,12 @@ async def receive( timeout: Seconds to wait before returning ``None``. Returns: - A delivery envelope, or ``None`` on timeout, shutdown, - or when an in-flight receive is woken by :meth:`unsubscribe`. + The next delivery envelope, or ``None`` when: + + - *timeout* expires without a message arriving. + - The bus is shut down while waiting. + - The subscription is cancelled via :meth:`unsubscribe` + while a ``receive()`` call is in flight. Raises: MessageBusNotRunningError: If the bus is not running. @@ -458,6 +464,11 @@ async def receive( try: result = await self._await_with_shutdown(queue, timeout) finally: + # Decrement outside the lock: in asyncio cooperative + # multitasking, this dict mutation is atomic between await + # points. The asymmetry with the lock-guarded increment + # is intentional — the decrement must happen after + # _await_with_shutdown completes. self._waiters[key] = max(0, self._waiters.get(key, 0) - 1) if result is None: await self._log_receive_null(channel_name, subscriber_id, timeout) diff --git a/src/ai_company/communication/conflict_resolution/_helpers.py b/src/ai_company/communication/conflict_resolution/_helpers.py index 6761c489d2..72bcc24b26 100644 --- a/src/ai_company/communication/conflict_resolution/_helpers.py +++ b/src/ai_company/communication/conflict_resolution/_helpers.py @@ -24,8 +24,9 @@ def find_losers( ) -> tuple[ConflictPosition, ...]: """Find all non-winning positions in a conflict. - For N-party conflicts (3+ agents), returns every position whose - agent was not the winner. + Returns every position whose agent was not the winner. For + 2-party conflicts this is a single position; for N-party + conflicts (3+ agents) this may return multiple. Args: conflict: The original conflict. diff --git a/src/ai_company/communication/conflict_resolution/hybrid_strategy.py b/src/ai_company/communication/conflict_resolution/hybrid_strategy.py index 3019893b41..eefef67e7b 100644 --- a/src/ai_company/communication/conflict_resolution/hybrid_strategy.py +++ b/src/ai_company/communication/conflict_resolution/hybrid_strategy.py @@ -216,7 +216,8 @@ def _authority_fallback( reason: Why authority fallback was triggered. Returns: - Resolution with ``RESOLVED_BY_HYBRID`` outcome. + Resolution with ``RESOLVED_BY_HYBRID`` outcome + (authority used as fallback within hybrid strategy). """ logger.warning( CONFLICT_AUTHORITY_FALLBACK, diff --git a/src/ai_company/communication/meeting/_parsing.py b/src/ai_company/communication/meeting/_parsing.py index 2b711ce437..6de9d5824e 100644 --- a/src/ai_company/communication/meeting/_parsing.py +++ b/src/ai_company/communication/meeting/_parsing.py @@ -1,12 +1,16 @@ """Shared helpers for parsing decisions and action items from LLM text. -Extracts structured ``Decision`` and ``ActionItem`` data from free-form -synthesis/summary responses produced by meeting protocol LLM calls. +Extracts structured decision strings and ``ActionItem`` instances from +free-form synthesis/summary responses produced by meeting protocol LLM +calls. """ import re from ai_company.communication.meeting.models import ActionItem +from ai_company.observability import get_logger + +logger = get_logger(__name__) # Patterns for section headers _DECISIONS_HEADER_RE = re.compile( @@ -24,12 +28,11 @@ # List item patterns (numbered or bulleted) _LIST_ITEM_RE = re.compile( - r"^\s*(?:\d+[\.\)]\s*|-\s*|\*\s*|\u2022\s*)(.+)", + r"^[^\S\n]*(?:\d+[\.\)][^\S\n]*|-[^\S\n]*|\*[^\S\n]*|\u2022[^\S\n]*)(.+)", re.MULTILINE, ) -# Pattern for "assignee: " or "(assigned to )" or -# "- assignee: " at end of line +# Pattern for "assignee: " or "(assigned to )" at end of line _ASSIGNEE_RE = re.compile( r"(?:" r"\(?assigned?\s+to:?\s*(.+?)\)?" diff --git a/src/ai_company/communication/meeting/_token_tracker.py b/src/ai_company/communication/meeting/_token_tracker.py index 13dba01aea..c32942a901 100644 --- a/src/ai_company/communication/meeting/_token_tracker.py +++ b/src/ai_company/communication/meeting/_token_tracker.py @@ -58,6 +58,9 @@ def is_exhausted(self) -> bool: def record(self, input_tokens: int, output_tokens: int) -> None: """Record token usage from an agent call. + Logs a warning when token usage exceeds the budget after + recording. + Args: input_tokens: Prompt tokens consumed (must be >= 0). output_tokens: Response tokens generated (must be >= 0). @@ -71,6 +74,12 @@ def record(self, input_tokens: int, output_tokens: int) -> None: f"input_tokens={input_tokens}, " f"output_tokens={output_tokens}" ) + logger.warning( + MEETING_BUDGET_EXHAUSTED, + error=msg, + input_tokens=input_tokens, + output_tokens=output_tokens, + ) raise ValueError(msg) self.input_tokens += input_tokens self.output_tokens += output_tokens diff --git a/src/ai_company/communication/meeting/models.py b/src/ai_company/communication/meeting/models.py index 7a3825802a..ad057955ad 100644 --- a/src/ai_company/communication/meeting/models.py +++ b/src/ai_company/communication/meeting/models.py @@ -236,7 +236,11 @@ def _validate_participants(self) -> Self: @model_validator(mode="after") def _validate_token_aggregates(self) -> Self: - """Ensure aggregate token counts match sum of contributions.""" + """Ensure aggregate token counts match sum of contributions. + + Skipped when no contributions are present (allows default + zero totals). + """ if not self.contributions: return self sum_input = sum(c.input_tokens for c in self.contributions) @@ -285,7 +289,7 @@ class MeetingRecord(BaseModel): default=None, description="Complete minutes on success", ) - error_message: str | None = Field( + error_message: NotBlankStr | None = Field( default=None, description="Error description on failure", ) @@ -314,12 +318,6 @@ def _validate_status_consistency(self) -> Self: "failed or budget_exhausted" ) raise ValueError(msg) - if not self.error_message.strip(): - msg = ( - "error_message must not be empty when status " - "is failed or budget_exhausted" - ) - raise ValueError(msg) if self.minutes is not None: msg = "minutes must be None when status is failed or budget_exhausted" raise ValueError(msg) diff --git a/src/ai_company/communication/meeting/orchestrator.py b/src/ai_company/communication/meeting/orchestrator.py index 3d7d79beaf..e233611c88 100644 --- a/src/ai_company/communication/meeting/orchestrator.py +++ b/src/ai_company/communication/meeting/orchestrator.py @@ -134,8 +134,8 @@ async def run_meeting( # noqa: PLR0913 Raises: MeetingProtocolNotFoundError: If the configured protocol is not in the registry. - MeetingParticipantError: If participant list is empty or - leader is in participants. + MeetingParticipantError: If participant list is empty, + contains duplicates, or leader is in participants. ValueError: If token_budget is not positive. """ meeting_id = f"mtg-{uuid4().hex[:12]}" @@ -371,8 +371,8 @@ def _validate_inputs( """Validate meeting inputs. Raises: - MeetingParticipantError: If participants are empty or leader - is in participants. + MeetingParticipantError: If participants are empty, contain + duplicates, or leader is in participants. ValueError: If token_budget is not positive. """ if token_budget <= 0: diff --git a/src/ai_company/communication/meeting/position_papers.py b/src/ai_company/communication/meeting/position_papers.py index d471d9bc92..487caccdbe 100644 --- a/src/ai_company/communication/meeting/position_papers.py +++ b/src/ai_company/communication/meeting/position_papers.py @@ -281,9 +281,11 @@ async def _collect_paper( # on any task failure, so reaching this point means all succeeded. if not all(r is not None for r in results): msg = f"Expected {n} position papers but some slots are None" + logger.error(msg, meeting_id=meeting_id) raise RuntimeError(msg) if not all(c is not None for c in contrib_results): msg = f"Expected {n} contributions but some slots are None" + logger.error(msg, meeting_id=meeting_id) raise RuntimeError(msg) papers: list[tuple[str, str]] = list(results) # type: ignore[arg-type] paper_contributions: list[MeetingContribution] = list(contrib_results) # type: ignore[arg-type] diff --git a/src/ai_company/communication/meeting/structured_phases.py b/src/ai_company/communication/meeting/structured_phases.py index 3fc57881d0..c8cb5b2d17 100644 --- a/src/ai_company/communication/meeting/structured_phases.py +++ b/src/ai_company/communication/meeting/structured_phases.py @@ -50,7 +50,7 @@ logger = get_logger(__name__) -# Reserve 20% of budget for later phases (conflict check + synthesis). +# Reserve 20% of remaining budget for the synthesis phase. _SYNTHESIS_RESERVE_FRACTION = 0.20 @@ -400,9 +400,11 @@ async def _collect_input( # on any task failure, so reaching this point means all succeeded. if not all(r is not None for r in result_inputs): msg = f"Expected {num_participants} inputs but some slots are None" + logger.error(msg, meeting_id=meeting_id) raise RuntimeError(msg) if not all(c is not None for c in result_contributions): msg = f"Expected {num_participants} contributions but some slots are None" + logger.error(msg, meeting_id=meeting_id) raise RuntimeError(msg) inputs: list[tuple[str, str]] = list(result_inputs) # type: ignore[arg-type] input_contributions: list[MeetingContribution] = list( diff --git a/src/ai_company/engine/decomposition/rollup.py b/src/ai_company/engine/decomposition/rollup.py index 9daa3b51ff..e22343ead0 100644 --- a/src/ai_company/engine/decomposition/rollup.py +++ b/src/ai_company/engine/decomposition/rollup.py @@ -7,15 +7,14 @@ from ai_company.core.enums import TaskStatus from ai_company.engine.decomposition.models import SubtaskStatusRollup - -if TYPE_CHECKING: - from ai_company.core.types import NotBlankStr - from ai_company.observability import get_logger from ai_company.observability.events.decomposition import ( DECOMPOSITION_ROLLUP_COMPUTED, ) +if TYPE_CHECKING: + from ai_company.core.types import NotBlankStr + logger = get_logger(__name__) diff --git a/src/ai_company/engine/parallel.py b/src/ai_company/engine/parallel.py index 68ba97e55b..56cc2e4481 100644 --- a/src/ai_company/engine/parallel.py +++ b/src/ai_company/engine/parallel.py @@ -183,7 +183,7 @@ async def execute_group( ) from release_error if task_error is not None: - raise task_error from task_error + raise task_error result = self._build_result( group, diff --git a/src/ai_company/engine/routing/service.py b/src/ai_company/engine/routing/service.py index 33c02d3f55..8feb7e6c42 100644 --- a/src/ai_company/engine/routing/service.py +++ b/src/ai_company/engine/routing/service.py @@ -72,6 +72,20 @@ def route( """ plan = decomposition_result.plan + if parent_task.id != plan.parent_task_id: + msg = ( + f"parent_task.id {parent_task.id!r} does not " + f"match plan.parent_task_id " + f"{plan.parent_task_id!r}" + ) + logger.warning( + TASK_ROUTING_FAILED, + parent_task_id=parent_task.id, + plan_parent_task_id=plan.parent_task_id, + error=msg, + ) + raise ValueError(msg) + logger.info( TASK_ROUTING_STARTED, parent_task_id=plan.parent_task_id, @@ -114,27 +128,8 @@ def _do_route( Returns: Routing result with decisions and unroutable subtask IDs. - - Raises: - ValueError: If parent_task.id does not match - plan.parent_task_id. """ plan = decomposition_result.plan - - if parent_task.id != plan.parent_task_id: - msg = ( - f"parent_task.id {parent_task.id!r} does not " - f"match plan.parent_task_id " - f"{plan.parent_task_id!r}" - ) - logger.warning( - TASK_ROUTING_FAILED, - parent_task_id=parent_task.id, - plan_parent_task_id=plan.parent_task_id, - error=msg, - ) - raise ValueError(msg) - topology = self._topology_selector.select(parent_task, plan) decisions: list[RoutingDecision] = [] diff --git a/src/ai_company/observability/events/communication.py b/src/ai_company/observability/events/communication.py index 88da0caf94..5e0b5d3d33 100644 --- a/src/ai_company/observability/events/communication.py +++ b/src/ai_company/observability/events/communication.py @@ -52,11 +52,6 @@ COMM_RECEIVE_SHUTDOWN: Final[str] = "communication.receive.shutdown" COMM_RECEIVE_UNSUBSCRIBED: Final[str] = "communication.receive.unsubscribed" -# Unsubscribe sentinel -COMM_UNSUBSCRIBE_SENTINEL_FAILED: Final[str] = ( - "communication.unsubscribe.sentinel_failed" -) - # Validation COMM_MESSENGER_INVALID_AGENT: Final[str] = "communication.messenger.invalid_agent" COMM_SEND_DIRECT_INVALID: Final[str] = "communication.message.send_direct_invalid" diff --git a/tests/unit/communication/conflict_resolution/test_helpers.py b/tests/unit/communication/conflict_resolution/test_helpers.py index 5aa706a632..079a287934 100644 --- a/tests/unit/communication/conflict_resolution/test_helpers.py +++ b/tests/unit/communication/conflict_resolution/test_helpers.py @@ -8,6 +8,9 @@ find_position_or_raise, pick_highest_seniority, ) +from ai_company.communication.delegation.hierarchy import ( + HierarchyResolver, # noqa: TC001 +) from ai_company.communication.errors import ConflictStrategyError from ai_company.core.enums import SeniorityLevel @@ -126,3 +129,62 @@ def test_equal_seniority_first_wins(self) -> None: ) best = pick_highest_seniority(conflict) assert best.agent_id == "first" + + def test_hierarchy_tiebreak_closer_to_root_wins( + self, + hierarchy: HierarchyResolver, + ) -> None: + """When seniority is equal, the agent closer to root wins.""" + conflict = make_conflict( + positions=( + make_position( + agent_id="jr_dev", + level=SeniorityLevel.SENIOR, + position="Deep agent", + ), + make_position( + agent_id="backend_lead", + level=SeniorityLevel.SENIOR, + position="Shallow agent", + ), + ), + ) + best = pick_highest_seniority(conflict, hierarchy=hierarchy) + # backend_lead is closer to root (fewer ancestors) than jr_dev + assert best.agent_id == "backend_lead" + + def test_hierarchy_tiebreak_equal_depth_keeps_incumbent( + self, + hierarchy: HierarchyResolver, + ) -> None: + """When seniority and depth are equal, incumbent wins.""" + conflict = make_conflict( + positions=( + make_position( + agent_id="sr_dev", + level=SeniorityLevel.SENIOR, + position="First", + ), + make_position( + agent_id="jr_dev", + level=SeniorityLevel.SENIOR, + position="Second", + ), + ), + ) + best = pick_highest_seniority(conflict, hierarchy=hierarchy) + # Same depth under backend_lead — incumbent (sr_dev) kept + assert best.agent_id == "sr_dev" + + +@pytest.mark.unit +class TestFindLosersWinnerValidation: + def test_winner_not_in_positions_raises(self) -> None: + """Raises when winning agent is not among conflict positions.""" + conflict = make_conflict() + resolution = make_resolution(winning_agent_id="nonexistent") + with pytest.raises( + ConflictStrategyError, + match="not found in conflict positions", + ): + find_losers(conflict, resolution) diff --git a/tests/unit/communication/delegation/test_hierarchy.py b/tests/unit/communication/delegation/test_hierarchy.py index f1db5a5c33..d8ae42bc8c 100644 --- a/tests/unit/communication/delegation/test_hierarchy.py +++ b/tests/unit/communication/delegation/test_hierarchy.py @@ -371,6 +371,12 @@ def test_reverse_agent_is_ancestor(self) -> None: == "backend_lead" ) + def test_same_agent_returns_self(self) -> None: + """Same agent as both arguments returns that agent.""" + company = _make_company(departments=(_eng_department(),)) + resolver = HierarchyResolver(company) + assert resolver.get_lowest_common_manager("sr_dev", "sr_dev") == "sr_dev" + def test_cross_department_agents_under_same_root(self) -> None: """If eng/qa share a root, that root is LCM.""" company = _make_company(departments=(_eng_department(), _qa_department())) diff --git a/tests/unit/communication/meeting/test_models.py b/tests/unit/communication/meeting/test_models.py index 9b0601ee36..b3788f70e0 100644 --- a/tests/unit/communication/meeting/test_models.py +++ b/tests/unit/communication/meeting/test_models.py @@ -388,6 +388,18 @@ def test_budget_exhausted_with_minutes_rejected(self) -> None: token_budget=2000, ) + def test_blank_error_message_rejected(self) -> None: + """Whitespace-only error_message is rejected via NotBlankStr.""" + with pytest.raises(ValidationError): + MeetingRecord( + meeting_id="m-1", + meeting_type_name="standup", + protocol_type=MeetingProtocolType.ROUND_ROBIN, + status=MeetingStatus.FAILED, + error_message=" ", + token_budget=2000, + ) + def test_frozen(self) -> None: record = MeetingRecord( meeting_id="m-1", @@ -398,3 +410,58 @@ def test_frozen(self) -> None: ) with pytest.raises(ValidationError): record.status = MeetingStatus.COMPLETED # type: ignore[misc] + + +@pytest.mark.unit +class TestMeetingMinutesTokenAggregates: + """Tests for MeetingMinutes token aggregate validation.""" + + def test_mismatched_input_tokens_rejected(self) -> None: + """total_input_tokens != sum of contributions raises.""" + contrib = MeetingContribution( + agent_id="agent-a", + content="Input", + phase=MeetingPhase.ROUND_ROBIN_TURN, + turn_number=0, + input_tokens=10, + output_tokens=20, + timestamp=_NOW, + ) + with pytest.raises(ValidationError, match="total_input_tokens"): + MeetingMinutes( + meeting_id="m-1", + protocol_type=MeetingProtocolType.ROUND_ROBIN, + leader_id="leader", + participant_ids=("agent-a",), + agenda=MeetingAgenda(title="Test"), + contributions=(contrib,), + total_input_tokens=999, + total_output_tokens=20, + started_at=_NOW, + ended_at=_LATER, + ) + + def test_mismatched_output_tokens_rejected(self) -> None: + """total_output_tokens != sum of contributions raises.""" + contrib = MeetingContribution( + agent_id="agent-a", + content="Input", + phase=MeetingPhase.ROUND_ROBIN_TURN, + turn_number=0, + input_tokens=10, + output_tokens=20, + timestamp=_NOW, + ) + with pytest.raises(ValidationError, match="total_output_tokens"): + MeetingMinutes( + meeting_id="m-1", + protocol_type=MeetingProtocolType.ROUND_ROBIN, + leader_id="leader", + participant_ids=("agent-a",), + agenda=MeetingAgenda(title="Test"), + contributions=(contrib,), + total_input_tokens=10, + total_output_tokens=999, + started_at=_NOW, + ended_at=_LATER, + ) diff --git a/tests/unit/communication/meeting/test_orchestrator.py b/tests/unit/communication/meeting/test_orchestrator.py index a5fc46b0d9..6e03b3effd 100644 --- a/tests/unit/communication/meeting/test_orchestrator.py +++ b/tests/unit/communication/meeting/test_orchestrator.py @@ -124,6 +124,21 @@ async def test_leader_in_participants_raises( token_budget=2000, ) + async def test_duplicate_participants_raises( + self, + simple_agenda: MeetingAgenda, + ) -> None: + orchestrator = _make_orchestrator() + with pytest.raises(MeetingParticipantError, match="Duplicate participant"): + await orchestrator.run_meeting( + meeting_type_name="standup", + protocol_config=MeetingProtocolConfig(), + agenda=simple_agenda, + leader_id="leader", + participant_ids=("agent-a", "agent-b", "agent-a"), + token_budget=2000, + ) + async def test_unregistered_protocol_raises( self, simple_agenda: MeetingAgenda, diff --git a/tests/unit/communication/meeting/test_parsing.py b/tests/unit/communication/meeting/test_parsing.py new file mode 100644 index 0000000000..c1797a2249 --- /dev/null +++ b/tests/unit/communication/meeting/test_parsing.py @@ -0,0 +1,132 @@ +"""Tests for meeting LLM response parsing helpers.""" + +import pytest + +from ai_company.communication.meeting._parsing import ( + parse_action_items, + parse_decisions, +) + +pytestmark = pytest.mark.timeout(30) + + +@pytest.mark.unit +class TestParseDecisions: + def test_numbered_list(self) -> None: + text = ( + "# Decisions\n1. Use async for all I/O\n2. Adopt event sourcing pattern\n" + ) + result = parse_decisions(text) + assert result == ( + "Use async for all I/O", + "Adopt event sourcing pattern", + ) + + def test_bulleted_list(self) -> None: + text = "## Decisions\n- Refactor the module\n- Add integration tests\n" + result = parse_decisions(text) + assert result == ( + "Refactor the module", + "Add integration tests", + ) + + def test_colon_header(self) -> None: + text = "Decisions:\n- First decision\n- Second decision\n" + result = parse_decisions(text) + assert result == ("First decision", "Second decision") + + def test_no_decisions_section(self) -> None: + text = "This is a summary with no decisions section." + result = parse_decisions(text) + assert result == () + + def test_empty_string(self) -> None: + result = parse_decisions("") + assert result == () + + def test_section_stops_at_next_header(self) -> None: + text = "# Decisions\n1. Do X\n# Action Items\n1. Assign Y\n" + result = parse_decisions(text) + assert result == ("Do X",) + + def test_asterisk_bullets(self) -> None: + text = "# Decisions\n* Star item\n" + result = parse_decisions(text) + assert result == ("Star item",) + + def test_whitespace_only_items_skipped(self) -> None: + text = "# Decisions\n1. \n2. Real decision\n" + result = parse_decisions(text) + assert result == ("Real decision",) + + def test_case_insensitive_header(self) -> None: + text = "# DECISIONS\n- Case test\n" + result = parse_decisions(text) + assert result == ("Case test",) + + +@pytest.mark.unit +class TestParseActionItems: + def test_simple_action_items(self) -> None: + text = "# Action Items\n1. Write unit tests\n2. Update documentation\n" + result = parse_action_items(text) + assert len(result) == 2 + assert result[0].description == "Write unit tests" + assert result[0].assignee_id is None + assert result[1].description == "Update documentation" + + def test_with_assigned_to_syntax(self) -> None: + text = "# Action Items\n- Implement feature (assigned to alice)\n" + result = parse_action_items(text) + assert len(result) == 1 + assert result[0].description == "Implement feature" + assert result[0].assignee_id == "alice" + + def test_with_assignee_colon_syntax(self) -> None: + text = "# Action Items\n- Review PR assignee: bob\n" + result = parse_action_items(text) + assert len(result) == 1 + assert result[0].description == "Review PR" + assert result[0].assignee_id == "bob" + + def test_no_action_items_section(self) -> None: + text = "Summary without action items." + result = parse_action_items(text) + assert result == () + + def test_empty_string(self) -> None: + result = parse_action_items("") + assert result == () + + def test_empty_items_skipped(self) -> None: + text = "# Action Items\n1. \n2. Real item\n" + result = parse_action_items(text) + assert len(result) == 1 + assert result[0].description == "Real item" + + def test_section_stops_at_next_header(self) -> None: + text = "# Action Items\n- Do this\n# Notes\n- Not an action item\n" + result = parse_action_items(text) + assert len(result) == 1 + assert result[0].description == "Do this" + + def test_case_insensitive_header(self) -> None: + text = "ACTION ITEMS:\n- Task one\n" + result = parse_action_items(text) + assert len(result) == 1 + + +@pytest.mark.unit +class TestParseDecisionsAndActions: + def test_both_sections_in_one_text(self) -> None: + text = ( + "# Decisions\n" + "1. Approve the design\n" + "# Action Items\n" + "1. Implement prototype (assigned to charlie)\n" + ) + decisions = parse_decisions(text) + actions = parse_action_items(text) + assert decisions == ("Approve the design",) + assert len(actions) == 1 + assert actions[0].assignee_id == "charlie" diff --git a/tests/unit/communication/test_bus_memory.py b/tests/unit/communication/test_bus_memory.py index df0b0b110c..71c23ee53a 100644 --- a/tests/unit/communication/test_bus_memory.py +++ b/tests/unit/communication/test_bus_memory.py @@ -288,6 +288,32 @@ async def unsubscriber() -> None: assert received == [None] + @pytest.mark.unit + async def test_unsubscribe_wakes_multiple_blocked_receivers(self) -> None: + """Unsubscribing wakes all concurrent blocked receive() calls.""" + bus = InMemoryMessageBus(config=_make_config()) + await bus.start() + await bus.subscribe("#general", "agent-a") + + received: list[object] = [] + + async def receiver() -> None: + result = await bus.receive("#general", "agent-a") + received.append(result) + + async def unsubscriber() -> None: + await asyncio.sleep(0.05) + await bus.unsubscribe("#general", "agent-a") + + async with asyncio.TaskGroup() as tg: + tg.create_task(receiver()) + tg.create_task(receiver()) + tg.create_task(receiver()) + tg.create_task(unsubscriber()) + + assert len(received) == 3 + assert all(r is None for r in received) + # ── Publish & Receive ──────────────────────────────────────────── diff --git a/tests/unit/engine/test_decomposition_models.py b/tests/unit/engine/test_decomposition_models.py index 4184e406aa..b1c6ce808f 100644 --- a/tests/unit/engine/test_decomposition_models.py +++ b/tests/unit/engine/test_decomposition_models.py @@ -227,6 +227,26 @@ def test_task_count_mismatch_rejected(self) -> None: dependency_edges=(), ) + @pytest.mark.unit + def test_task_id_mismatch_rejected(self) -> None: + """Result rejects matching count but different IDs.""" + plan = DecompositionPlan( + parent_task_id="task-1", + subtasks=( + SubtaskDefinition(id="sub-1", title="A", description="A desc"), + SubtaskDefinition(id="sub-2", title="B", description="B desc"), + ), + ) + with pytest.raises(ValueError, match="do not match plan subtask IDs"): + DecompositionResult( + plan=plan, + created_tasks=( + _make_result_task("sub-1"), + _make_result_task("sub-99"), + ), + dependency_edges=(), + ) + @pytest.mark.unit def test_unknown_edge_ids_rejected(self) -> None: """Result rejects edges referencing unknown task IDs.""" diff --git a/tests/unit/engine/test_decomposition_service.py b/tests/unit/engine/test_decomposition_service.py index 5c114fe9f3..f52abde4fe 100644 --- a/tests/unit/engine/test_decomposition_service.py +++ b/tests/unit/engine/test_decomposition_service.py @@ -270,6 +270,23 @@ def get_strategy_name(self) -> str: with pytest.raises(RuntimeError, match="strategy boom"): await service.decompose_task(task, ctx) + @pytest.mark.unit + async def test_decompose_propagates_dependencies(self) -> None: + """Subtask dependencies propagate to created Task objects.""" + task = _make_task() + plan = _make_plan() + strategy = ManualDecompositionStrategy(plan) + classifier = TaskStructureClassifier() + service = DecompositionService(strategy, classifier) + ctx = DecompositionContext() + + result = await service.decompose_task(task, ctx) + + tasks_by_id = {t.id: t for t in result.created_tasks} + assert tasks_by_id["sub-1"].dependencies == () + assert tasks_by_id["sub-2"].dependencies == ("sub-1",) + assert tasks_by_id["sub-3"].dependencies == ("sub-2",) + @pytest.mark.unit def test_rollup_status_delegates(self) -> None: """rollup_status delegates to StatusRollup.compute.""" diff --git a/tests/unit/engine/test_routing_models.py b/tests/unit/engine/test_routing_models.py index e25fcce094..3a12c9d4bc 100644 --- a/tests/unit/engine/test_routing_models.py +++ b/tests/unit/engine/test_routing_models.py @@ -135,6 +135,43 @@ def test_overlap_rejected( unroutable=("sub-1",), ) + @pytest.mark.unit + def test_duplicate_decision_ids_rejected( + self, sample_agent_with_personality: AgentIdentity + ) -> None: + """Duplicate subtask IDs within decisions are rejected.""" + candidate = RoutingCandidate( + agent_identity=sample_agent_with_personality, + score=0.5, + reason="Match", + ) + decision_a = RoutingDecision( + subtask_id="sub-1", + selected_candidate=candidate, + topology=CoordinationTopology.SAS, + ) + decision_b = RoutingDecision( + subtask_id="sub-1", + selected_candidate=candidate, + topology=CoordinationTopology.SAS, + ) + with pytest.raises(ValueError, match="Duplicate subtask IDs in decisions"): + RoutingResult( + parent_task_id="task-1", + decisions=(decision_a, decision_b), + unroutable=(), + ) + + @pytest.mark.unit + def test_duplicate_unroutable_ids_rejected(self) -> None: + """Duplicate subtask IDs within unroutable are rejected.""" + with pytest.raises(ValueError, match="Duplicate subtask IDs in unroutable"): + RoutingResult( + parent_task_id="task-1", + decisions=(), + unroutable=("sub-1", "sub-1"), + ) + class TestAutoTopologyConfig: """Tests for AutoTopologyConfig model.""" diff --git a/tests/unit/engine/test_routing_service.py b/tests/unit/engine/test_routing_service.py index eb15a5a8b9..a6d0ebddb9 100644 --- a/tests/unit/engine/test_routing_service.py +++ b/tests/unit/engine/test_routing_service.py @@ -293,6 +293,19 @@ def test_parent_task_id_in_result(self) -> None: result = service.route(decomp, (), task) assert result.parent_task_id == "task-route-1" + @pytest.mark.unit + def test_parent_task_id_mismatch_raises(self) -> None: + """ValueError when parent_task.id != plan.parent_task_id.""" + scorer = AgentTaskScorer() + selector = TopologySelector() + service = TaskRoutingService(scorer, selector) + + task = _make_task("task-wrong-id") + decomp = _make_decomposition_result("task-route-1") + + with pytest.raises(ValueError, match="does not match"): + service.route(decomp, (), task) + @pytest.mark.unit def test_exception_propagates(self) -> None: """Exceptions from _do_route are logged and re-raised.""" From 1317beae37c953681057b7bb4323c35f82eeb7fe Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Sun, 8 Mar 2026 13:46:20 +0100 Subject: [PATCH 3/3] fix: address pre-PR review findings for post-merge feedback - Fix regex patterns in meeting _parsing.py to prevent false header matches on list items and capture multi-line list items - Change MEETING_BUDGET_EXHAUSTED to MEETING_VALIDATION_FAILED for negative token validation in _token_tracker.py - Add assignee validation against meeting participants in all 3 protocols (prompt injection defense) - Fix _waiters decrement in bus_memory.py to avoid orphan entries after unsubscribe - Add empty-contributions-must-have-zero-totals validation in MeetingMinutes - Use MEETING_INTERNAL_ERROR event constant instead of dynamic f-string event names - Upgrade except* log level from debug to warning in parallel.py - Add _known_agents existence check in hierarchy get_lowest_common_manager - Import _MIN_POSITIONS from models instead of redefining in conflict resolution service - Update DESIGN_SPEC.md and README.md for accuracy - Add/update tests for all changes --- DESIGN_SPEC.md | 2 +- README.md | 2 +- src/ai_company/communication/bus_memory.py | 18 ++++++++---- .../conflict_resolution/service.py | 3 +- .../communication/delegation/hierarchy.py | 9 +++++- .../communication/meeting/_parsing.py | 21 ++++++++++---- .../communication/meeting/_token_tracker.py | 3 +- .../communication/meeting/models.py | 6 ++++ .../communication/meeting/orchestrator.py | 3 +- .../communication/meeting/position_papers.py | 29 +++++++++++++++---- .../communication/meeting/round_robin.py | 17 ++++++++--- .../meeting/structured_phases.py | 29 +++++++++++++++---- src/ai_company/engine/decomposition/models.py | 4 +++ src/ai_company/engine/parallel.py | 2 +- .../observability/events/meeting.py | 3 ++ .../delegation/test_hierarchy.py | 6 ++++ .../unit/communication/meeting/test_models.py | 29 +++++++++++++++++++ .../communication/meeting/test_prompts.py | 20 +++++++++++++ .../unit/engine/test_decomposition_models.py | 5 +++- .../unit/engine/test_decomposition_rollup.py | 7 ++++- 20 files changed, 182 insertions(+), 36 deletions(-) diff --git a/DESIGN_SPEC.md b/DESIGN_SPEC.md index 0896a3afcd..b3a6dd5d8c 100644 --- a/DESIGN_SPEC.md +++ b/DESIGN_SPEC.md @@ -2412,7 +2412,7 @@ ai-company/ │ │ ├── config.py # Communication config │ │ ├── conflict_resolution/ # Conflict resolution subsystem (§5.6) │ │ │ ├── __init__.py # Package exports -│ │ │ ├── _helpers.py # Shared utility (find_losers, build_dissent_records) +│ │ │ ├── _helpers.py # Shared utility (find_losers, pick_highest_seniority) │ │ │ ├── authority_strategy.py # AuthorityResolver (Strategy 1) │ │ │ ├── config.py # ConflictResolutionConfig, DebateConfig, HybridConfig │ │ │ ├── debate_strategy.py # DebateResolver (Strategy 2) diff --git a/README.md b/README.md index 01f651d469..5e22bfe5bf 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ AI Company lets you spin up a virtual organization staffed entirely by AI agents ## Status -**M4: Multi-Agent** in progress (M0 Tooling, M1 Config & Core, M2 Providers — all done; M3 Single Agent in progress). See [DESIGN_SPEC.md](DESIGN_SPEC.md) for the full high-level specification. +**M3: Single Agent** and **M4: Multi-Agent** in progress (M0 Tooling, M1 Config & Core, M2 Providers — all done). See [DESIGN_SPEC.md](DESIGN_SPEC.md) for the full high-level specification. ## Tech Stack diff --git a/src/ai_company/communication/bus_memory.py b/src/ai_company/communication/bus_memory.py index 846bb898fc..c2a3f593de 100644 --- a/src/ai_company/communication/bus_memory.py +++ b/src/ai_company/communication/bus_memory.py @@ -464,12 +464,20 @@ async def receive( try: result = await self._await_with_shutdown(queue, timeout) finally: - # Decrement outside the lock: in asyncio cooperative - # multitasking, this dict mutation is atomic between await - # points. The asymmetry with the lock-guarded increment - # is intentional — the decrement must happen after + # Decrement outside the lock: no ``await`` separates the + # read and write of ``_waiters``, so no other coroutine + # can interleave in a single-threaded asyncio event loop. + # The asymmetry with the lock-guarded increment is + # intentional — the decrement must happen after # _await_with_shutdown completes. - self._waiters[key] = max(0, self._waiters.get(key, 0) - 1) + current = self._waiters.get(key) + if current is None: + # Key was removed (e.g. by unsubscribe); don't recreate. + pass + elif current <= 1: + self._waiters.pop(key, None) + else: + self._waiters[key] = current - 1 if result is None: await self._log_receive_null(channel_name, subscriber_id, timeout) return result diff --git a/src/ai_company/communication/conflict_resolution/service.py b/src/ai_company/communication/conflict_resolution/service.py index 67be30b118..f35bdf39d9 100644 --- a/src/ai_company/communication/conflict_resolution/service.py +++ b/src/ai_company/communication/conflict_resolution/service.py @@ -15,6 +15,7 @@ ConflictResolutionConfig, ) from ai_company.communication.conflict_resolution.models import ( + _MIN_POSITIONS, Conflict, ConflictPosition, ConflictResolution, @@ -43,8 +44,6 @@ logger = get_logger(__name__) -_MIN_POSITIONS = 2 - class ConflictResolutionService: """Orchestrates conflict detection, resolution, and audit. diff --git a/src/ai_company/communication/delegation/hierarchy.py b/src/ai_company/communication/delegation/hierarchy.py index b609389722..75fdd1a306 100644 --- a/src/ai_company/communication/delegation/hierarchy.py +++ b/src/ai_company/communication/delegation/hierarchy.py @@ -32,7 +32,7 @@ class HierarchyResolver: HierarchyResolutionError: If a cycle is detected. """ - __slots__ = ("_reports_of", "_supervisor_of") + __slots__ = ("_known_agents", "_reports_of", "_supervisor_of") def __init__(self, company: Company) -> None: supervisor_of: dict[str, str] = {} @@ -79,6 +79,11 @@ def __init__(self, company: Company) -> None: self._reports_of: MappingProxyType[str, tuple[str, ...]] = MappingProxyType( {k: tuple(v) for k, v in reports_of.items()} ) + self._known_agents: frozenset[str] = frozenset( + set(supervisor_of.keys()) + | set(supervisor_of.values()) + | set(reports_of.keys()) + ) logger.debug( DELEGATION_HIERARCHY_BUILT, @@ -229,6 +234,8 @@ def get_lowest_common_manager( common manager exists. """ if agent_a == agent_b: + if agent_a not in self._known_agents: + return None return agent_a ancestors_a = self.get_ancestors(agent_a) ancestors_b_set = set(self.get_ancestors(agent_b)) diff --git a/src/ai_company/communication/meeting/_parsing.py b/src/ai_company/communication/meeting/_parsing.py index 6de9d5824e..d70c6c11bc 100644 --- a/src/ai_company/communication/meeting/_parsing.py +++ b/src/ai_company/communication/meeting/_parsing.py @@ -22,13 +22,14 @@ re.IGNORECASE | re.MULTILINE, ) _ANY_HEADER_RE = re.compile( - r"^#+\s+\S|^\S.*:\s*$", + r"^#+\s+\S|^(?!\s*(?:\d+[\.\)]\s|-\s|\*\s|\u2022\s))\S.*:\s*$", re.MULTILINE, ) -# List item patterns (numbered or bulleted) +# List item patterns (numbered or bulleted), capturing continuation lines _LIST_ITEM_RE = re.compile( - r"^[^\S\n]*(?:\d+[\.\)][^\S\n]*|-[^\S\n]*|\*[^\S\n]*|\u2022[^\S\n]*)(.+)", + r"^[^\S\n]*(?:\d+[\.\)][^\S\n]*|-[^\S\n]*|\*[^\S\n]*|\u2022[^\S\n]*)" + r"(.+(?:\n(?![^\S\n]*(?:\d+[\.\)]\s|-\s|\*\s|\u2022\s)|#+\s|\S.*:\s*$).+)*)", re.MULTILINE, ) @@ -82,11 +83,16 @@ def parse_decisions(summary_text: str) -> tuple[str, ...]: """ section = _extract_section(summary_text, _DECISIONS_HEADER_RE) if not section: + logger.debug( + "meeting.parsing.no_section", + section="decisions", + ) return () decisions: list[str] = [] for match in _LIST_ITEM_RE.finditer(section): - text = match.group(1).strip() + # Join continuation lines into a single string + text = " ".join(match.group(1).split()) if text: decisions.append(text) @@ -135,11 +141,16 @@ def parse_action_items( """ section = _extract_section(summary_text, _ACTION_ITEMS_HEADER_RE) if not section: + logger.debug( + "meeting.parsing.no_section", + section="action_items", + ) return () items: list[ActionItem] = [] for match in _LIST_ITEM_RE.finditer(section): - raw_text = match.group(1).strip() + # Join continuation lines into a single string + raw_text = " ".join(match.group(1).split()) if not raw_text: continue diff --git a/src/ai_company/communication/meeting/_token_tracker.py b/src/ai_company/communication/meeting/_token_tracker.py index c32942a901..13e312f87e 100644 --- a/src/ai_company/communication/meeting/_token_tracker.py +++ b/src/ai_company/communication/meeting/_token_tracker.py @@ -15,6 +15,7 @@ from ai_company.observability import get_logger from ai_company.observability.events.meeting import ( MEETING_BUDGET_EXHAUSTED, + MEETING_VALIDATION_FAILED, ) logger = get_logger(__name__) @@ -75,7 +76,7 @@ def record(self, input_tokens: int, output_tokens: int) -> None: f"output_tokens={output_tokens}" ) logger.warning( - MEETING_BUDGET_EXHAUSTED, + MEETING_VALIDATION_FAILED, error=msg, input_tokens=input_tokens, output_tokens=output_tokens, diff --git a/src/ai_company/communication/meeting/models.py b/src/ai_company/communication/meeting/models.py index ad057955ad..79bc28d86a 100644 --- a/src/ai_company/communication/meeting/models.py +++ b/src/ai_company/communication/meeting/models.py @@ -242,6 +242,12 @@ def _validate_token_aggregates(self) -> Self: zero totals). """ if not self.contributions: + if self.total_input_tokens != 0 or self.total_output_tokens != 0: + msg = ( + "total_input_tokens and total_output_tokens must " + "be 0 when contributions are empty" + ) + raise ValueError(msg) return self sum_input = sum(c.input_tokens for c in self.contributions) sum_output = sum(c.output_tokens for c in self.contributions) diff --git a/src/ai_company/communication/meeting/orchestrator.py b/src/ai_company/communication/meeting/orchestrator.py index e233611c88..91ddc82866 100644 --- a/src/ai_company/communication/meeting/orchestrator.py +++ b/src/ai_company/communication/meeting/orchestrator.py @@ -5,7 +5,6 @@ from action items, and records audit trail entries. """ -import copy from collections import Counter from collections.abc import Mapping # noqa: TC003 from types import MappingProxyType @@ -96,7 +95,7 @@ def __init__( ) -> None: self._protocol_registry: MappingProxyType[ MeetingProtocolType, MeetingProtocol - ] = MappingProxyType(copy.deepcopy(dict(protocol_registry))) + ] = MappingProxyType(dict(protocol_registry)) self._agent_caller = agent_caller self._task_creator = task_creator self._records: list[MeetingRecord] = [] diff --git a/src/ai_company/communication/meeting/position_papers.py b/src/ai_company/communication/meeting/position_papers.py index 487caccdbe..43384d9560 100644 --- a/src/ai_company/communication/meeting/position_papers.py +++ b/src/ai_company/communication/meeting/position_papers.py @@ -34,6 +34,7 @@ MEETING_AGENT_CALLED, MEETING_AGENT_RESPONDED, MEETING_CONTRIBUTION_RECORDED, + MEETING_INTERNAL_ERROR, MEETING_PHASE_COMPLETED, MEETING_PHASE_STARTED, MEETING_SUMMARY_GENERATED, @@ -69,8 +70,12 @@ def _build_synthesis_prompt( parts.append("") parts.append( "Please synthesize these position papers. Identify areas of " - "agreement, conflicts, and produce a list of decisions and " - "action items with assignees." + "agreement and conflicts, then produce your output using " + "exactly these section headers:\n\n" + "Decisions:\n" + "1. \n\n" + "Action Items:\n" + "- (assigned to )" ) return "\n".join(parts) @@ -152,7 +157,13 @@ async def run( # noqa: PLR0913 synthesis_text = synthesis_contribution.content decisions = parse_decisions(synthesis_text) - action_items = parse_action_items(synthesis_text) + raw_action_items = parse_action_items(synthesis_text) + allowed_assignees = set(participant_ids) | {leader_id} + action_items = tuple( + item + for item in raw_action_items + if item.assignee_id is None or item.assignee_id in allowed_assignees + ) logger.debug( MEETING_TOKENS_RECORDED, @@ -281,11 +292,19 @@ async def _collect_paper( # on any task failure, so reaching this point means all succeeded. if not all(r is not None for r in results): msg = f"Expected {n} position papers but some slots are None" - logger.error(msg, meeting_id=meeting_id) + logger.error( + MEETING_INTERNAL_ERROR, + error=msg, + meeting_id=meeting_id, + ) raise RuntimeError(msg) if not all(c is not None for c in contrib_results): msg = f"Expected {n} contributions but some slots are None" - logger.error(msg, meeting_id=meeting_id) + logger.error( + MEETING_INTERNAL_ERROR, + error=msg, + meeting_id=meeting_id, + ) raise RuntimeError(msg) papers: list[tuple[str, str]] = list(results) # type: ignore[arg-type] paper_contributions: list[MeetingContribution] = list(contrib_results) # type: ignore[arg-type] diff --git a/src/ai_company/communication/meeting/round_robin.py b/src/ai_company/communication/meeting/round_robin.py index dce8ed7b24..0cb8377aba 100644 --- a/src/ai_company/communication/meeting/round_robin.py +++ b/src/ai_company/communication/meeting/round_robin.py @@ -74,9 +74,12 @@ def _build_summary_prompt( parts.extend(transcript) parts.append("") parts.append( - "Please summarize this meeting. List the key decisions made " - "and any action items with assignees. Format decisions as a " - "numbered list and action items as a bulleted list." + "Please summarize this meeting using exactly these section " + "headers:\n\n" + "Decisions:\n" + "1. \n\n" + "Action Items:\n" + "- (assigned to )" ) return "\n".join(parts) @@ -163,7 +166,13 @@ async def run( # noqa: PLR0913 ) all_contributions = (*contributions, summary_contribution) decisions = parse_decisions(summary) - action_items = parse_action_items(summary) + raw_action_items = parse_action_items(summary) + allowed_assignees = set(participant_ids) | {leader_id} + action_items = tuple( + item + for item in raw_action_items + if item.assignee_id is None or item.assignee_id in allowed_assignees + ) elif self._config.leader_summarizes and tracker.is_exhausted: logger.warning( MEETING_SUMMARY_SKIPPED, diff --git a/src/ai_company/communication/meeting/structured_phases.py b/src/ai_company/communication/meeting/structured_phases.py index c8cb5b2d17..eb5f24f03f 100644 --- a/src/ai_company/communication/meeting/structured_phases.py +++ b/src/ai_company/communication/meeting/structured_phases.py @@ -41,6 +41,7 @@ MEETING_BUDGET_EXHAUSTED, MEETING_CONFLICT_DETECTED, MEETING_CONTRIBUTION_RECORDED, + MEETING_INTERNAL_ERROR, MEETING_PHASE_COMPLETED, MEETING_PHASE_STARTED, MEETING_SUMMARY_GENERATED, @@ -133,9 +134,11 @@ def _build_synthesis_prompt( parts.append("") parts.append( "As the meeting leader, synthesize all inputs and discussion " - "into final decisions and action items. List decisions as a " - "numbered list and action items as a bulleted list with " - "assignees." + "into your output using exactly these section headers:\n\n" + "Decisions:\n" + "1. \n\n" + "Action Items:\n" + "- (assigned to )" ) return "\n".join(parts) @@ -277,7 +280,13 @@ async def run( # noqa: PLR0913 ) decisions = parse_decisions(summary) - action_items = parse_action_items(summary) + raw_action_items = parse_action_items(summary) + allowed_assignees = set(participant_ids) | {leader_id} + action_items = tuple( + item + for item in raw_action_items + if item.assignee_id is None or item.assignee_id in allowed_assignees + ) logger.debug( MEETING_TOKENS_RECORDED, @@ -400,11 +409,19 @@ async def _collect_input( # on any task failure, so reaching this point means all succeeded. if not all(r is not None for r in result_inputs): msg = f"Expected {num_participants} inputs but some slots are None" - logger.error(msg, meeting_id=meeting_id) + logger.error( + MEETING_INTERNAL_ERROR, + error=msg, + meeting_id=meeting_id, + ) raise RuntimeError(msg) if not all(c is not None for c in result_contributions): msg = f"Expected {num_participants} contributions but some slots are None" - logger.error(msg, meeting_id=meeting_id) + logger.error( + MEETING_INTERNAL_ERROR, + error=msg, + meeting_id=meeting_id, + ) raise RuntimeError(msg) inputs: list[tuple[str, str]] = list(result_inputs) # type: ignore[arg-type] input_contributions: list[MeetingContribution] = list( diff --git a/src/ai_company/engine/decomposition/models.py b/src/ai_company/engine/decomposition/models.py index 38f42877a3..dafbbf8673 100644 --- a/src/ai_company/engine/decomposition/models.py +++ b/src/ai_company/engine/decomposition/models.py @@ -186,6 +186,10 @@ class SubtaskStatusRollup(BaseModel): these. The ``derived_parent_status`` treats any such remainder as work still pending (IN_PROGRESS). + When all subtasks are in terminal states but with a mix of + completed and cancelled, ``derived_parent_status`` returns + ``CANCELLED`` (some work was abandoned). + Attributes: parent_task_id: ID of the parent task. total: Total number of subtasks. diff --git a/src/ai_company/engine/parallel.py b/src/ai_company/engine/parallel.py index 56cc2e4481..1603dd6723 100644 --- a/src/ai_company/engine/parallel.py +++ b/src/ai_company/engine/parallel.py @@ -244,7 +244,7 @@ async def _run_task_group( # TaskGroup wraps exceptions in ExceptionGroup when # _run_guarded re-raises (fail_fast enabled). # Individual errors already logged in _record_error_outcome. - logger.debug( + logger.warning( PARALLEL_GROUP_SUPPRESSED, error=f"ExceptionGroup suppressed: {eg!r}", group_id=group.group_id, diff --git a/src/ai_company/observability/events/meeting.py b/src/ai_company/observability/events/meeting.py index 34fe5d2ca7..9cb0a33fff 100644 --- a/src/ai_company/observability/events/meeting.py +++ b/src/ai_company/observability/events/meeting.py @@ -38,3 +38,6 @@ # Token tracking MEETING_TOKENS_RECORDED: Final[str] = "meeting.tokens.recorded" + +# Internal invariant violations +MEETING_INTERNAL_ERROR: Final[str] = "meeting.internal.error" diff --git a/tests/unit/communication/delegation/test_hierarchy.py b/tests/unit/communication/delegation/test_hierarchy.py index d8ae42bc8c..c7f829ceda 100644 --- a/tests/unit/communication/delegation/test_hierarchy.py +++ b/tests/unit/communication/delegation/test_hierarchy.py @@ -377,6 +377,12 @@ def test_same_agent_returns_self(self) -> None: resolver = HierarchyResolver(company) assert resolver.get_lowest_common_manager("sr_dev", "sr_dev") == "sr_dev" + def test_same_unknown_agent_returns_none(self) -> None: + """Unknown agent passed as both arguments returns None.""" + company = _make_company(departments=(_eng_department(),)) + resolver = HierarchyResolver(company) + assert resolver.get_lowest_common_manager("unknown", "unknown") is None + def test_cross_department_agents_under_same_root(self) -> None: """If eng/qa share a root, that root is LCM.""" company = _make_company(departments=(_eng_department(), _qa_department())) diff --git a/tests/unit/communication/meeting/test_models.py b/tests/unit/communication/meeting/test_models.py index b3788f70e0..53b3184b6f 100644 --- a/tests/unit/communication/meeting/test_models.py +++ b/tests/unit/communication/meeting/test_models.py @@ -189,7 +189,17 @@ def test_basic_creation(self) -> None: assert minutes.conflicts_detected is False def test_total_tokens_computed(self) -> None: + contrib = MeetingContribution( + agent_id="agent-a", + content="Input", + phase=MeetingPhase.ROUND_ROBIN_TURN, + turn_number=0, + input_tokens=100, + output_tokens=50, + timestamp=_NOW, + ) minutes = self._make_minutes( + contributions=(contrib,), total_input_tokens=100, total_output_tokens=50, ) @@ -465,3 +475,22 @@ def test_mismatched_output_tokens_rejected(self) -> None: started_at=_NOW, ended_at=_LATER, ) + + def test_empty_contributions_non_zero_totals_rejected(self) -> None: + """Empty contributions with non-zero totals raises.""" + with pytest.raises( + ValidationError, + match="must be 0 when contributions are empty", + ): + MeetingMinutes( + meeting_id="m-1", + protocol_type=MeetingProtocolType.ROUND_ROBIN, + leader_id="leader", + participant_ids=("agent-a",), + agenda=MeetingAgenda(title="Test"), + contributions=(), + total_input_tokens=100, + total_output_tokens=0, + started_at=_NOW, + ended_at=_LATER, + ) diff --git a/tests/unit/communication/meeting/test_prompts.py b/tests/unit/communication/meeting/test_prompts.py index cb32283179..f9cbe4af56 100644 --- a/tests/unit/communication/meeting/test_prompts.py +++ b/tests/unit/communication/meeting/test_prompts.py @@ -72,3 +72,23 @@ def test_items_with_descriptions_use_em_dash(self) -> None: agenda = MeetingAgenda(title="Design", items=items) result = build_agenda_prompt(agenda) assert "1. Auth — OAuth flow" in result + + def test_items_with_presenter_id(self) -> None: + """Presenter ID is included in the formatted prompt.""" + items = ( + MeetingAgendaItem( + title="API Design", + description="REST endpoints", + presenter_id="lead-dev", + ), + ) + agenda = MeetingAgenda(title="Review", items=items) + result = build_agenda_prompt(agenda) + assert "(presenter: lead-dev)" in result + + def test_items_without_presenter_id(self) -> None: + """No presenter tag when presenter_id is None.""" + items = (MeetingAgendaItem(title="Topic"),) + agenda = MeetingAgenda(title="Sync", items=items) + result = build_agenda_prompt(agenda) + assert "presenter:" not in result diff --git a/tests/unit/engine/test_decomposition_models.py b/tests/unit/engine/test_decomposition_models.py index b1c6ce808f..3caeac4860 100644 --- a/tests/unit/engine/test_decomposition_models.py +++ b/tests/unit/engine/test_decomposition_models.py @@ -237,7 +237,10 @@ def test_task_id_mismatch_rejected(self) -> None: SubtaskDefinition(id="sub-2", title="B", description="B desc"), ), ) - with pytest.raises(ValueError, match="do not match plan subtask IDs"): + with pytest.raises( + ValueError, + match=r"missing=\['sub-2'\].*extra=\['sub-99'\]", + ): DecompositionResult( plan=plan, created_tasks=( diff --git a/tests/unit/engine/test_decomposition_rollup.py b/tests/unit/engine/test_decomposition_rollup.py index 4e471b1bbc..346e5b9bbb 100644 --- a/tests/unit/engine/test_decomposition_rollup.py +++ b/tests/unit/engine/test_decomposition_rollup.py @@ -58,10 +58,15 @@ def test_blocked_no_in_progress(self) -> None: @pytest.mark.unit def test_empty_statuses(self) -> None: - """Empty statuses -> derived CREATED.""" + """Empty statuses -> derived CREATED with all counts zero.""" rollup = StatusRollup.compute("task-1", ()) assert rollup.derived_parent_status == TaskStatus.CREATED assert rollup.total == 0 + assert rollup.completed == 0 + assert rollup.failed == 0 + assert rollup.in_progress == 0 + assert rollup.blocked == 0 + assert rollup.cancelled == 0 @pytest.mark.unit def test_pending_work(self) -> None: