From e515bf166ef5d9346bee09c3767ea2d376e9a894 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Sat, 7 Mar 2026 19:15:20 +0100 Subject: [PATCH 1/5] feat: implement parallel agent execution (#22) Add ParallelExecutor for concurrent agent execution using asyncio.TaskGroup with error isolation, concurrency limits, resource locking, and progress tracking. Follows the ToolInvoker.invoke_all() pattern. New modules: - parallel_models: AgentAssignment, ParallelExecutionGroup, AgentOutcome, ParallelExecutionResult, ParallelProgress (frozen Pydantic models) - resource_lock: ResourceLock protocol + InMemoryResourceLock - parallel: ParallelExecutor orchestrator - events/parallel: 10 event constants 54 new unit tests (24 models, 13 lock, 17 executor), 96.42% coverage. Co-Authored-By: Claude Opus 4.6 --- src/ai_company/engine/__init__.py | 22 + src/ai_company/engine/errors.py | 8 + src/ai_company/engine/parallel.py | 461 ++++++++++++++++ src/ai_company/engine/parallel_models.py | 238 +++++++++ src/ai_company/engine/resource_lock.py | 129 +++++ .../observability/events/parallel.py | 14 + tests/unit/engine/test_parallel.py | 491 ++++++++++++++++++ tests/unit/engine/test_parallel_models.py | 418 +++++++++++++++ tests/unit/engine/test_resource_lock.py | 111 ++++ tests/unit/observability/test_events.py | 1 + 10 files changed, 1893 insertions(+) create mode 100644 src/ai_company/engine/parallel.py create mode 100644 src/ai_company/engine/parallel_models.py create mode 100644 src/ai_company/engine/resource_lock.py create mode 100644 src/ai_company/observability/events/parallel.py create mode 100644 tests/unit/engine/test_parallel.py create mode 100644 tests/unit/engine/test_parallel_models.py create mode 100644 tests/unit/engine/test_resource_lock.py diff --git a/src/ai_company/engine/__init__.py b/src/ai_company/engine/__init__.py index 6de12cdab1..3dbd8c4abd 100644 --- a/src/ai_company/engine/__init__.py +++ b/src/ai_company/engine/__init__.py @@ -17,7 +17,9 @@ ExecutionStateError, LoopExecutionError, MaxTurnsExceededError, + ParallelExecutionError, PromptBuildError, + ResourceConflictError, ) from ai_company.engine.loop_protocol import ( BudgetChecker, @@ -28,6 +30,14 @@ TurnRecord, ) from ai_company.engine.metrics import TaskCompletionMetrics +from ai_company.engine.parallel import ParallelExecutor, ProgressCallback +from ai_company.engine.parallel_models import ( + AgentAssignment, + AgentOutcome, + ParallelExecutionGroup, + ParallelExecutionResult, + ParallelProgress, +) from ai_company.engine.plan_execute_loop import PlanExecuteLoop from ai_company.engine.plan_models import ( ExecutionPlan, @@ -47,6 +57,7 @@ RecoveryResult, RecoveryStrategy, ) +from ai_company.engine.resource_lock import InMemoryResourceLock, ResourceLock from ai_company.engine.run_result import AgentRunResult from ai_company.engine.shutdown import ( CleanupCallback, @@ -61,9 +72,11 @@ __all__ = [ "DEFAULT_MAX_TURNS", "ZERO_TOKEN_USAGE", + "AgentAssignment", "AgentContext", "AgentContextSnapshot", "AgentEngine", + "AgentOutcome", "AgentRunResult", "BudgetChecker", "BudgetExhaustedError", @@ -76,16 +89,25 @@ "ExecutionResult", "ExecutionStateError", "FailAndReassignStrategy", + "InMemoryResourceLock", "LoopExecutionError", "MaxTurnsExceededError", + "ParallelExecutionError", + "ParallelExecutionGroup", + "ParallelExecutionResult", + "ParallelExecutor", + "ParallelProgress", "PlanExecuteConfig", "PlanExecuteLoop", "PlanStep", + "ProgressCallback", "PromptBuildError", "PromptTokenEstimator", "ReactLoop", "RecoveryResult", "RecoveryStrategy", + "ResourceConflictError", + "ResourceLock", "ShutdownChecker", "ShutdownManager", "ShutdownResult", diff --git a/src/ai_company/engine/errors.py b/src/ai_company/engine/errors.py index 00058ab891..858c1b20be 100644 --- a/src/ai_company/engine/errors.py +++ b/src/ai_company/engine/errors.py @@ -37,3 +37,11 @@ class LoopExecutionError(EngineError): This exception is available for the engine layer above the loop to convert that result into a raised error when appropriate. """ + + +class ParallelExecutionError(EngineError): + """Raised when a parallel execution group encounters a fatal error.""" + + +class ResourceConflictError(EngineError): + """Raised when resource claims conflict between assignments.""" diff --git a/src/ai_company/engine/parallel.py b/src/ai_company/engine/parallel.py new file mode 100644 index 0000000000..741ebf0131 --- /dev/null +++ b/src/ai_company/engine/parallel.py @@ -0,0 +1,461 @@ +"""Parallel agent execution orchestrator. + +Coordinates multiple ``AgentEngine.run()`` calls in parallel using +structured concurrency (``asyncio.TaskGroup``), with error isolation, +concurrency limits, resource locking, and progress tracking. + +Follows the ``ToolInvoker.invoke_all()`` pattern from +``tools/invoker.py`` — ``TaskGroup`` + optional ``Semaphore`` + +``_run_guarded()`` error isolation. +""" + +import asyncio +import dataclasses +import time +from collections.abc import Callable +from typing import TYPE_CHECKING + +from ai_company.engine.errors import ParallelExecutionError, ResourceConflictError +from ai_company.engine.parallel_models import ( + AgentAssignment, + AgentOutcome, + ParallelExecutionGroup, + ParallelExecutionResult, + ParallelProgress, +) +from ai_company.engine.resource_lock import InMemoryResourceLock, ResourceLock +from ai_company.observability import get_logger + +if TYPE_CHECKING: + from ai_company.engine.agent_engine import AgentEngine + from ai_company.engine.run_result import AgentRunResult + from ai_company.engine.shutdown import ShutdownManager +from ai_company.observability.events.parallel import ( + PARALLEL_AGENT_COMPLETE, + PARALLEL_AGENT_ERROR, + PARALLEL_AGENT_START, + PARALLEL_GROUP_COMPLETE, + PARALLEL_GROUP_START, + PARALLEL_PROGRESS_UPDATE, + PARALLEL_VALIDATION_ERROR, +) + +logger = get_logger(__name__) + +ProgressCallback = Callable[[ParallelProgress], None] +"""Synchronous callback invoked on progress updates.""" + + +@dataclasses.dataclass +class _ProgressState: + """Mutable progress tracking — internal to ``execute_group()`` scope.""" + + group_id: str + total: int + completed: int = 0 + in_progress: int = 0 + succeeded: int = 0 + failed: int = 0 + + def snapshot(self) -> ParallelProgress: + """Create a frozen progress snapshot.""" + return ParallelProgress( + group_id=self.group_id, + total=self.total, + completed=self.completed, + in_progress=self.in_progress, + pending=self.total - self.completed - self.in_progress, + succeeded=self.succeeded, + failed=self.failed, + ) + + +class ParallelExecutor: + """Orchestrates concurrent agent execution. + + Composition over inheritance — takes an ``AgentEngine`` and + coordinates concurrent ``run()`` calls. + + Args: + engine: Agent execution engine. + shutdown_manager: Optional shutdown manager for task registration. + resource_lock: Optional resource lock for exclusive file access. + Defaults to ``InMemoryResourceLock`` if any assignments + declare resource claims. + progress_callback: Optional synchronous callback invoked on + progress updates. + """ + + def __init__( + self, + *, + engine: AgentEngine, + shutdown_manager: ShutdownManager | None = None, + resource_lock: ResourceLock | None = None, + progress_callback: ProgressCallback | None = None, + ) -> None: + self._engine = engine + self._shutdown_manager = shutdown_manager + self._resource_lock = resource_lock + self._progress_callback = progress_callback + + async def execute_group( + self, + group: ParallelExecutionGroup, + ) -> ParallelExecutionResult: + """Execute a parallel group of agent assignments. + + Args: + group: The execution group to run. + + Returns: + Result with all agent outcomes. + + Raises: + ResourceConflictError: If resource claims conflict between + assignments. + ParallelExecutionError: If fatal errors (MemoryError, + RecursionError) occurred during execution. + """ + start = time.monotonic() + + logger.info( + PARALLEL_GROUP_START, + group_id=group.group_id, + agent_count=len(group.assignments), + max_concurrency=group.max_concurrency, + fail_fast=group.fail_fast, + ) + + lock = self._resolve_lock(group) + self._validate_resource_claims(group) + + if lock is not None: + await self._acquire_all_locks(group, lock) + + semaphore = ( + asyncio.Semaphore(group.max_concurrency) + if group.max_concurrency is not None + else None + ) + + outcomes: dict[str, AgentOutcome] = {} + fatal_errors: list[Exception] = [] + progress = _ProgressState( + group_id=group.group_id, + total=len(group.assignments), + ) + + try: + async with asyncio.TaskGroup() as tg: + for assignment in group.assignments: + tg.create_task( + self._run_guarded( + assignment=assignment, + group=group, + outcomes=outcomes, + fatal_errors=fatal_errors, + progress=progress, + semaphore=semaphore, + lock=lock, + ), + ) + except* Exception as eg: + # TaskGroup wraps exceptions in ExceptionGroup when + # fail_fast re-raises inside _run_guarded. + # Outcomes from completed tasks are already collected. + logger.debug( + PARALLEL_GROUP_COMPLETE, + group_id=group.group_id, + note="TaskGroup exited with exceptions", + exception_count=len(eg.exceptions), + ) + + if lock is not None: + await self._release_all_locks(group, lock) + + duration = time.monotonic() - start + + result = ParallelExecutionResult( + group_id=group.group_id, + outcomes=tuple( + outcomes.get( + a.task_id, + AgentOutcome( + task_id=a.task_id, + agent_id=a.agent_id, + error="Cancelled due to fail_fast", + ), + ) + for a in group.assignments + ), + total_duration_seconds=duration, + ) + + logger.info( + PARALLEL_GROUP_COMPLETE, + group_id=group.group_id, + succeeded=result.agents_succeeded, + failed=result.agents_failed, + duration_seconds=duration, + ) + + if fatal_errors: + msg = ( + f"Parallel group {group.group_id!r} had " + f"{len(fatal_errors)} fatal error(s)" + ) + raise ParallelExecutionError(msg) from fatal_errors[0] + + return result + + async def _run_guarded( # noqa: PLR0913 + self, + *, + assignment: AgentAssignment, + group: ParallelExecutionGroup, + outcomes: dict[str, AgentOutcome], + fatal_errors: list[Exception], + progress: _ProgressState, + semaphore: asyncio.Semaphore | None, + lock: ResourceLock | None, + ) -> None: + """Execute a single agent, isolating errors from siblings. + + Follows the ``ToolInvoker._run_guarded()`` pattern: + - ``MemoryError``/``RecursionError`` → collected in fatal_errors + - Regular ``Exception`` → stored as error outcome + - ``BaseException`` → propagates through TaskGroup + """ + task_id = assignment.task_id + agent_id = assignment.agent_id + + if not self._register_with_shutdown(task_id, agent_id, outcomes): + return + + try: + progress.in_progress += 1 + self._emit_progress(progress) + await self._execute_assignment( + assignment, + group, + outcomes, + progress, + semaphore, + ) + except (MemoryError, RecursionError) as exc: + fatal_errors.append(exc) + outcomes[task_id] = AgentOutcome( + task_id=task_id, + agent_id=agent_id, + error=f"Fatal: {type(exc).__name__}: {exc}", + ) + progress.failed += 1 + except Exception as exc: + self._record_error_outcome( + exc, + assignment, + group, + outcomes, + progress, + ) + if group.fail_fast: + raise + finally: + progress.in_progress = max(0, progress.in_progress - 1) + progress.completed += 1 + self._emit_progress(progress) + + if lock is not None: + for resource in assignment.resource_claims: + await lock.release(resource, agent_id) + + if self._shutdown_manager is not None: + self._shutdown_manager.unregister_task(task_id) + + def _register_with_shutdown( + self, + task_id: str, + agent_id: str, + outcomes: dict[str, AgentOutcome], + ) -> bool: + """Register with shutdown manager. Returns False if shutdown.""" + if self._shutdown_manager is None: + return True + asyncio_task = asyncio.current_task() + if asyncio_task is None: + return True + try: + self._shutdown_manager.register_task(task_id, asyncio_task) + except RuntimeError: + outcomes[task_id] = AgentOutcome( + task_id=task_id, + agent_id=agent_id, + error="Shutdown in progress", + ) + return False + return True + + async def _execute_assignment( + self, + assignment: AgentAssignment, + group: ParallelExecutionGroup, + outcomes: dict[str, AgentOutcome], + progress: _ProgressState, + semaphore: asyncio.Semaphore | None, + ) -> None: + """Run engine.run() with optional semaphore.""" + task_id = assignment.task_id + agent_id = assignment.agent_id + + logger.info( + PARALLEL_AGENT_START, + group_id=group.group_id, + agent_id=agent_id, + task_id=task_id, + ) + + if semaphore is not None: + await semaphore.acquire() + try: + run_result: AgentRunResult = await self._engine.run( + identity=assignment.identity, + task=assignment.task, + completion_config=assignment.completion_config, + max_turns=assignment.max_turns, + memory_messages=assignment.memory_messages, + timeout_seconds=assignment.timeout_seconds, + ) + outcomes[task_id] = AgentOutcome( + task_id=task_id, + agent_id=agent_id, + result=run_result, + ) + progress.succeeded += 1 + logger.info( + PARALLEL_AGENT_COMPLETE, + group_id=group.group_id, + agent_id=agent_id, + task_id=task_id, + success=True, + ) + finally: + if semaphore is not None: + semaphore.release() + + def _record_error_outcome( + self, + exc: Exception, + assignment: AgentAssignment, + group: ParallelExecutionGroup, + outcomes: dict[str, AgentOutcome], + progress: _ProgressState, + ) -> None: + """Record a failed agent outcome.""" + error_msg = f"{type(exc).__name__}: {exc}" + outcomes[assignment.task_id] = AgentOutcome( + task_id=assignment.task_id, + agent_id=assignment.agent_id, + error=error_msg, + ) + progress.failed += 1 + logger.warning( + PARALLEL_AGENT_ERROR, + group_id=group.group_id, + agent_id=assignment.agent_id, + task_id=assignment.task_id, + error=error_msg, + ) + + def _resolve_lock( + self, + group: ParallelExecutionGroup, + ) -> ResourceLock | None: + """Return the resource lock to use, if any claims exist.""" + has_claims = any(a.resource_claims for a in group.assignments) + if not has_claims: + return self._resource_lock + if self._resource_lock is not None: + return self._resource_lock + return InMemoryResourceLock() + + def _validate_resource_claims( + self, + group: ParallelExecutionGroup, + ) -> None: + """Check for overlapping resource claims between assignments. + + Raises: + ResourceConflictError: If two assignments claim the same + resource. + """ + seen: dict[str, str] = {} + for assignment in group.assignments: + for resource in assignment.resource_claims: + if resource in seen: + other = seen[resource] + msg = ( + f"Resource conflict: {resource!r} claimed by " + f"both agent {other!r} and {assignment.agent_id!r}" + ) + logger.warning( + PARALLEL_VALIDATION_ERROR, + group_id=group.group_id, + error=msg, + ) + raise ResourceConflictError(msg) + seen[resource] = assignment.agent_id + + async def _acquire_all_locks( + self, + group: ParallelExecutionGroup, + lock: ResourceLock, + ) -> None: + """Acquire resource locks for all assignments.""" + for assignment in group.assignments: + for resource in assignment.resource_claims: + acquired = await lock.acquire( + resource, + assignment.agent_id, + ) + if not acquired: + holder = lock.holder_of(resource) + msg = f"Failed to acquire lock on {resource!r}: held by {holder!r}" + logger.warning( + PARALLEL_VALIDATION_ERROR, + group_id=group.group_id, + error=msg, + ) + # Release any locks already acquired for this group + await self._release_all_locks(group, lock) + raise ResourceConflictError(msg) + + async def _release_all_locks( + self, + group: ParallelExecutionGroup, + lock: ResourceLock, + ) -> None: + """Release all resource locks for all assignments.""" + for assignment in group.assignments: + await lock.release_all(assignment.agent_id) + + def _emit_progress(self, state: _ProgressState) -> None: + """Emit a progress update via the callback, if configured.""" + if self._progress_callback is None: + return + snapshot = state.snapshot() + logger.debug( + PARALLEL_PROGRESS_UPDATE, + group_id=snapshot.group_id, + total=snapshot.total, + completed=snapshot.completed, + in_progress=snapshot.in_progress, + pending=snapshot.pending, + ) + try: + self._progress_callback(snapshot) + except Exception: + logger.exception( + PARALLEL_PROGRESS_UPDATE, + error="Progress callback raised", + ) diff --git a/src/ai_company/engine/parallel_models.py b/src/ai_company/engine/parallel_models.py new file mode 100644 index 0000000000..0f2c59d2bf --- /dev/null +++ b/src/ai_company/engine/parallel_models.py @@ -0,0 +1,238 @@ +"""Parallel execution models. + +Frozen Pydantic models for describing parallel agent assignments, +their outcomes, and execution group metadata. +""" + +from collections import Counter + +from pydantic import BaseModel, ConfigDict, Field, computed_field, model_validator + +from ai_company.core.agent import AgentIdentity # noqa: TC001 +from ai_company.core.task import Task # noqa: TC001 +from ai_company.core.types import NotBlankStr # noqa: TC001 +from ai_company.engine.context import DEFAULT_MAX_TURNS +from ai_company.engine.run_result import AgentRunResult # noqa: TC001 +from ai_company.providers.models import ( + ChatMessage, # noqa: TC001 + CompletionConfig, # noqa: TC001 +) + + +class AgentAssignment(BaseModel): + """A single agent-task pairing for parallel execution. + + Attributes: + identity: Agent to run. + task: Task to execute. + completion_config: Optional LLM completion configuration override. + max_turns: Maximum execution turns. + timeout_seconds: Optional wall-clock timeout for this agent. + memory_messages: Pre-loaded memory messages for the agent. + resource_claims: File paths requiring exclusive access. + """ + + model_config = ConfigDict(frozen=True) + + identity: AgentIdentity = Field(description="Agent to run") + task: Task = Field(description="Task to execute") + completion_config: CompletionConfig | None = Field( + default=None, + description="Optional LLM completion configuration override", + ) + max_turns: int = Field( + default=DEFAULT_MAX_TURNS, + ge=1, + description="Maximum execution turns", + ) + timeout_seconds: float | None = Field( + default=None, + gt=0, + description="Optional wall-clock timeout for this agent", + ) + memory_messages: tuple[ChatMessage, ...] = Field( + default=(), + description="Pre-loaded memory messages", + ) + resource_claims: tuple[str, ...] = Field( + default=(), + description="File paths requiring exclusive access", + ) + + @computed_field( # type: ignore[prop-decorator] + description="Agent identifier string", + ) + @property + def agent_id(self) -> str: + """Agent identifier (string form of UUID).""" + return str(self.identity.id) + + @computed_field( # type: ignore[prop-decorator] + description="Task identifier string", + ) + @property + def task_id(self) -> str: + """Task identifier.""" + return self.task.id + + +class ParallelExecutionGroup(BaseModel): + """A group of agent assignments to execute in parallel. + + Attributes: + group_id: Unique group identifier. + assignments: Agent-task pairings (non-empty). + max_concurrency: Max simultaneous runs (None = unlimited). + fail_fast: Cancel remaining assignments on first failure. + """ + + model_config = ConfigDict(frozen=True) + + group_id: NotBlankStr = Field( + description="Unique group identifier", + ) + assignments: tuple[AgentAssignment, ...] = Field( + description="Agent-task pairings", + ) + max_concurrency: int | None = Field( + default=None, + ge=1, + description="Max simultaneous runs (None = unlimited)", + ) + fail_fast: bool = Field( + default=False, + description="Cancel remaining on first failure", + ) + + @model_validator(mode="after") + def _validate_assignments(self) -> ParallelExecutionGroup: + if not self.assignments: + msg = "assignments must contain at least one entry" + raise ValueError(msg) + + task_ids = [a.task_id for a in self.assignments] + task_counts = Counter(task_ids) + dupes = sorted(tid for tid, c in task_counts.items() if c > 1) + if dupes: + msg = f"Duplicate task IDs in assignments: {dupes}" + raise ValueError(msg) + + agent_ids = [a.agent_id for a in self.assignments] + agent_counts = Counter(agent_ids) + dupes = sorted(aid for aid, c in agent_counts.items() if c > 1) + if dupes: + msg = f"Duplicate agent IDs in assignments: {dupes}" + raise ValueError(msg) + + return self + + +class AgentOutcome(BaseModel): + """Outcome of a single agent execution within a parallel group. + + Attributes: + task_id: Task identifier. + agent_id: Agent identifier. + result: Present if execution produced a result. + error: Present if the agent crashed before producing a result. + """ + + model_config = ConfigDict(frozen=True) + + task_id: NotBlankStr = Field(description="Task identifier") + agent_id: NotBlankStr = Field(description="Agent identifier") + result: AgentRunResult | None = Field( + default=None, + description="Present if execution produced a result", + ) + error: str | None = Field( + default=None, + description="Present if agent crashed before producing result", + ) + + @computed_field( # type: ignore[prop-decorator] + description="Whether the agent completed successfully", + ) + @property + def is_success(self) -> bool: + """True when result is present and successful.""" + return self.result is not None and self.result.is_success + + +class ParallelExecutionResult(BaseModel): + """Result of a complete parallel execution group. + + Attributes: + group_id: Group identifier. + outcomes: Tuple of agent outcomes. + total_duration_seconds: Wall-clock duration of the group. + """ + + model_config = ConfigDict(frozen=True) + + group_id: NotBlankStr = Field(description="Group identifier") + outcomes: tuple[AgentOutcome, ...] = Field( + description="Tuple of agent outcomes", + ) + total_duration_seconds: float = Field( + ge=0.0, + description="Wall-clock duration of the group execution", + ) + + @computed_field( # type: ignore[prop-decorator] + description="Total cost in USD across all agents", + ) + @property + def total_cost_usd(self) -> float: + """Sum of costs from all outcomes with results.""" + return sum( + o.result.total_cost_usd for o in self.outcomes if o.result is not None + ) + + @computed_field( # type: ignore[prop-decorator] + description="Number of agents that succeeded", + ) + @property + def agents_succeeded(self) -> int: + """Count of successful agent outcomes.""" + return sum(1 for o in self.outcomes if o.is_success) + + @computed_field( # type: ignore[prop-decorator] + description="Number of agents that failed", + ) + @property + def agents_failed(self) -> int: + """Count of failed agent outcomes.""" + return sum(1 for o in self.outcomes if not o.is_success) + + @computed_field( # type: ignore[prop-decorator] + description="Whether all agents completed successfully", + ) + @property + def all_succeeded(self) -> bool: + """True when every outcome is a success.""" + return all(o.is_success for o in self.outcomes) + + +class ParallelProgress(BaseModel): + """Point-in-time snapshot of parallel execution progress. + + Attributes: + group_id: Group identifier. + total: Total number of assignments. + completed: Number of assignments finished (success or failure). + in_progress: Number of assignments currently running. + pending: Number of assignments not yet started. + succeeded: Number of successful completions. + failed: Number of failed completions. + """ + + model_config = ConfigDict(frozen=True) + + group_id: NotBlankStr = Field(description="Group identifier") + total: int = Field(ge=0, description="Total assignments") + completed: int = Field(ge=0, description="Finished assignments") + in_progress: int = Field(ge=0, description="Currently running") + pending: int = Field(ge=0, description="Not yet started") + succeeded: int = Field(ge=0, description="Successful completions") + failed: int = Field(ge=0, description="Failed completions") diff --git a/src/ai_company/engine/resource_lock.py b/src/ai_company/engine/resource_lock.py new file mode 100644 index 0000000000..c52b05b60a --- /dev/null +++ b/src/ai_company/engine/resource_lock.py @@ -0,0 +1,129 @@ +"""Resource locking for parallel agent execution. + +Provides exclusive access to shared resources (e.g. file paths) so that +concurrent agents do not clobber each other's work. + +The ``ResourceLock`` protocol is pluggable; ``InMemoryResourceLock`` is +the default single-process implementation using ``asyncio.Lock``. +""" + +import asyncio +from typing import Protocol, runtime_checkable + +from ai_company.observability import get_logger +from ai_company.observability.events.parallel import ( + PARALLEL_LOCK_ACQUIRED, + PARALLEL_LOCK_CONFLICT, + PARALLEL_LOCK_RELEASED, +) + +logger = get_logger(__name__) + + +@runtime_checkable +class ResourceLock(Protocol): + """Protocol for exclusive resource locking. + + Resources are identified by string keys (typically file paths). + Holders are identified by agent IDs. + """ + + async def acquire(self, resource: str, holder: str) -> bool: + """Attempt to acquire exclusive access to *resource*. + + Returns ``True`` if the lock was acquired (or was already held + by the same holder). Returns ``False`` if another holder owns + the lock. + """ + ... + + async def release(self, resource: str, holder: str) -> None: + """Release a lock on *resource*. + + No-op if the resource is not locked or is held by a different + holder. + """ + ... + + async def release_all(self, holder: str) -> int: + """Release all locks held by *holder*. + + Returns the number of locks released. + """ + ... + + def is_locked(self, resource: str) -> bool: + """Return ``True`` if *resource* is currently locked.""" + ... + + def holder_of(self, resource: str) -> str | None: + """Return the holder of *resource*, or ``None`` if unlocked.""" + ... + + +class InMemoryResourceLock: + """In-memory resource lock using ``asyncio.Lock`` for mutual exclusion. + + Suitable for single-process deployments. All mutations are guarded + by an internal ``asyncio.Lock`` to ensure correctness under + concurrent access from multiple ``asyncio.Task`` instances. + """ + + def __init__(self) -> None: + self._locks: dict[str, str] = {} + self._mutex = asyncio.Lock() + + async def acquire(self, resource: str, holder: str) -> bool: + """Attempt to acquire exclusive access to *resource*.""" + async with self._mutex: + current = self._locks.get(resource) + if current is None: + self._locks[resource] = holder + logger.debug( + PARALLEL_LOCK_ACQUIRED, + resource=resource, + holder=holder, + ) + return True + if current == holder: + return True + logger.debug( + PARALLEL_LOCK_CONFLICT, + resource=resource, + holder=holder, + current_holder=current, + ) + return False + + async def release(self, resource: str, holder: str) -> None: + """Release a lock on *resource* if held by *holder*.""" + async with self._mutex: + current = self._locks.get(resource) + if current is not None and current == holder: + del self._locks[resource] + logger.debug( + PARALLEL_LOCK_RELEASED, + resource=resource, + holder=holder, + ) + + async def release_all(self, holder: str) -> int: + """Release all locks held by *holder*.""" + async with self._mutex: + to_release = [r for r, h in self._locks.items() if h == holder] + for resource in to_release: + del self._locks[resource] + logger.debug( + PARALLEL_LOCK_RELEASED, + resource=resource, + holder=holder, + ) + return len(to_release) + + def is_locked(self, resource: str) -> bool: + """Return ``True`` if *resource* is currently locked.""" + return resource in self._locks + + def holder_of(self, resource: str) -> str | None: + """Return the holder of *resource*, or ``None``.""" + return self._locks.get(resource) diff --git a/src/ai_company/observability/events/parallel.py b/src/ai_company/observability/events/parallel.py new file mode 100644 index 0000000000..aa082dd280 --- /dev/null +++ b/src/ai_company/observability/events/parallel.py @@ -0,0 +1,14 @@ +"""Parallel execution event constants.""" + +from typing import Final + +PARALLEL_GROUP_START: Final[str] = "execution.parallel.group_start" +PARALLEL_GROUP_COMPLETE: Final[str] = "execution.parallel.group_complete" +PARALLEL_AGENT_START: Final[str] = "execution.parallel.agent_start" +PARALLEL_AGENT_COMPLETE: Final[str] = "execution.parallel.agent_complete" +PARALLEL_AGENT_ERROR: Final[str] = "execution.parallel.agent_error" +PARALLEL_LOCK_ACQUIRED: Final[str] = "execution.parallel.lock_acquired" +PARALLEL_LOCK_RELEASED: Final[str] = "execution.parallel.lock_released" +PARALLEL_LOCK_CONFLICT: Final[str] = "execution.parallel.lock_conflict" +PARALLEL_PROGRESS_UPDATE: Final[str] = "execution.parallel.progress_update" +PARALLEL_VALIDATION_ERROR: Final[str] = "execution.parallel.validation_error" diff --git a/tests/unit/engine/test_parallel.py b/tests/unit/engine/test_parallel.py new file mode 100644 index 0000000000..5da367a4df --- /dev/null +++ b/tests/unit/engine/test_parallel.py @@ -0,0 +1,491 @@ +"""Tests for the ParallelExecutor.""" + +import asyncio +from datetime import date +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from ai_company.core.agent import AgentIdentity, ModelConfig, PersonalityConfig +from ai_company.core.enums import ( + Complexity, + Priority, + SeniorityLevel, + TaskStatus, + TaskType, +) +from ai_company.core.task import Task +from ai_company.engine.context import AgentContext +from ai_company.engine.errors import ParallelExecutionError, ResourceConflictError +from ai_company.engine.loop_protocol import ExecutionResult, TerminationReason +from ai_company.engine.parallel import ParallelExecutor +from ai_company.engine.parallel_models import ( + AgentAssignment, + ParallelExecutionGroup, + ParallelProgress, +) +from ai_company.engine.prompt import SystemPrompt +from ai_company.engine.resource_lock import InMemoryResourceLock +from ai_company.engine.run_result import AgentRunResult +from ai_company.engine.shutdown import ShutdownManager + + +def _make_identity( + name: str = "test-agent", +) -> AgentIdentity: + return AgentIdentity( + name=name, + role="engineer", + department="engineering", + level=SeniorityLevel.MID, + hiring_date=date(2026, 1, 15), + personality=PersonalityConfig(traits=("analytical",)), + model=ModelConfig( + provider="test-provider", + model_id="test-small-001", + ), + ) + + +def _make_task(title: str = "test-task") -> Task: + return Task( + id=f"task-{title}", + title=title, + description="A test task", + type=TaskType.DEVELOPMENT, + priority=Priority.MEDIUM, + project="test-project", + created_by="tester", + assigned_to="test-agent", + status=TaskStatus.ASSIGNED, + estimated_complexity=Complexity.SIMPLE, + ) + + +def _make_run_result( + identity: AgentIdentity, + task: Task, + reason: TerminationReason = TerminationReason.COMPLETED, +) -> AgentRunResult: + ctx = AgentContext.from_identity(identity, task=task) + error_msg = "test error" if reason == TerminationReason.ERROR else None + execution_result = ExecutionResult( + context=ctx, + termination_reason=reason, + error_message=error_msg, + ) + return AgentRunResult( + execution_result=execution_result, + system_prompt=SystemPrompt( + content="test", + template_version="1.0", + estimated_tokens=1, + sections=("identity",), + metadata={"agent_id": str(identity.id)}, + ), + duration_seconds=0.5, + agent_id=str(identity.id), + task_id=task.id, + ) + + +def _make_assignment( + name: str = "agent", + title: str = "task", + **kwargs: object, +) -> AgentAssignment: + return AgentAssignment( + identity=_make_identity(name), + task=_make_task(title), + **kwargs, # type: ignore[arg-type] + ) + + +def _make_group( + *assignments: AgentAssignment, + group_id: str = "test-group", + **kwargs: object, +) -> ParallelExecutionGroup: + if not assignments: + assignments = (_make_assignment(),) + return ParallelExecutionGroup( + group_id=group_id, + assignments=assignments, + **kwargs, # type: ignore[arg-type] + ) + + +def _mock_engine( + side_effect: object = None, +) -> MagicMock: + """Create a mock AgentEngine with an async run method.""" + engine = MagicMock() + engine.run = AsyncMock(side_effect=side_effect) + return engine + + +@pytest.mark.unit +class TestParallelExecutorConstruction: + """ParallelExecutor construction.""" + + def test_minimal(self) -> None: + engine = _mock_engine() + executor = ParallelExecutor(engine=engine) + assert executor is not None + + def test_with_all_options(self) -> None: + engine = _mock_engine() + sm = ShutdownManager() + lock = InMemoryResourceLock() + cb = MagicMock() + executor = ParallelExecutor( + engine=engine, + shutdown_manager=sm, + resource_lock=lock, + progress_callback=cb, + ) + assert executor is not None + + +@pytest.mark.unit +class TestParallelExecutorSingleAgent: + """Single-agent parallel execution (degenerate case).""" + + async def test_single_success(self) -> None: + a = _make_assignment("a1", "t1") + run_result = _make_run_result(a.identity, a.task) + engine = _mock_engine(side_effect=[run_result]) + executor = ParallelExecutor(engine=engine) + group = _make_group(a) + + result = await executor.execute_group(group) + + assert result.group_id == "test-group" + assert len(result.outcomes) == 1 + assert result.all_succeeded is True + assert result.total_duration_seconds > 0 + engine.run.assert_awaited_once() + + async def test_single_failure(self) -> None: + a = _make_assignment("a1", "t1") + engine = _mock_engine( + side_effect=RuntimeError("agent crashed"), + ) + executor = ParallelExecutor(engine=engine) + group = _make_group(a) + + result = await executor.execute_group(group) + + assert len(result.outcomes) == 1 + assert result.all_succeeded is False + assert result.outcomes[0].error is not None + assert "agent crashed" in result.outcomes[0].error + + +@pytest.mark.unit +class TestParallelExecutorMultipleAgents: + """Multiple agents running in parallel.""" + + async def test_two_agents_both_succeed(self) -> None: + a1 = _make_assignment("a1", "t1") + a2 = _make_assignment("a2", "t2") + r1 = _make_run_result(a1.identity, a1.task) + r2 = _make_run_result(a2.identity, a2.task) + engine = _mock_engine(side_effect=[r1, r2]) + executor = ParallelExecutor(engine=engine) + group = _make_group(a1, a2) + + result = await executor.execute_group(group) + + assert result.agents_succeeded == 2 + assert result.agents_failed == 0 + assert result.all_succeeded is True + assert engine.run.await_count == 2 + + async def test_one_fails_one_succeeds(self) -> None: + a1 = _make_assignment("a1", "t1") + a2 = _make_assignment("a2", "t2") + r1 = _make_run_result(a1.identity, a1.task) + + call_count = 0 + + async def side_effect(**kwargs: object) -> AgentRunResult: + nonlocal call_count + call_count += 1 + if call_count == 1: + return r1 + msg = "agent 2 crashed" + raise RuntimeError(msg) + + engine = _mock_engine() + engine.run = AsyncMock(side_effect=side_effect) + executor = ParallelExecutor(engine=engine) + group = _make_group(a1, a2) + + result = await executor.execute_group(group) + + assert result.agents_succeeded + result.agents_failed == 2 + # At least one succeeded, at least one failed + assert result.agents_failed >= 1 + assert result.all_succeeded is False + + +@pytest.mark.unit +class TestParallelExecutorConcurrencyLimit: + """max_concurrency semaphore behavior.""" + + async def test_concurrency_limited(self) -> None: + """Verify semaphore limits concurrent executions.""" + max_concurrent = 0 + current_concurrent = 0 + lock = asyncio.Lock() + + async def track_concurrency(**kwargs: object) -> AgentRunResult: + nonlocal max_concurrent, current_concurrent + async with lock: + current_concurrent += 1 + max_concurrent = max(max_concurrent, current_concurrent) + await asyncio.sleep(0.05) + async with lock: + current_concurrent -= 1 + identity = kwargs.get("identity") + task = kwargs.get("task") + return _make_run_result(identity, task) # type: ignore[arg-type] + + assignments = [_make_assignment(f"a{i}", f"t{i}") for i in range(4)] + engine = _mock_engine() + engine.run = AsyncMock(side_effect=track_concurrency) + executor = ParallelExecutor(engine=engine) + group = _make_group( + *assignments, + max_concurrency=2, + ) + + result = await executor.execute_group(group) + + assert result.all_succeeded is True + assert max_concurrent <= 2 + + +@pytest.mark.unit +class TestParallelExecutorFailFast: + """fail_fast cancellation behavior.""" + + async def test_fail_fast_cancels_siblings(self) -> None: + a1 = _make_assignment("a1", "t1") + a2 = _make_assignment("a2", "t2") + a3 = _make_assignment("a3", "t3") + + call_count = 0 + + async def side_effect(**kwargs: object) -> AgentRunResult: + nonlocal call_count + call_count += 1 + if call_count == 1: + msg = "first agent failed" + raise RuntimeError(msg) + # Others take longer (would be cancelled) + await asyncio.sleep(10) + identity = kwargs.get("identity") + task = kwargs.get("task") + return _make_run_result(identity, task) # type: ignore[arg-type] + + engine = _mock_engine() + engine.run = AsyncMock(side_effect=side_effect) + executor = ParallelExecutor(engine=engine) + group = _make_group(a1, a2, a3, fail_fast=True) + + result = await executor.execute_group(group) + + assert result.agents_failed >= 1 + # Some outcomes should be cancellation errors + cancel_outcomes = [ + o for o in result.outcomes if o.error and "fail_fast" in o.error + ] + assert len(cancel_outcomes) >= 1 + + +@pytest.mark.unit +class TestParallelExecutorResourceLocking: + """Resource claim and lock behavior.""" + + async def test_non_conflicting_claims(self) -> None: + a1 = _make_assignment( + "a1", + "t1", + resource_claims=("src/a.py",), + ) + a2 = _make_assignment( + "a2", + "t2", + resource_claims=("src/b.py",), + ) + r1 = _make_run_result(a1.identity, a1.task) + r2 = _make_run_result(a2.identity, a2.task) + engine = _mock_engine(side_effect=[r1, r2]) + lock = InMemoryResourceLock() + executor = ParallelExecutor( + engine=engine, + resource_lock=lock, + ) + group = _make_group(a1, a2) + + result = await executor.execute_group(group) + + assert result.all_succeeded is True + + async def test_conflicting_claims_raises(self) -> None: + a1 = _make_assignment( + "a1", + "t1", + resource_claims=("src/shared.py",), + ) + a2 = _make_assignment( + "a2", + "t2", + resource_claims=("src/shared.py",), + ) + engine = _mock_engine() + lock = InMemoryResourceLock() + executor = ParallelExecutor( + engine=engine, + resource_lock=lock, + ) + group = _make_group(a1, a2) + + with pytest.raises(ResourceConflictError): + await executor.execute_group(group) + + async def test_locks_released_after_execution(self) -> None: + a1 = _make_assignment( + "a1", + "t1", + resource_claims=("src/a.py",), + ) + r1 = _make_run_result(a1.identity, a1.task) + engine = _mock_engine(side_effect=[r1]) + lock = InMemoryResourceLock() + executor = ParallelExecutor( + engine=engine, + resource_lock=lock, + ) + group = _make_group(a1) + + await executor.execute_group(group) + + assert not lock.is_locked("src/a.py") + + async def test_locks_released_on_error(self) -> None: + a1 = _make_assignment( + "a1", + "t1", + resource_claims=("src/a.py",), + ) + engine = _mock_engine( + side_effect=RuntimeError("crash"), + ) + lock = InMemoryResourceLock() + executor = ParallelExecutor( + engine=engine, + resource_lock=lock, + ) + group = _make_group(a1) + + await executor.execute_group(group) + + assert not lock.is_locked("src/a.py") + + +@pytest.mark.unit +class TestParallelExecutorProgress: + """Progress callback invocation.""" + + async def test_progress_callback_called(self) -> None: + a1 = _make_assignment("a1", "t1") + r1 = _make_run_result(a1.identity, a1.task) + engine = _mock_engine(side_effect=[r1]) + progress_updates: list[ParallelProgress] = [] + + def on_progress(p: ParallelProgress) -> None: + progress_updates.append(p) + + executor = ParallelExecutor( + engine=engine, + progress_callback=on_progress, + ) + group = _make_group(a1) + + await executor.execute_group(group) + + assert len(progress_updates) >= 1 + # Final update should show completion + final = progress_updates[-1] + assert final.completed == 1 + assert final.total == 1 + + async def test_progress_tracks_multiple_agents(self) -> None: + a1 = _make_assignment("a1", "t1") + a2 = _make_assignment("a2", "t2") + r1 = _make_run_result(a1.identity, a1.task) + r2 = _make_run_result(a2.identity, a2.task) + engine = _mock_engine(side_effect=[r1, r2]) + progress_updates: list[ParallelProgress] = [] + + def on_progress(p: ParallelProgress) -> None: + progress_updates.append(p) + + executor = ParallelExecutor( + engine=engine, + progress_callback=on_progress, + ) + group = _make_group(a1, a2) + + await executor.execute_group(group) + + assert len(progress_updates) >= 2 + final = progress_updates[-1] + assert final.completed == 2 + assert final.total == 2 + + +@pytest.mark.unit +class TestParallelExecutorShutdown: + """Shutdown manager integration.""" + + async def test_shutdown_manager_integration(self) -> None: + a1 = _make_assignment("a1", "t1") + r1 = _make_run_result(a1.identity, a1.task) + engine = _mock_engine(side_effect=[r1]) + sm = ShutdownManager() + executor = ParallelExecutor( + engine=engine, + shutdown_manager=sm, + ) + group = _make_group(a1) + + result = await executor.execute_group(group) + + assert result.all_succeeded is True + + +@pytest.mark.unit +class TestParallelExecutorFatalErrors: + """Fatal error (MemoryError/RecursionError) handling.""" + + async def test_memory_error_propagates(self) -> None: + a1 = _make_assignment("a1", "t1") + engine = _mock_engine(side_effect=MemoryError("OOM")) + executor = ParallelExecutor(engine=engine) + group = _make_group(a1) + + with pytest.raises(ParallelExecutionError, match="fatal"): + await executor.execute_group(group) + + async def test_recursion_error_propagates(self) -> None: + a1 = _make_assignment("a1", "t1") + engine = _mock_engine(side_effect=RecursionError("stack")) + executor = ParallelExecutor(engine=engine) + group = _make_group(a1) + + with pytest.raises(ParallelExecutionError, match="fatal"): + await executor.execute_group(group) diff --git a/tests/unit/engine/test_parallel_models.py b/tests/unit/engine/test_parallel_models.py new file mode 100644 index 0000000000..052625726e --- /dev/null +++ b/tests/unit/engine/test_parallel_models.py @@ -0,0 +1,418 @@ +"""Tests for parallel execution models.""" + +from datetime import date + +import pytest +from pydantic import ValidationError + +from ai_company.core.agent import AgentIdentity, ModelConfig, PersonalityConfig +from ai_company.core.enums import ( + Complexity, + Priority, + SeniorityLevel, + TaskStatus, + TaskType, +) +from ai_company.core.task import Task +from ai_company.engine.context import DEFAULT_MAX_TURNS +from ai_company.engine.loop_protocol import ( + ExecutionResult, + TerminationReason, +) +from ai_company.engine.parallel_models import ( + AgentAssignment, + AgentOutcome, + ParallelExecutionGroup, + ParallelExecutionResult, + ParallelProgress, +) +from ai_company.engine.prompt import SystemPrompt +from ai_company.engine.run_result import AgentRunResult +from ai_company.providers.enums import MessageRole +from ai_company.providers.models import ChatMessage + + +def _make_identity( + name: str = "test-agent", + **kwargs: object, +) -> AgentIdentity: + defaults: dict[str, object] = { + "role": "engineer", + "department": "engineering", + "level": SeniorityLevel.MID, + "hiring_date": date(2026, 1, 15), + "personality": PersonalityConfig(traits=("analytical",)), + "model": ModelConfig( + provider="test-provider", + model_id="test-small-001", + ), + } + defaults.update(kwargs) + return AgentIdentity(name=name, **defaults) # type: ignore[arg-type] + + +def _make_task( + title: str = "test-task", + **kwargs: object, +) -> Task: + defaults: dict[str, object] = { + "id": f"task-{title}", + "description": "A test task", + "type": TaskType.DEVELOPMENT, + "priority": Priority.MEDIUM, + "project": "test-project", + "created_by": "tester", + "assigned_to": "test-agent", + "status": TaskStatus.ASSIGNED, + "estimated_complexity": Complexity.SIMPLE, + } + defaults.update(kwargs) + return Task(title=title, **defaults) # type: ignore[arg-type] + + +def _make_assignment( + name: str = "agent", + title: str = "task", + **kwargs: object, +) -> AgentAssignment: + return AgentAssignment( + identity=_make_identity(name), + task=_make_task(title), + **kwargs, # type: ignore[arg-type] + ) + + +def _make_run_result( + agent_id: str = "agent-1", + task_id: str = "task-1", + reason: TerminationReason = TerminationReason.COMPLETED, +) -> AgentRunResult: + identity = _make_identity() + task = _make_task() + from ai_company.engine.context import AgentContext + + ctx = AgentContext.from_identity(identity, task=task) + error_msg = "test error" if reason == TerminationReason.ERROR else None + execution_result = ExecutionResult( + context=ctx, + termination_reason=reason, + error_message=error_msg, + ) + return AgentRunResult( + execution_result=execution_result, + system_prompt=SystemPrompt( + content="test", + template_version="1.0", + estimated_tokens=1, + sections=("identity",), + metadata={"agent_id": agent_id}, + ), + duration_seconds=1.0, + agent_id=agent_id, + task_id=task_id, + ) + + +@pytest.mark.unit +class TestAgentAssignment: + """AgentAssignment frozen model.""" + + def test_minimal_construction(self) -> None: + identity = _make_identity() + task = _make_task() + assignment = AgentAssignment(identity=identity, task=task) + + assert assignment.identity == identity + assert assignment.task == task + assert assignment.completion_config is None + assert assignment.max_turns == DEFAULT_MAX_TURNS + assert assignment.timeout_seconds is None + assert assignment.memory_messages == () + assert assignment.resource_claims == () + + def test_full_construction(self) -> None: + identity = _make_identity() + task = _make_task() + msg = ChatMessage(role=MessageRole.USER, content="hi") + assignment = AgentAssignment( + identity=identity, + task=task, + max_turns=5, + timeout_seconds=60.0, + memory_messages=(msg,), + resource_claims=("src/main.py", "README.md"), + ) + + assert assignment.max_turns == 5 + assert assignment.timeout_seconds == 60.0 + assert len(assignment.memory_messages) == 1 + assert assignment.resource_claims == ("src/main.py", "README.md") + + def test_frozen(self) -> None: + assignment = _make_assignment() + with pytest.raises(ValidationError): + assignment.max_turns = 10 # type: ignore[misc] + + def test_agent_id_property(self) -> None: + assignment = _make_assignment() + assert assignment.agent_id == str(assignment.identity.id) + + def test_task_id_property(self) -> None: + assignment = _make_assignment() + assert assignment.task_id == assignment.task.id + + +@pytest.mark.unit +class TestParallelExecutionGroup: + """ParallelExecutionGroup frozen model with validators.""" + + def test_minimal_construction(self) -> None: + a = _make_assignment("a1", "t1") + group = ParallelExecutionGroup( + group_id="grp-1", + assignments=(a,), + ) + + assert group.group_id == "grp-1" + assert len(group.assignments) == 1 + assert group.max_concurrency is None + assert group.fail_fast is False + + def test_full_construction(self) -> None: + a1 = _make_assignment("a1", "t1") + a2 = _make_assignment("a2", "t2") + group = ParallelExecutionGroup( + group_id="grp-2", + assignments=(a1, a2), + max_concurrency=2, + fail_fast=True, + ) + + assert len(group.assignments) == 2 + assert group.max_concurrency == 2 + assert group.fail_fast is True + + def test_frozen(self) -> None: + group = ParallelExecutionGroup( + group_id="grp-1", + assignments=(_make_assignment(),), + ) + with pytest.raises(ValidationError): + group.fail_fast = True # type: ignore[misc] + + def test_empty_assignments_rejected(self) -> None: + with pytest.raises(ValidationError, match="at least one"): + ParallelExecutionGroup( + group_id="grp", + assignments=(), + ) + + def test_duplicate_task_ids_rejected(self) -> None: + identity1 = _make_identity("a1") + identity2 = _make_identity("a2") + task = _make_task("shared") + a1 = AgentAssignment(identity=identity1, task=task) + a2 = AgentAssignment(identity=identity2, task=task) + + with pytest.raises(ValidationError, match=r"[Dd]uplicate.*task"): + ParallelExecutionGroup( + group_id="grp", + assignments=(a1, a2), + ) + + def test_duplicate_agent_ids_rejected(self) -> None: + identity = _make_identity("same") + t1 = _make_task("t1") + t2 = _make_task("t2") + a1 = AgentAssignment(identity=identity, task=t1) + a2 = AgentAssignment(identity=identity, task=t2) + + with pytest.raises(ValidationError, match=r"[Dd]uplicate.*agent"): + ParallelExecutionGroup( + group_id="grp", + assignments=(a1, a2), + ) + + def test_max_concurrency_zero_rejected(self) -> None: + with pytest.raises(ValidationError): + ParallelExecutionGroup( + group_id="grp", + assignments=(_make_assignment(),), + max_concurrency=0, + ) + + def test_max_concurrency_negative_rejected(self) -> None: + with pytest.raises(ValidationError): + ParallelExecutionGroup( + group_id="grp", + assignments=(_make_assignment(),), + max_concurrency=-1, + ) + + def test_blank_group_id_rejected(self) -> None: + with pytest.raises(ValidationError): + ParallelExecutionGroup( + group_id=" ", + assignments=(_make_assignment(),), + ) + + +@pytest.mark.unit +class TestAgentOutcome: + """AgentOutcome frozen model.""" + + def test_success_outcome(self) -> None: + result = _make_run_result() + outcome = AgentOutcome( + task_id="t1", + agent_id="a1", + result=result, + ) + + assert outcome.result is result + assert outcome.error is None + assert outcome.is_success is True + + def test_error_outcome(self) -> None: + outcome = AgentOutcome( + task_id="t1", + agent_id="a1", + error="something failed", + ) + + assert outcome.result is None + assert outcome.error == "something failed" + assert outcome.is_success is False + + def test_failed_run_result(self) -> None: + result = _make_run_result( + reason=TerminationReason.ERROR, + ) + outcome = AgentOutcome( + task_id="t1", + agent_id="a1", + result=result, + ) + + assert outcome.is_success is False + + def test_frozen(self) -> None: + outcome = AgentOutcome( + task_id="t1", + agent_id="a1", + error="x", + ) + with pytest.raises(ValidationError): + outcome.error = "y" # type: ignore[misc] + + +@pytest.mark.unit +class TestParallelExecutionResult: + """ParallelExecutionResult frozen model with computed fields.""" + + def test_all_succeeded(self) -> None: + o1 = AgentOutcome( + task_id="t1", + agent_id="a1", + result=_make_run_result(), + ) + o2 = AgentOutcome( + task_id="t2", + agent_id="a2", + result=_make_run_result(), + ) + result = ParallelExecutionResult( + group_id="grp", + outcomes=(o1, o2), + total_duration_seconds=5.0, + ) + + assert result.agents_succeeded == 2 + assert result.agents_failed == 0 + assert result.all_succeeded is True + + def test_partial_failure(self) -> None: + o1 = AgentOutcome( + task_id="t1", + agent_id="a1", + result=_make_run_result(), + ) + o2 = AgentOutcome( + task_id="t2", + agent_id="a2", + error="boom", + ) + result = ParallelExecutionResult( + group_id="grp", + outcomes=(o1, o2), + total_duration_seconds=3.0, + ) + + assert result.agents_succeeded == 1 + assert result.agents_failed == 1 + assert result.all_succeeded is False + + def test_total_cost_usd(self) -> None: + o1 = AgentOutcome( + task_id="t1", + agent_id="a1", + result=_make_run_result(), + ) + o2 = AgentOutcome( + task_id="t2", + agent_id="a2", + error="boom", + ) + result = ParallelExecutionResult( + group_id="grp", + outcomes=(o1, o2), + total_duration_seconds=3.0, + ) + + # Cost from error outcomes is 0, cost from success = context cost + assert result.total_cost_usd >= 0.0 + + def test_frozen(self) -> None: + result = ParallelExecutionResult( + group_id="grp", + outcomes=(), + total_duration_seconds=0.0, + ) + with pytest.raises(ValidationError): + result.group_id = "x" # type: ignore[misc] + + +@pytest.mark.unit +class TestParallelProgress: + """ParallelProgress frozen snapshot model.""" + + def test_construction(self) -> None: + progress = ParallelProgress( + group_id="grp", + total=4, + completed=2, + in_progress=1, + pending=1, + succeeded=2, + failed=0, + ) + + assert progress.total == 4 + assert progress.completed == 2 + assert progress.in_progress == 1 + assert progress.pending == 1 + assert progress.succeeded == 2 + assert progress.failed == 0 + + def test_frozen(self) -> None: + progress = ParallelProgress( + group_id="grp", + total=1, + completed=0, + in_progress=0, + pending=1, + succeeded=0, + failed=0, + ) + with pytest.raises(ValidationError): + progress.total = 10 # type: ignore[misc] diff --git a/tests/unit/engine/test_resource_lock.py b/tests/unit/engine/test_resource_lock.py new file mode 100644 index 0000000000..1111d99b5f --- /dev/null +++ b/tests/unit/engine/test_resource_lock.py @@ -0,0 +1,111 @@ +"""Tests for resource locking.""" + +import asyncio + +import pytest + +from ai_company.engine.resource_lock import InMemoryResourceLock, ResourceLock + + +@pytest.mark.unit +class TestResourceLockProtocol: + """ResourceLock protocol compliance.""" + + def test_in_memory_is_resource_lock(self) -> None: + lock = InMemoryResourceLock() + assert isinstance(lock, ResourceLock) + + +@pytest.mark.unit +class TestInMemoryResourceLock: + """InMemoryResourceLock implementation.""" + + async def test_acquire_uncontested(self) -> None: + lock = InMemoryResourceLock() + result = await lock.acquire("file.py", "agent-1") + assert result is True + + async def test_acquire_same_holder_idempotent(self) -> None: + lock = InMemoryResourceLock() + await lock.acquire("file.py", "agent-1") + result = await lock.acquire("file.py", "agent-1") + assert result is True + + async def test_acquire_different_holder_fails(self) -> None: + lock = InMemoryResourceLock() + await lock.acquire("file.py", "agent-1") + result = await lock.acquire("file.py", "agent-2") + assert result is False + + async def test_release_allows_reacquire(self) -> None: + lock = InMemoryResourceLock() + await lock.acquire("file.py", "agent-1") + await lock.release("file.py", "agent-1") + result = await lock.acquire("file.py", "agent-2") + assert result is True + + async def test_release_wrong_holder_noop(self) -> None: + lock = InMemoryResourceLock() + await lock.acquire("file.py", "agent-1") + await lock.release("file.py", "agent-2") + assert lock.is_locked("file.py") + assert lock.holder_of("file.py") == "agent-1" + + async def test_release_unlocked_noop(self) -> None: + lock = InMemoryResourceLock() + await lock.release("file.py", "agent-1") + assert not lock.is_locked("file.py") + + async def test_release_all(self) -> None: + lock = InMemoryResourceLock() + await lock.acquire("a.py", "agent-1") + await lock.acquire("b.py", "agent-1") + await lock.acquire("c.py", "agent-2") + + released = await lock.release_all("agent-1") + + assert released == 2 + assert not lock.is_locked("a.py") + assert not lock.is_locked("b.py") + assert lock.is_locked("c.py") + + async def test_release_all_none_held(self) -> None: + lock = InMemoryResourceLock() + released = await lock.release_all("agent-1") + assert released == 0 + + async def test_is_locked(self) -> None: + lock = InMemoryResourceLock() + assert not lock.is_locked("file.py") + await lock.acquire("file.py", "agent-1") + assert lock.is_locked("file.py") + + async def test_holder_of(self) -> None: + lock = InMemoryResourceLock() + assert lock.holder_of("file.py") is None + await lock.acquire("file.py", "agent-1") + assert lock.holder_of("file.py") == "agent-1" + + async def test_concurrent_acquire_only_one_wins(self) -> None: + lock = InMemoryResourceLock() + results: list[bool] = [] + + async def try_acquire(holder: str) -> None: + result = await lock.acquire("file.py", holder) + results.append(result) + + async with asyncio.TaskGroup() as tg: + for i in range(5): + tg.create_task(try_acquire(f"agent-{i}")) + + assert results.count(True) == 1 + assert results.count(False) == 4 + + async def test_multiple_resources_independent(self) -> None: + lock = InMemoryResourceLock() + r1 = await lock.acquire("a.py", "agent-1") + r2 = await lock.acquire("b.py", "agent-2") + assert r1 is True + assert r2 is True + assert lock.holder_of("a.py") == "agent-1" + assert lock.holder_of("b.py") == "agent-2" diff --git a/tests/unit/observability/test_events.py b/tests/unit/observability/test_events.py index 8867614b5d..9b1173909a 100644 --- a/tests/unit/observability/test_events.py +++ b/tests/unit/observability/test_events.py @@ -108,6 +108,7 @@ def test_all_domain_modules_discovered(self) -> None: "correlation", "execution", "git", + "parallel", "personality", "prompt", "provider", From 32de5565ee25f31bfe73a856186ddaef6b700818 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Sat, 7 Mar 2026 19:40:57 +0100 Subject: [PATCH 2/5] refactor: harden parallel execution with review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-reviewed by 9 agents, 30 findings addressed: Source fixes: - Add AgentOutcome model_validator (result XOR error) - Make ParallelProgress.pending a @computed_field - Use NotBlankStr for resource_claims tuple - Use Self return type in _validate_assignments - Fix import ordering (events before TYPE_CHECKING) - Add fatal error logging (MemoryError/RecursionError) - Add shutdown rejection logging - Raise except* log level from DEBUG to INFO - Remove redundant per-task lock release (group-level only) - Wrap group-level lock release in try/except - Return None from _resolve_lock when no claims - Add wrong-holder warning in InMemoryResourceLock.release - Use nullcontext for semaphore pattern - Narrow _execute_assignment group parameter to group_id Documentation: - Update DESIGN_SPEC.md §15.3 (new files), §15.5 (conventions), §6.3 and §6.5 (implementation callouts) - Update CLAUDE.md engine package description Tests: - Add shutdown-in-progress rejection test - Add external lock holder conflict test - Add auto-created lock tests (success + conflict) - Add progress callback exception resilience test - Fix _make_run_result helper consistency - Fix test_total_cost_usd assertion - Add ParallelProgress computed pending tests - Add AgentOutcome both-none-rejected test --- CLAUDE.md | 2 +- DESIGN_SPEC.md | 9 ++- src/ai_company/engine/parallel.py | 92 +++++++++++++-------- src/ai_company/engine/parallel_models.py | 25 ++++-- src/ai_company/engine/resource_lock.py | 10 ++- tests/unit/engine/test_parallel.py | 97 +++++++++++++++++++++++ tests/unit/engine/test_parallel_models.py | 53 +++++++++---- 7 files changed, 231 insertions(+), 57 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 878cd642ff..86af5ab03a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -49,7 +49,7 @@ src/ai_company/ communication/ # Message bus (protocol + in-memory backend), dispatcher, messenger facade, channels config/ # YAML company config loading and validation core/ # Shared domain models and base classes - engine/ # Agent orchestration, execution loops, task lifecycle, recovery, and shutdown + engine/ # Agent orchestration, execution loops, parallel execution, task lifecycle, recovery, and shutdown memory/ # Persistent agent memory (memory layer TBD) observability/ # Structured logging, correlation tracking, log sinks providers/ # LLM provider abstraction (LiteLLM adapter) diff --git a/DESIGN_SPEC.md b/DESIGN_SPEC.md index 65b388c3b7..8714d02bd2 100644 --- a/DESIGN_SPEC.md +++ b/DESIGN_SPEC.md @@ -803,6 +803,8 @@ Task ───┤ ├──▶ Integration ──▶ QA └──▶ Backend Dev ──┘ ``` +> **Current state (M3):** `ParallelExecutor` (in `engine/parallel.py`) implements concurrent agent execution with `asyncio.TaskGroup`, configurable concurrency limits, resource locking for exclusive file access, and error isolation. Models in `engine/parallel_models.py`: `AgentAssignment`, `ParallelExecutionGroup`, `AgentOutcome`, `ParallelExecutionResult`, `ParallelProgress`. + #### Kanban Board ```text @@ -835,7 +837,7 @@ Tasks can be assigned through multiple strategies: The agent execution loop defines how an agent processes a task from start to finish. The framework provides multiple configurable loop architectures behind an `ExecutionLoop` protocol, making the system extensible. The default can vary by task complexity, and is configurable per agent or role. -> **Current state (M3):** ReAct (Loop 1) and Plan-and-Execute (Loop 2) are implemented. Hybrid loop and auto-selection are M4+. +> **Current state (M3):** ReAct (Loop 1) and Plan-and-Execute (Loop 2) are implemented. `ParallelExecutor` enables concurrent `AgentEngine.run()` calls with `TaskGroup` + Semaphore concurrency limits, resource locking, and error isolation (see §6.3). Hybrid loop and auto-selection are M4+. #### ExecutionLoop Protocol @@ -2340,6 +2342,9 @@ ai-company/ │ │ ├── cost_recording.py # Per-turn cost recording helpers │ │ ├── run_result.py # AgentRunResult outcome model │ │ ├── agent_engine.py # Agent execution engine +│ │ ├── parallel.py # Parallel agent executor (TaskGroup + Semaphore) +│ │ ├── parallel_models.py # AgentAssignment, ParallelExecutionGroup, AgentOutcome, ParallelExecutionResult +│ │ ├── resource_lock.py # ResourceLock protocol + InMemoryResourceLock │ │ ├── shutdown.py # Graceful shutdown strategy & manager │ │ ├── task_engine.py # Task routing & scheduling (M3-M4) │ │ ├── workflow_engine.py # Workflow orchestration (M4) @@ -2376,6 +2381,7 @@ ai-company/ │ │ │ ├── correlation.py # CORRELATION_* constants │ │ │ ├── execution.py # EXECUTION_* constants │ │ │ ├── git.py # GIT_* constants +│ │ │ ├── parallel.py # PARALLEL_* constants │ │ │ ├── personality.py # PERSONALITY_* constants │ │ │ ├── prompt.py # PROMPT_* constants │ │ │ ├── provider.py # PROVIDER_* constants @@ -2522,6 +2528,7 @@ These conventions were established during the M0–M2+ review cycle. **Adopted** | **Shared field groups** | Adopted (M2.5) | Extracted common field sets into base models (e.g. `_SpendingTotals`) | Prevents field duplication across spending summary models. `_SpendingTotals` provides shared aggregation fields; `AgentSpending`, `DepartmentSpending`, `PeriodSpending` extend it. | | **Event constants** | Adopted (per-domain) | Per-domain submodules under `events/` package (e.g. `events.provider`, `events.budget`). Import directly: `from ai_company.observability.events. import CONSTANT` | Split by domain for discoverability, co-location with domain logic, and reduced merge conflicts as constants grow. `__init__.py` serves as package marker with usage documentation; no re-exports. | | **Parallel tool execution** | Adopted (M2.5) | `asyncio.TaskGroup` in `ToolInvoker.invoke_all` with optional `max_concurrency` semaphore | Structured concurrency with proper cancellation semantics. Fatal errors collected via guarded wrapper and re-raised after all tasks complete. | +| **Parallel agent execution** | Adopted (M3) | `ParallelExecutor` coordinates concurrent `AgentEngine.run()` calls via `asyncio.TaskGroup` + optional `Semaphore` concurrency limit + `_run_guarded()` error isolation. `ResourceLock` protocol with `InMemoryResourceLock` for exclusive file-path claims. Progress tracking via `ProgressCallback`. Shutdown-aware via `ShutdownManager` task registration. Fail-fast mode re-raises first failure through `TaskGroup`. | Follows the `ToolInvoker.invoke_all()` pattern (parallel tool execution above). Composition over inheritance — wraps `AgentEngine`. Structured concurrency with proper cancellation. See §6.3 Parallel Execution. | | **Tool permission checking** | Adopted (M3) | `ToolPermissionChecker` enforces category-level gating based on `ToolAccessLevel` (sandboxed → restricted → standard → elevated, plus custom). Priority-based resolution: denied list → allowed list → level categories → deny. Case-insensitive name matching. `ToolInvoker` filters definitions for prompt and checks at invocation time. | Defense-in-depth: agents only see permitted tools in the LLM prompt, and invocations are re-checked at execution time. Explicit allow/deny lists provide per-agent overrides. See §11.1.1. | | **Tool sandboxing** | Adopted (M3, incremental) | File system tools use in-process `PathValidator` for workspace-scoped path validation (symlink resolution + containment check). `BaseFileSystemTool` ABC provides shared `ToolCategory.FILE_SYSTEM` and `PathValidator` integration — all file system tools extend this base. `SandboxBackend` protocol with `SubprocessSandbox` implemented — git tools accept optional `SandboxBackend` injection and delegate subprocess management to it (env filtering, workspace enforcement, timeout + process-group kill). `DockerSandbox` planned for code_runner, terminal, web, and database tools. `K8sSandbox` planned for future container deployments. Config-driven per-category backend selection planned for engine wiring. | File system tools use defence-in-depth path validation; subprocess sandbox provides lightweight isolation for git tools; heavier Docker/K8s isolation reserved for higher-risk tool categories (code execution, network). See §11.1.2. | | **Crash recovery** | Adopted (M3) | Pluggable `RecoveryStrategy` protocol. M3: `FailAndReassignStrategy` (catch at engine boundary, log snapshot, mark FAILED / eligible for reassignment). M4/M5: `CheckpointStrategy` (persist `AgentContext` per turn, resume from last checkpoint). | Immutable `model_copy` pattern makes checkpoint serialization trivial to add later. Fail-and-reassign is sufficient for short MVP tasks. See §6.6. | diff --git a/src/ai_company/engine/parallel.py b/src/ai_company/engine/parallel.py index 741ebf0131..fe672f47ec 100644 --- a/src/ai_company/engine/parallel.py +++ b/src/ai_company/engine/parallel.py @@ -13,6 +13,7 @@ import dataclasses import time from collections.abc import Callable +from contextlib import nullcontext from typing import TYPE_CHECKING from ai_company.engine.errors import ParallelExecutionError, ResourceConflictError @@ -25,11 +26,6 @@ ) from ai_company.engine.resource_lock import InMemoryResourceLock, ResourceLock from ai_company.observability import get_logger - -if TYPE_CHECKING: - from ai_company.engine.agent_engine import AgentEngine - from ai_company.engine.run_result import AgentRunResult - from ai_company.engine.shutdown import ShutdownManager from ai_company.observability.events.parallel import ( PARALLEL_AGENT_COMPLETE, PARALLEL_AGENT_ERROR, @@ -40,10 +36,19 @@ PARALLEL_VALIDATION_ERROR, ) +if TYPE_CHECKING: + from ai_company.engine.agent_engine import AgentEngine + from ai_company.engine.run_result import AgentRunResult + from ai_company.engine.shutdown import ShutdownManager + logger = get_logger(__name__) ProgressCallback = Callable[[ParallelProgress], None] -"""Synchronous callback invoked on progress updates.""" +"""Synchronous callback invoked on progress updates. + +Called directly (not awaited) from the executor's event loop; +must not block. +""" @dataclasses.dataclass @@ -64,7 +69,6 @@ def snapshot(self) -> ParallelProgress: total=self.total, completed=self.completed, in_progress=self.in_progress, - pending=self.total - self.completed - self.in_progress, succeeded=self.succeeded, failed=self.failed, ) @@ -157,22 +161,30 @@ async def execute_group( fatal_errors=fatal_errors, progress=progress, semaphore=semaphore, - lock=lock, ), ) except* Exception as eg: # TaskGroup wraps exceptions in ExceptionGroup when # fail_fast re-raises inside _run_guarded. # Outcomes from completed tasks are already collected. - logger.debug( + exc_summaries = [f"{type(e).__name__}: {e}" for e in eg.exceptions] + logger.info( PARALLEL_GROUP_COMPLETE, group_id=group.group_id, note="TaskGroup exited with exceptions", exception_count=len(eg.exceptions), + exceptions=exc_summaries, ) if lock is not None: - await self._release_all_locks(group, lock) + try: + await self._release_all_locks(group, lock) + except Exception: + logger.exception( + PARALLEL_GROUP_COMPLETE, + error="Failed to release resource locks", + group_id=group.group_id, + ) duration = time.monotonic() - start @@ -218,13 +230,13 @@ async def _run_guarded( # noqa: PLR0913 fatal_errors: list[Exception], progress: _ProgressState, semaphore: asyncio.Semaphore | None, - lock: ResourceLock | None, ) -> None: """Execute a single agent, isolating errors from siblings. Follows the ``ToolInvoker._run_guarded()`` pattern: - ``MemoryError``/``RecursionError`` → collected in fatal_errors - - Regular ``Exception`` → stored as error outcome + - Regular ``Exception`` → stored as error outcome; + re-raised when ``fail_fast`` is enabled - ``BaseException`` → propagates through TaskGroup """ task_id = assignment.task_id @@ -237,18 +249,26 @@ async def _run_guarded( # noqa: PLR0913 progress.in_progress += 1 self._emit_progress(progress) await self._execute_assignment( - assignment, - group, - outcomes, - progress, - semaphore, + assignment=assignment, + group_id=group.group_id, + outcomes=outcomes, + progress=progress, + semaphore=semaphore, ) except (MemoryError, RecursionError) as exc: + error_msg = f"Fatal: {type(exc).__name__}: {exc}" + logger.exception( + PARALLEL_AGENT_ERROR, + group_id=group.group_id, + agent_id=agent_id, + task_id=task_id, + error=error_msg, + ) fatal_errors.append(exc) outcomes[task_id] = AgentOutcome( task_id=task_id, agent_id=agent_id, - error=f"Fatal: {type(exc).__name__}: {exc}", + error=error_msg, ) progress.failed += 1 except Exception as exc: @@ -266,10 +286,6 @@ async def _run_guarded( # noqa: PLR0913 progress.completed += 1 self._emit_progress(progress) - if lock is not None: - for resource in assignment.resource_claims: - await lock.release(resource, agent_id) - if self._shutdown_manager is not None: self._shutdown_manager.unregister_task(task_id) @@ -287,7 +303,13 @@ def _register_with_shutdown( return True try: self._shutdown_manager.register_task(task_id, asyncio_task) - except RuntimeError: + except RuntimeError as exc: + logger.warning( + PARALLEL_AGENT_ERROR, + agent_id=agent_id, + task_id=task_id, + error=f"Failed to register with shutdown manager: {exc}", + ) outcomes[task_id] = AgentOutcome( task_id=task_id, agent_id=agent_id, @@ -298,8 +320,9 @@ def _register_with_shutdown( async def _execute_assignment( self, + *, assignment: AgentAssignment, - group: ParallelExecutionGroup, + group_id: str, outcomes: dict[str, AgentOutcome], progress: _ProgressState, semaphore: asyncio.Semaphore | None, @@ -310,14 +333,13 @@ async def _execute_assignment( logger.info( PARALLEL_AGENT_START, - group_id=group.group_id, + group_id=group_id, agent_id=agent_id, task_id=task_id, ) - if semaphore is not None: - await semaphore.acquire() - try: + ctx = semaphore if semaphore is not None else nullcontext() + async with ctx: run_result: AgentRunResult = await self._engine.run( identity=assignment.identity, task=assignment.task, @@ -334,14 +356,11 @@ async def _execute_assignment( progress.succeeded += 1 logger.info( PARALLEL_AGENT_COMPLETE, - group_id=group.group_id, + group_id=group_id, agent_id=agent_id, task_id=task_id, success=True, ) - finally: - if semaphore is not None: - semaphore.release() def _record_error_outcome( self, @@ -371,10 +390,15 @@ def _resolve_lock( self, group: ParallelExecutionGroup, ) -> ResourceLock | None: - """Return the resource lock to use, if any claims exist.""" + """Return the resource lock to use, or ``None`` if not needed. + + When no assignments declare resource claims, returns ``None`` + (no locking needed). When claims exist, falls back to + ``InMemoryResourceLock()`` if no lock was injected. + """ has_claims = any(a.resource_claims for a in group.assignments) if not has_claims: - return self._resource_lock + return None if self._resource_lock is not None: return self._resource_lock return InMemoryResourceLock() diff --git a/src/ai_company/engine/parallel_models.py b/src/ai_company/engine/parallel_models.py index 0f2c59d2bf..52354228b3 100644 --- a/src/ai_company/engine/parallel_models.py +++ b/src/ai_company/engine/parallel_models.py @@ -5,6 +5,7 @@ """ from collections import Counter +from typing import Self from pydantic import BaseModel, ConfigDict, Field, computed_field, model_validator @@ -54,7 +55,7 @@ class AgentAssignment(BaseModel): default=(), description="Pre-loaded memory messages", ) - resource_claims: tuple[str, ...] = Field( + resource_claims: tuple[NotBlankStr, ...] = Field( default=(), description="File paths requiring exclusive access", ) @@ -105,7 +106,7 @@ class ParallelExecutionGroup(BaseModel): ) @model_validator(mode="after") - def _validate_assignments(self) -> ParallelExecutionGroup: + def _validate_assignments(self) -> Self: if not self.assignments: msg = "assignments must contain at least one entry" raise ValueError(msg) @@ -150,6 +151,13 @@ class AgentOutcome(BaseModel): description="Present if agent crashed before producing result", ) + @model_validator(mode="after") + def _validate_result_or_error(self) -> Self: + if self.result is None and self.error is None: + msg = "Either result or error must be set" + raise ValueError(msg) + return self + @computed_field( # type: ignore[prop-decorator] description="Whether the agent completed successfully", ) @@ -202,7 +210,7 @@ def agents_succeeded(self) -> int: ) @property def agents_failed(self) -> int: - """Count of failed agent outcomes.""" + """Count of non-successful outcomes (includes cancelled).""" return sum(1 for o in self.outcomes if not o.is_success) @computed_field( # type: ignore[prop-decorator] @@ -222,7 +230,7 @@ class ParallelProgress(BaseModel): total: Total number of assignments. completed: Number of assignments finished (success or failure). in_progress: Number of assignments currently running. - pending: Number of assignments not yet started. + pending: Derived: ``total - completed - in_progress`` (clamped >= 0). succeeded: Number of successful completions. failed: Number of failed completions. """ @@ -233,6 +241,13 @@ class ParallelProgress(BaseModel): total: int = Field(ge=0, description="Total assignments") completed: int = Field(ge=0, description="Finished assignments") in_progress: int = Field(ge=0, description="Currently running") - pending: int = Field(ge=0, description="Not yet started") succeeded: int = Field(ge=0, description="Successful completions") failed: int = Field(ge=0, description="Failed completions") + + @computed_field( # type: ignore[prop-decorator] + description="Not yet started", + ) + @property + def pending(self) -> int: + """Assignments not yet started.""" + return max(0, self.total - self.completed - self.in_progress) diff --git a/src/ai_company/engine/resource_lock.py b/src/ai_company/engine/resource_lock.py index c52b05b60a..9ef0ac2b40 100644 --- a/src/ai_company/engine/resource_lock.py +++ b/src/ai_company/engine/resource_lock.py @@ -99,13 +99,21 @@ async def release(self, resource: str, holder: str) -> None: """Release a lock on *resource* if held by *holder*.""" async with self._mutex: current = self._locks.get(resource) - if current is not None and current == holder: + if current == holder: del self._locks[resource] logger.debug( PARALLEL_LOCK_RELEASED, resource=resource, holder=holder, ) + elif current is not None: + logger.warning( + PARALLEL_LOCK_CONFLICT, + resource=resource, + holder=holder, + current_holder=current, + error="Release attempted by non-holder", + ) async def release_all(self, holder: str) -> int: """Release all locks held by *holder*.""" diff --git a/tests/unit/engine/test_parallel.py b/tests/unit/engine/test_parallel.py index 5da367a4df..551094e757 100644 --- a/tests/unit/engine/test_parallel.py +++ b/tests/unit/engine/test_parallel.py @@ -375,6 +375,65 @@ async def test_locks_released_after_execution(self) -> None: assert not lock.is_locked("src/a.py") + async def test_external_lock_holder_raises(self) -> None: + a1 = _make_assignment( + "a1", + "t1", + resource_claims=("src/shared.py",), + ) + engine = _mock_engine() + lock = InMemoryResourceLock() + await lock.acquire("src/shared.py", "external-agent") + executor = ParallelExecutor( + engine=engine, + resource_lock=lock, + ) + group = _make_group(a1) + + with pytest.raises(ResourceConflictError): + await executor.execute_group(group) + + assert lock.holder_of("src/shared.py") == "external-agent" + + async def test_auto_creates_lock_for_claims(self) -> None: + a1 = _make_assignment( + "a1", + "t1", + resource_claims=("src/a.py",), + ) + a2 = _make_assignment( + "a2", + "t2", + resource_claims=("src/b.py",), + ) + r1 = _make_run_result(a1.identity, a1.task) + r2 = _make_run_result(a2.identity, a2.task) + engine = _mock_engine(side_effect=[r1, r2]) + executor = ParallelExecutor(engine=engine) + group = _make_group(a1, a2) + + result = await executor.execute_group(group) + + assert result.all_succeeded is True + + async def test_auto_created_lock_detects_conflicts(self) -> None: + a1 = _make_assignment( + "a1", + "t1", + resource_claims=("src/shared.py",), + ) + a2 = _make_assignment( + "a2", + "t2", + resource_claims=("src/shared.py",), + ) + engine = _mock_engine() + executor = ParallelExecutor(engine=engine) + group = _make_group(a1, a2) + + with pytest.raises(ResourceConflictError): + await executor.execute_group(group) + async def test_locks_released_on_error(self) -> None: a1 = _make_assignment( "a1", @@ -423,6 +482,25 @@ def on_progress(p: ParallelProgress) -> None: assert final.completed == 1 assert final.total == 1 + async def test_progress_callback_exception_swallowed(self) -> None: + a1 = _make_assignment("a1", "t1") + r1 = _make_run_result(a1.identity, a1.task) + engine = _mock_engine(side_effect=[r1]) + + def bad_callback(p: ParallelProgress) -> None: + msg = "callback error" + raise ValueError(msg) + + executor = ParallelExecutor( + engine=engine, + progress_callback=bad_callback, + ) + group = _make_group(a1) + + result = await executor.execute_group(group) + + assert result.all_succeeded is True + async def test_progress_tracks_multiple_agents(self) -> None: a1 = _make_assignment("a1", "t1") a2 = _make_assignment("a2", "t2") @@ -452,6 +530,25 @@ def on_progress(p: ParallelProgress) -> None: class TestParallelExecutorShutdown: """Shutdown manager integration.""" + async def test_shutdown_in_progress_rejected(self) -> None: + a1 = _make_assignment("a1", "t1") + engine = _mock_engine() + sm = ShutdownManager() + sm.register_task = MagicMock( # type: ignore[method-assign] + side_effect=RuntimeError("Shutdown in progress"), + ) + executor = ParallelExecutor( + engine=engine, + shutdown_manager=sm, + ) + group = _make_group(a1) + + result = await executor.execute_group(group) + + assert result.all_succeeded is False + assert result.outcomes[0].error == "Shutdown in progress" + engine.run.assert_not_awaited() + async def test_shutdown_manager_integration(self) -> None: a1 = _make_assignment("a1", "t1") r1 = _make_run_result(a1.identity, a1.task) diff --git a/tests/unit/engine/test_parallel_models.py b/tests/unit/engine/test_parallel_models.py index 052625726e..f1be606451 100644 --- a/tests/unit/engine/test_parallel_models.py +++ b/tests/unit/engine/test_parallel_models.py @@ -14,7 +14,7 @@ TaskType, ) from ai_company.core.task import Task -from ai_company.engine.context import DEFAULT_MAX_TURNS +from ai_company.engine.context import DEFAULT_MAX_TURNS, AgentContext from ai_company.engine.loop_protocol import ( ExecutionResult, TerminationReason, @@ -83,14 +83,12 @@ def _make_assignment( def _make_run_result( - agent_id: str = "agent-1", - task_id: str = "task-1", + identity: AgentIdentity | None = None, + task: Task | None = None, reason: TerminationReason = TerminationReason.COMPLETED, ) -> AgentRunResult: - identity = _make_identity() - task = _make_task() - from ai_company.engine.context import AgentContext - + identity = identity or _make_identity() + task = task or _make_task() ctx = AgentContext.from_identity(identity, task=task) error_msg = "test error" if reason == TerminationReason.ERROR else None execution_result = ExecutionResult( @@ -105,11 +103,11 @@ def _make_run_result( template_version="1.0", estimated_tokens=1, sections=("identity",), - metadata={"agent_id": agent_id}, + metadata={"agent_id": str(identity.id)}, ), duration_seconds=1.0, - agent_id=agent_id, - task_id=task_id, + agent_id=str(identity.id), + task_id=task.id, ) @@ -296,6 +294,10 @@ def test_failed_run_result(self) -> None: assert outcome.is_success is False + def test_both_none_rejected(self) -> None: + with pytest.raises(ValidationError, match="result or error"): + AgentOutcome(task_id="t1", agent_id="a1") + def test_frozen(self) -> None: outcome = AgentOutcome( task_id="t1", @@ -369,8 +371,9 @@ def test_total_cost_usd(self) -> None: total_duration_seconds=3.0, ) - # Cost from error outcomes is 0, cost from success = context cost - assert result.total_cost_usd >= 0.0 + # Error outcome contributes no cost, success contributes its cost + assert o1.result is not None + assert result.total_cost_usd == o1.result.total_cost_usd def test_frozen(self) -> None: result = ParallelExecutionResult( @@ -392,7 +395,6 @@ def test_construction(self) -> None: total=4, completed=2, in_progress=1, - pending=1, succeeded=2, failed=0, ) @@ -400,17 +402,38 @@ def test_construction(self) -> None: assert progress.total == 4 assert progress.completed == 2 assert progress.in_progress == 1 - assert progress.pending == 1 + assert progress.pending == 1 # computed: 4 - 2 - 1 assert progress.succeeded == 2 assert progress.failed == 0 + def test_pending_computed(self) -> None: + progress = ParallelProgress( + group_id="grp", + total=5, + completed=1, + in_progress=2, + succeeded=1, + failed=0, + ) + assert progress.pending == 2 # 5 - 1 - 2 + + def test_pending_clamped_to_zero(self) -> None: + progress = ParallelProgress( + group_id="grp", + total=1, + completed=1, + in_progress=1, + succeeded=1, + failed=0, + ) + assert progress.pending == 0 # max(0, 1 - 1 - 1) + def test_frozen(self) -> None: progress = ParallelProgress( group_id="grp", total=1, completed=0, in_progress=0, - pending=1, succeeded=0, failed=0, ) From 3b040ba0a40df70d28cea3624fba9b1ba8100f71 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Sat, 7 Mar 2026 19:45:11 +0100 Subject: [PATCH 3/5] fix: prevent flaky TaskFactory by pinning empty dependencies Polyfactory could randomly generate a task ID matching a dependency, causing sporadic self-dependency validation failures. Co-Authored-By: Claude Opus 4.6 --- tests/unit/core/conftest.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/core/conftest.py b/tests/unit/core/conftest.py index f9610bc680..703ff4263f 100644 --- a/tests/unit/core/conftest.py +++ b/tests/unit/core/conftest.py @@ -171,6 +171,7 @@ class TaskFactory(ModelFactory[Task]): __model__ = Task status = TaskStatus.CREATED assigned_to = None + dependencies = () deadline = None From e67dc064722745f1fdf78fbf045dd3f5bfc60880 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Sat, 7 Mar 2026 20:10:51 +0100 Subject: [PATCH 4/5] refactor: harden parallel execution with review findings Address 23 review items from local agents (code-reviewer, type-design, silent-failure-hunter, logging-audit, docs-consistency, issue-resolution) and external reviewers (CodeRabbit, Gemini, Copilot, Greptile): - Wrap lock release in try/finally for CancelledError safety - Add CancelledError handler with proper outcome recording - Track progress for shutdown-rejected tasks - Extract _run_task_group, _build_result, _record_fatal_outcome methods - Persist auto-created InMemoryResourceLock on self._resource_lock - Reorder finally: unregister_task before _emit_progress - Add ERROR log before fatal ParallelExecutionError raise - Remove duplicate PARALLEL_GROUP_COMPLETE log from except* block - Add duplicate resource claims validator on AgentAssignment - Enforce mutual exclusivity (result XOR error) on AgentOutcome - Add cross-field count validators on ParallelProgress - Update DESIGN_SPEC.md: add ParallelProgress, plan_parsing.py, company.py events, fix fail_fast wording, add M3 clarification - Add parametrized numeric constraint tests - Add outcome pairing assertions in multi-agent test - Fix fail_fast test to handle CancelledError outcomes Co-Authored-By: Claude Opus 4.6 --- DESIGN_SPEC.md | 8 +- src/ai_company/engine/parallel.py | 216 ++++++++++++++-------- src/ai_company/engine/parallel_models.py | 39 +++- tests/unit/engine/test_parallel.py | 14 +- tests/unit/engine/test_parallel_models.py | 102 ++++++++-- 5 files changed, 279 insertions(+), 100 deletions(-) diff --git a/DESIGN_SPEC.md b/DESIGN_SPEC.md index 8714d02bd2..98f5c1a918 100644 --- a/DESIGN_SPEC.md +++ b/DESIGN_SPEC.md @@ -803,7 +803,7 @@ Task ───┤ ├──▶ Integration ──▶ QA └──▶ Backend Dev ──┘ ``` -> **Current state (M3):** `ParallelExecutor` (in `engine/parallel.py`) implements concurrent agent execution with `asyncio.TaskGroup`, configurable concurrency limits, resource locking for exclusive file access, and error isolation. Models in `engine/parallel_models.py`: `AgentAssignment`, `ParallelExecutionGroup`, `AgentOutcome`, `ParallelExecutionResult`, `ParallelProgress`. +> **Current state (M3):** `ParallelExecutor` (in `engine/parallel.py`) implements concurrent agent execution with `asyncio.TaskGroup`, configurable concurrency limits, resource locking for exclusive file access, error isolation, and progress tracking. While M3 primarily targets single-agent execution, parallel coordination is implemented here as prerequisite infrastructure for M4 multi-agent workflows. Models in `engine/parallel_models.py`: `AgentAssignment`, `ParallelExecutionGroup`, `AgentOutcome`, `ParallelExecutionResult`, `ParallelProgress`. #### Kanban Board @@ -2337,13 +2337,14 @@ ai-company/ │ │ ├── react_loop.py # ReAct loop implementation │ │ ├── plan_models.py # Plan step, plan, and plan-execute config models │ │ ├── plan_execute_loop.py # Plan-and-Execute loop implementation +│ │ ├── plan_parsing.py # Plan extraction from LLM responses (JSON + text fallback) │ │ ├── loop_helpers.py # Shared stateless helpers for all loop implementations │ │ ├── recovery.py # Crash recovery strategies (RecoveryStrategy protocol) │ │ ├── cost_recording.py # Per-turn cost recording helpers │ │ ├── run_result.py # AgentRunResult outcome model │ │ ├── agent_engine.py # Agent execution engine │ │ ├── parallel.py # Parallel agent executor (TaskGroup + Semaphore) -│ │ ├── parallel_models.py # AgentAssignment, ParallelExecutionGroup, AgentOutcome, ParallelExecutionResult +│ │ ├── parallel_models.py # AgentAssignment, ParallelExecutionGroup, AgentOutcome, ParallelExecutionResult, ParallelProgress │ │ ├── resource_lock.py # ResourceLock protocol + InMemoryResourceLock │ │ ├── shutdown.py # Graceful shutdown strategy & manager │ │ ├── task_engine.py # Task routing & scheduling (M3-M4) @@ -2376,6 +2377,7 @@ ai-company/ │ │ ├── events/ # Per-domain event constants │ │ │ ├── __init__.py # Package marker with usage docs; no re-exports │ │ │ ├── budget.py # BUDGET_* constants +│ │ │ ├── company.py # COMPANY_* constants │ │ │ ├── communication.py # COMM_* constants │ │ │ ├── config.py # CONFIG_* constants │ │ │ ├── correlation.py # CORRELATION_* constants @@ -2528,7 +2530,7 @@ These conventions were established during the M0–M2+ review cycle. **Adopted** | **Shared field groups** | Adopted (M2.5) | Extracted common field sets into base models (e.g. `_SpendingTotals`) | Prevents field duplication across spending summary models. `_SpendingTotals` provides shared aggregation fields; `AgentSpending`, `DepartmentSpending`, `PeriodSpending` extend it. | | **Event constants** | Adopted (per-domain) | Per-domain submodules under `events/` package (e.g. `events.provider`, `events.budget`). Import directly: `from ai_company.observability.events. import CONSTANT` | Split by domain for discoverability, co-location with domain logic, and reduced merge conflicts as constants grow. `__init__.py` serves as package marker with usage documentation; no re-exports. | | **Parallel tool execution** | Adopted (M2.5) | `asyncio.TaskGroup` in `ToolInvoker.invoke_all` with optional `max_concurrency` semaphore | Structured concurrency with proper cancellation semantics. Fatal errors collected via guarded wrapper and re-raised after all tasks complete. | -| **Parallel agent execution** | Adopted (M3) | `ParallelExecutor` coordinates concurrent `AgentEngine.run()` calls via `asyncio.TaskGroup` + optional `Semaphore` concurrency limit + `_run_guarded()` error isolation. `ResourceLock` protocol with `InMemoryResourceLock` for exclusive file-path claims. Progress tracking via `ProgressCallback`. Shutdown-aware via `ShutdownManager` task registration. Fail-fast mode re-raises first failure through `TaskGroup`. | Follows the `ToolInvoker.invoke_all()` pattern (parallel tool execution above). Composition over inheritance — wraps `AgentEngine`. Structured concurrency with proper cancellation. See §6.3 Parallel Execution. | +| **Parallel agent execution** | Adopted (M3) | `ParallelExecutor` coordinates concurrent `AgentEngine.run()` calls via `asyncio.TaskGroup` + optional `Semaphore` concurrency limit + `_run_guarded()` error isolation. `ResourceLock` protocol with `InMemoryResourceLock` for exclusive file-path claims. Progress tracking via `ProgressCallback`. Shutdown-aware via `ShutdownManager` task registration. Fail-fast mode cancels sibling tasks on first failure; all errors are surfaced via `ParallelExecutionResult` outcomes. | Follows the `ToolInvoker.invoke_all()` pattern (parallel tool execution above). Composition over inheritance — wraps `AgentEngine`. Structured concurrency with proper cancellation. See §6.3 Parallel Execution. | | **Tool permission checking** | Adopted (M3) | `ToolPermissionChecker` enforces category-level gating based on `ToolAccessLevel` (sandboxed → restricted → standard → elevated, plus custom). Priority-based resolution: denied list → allowed list → level categories → deny. Case-insensitive name matching. `ToolInvoker` filters definitions for prompt and checks at invocation time. | Defense-in-depth: agents only see permitted tools in the LLM prompt, and invocations are re-checked at execution time. Explicit allow/deny lists provide per-agent overrides. See §11.1.1. | | **Tool sandboxing** | Adopted (M3, incremental) | File system tools use in-process `PathValidator` for workspace-scoped path validation (symlink resolution + containment check). `BaseFileSystemTool` ABC provides shared `ToolCategory.FILE_SYSTEM` and `PathValidator` integration — all file system tools extend this base. `SandboxBackend` protocol with `SubprocessSandbox` implemented — git tools accept optional `SandboxBackend` injection and delegate subprocess management to it (env filtering, workspace enforcement, timeout + process-group kill). `DockerSandbox` planned for code_runner, terminal, web, and database tools. `K8sSandbox` planned for future container deployments. Config-driven per-category backend selection planned for engine wiring. | File system tools use defence-in-depth path validation; subprocess sandbox provides lightweight isolation for git tools; heavier Docker/K8s isolation reserved for higher-risk tool categories (code execution, network). See §11.1.2. | | **Crash recovery** | Adopted (M3) | Pluggable `RecoveryStrategy` protocol. M3: `FailAndReassignStrategy` (catch at engine boundary, log snapshot, mark FAILED / eligible for reassignment). M4/M5: `CheckpointStrategy` (persist `AgentContext` per turn, resume from last checkpoint). | Immutable `model_copy` pattern makes checkpoint serialization trivial to add later. Fail-and-reassign is sufficient for short MVP tasks. See §6.6. | diff --git a/src/ai_company/engine/parallel.py b/src/ai_company/engine/parallel.py index fe672f47ec..b37c74232a 100644 --- a/src/ai_company/engine/parallel.py +++ b/src/ai_company/engine/parallel.py @@ -47,7 +47,7 @@ """Synchronous callback invoked on progress updates. Called directly (not awaited) from the executor's event loop; -must not block. +must not block. Async functions will produce un-awaited coroutines. """ @@ -137,12 +137,6 @@ async def execute_group( if lock is not None: await self._acquire_all_locks(group, lock) - semaphore = ( - asyncio.Semaphore(group.max_concurrency) - if group.max_concurrency is not None - else None - ) - outcomes: dict[str, AgentOutcome] = {} fatal_errors: list[Exception] = [] progress = _ProgressState( @@ -151,57 +145,27 @@ async def execute_group( ) try: - async with asyncio.TaskGroup() as tg: - for assignment in group.assignments: - tg.create_task( - self._run_guarded( - assignment=assignment, - group=group, - outcomes=outcomes, - fatal_errors=fatal_errors, - progress=progress, - semaphore=semaphore, - ), - ) - except* Exception as eg: - # TaskGroup wraps exceptions in ExceptionGroup when - # fail_fast re-raises inside _run_guarded. - # Outcomes from completed tasks are already collected. - exc_summaries = [f"{type(e).__name__}: {e}" for e in eg.exceptions] - logger.info( - PARALLEL_GROUP_COMPLETE, - group_id=group.group_id, - note="TaskGroup exited with exceptions", - exception_count=len(eg.exceptions), - exceptions=exc_summaries, + await self._run_task_group( + group, + outcomes, + fatal_errors, + progress, ) + finally: + if lock is not None: + try: + await self._release_all_locks(group, lock) + except Exception: + logger.exception( + PARALLEL_GROUP_COMPLETE, + error="Failed to release resource locks", + group_id=group.group_id, + ) - if lock is not None: - try: - await self._release_all_locks(group, lock) - except Exception: - logger.exception( - PARALLEL_GROUP_COMPLETE, - error="Failed to release resource locks", - group_id=group.group_id, - ) - - duration = time.monotonic() - start - - result = ParallelExecutionResult( - group_id=group.group_id, - outcomes=tuple( - outcomes.get( - a.task_id, - AgentOutcome( - task_id=a.task_id, - agent_id=a.agent_id, - error="Cancelled due to fail_fast", - ), - ) - for a in group.assignments - ), - total_duration_seconds=duration, + result = self._build_result( + group, + outcomes, + time.monotonic() - start, ) logger.info( @@ -209,7 +173,7 @@ async def execute_group( group_id=group.group_id, succeeded=result.agents_succeeded, failed=result.agents_failed, - duration_seconds=duration, + duration_seconds=result.total_duration_seconds, ) if fatal_errors: @@ -217,10 +181,49 @@ async def execute_group( f"Parallel group {group.group_id!r} had " f"{len(fatal_errors)} fatal error(s)" ) + logger.error( + PARALLEL_AGENT_ERROR, + group_id=group.group_id, + fatal_error_count=len(fatal_errors), + error=msg, + ) raise ParallelExecutionError(msg) from fatal_errors[0] return result + async def _run_task_group( + self, + group: ParallelExecutionGroup, + outcomes: dict[str, AgentOutcome], + fatal_errors: list[Exception], + progress: _ProgressState, + ) -> None: + """Run all assignments via TaskGroup.""" + semaphore = ( + asyncio.Semaphore(group.max_concurrency) + if group.max_concurrency is not None + else None + ) + try: + async with asyncio.TaskGroup() as tg: + for assignment in group.assignments: + tg.create_task( + self._run_guarded( + assignment=assignment, + group=group, + outcomes=outcomes, + fatal_errors=fatal_errors, + progress=progress, + semaphore=semaphore, + ), + ) + except* Exception: # noqa: S110 + # TaskGroup wraps exceptions in ExceptionGroup when + # fail_fast re-raises inside _run_guarded. + # Outcomes from completed tasks are already collected; + # individual errors logged in _record_error_outcome. + pass + async def _run_guarded( # noqa: PLR0913 self, *, @@ -237,12 +240,16 @@ async def _run_guarded( # noqa: PLR0913 - ``MemoryError``/``RecursionError`` → collected in fatal_errors - Regular ``Exception`` → stored as error outcome; re-raised when ``fail_fast`` is enabled + - ``CancelledError`` → stored as cancelled outcome, re-raised - ``BaseException`` → propagates through TaskGroup """ task_id = assignment.task_id agent_id = assignment.agent_id if not self._register_with_shutdown(task_id, agent_id, outcomes): + progress.completed += 1 + progress.failed += 1 + self._emit_progress(progress) return try: @@ -256,21 +263,14 @@ async def _run_guarded( # noqa: PLR0913 semaphore=semaphore, ) except (MemoryError, RecursionError) as exc: - error_msg = f"Fatal: {type(exc).__name__}: {exc}" - logger.exception( - PARALLEL_AGENT_ERROR, - group_id=group.group_id, - agent_id=agent_id, - task_id=task_id, - error=error_msg, - ) - fatal_errors.append(exc) - outcomes[task_id] = AgentOutcome( - task_id=task_id, - agent_id=agent_id, - error=error_msg, + self._record_fatal_outcome( + exc, + assignment, + group, + outcomes, + fatal_errors, + progress, ) - progress.failed += 1 except Exception as exc: self._record_error_outcome( exc, @@ -281,21 +281,34 @@ async def _run_guarded( # noqa: PLR0913 ) if group.fail_fast: raise + except asyncio.CancelledError: + outcomes[task_id] = AgentOutcome( + task_id=task_id, + agent_id=agent_id, + error="Cancelled", + ) + progress.failed += 1 + raise finally: progress.in_progress = max(0, progress.in_progress - 1) progress.completed += 1 - self._emit_progress(progress) if self._shutdown_manager is not None: self._shutdown_manager.unregister_task(task_id) + self._emit_progress(progress) + def _register_with_shutdown( self, task_id: str, agent_id: str, outcomes: dict[str, AgentOutcome], ) -> bool: - """Register with shutdown manager. Returns False if shutdown.""" + """Register with shutdown manager. + + Returns ``False`` and records an error outcome if shutdown + is already in progress. + """ if self._shutdown_manager is None: return True asyncio_task = asyncio.current_task() @@ -327,7 +340,7 @@ async def _execute_assignment( progress: _ProgressState, semaphore: asyncio.Semaphore | None, ) -> None: - """Run engine.run() with optional semaphore.""" + """Run ``engine.run()`` under optional semaphore and record outcome.""" task_id = assignment.task_id agent_id = assignment.agent_id @@ -386,6 +399,55 @@ def _record_error_outcome( error=error_msg, ) + def _record_fatal_outcome( # noqa: PLR0913 + self, + exc: BaseException, + assignment: AgentAssignment, + group: ParallelExecutionGroup, + outcomes: dict[str, AgentOutcome], + fatal_errors: list[Exception], + progress: _ProgressState, + ) -> None: + """Record a fatal error outcome (MemoryError/RecursionError).""" + error_msg = f"Fatal: {type(exc).__name__}: {exc}" + logger.exception( + PARALLEL_AGENT_ERROR, + group_id=group.group_id, + agent_id=assignment.agent_id, + task_id=assignment.task_id, + error=error_msg, + ) + fatal_errors.append(exc) # type: ignore[arg-type] + outcomes[assignment.task_id] = AgentOutcome( + task_id=assignment.task_id, + agent_id=assignment.agent_id, + error=error_msg, + ) + progress.failed += 1 + + def _build_result( + self, + group: ParallelExecutionGroup, + outcomes: dict[str, AgentOutcome], + duration: float, + ) -> ParallelExecutionResult: + """Build execution result, filling cancelled outcomes.""" + return ParallelExecutionResult( + group_id=group.group_id, + outcomes=tuple( + outcomes.get( + a.task_id, + AgentOutcome( + task_id=a.task_id, + agent_id=a.agent_id, + error="Cancelled due to fail_fast", + ), + ) + for a in group.assignments + ), + total_duration_seconds=duration, + ) + def _resolve_lock( self, group: ParallelExecutionGroup, @@ -394,14 +456,15 @@ def _resolve_lock( When no assignments declare resource claims, returns ``None`` (no locking needed). When claims exist, falls back to - ``InMemoryResourceLock()`` if no lock was injected. + a shared ``InMemoryResourceLock()`` if no lock was injected. """ has_claims = any(a.resource_claims for a in group.assignments) if not has_claims: return None if self._resource_lock is not None: return self._resource_lock - return InMemoryResourceLock() + self._resource_lock = InMemoryResourceLock() + return self._resource_lock def _validate_resource_claims( self, @@ -450,7 +513,6 @@ async def _acquire_all_locks( group_id=group.group_id, error=msg, ) - # Release any locks already acquired for this group await self._release_all_locks(group, lock) raise ResourceConflictError(msg) diff --git a/src/ai_company/engine/parallel_models.py b/src/ai_company/engine/parallel_models.py index 52354228b3..c066edafe2 100644 --- a/src/ai_company/engine/parallel_models.py +++ b/src/ai_company/engine/parallel_models.py @@ -30,7 +30,7 @@ class AgentAssignment(BaseModel): max_turns: Maximum execution turns. timeout_seconds: Optional wall-clock timeout for this agent. memory_messages: Pre-loaded memory messages for the agent. - resource_claims: File paths requiring exclusive access. + resource_claims: File paths requiring exclusive access (unique). """ model_config = ConfigDict(frozen=True) @@ -57,9 +57,21 @@ class AgentAssignment(BaseModel): ) resource_claims: tuple[NotBlankStr, ...] = Field( default=(), - description="File paths requiring exclusive access", + description="File paths requiring exclusive access (unique)", ) + @model_validator(mode="after") + def _validate_resource_claims_unique(self) -> Self: + if len(self.resource_claims) != len(set(self.resource_claims)): + dupes = sorted( + r + for r in set(self.resource_claims) + if self.resource_claims.count(r) > 1 + ) + msg = f"Duplicate resource claims: {dupes}" + raise ValueError(msg) + return self + @computed_field( # type: ignore[prop-decorator] description="Agent identifier string", ) @@ -134,8 +146,9 @@ class AgentOutcome(BaseModel): Attributes: task_id: Task identifier. agent_id: Agent identifier. - result: Present if execution produced a result. - error: Present if the agent crashed before producing a result. + result: Present if execution completed (success or failure). + error: Present if the agent failed, was cancelled, or could + not execute. Mutually exclusive with ``result``. """ model_config = ConfigDict(frozen=True) @@ -144,17 +157,17 @@ class AgentOutcome(BaseModel): agent_id: NotBlankStr = Field(description="Agent identifier") result: AgentRunResult | None = Field( default=None, - description="Present if execution produced a result", + description="Present if execution completed", ) error: str | None = Field( default=None, - description="Present if agent crashed before producing result", + description="Present if agent failed, was cancelled, or could not execute", ) @model_validator(mode="after") def _validate_result_or_error(self) -> Self: - if self.result is None and self.error is None: - msg = "Either result or error must be set" + if (self.result is None) == (self.error is None): + msg = "Exactly one of result or error must be set" raise ValueError(msg) return self @@ -244,6 +257,16 @@ class ParallelProgress(BaseModel): succeeded: int = Field(ge=0, description="Successful completions") failed: int = Field(ge=0, description="Failed completions") + @model_validator(mode="after") + def _validate_counts(self) -> Self: + if self.completed + self.in_progress > self.total: + msg = "completed + in_progress must not exceed total" + raise ValueError(msg) + if self.succeeded + self.failed > self.completed: + msg = "succeeded + failed must not exceed completed" + raise ValueError(msg) + return self + @computed_field( # type: ignore[prop-decorator] description="Not yet started", ) diff --git a/tests/unit/engine/test_parallel.py b/tests/unit/engine/test_parallel.py index 551094e757..8c700404d6 100644 --- a/tests/unit/engine/test_parallel.py +++ b/tests/unit/engine/test_parallel.py @@ -201,6 +201,10 @@ async def test_two_agents_both_succeed(self) -> None: assert result.agents_failed == 0 assert result.all_succeeded is True assert engine.run.await_count == 2 + # Verify outcome pairing + outcome_pairs = sorted((o.agent_id, o.task_id) for o in result.outcomes) + expected_pairs = sorted((str(a.identity.id), a.task.id) for a in (a1, a2)) + assert outcome_pairs == expected_pairs async def test_one_fails_one_succeeds(self) -> None: a1 = _make_assignment("a1", "t1") @@ -300,7 +304,7 @@ async def side_effect(**kwargs: object) -> AgentRunResult: assert result.agents_failed >= 1 # Some outcomes should be cancellation errors cancel_outcomes = [ - o for o in result.outcomes if o.error and "fail_fast" in o.error + o for o in result.outcomes if o.error and "cancel" in o.error.lower() ] assert len(cancel_outcomes) >= 1 @@ -537,9 +541,11 @@ async def test_shutdown_in_progress_rejected(self) -> None: sm.register_task = MagicMock( # type: ignore[method-assign] side_effect=RuntimeError("Shutdown in progress"), ) + progress_updates: list[ParallelProgress] = [] executor = ParallelExecutor( engine=engine, shutdown_manager=sm, + progress_callback=progress_updates.append, ) group = _make_group(a1) @@ -548,6 +554,12 @@ async def test_shutdown_in_progress_rejected(self) -> None: assert result.all_succeeded is False assert result.outcomes[0].error == "Shutdown in progress" engine.run.assert_not_awaited() + # Progress must be tracked even for rejected tasks + assert len(progress_updates) >= 1 + final = progress_updates[-1] + assert final.completed == 1 + assert final.failed == 1 + assert final.pending == 0 async def test_shutdown_manager_integration(self) -> None: a1 = _make_assignment("a1", "t1") diff --git a/tests/unit/engine/test_parallel_models.py b/tests/unit/engine/test_parallel_models.py index f1be606451..f0f2d2cd34 100644 --- a/tests/unit/engine/test_parallel_models.py +++ b/tests/unit/engine/test_parallel_models.py @@ -151,6 +151,32 @@ def test_frozen(self) -> None: with pytest.raises(ValidationError): assignment.max_turns = 10 # type: ignore[misc] + @pytest.mark.parametrize( + ("field", "value"), + [ + ("max_turns", 0), + ("max_turns", -1), + ("timeout_seconds", 0), + ("timeout_seconds", -5.0), + ], + ) + def test_invalid_numeric_constraints( + self, + field: str, + value: object, + ) -> None: + with pytest.raises(ValidationError): + _make_assignment(**{field: value}) + + def test_duplicate_resource_claims_rejected(self) -> None: + with pytest.raises( + ValidationError, + match=r"[Dd]uplicate.*resource", + ): + _make_assignment( + resource_claims=("src/a.py", "src/a.py"), + ) + def test_agent_id_property(self) -> None: assignment = _make_assignment() assert assignment.agent_id == str(assignment.identity.id) @@ -295,9 +321,25 @@ def test_failed_run_result(self) -> None: assert outcome.is_success is False def test_both_none_rejected(self) -> None: - with pytest.raises(ValidationError, match="result or error"): + with pytest.raises( + ValidationError, + match="Exactly one of result or error", + ): AgentOutcome(task_id="t1", agent_id="a1") + def test_both_set_rejected(self) -> None: + result = _make_run_result() + with pytest.raises( + ValidationError, + match="Exactly one of result or error", + ): + AgentOutcome( + task_id="t1", + agent_id="a1", + result=result, + error="also has error", + ) + def test_frozen(self) -> None: outcome = AgentOutcome( task_id="t1", @@ -417,16 +459,54 @@ def test_pending_computed(self) -> None: ) assert progress.pending == 2 # 5 - 1 - 2 - def test_pending_clamped_to_zero(self) -> None: - progress = ParallelProgress( - group_id="grp", - total=1, - completed=1, - in_progress=1, - succeeded=1, - failed=0, - ) - assert progress.pending == 0 # max(0, 1 - 1 - 1) + def test_completed_plus_in_progress_exceeds_total_rejected( + self, + ) -> None: + with pytest.raises( + ValidationError, + match=r"completed.*in_progress.*must not exceed total", + ): + ParallelProgress( + group_id="grp", + total=1, + completed=1, + in_progress=1, + succeeded=1, + failed=0, + ) + + def test_succeeded_plus_failed_exceeds_completed_rejected( + self, + ) -> None: + with pytest.raises( + ValidationError, + match=r"succeeded.*failed.*must not exceed completed", + ): + ParallelProgress( + group_id="grp", + total=5, + completed=1, + in_progress=0, + succeeded=1, + failed=1, + ) + + @pytest.mark.parametrize( + "field", + ["total", "completed", "in_progress", "succeeded", "failed"], + ) + def test_negative_values_rejected(self, field: str) -> None: + kwargs: dict[str, object] = { + "group_id": "grp", + "total": 1, + "completed": 0, + "in_progress": 0, + "succeeded": 0, + "failed": 0, + } + kwargs[field] = -1 + with pytest.raises(ValidationError): + ParallelProgress(**kwargs) # type: ignore[arg-type] def test_frozen(self) -> None: progress = ParallelProgress( From d06f71b1a9057cf187413e3da4d949aaf1cd563a Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Sat, 7 Mar 2026 20:32:42 +0100 Subject: [PATCH 5/5] fix: address round-2 review findings from CodeRabbit and Greptile - Move lock acquisition inside try/finally to prevent leak on partial acquire - Check run_result.is_success before counting as succeeded in progress - Use execution-scoped lock holder (group_id:task_id) for cross-group safety - Change lock-release failure event to PARALLEL_VALIDATION_ERROR - Narrow _record_fatal_outcome exc type to MemoryError | RecursionError - Scope auto-created lock locally instead of mutating self - Add result.task_id/agent_id matching in AgentOutcome validator - Make _make_assignment positional-only (fixes mypy arg-type) - Fix AgentOutcome test fixture IDs to match _make_run_result Co-Authored-By: Claude Opus 4.6 --- src/ai_company/engine/parallel.py | 35 +++++---- src/ai_company/engine/parallel_models.py | 13 ++++ tests/unit/engine/test_parallel_models.py | 87 +++++++++++++++++------ 3 files changed, 98 insertions(+), 37 deletions(-) diff --git a/src/ai_company/engine/parallel.py b/src/ai_company/engine/parallel.py index b37c74232a..fadc23a34b 100644 --- a/src/ai_company/engine/parallel.py +++ b/src/ai_company/engine/parallel.py @@ -134,9 +134,6 @@ async def execute_group( lock = self._resolve_lock(group) self._validate_resource_claims(group) - if lock is not None: - await self._acquire_all_locks(group, lock) - outcomes: dict[str, AgentOutcome] = {} fatal_errors: list[Exception] = [] progress = _ProgressState( @@ -145,6 +142,8 @@ async def execute_group( ) try: + if lock is not None: + await self._acquire_all_locks(group, lock) await self._run_task_group( group, outcomes, @@ -157,7 +156,7 @@ async def execute_group( await self._release_all_locks(group, lock) except Exception: logger.exception( - PARALLEL_GROUP_COMPLETE, + PARALLEL_VALIDATION_ERROR, error="Failed to release resource locks", group_id=group.group_id, ) @@ -366,13 +365,17 @@ async def _execute_assignment( agent_id=agent_id, result=run_result, ) - progress.succeeded += 1 + success = run_result.is_success + if success: + progress.succeeded += 1 + else: + progress.failed += 1 logger.info( PARALLEL_AGENT_COMPLETE, group_id=group_id, agent_id=agent_id, task_id=task_id, - success=True, + success=success, ) def _record_error_outcome( @@ -401,7 +404,7 @@ def _record_error_outcome( def _record_fatal_outcome( # noqa: PLR0913 self, - exc: BaseException, + exc: MemoryError | RecursionError, assignment: AgentAssignment, group: ParallelExecutionGroup, outcomes: dict[str, AgentOutcome], @@ -417,7 +420,7 @@ def _record_fatal_outcome( # noqa: PLR0913 task_id=assignment.task_id, error=error_msg, ) - fatal_errors.append(exc) # type: ignore[arg-type] + fatal_errors.append(exc) outcomes[assignment.task_id] = AgentOutcome( task_id=assignment.task_id, agent_id=assignment.agent_id, @@ -463,8 +466,7 @@ def _resolve_lock( return None if self._resource_lock is not None: return self._resource_lock - self._resource_lock = InMemoryResourceLock() - return self._resource_lock + return InMemoryResourceLock() def _validate_resource_claims( self, @@ -500,14 +502,18 @@ async def _acquire_all_locks( ) -> None: """Acquire resource locks for all assignments.""" for assignment in group.assignments: + holder_id = f"{group.group_id}:{assignment.task_id}" for resource in assignment.resource_claims: acquired = await lock.acquire( resource, - assignment.agent_id, + holder_id, ) if not acquired: - holder = lock.holder_of(resource) - msg = f"Failed to acquire lock on {resource!r}: held by {holder!r}" + current_holder = lock.holder_of(resource) + msg = ( + f"Failed to acquire lock on {resource!r}: " + f"held by {current_holder!r}" + ) logger.warning( PARALLEL_VALIDATION_ERROR, group_id=group.group_id, @@ -523,7 +529,8 @@ async def _release_all_locks( ) -> None: """Release all resource locks for all assignments.""" for assignment in group.assignments: - await lock.release_all(assignment.agent_id) + holder_id = f"{group.group_id}:{assignment.task_id}" + await lock.release_all(holder_id) def _emit_progress(self, state: _ProgressState) -> None: """Emit a progress update via the callback, if configured.""" diff --git a/src/ai_company/engine/parallel_models.py b/src/ai_company/engine/parallel_models.py index c066edafe2..7312303e62 100644 --- a/src/ai_company/engine/parallel_models.py +++ b/src/ai_company/engine/parallel_models.py @@ -169,6 +169,19 @@ def _validate_result_or_error(self) -> Self: if (self.result is None) == (self.error is None): msg = "Exactly one of result or error must be set" raise ValueError(msg) + if self.result is not None: + if self.result.task_id != self.task_id: + msg = ( + f"result.task_id {self.result.task_id!r} " + f"must match task_id {self.task_id!r}" + ) + raise ValueError(msg) + if self.result.agent_id != self.agent_id: + msg = ( + f"result.agent_id {self.result.agent_id!r} " + f"must match agent_id {self.agent_id!r}" + ) + raise ValueError(msg) return self @computed_field( # type: ignore[prop-decorator] diff --git a/tests/unit/engine/test_parallel_models.py b/tests/unit/engine/test_parallel_models.py index f0f2d2cd34..59c4b73de6 100644 --- a/tests/unit/engine/test_parallel_models.py +++ b/tests/unit/engine/test_parallel_models.py @@ -73,6 +73,7 @@ def _make_task( def _make_assignment( name: str = "agent", title: str = "task", + /, **kwargs: object, ) -> AgentAssignment: return AgentAssignment( @@ -286,10 +287,12 @@ class TestAgentOutcome: """AgentOutcome frozen model.""" def test_success_outcome(self) -> None: - result = _make_run_result() + identity = _make_identity() + task = _make_task() + result = _make_run_result(identity, task) outcome = AgentOutcome( - task_id="t1", - agent_id="a1", + task_id=task.id, + agent_id=str(identity.id), result=result, ) @@ -309,17 +312,45 @@ def test_error_outcome(self) -> None: assert outcome.is_success is False def test_failed_run_result(self) -> None: - result = _make_run_result( - reason=TerminationReason.ERROR, - ) + identity = _make_identity() + task = _make_task() + result = _make_run_result(identity, task, reason=TerminationReason.ERROR) outcome = AgentOutcome( - task_id="t1", - agent_id="a1", + task_id=task.id, + agent_id=str(identity.id), result=result, ) assert outcome.is_success is False + def test_mismatched_task_id_rejected(self) -> None: + identity = _make_identity() + task = _make_task() + result = _make_run_result(identity, task) + with pytest.raises( + ValidationError, + match=r"result\.task_id.*must match task_id", + ): + AgentOutcome( + task_id="wrong-task-id", + agent_id=str(identity.id), + result=result, + ) + + def test_mismatched_agent_id_rejected(self) -> None: + identity = _make_identity() + task = _make_task() + result = _make_run_result(identity, task) + with pytest.raises( + ValidationError, + match=r"result\.agent_id.*must match agent_id", + ): + AgentOutcome( + task_id=task.id, + agent_id="wrong-agent-id", + result=result, + ) + def test_both_none_rejected(self) -> None: with pytest.raises( ValidationError, @@ -328,14 +359,16 @@ def test_both_none_rejected(self) -> None: AgentOutcome(task_id="t1", agent_id="a1") def test_both_set_rejected(self) -> None: - result = _make_run_result() + identity = _make_identity() + task = _make_task() + result = _make_run_result(identity, task) with pytest.raises( ValidationError, match="Exactly one of result or error", ): AgentOutcome( - task_id="t1", - agent_id="a1", + task_id=task.id, + agent_id=str(identity.id), result=result, error="also has error", ) @@ -355,15 +388,19 @@ class TestParallelExecutionResult: """ParallelExecutionResult frozen model with computed fields.""" def test_all_succeeded(self) -> None: + i1, t1 = _make_identity("a1"), _make_task("t1") + i2, t2 = _make_identity("a2"), _make_task("t2") + r1 = _make_run_result(i1, t1) + r2 = _make_run_result(i2, t2) o1 = AgentOutcome( - task_id="t1", - agent_id="a1", - result=_make_run_result(), + task_id=t1.id, + agent_id=str(i1.id), + result=r1, ) o2 = AgentOutcome( - task_id="t2", - agent_id="a2", - result=_make_run_result(), + task_id=t2.id, + agent_id=str(i2.id), + result=r2, ) result = ParallelExecutionResult( group_id="grp", @@ -376,10 +413,12 @@ def test_all_succeeded(self) -> None: assert result.all_succeeded is True def test_partial_failure(self) -> None: + i1, t1 = _make_identity("a1"), _make_task("t1") + r1 = _make_run_result(i1, t1) o1 = AgentOutcome( - task_id="t1", - agent_id="a1", - result=_make_run_result(), + task_id=t1.id, + agent_id=str(i1.id), + result=r1, ) o2 = AgentOutcome( task_id="t2", @@ -397,10 +436,12 @@ def test_partial_failure(self) -> None: assert result.all_succeeded is False def test_total_cost_usd(self) -> None: + i1, t1 = _make_identity("a1"), _make_task("t1") + r1 = _make_run_result(i1, t1) o1 = AgentOutcome( - task_id="t1", - agent_id="a1", - result=_make_run_result(), + task_id=t1.id, + agent_id=str(i1.id), + result=r1, ) o2 = AgentOutcome( task_id="t2",