diff --git a/.claude/skills/aurelio-review-pr/SKILL.md b/.claude/skills/aurelio-review-pr/SKILL.md index 3d6f2fb75a..0066693ecb 100644 --- a/.claude/skills/aurelio-review-pr/SKILL.md +++ b/.claude/skills/aurelio-review-pr/SKILL.md @@ -250,37 +250,46 @@ Collect all findings with their severity/confidence scores. ## Phase 4: Fetch external reviewer feedback -Fetch from three GitHub API sources **in parallel** using `gh api`: +**CRITICAL: Fetch ALL reviewers — do NOT filter by known bot names.** The set of external reviewers varies per repo and can include any combination of bots (CodeRabbit, Gemini, Copilot, Greptile, etc.) and human reviewers. Always fetch unfiltered results and categorize by author from the response. + +**CRITICAL: Wait for all bots to finish processing.** Before triaging, check if any bot reviewer is still processing (e.g. CodeRabbit's "Currently processing" placeholder, or a review with an empty body). If a bot appears to still be processing: +1. Poll every 30 seconds for up to 3 minutes (6 checks) +2. If still not ready after 3 minutes, proceed without it and mark its coverage as "pending" in the triage table +3. After implementing fixes and pushing, re-check for the bot's feedback in Phase 9 + +Fetch from three GitHub API sources **in parallel** using `gh api` — **always unfiltered** (no `select(.user.login == ...)` filtering): 1. **Review submissions** (top-level review bodies): ```bash - gh api repos/OWNER/REPO/pulls/NUMBER/reviews --paginate + gh api repos/OWNER/REPO/pulls/NUMBER/reviews --paginate --jq '.[] | {author: .user.login, state: .state, body: (.body // "")}' ``` - Extract: author, state, body. + Extract: author, state, body. List ALL unique authors to identify every reviewer. **CRITICAL: Parse review bodies for outside-diff-range comments.** Some reviewers (e.g. CodeRabbit) embed actionable comments inside `
` blocks in the review body when the affected lines are outside the PR's diff range. Look for patterns like "Outside diff range comments (N)" and extract each embedded comment's file path, line range, severity, and description. These are just as important as inline comments — do NOT skip them. 2. **Inline review comments** (comments on specific lines): ```bash - gh api repos/OWNER/REPO/pulls/NUMBER/comments --paginate + gh api repos/OWNER/REPO/pulls/NUMBER/comments --paginate --jq '.[] | {author: .user.login, path: .path, line: .line, body: (.body // "")}' ``` - Extract: author, file path, line number, body. + Extract: author, file path, line number, body. **Include ALL authors.** 3. **Issue-level comments** (general PR comments, e.g. CodeRabbit walkthrough): ```bash - gh api repos/OWNER/REPO/issues/NUMBER/comments --paginate + gh api repos/OWNER/REPO/issues/NUMBER/comments --paginate --jq '.[] | {author: .user.login, body: (.body // "")}' ``` - Extract: author, body (look for actionable items, not just summaries). + Extract: author, body (look for actionable items, not just summaries). **Include ALL authors.** + +After fetching, **enumerate all unique external reviewers** found across all three sources and report the list to the user before triaging. This ensures no reviewer is accidentally missed. -**Important:** Use `gh api` with `--jq` for filtering. Keep it simple and robust — no complex Python scripts to parse JSON. +**Important:** Use `gh api` with `--jq` for filtering fields only (not filtering authors). Keep it simple and robust — no complex Python scripts to parse JSON. -**Important:** When review bodies are large (e.g. CodeRabbit's review with embedded outside-diff comments), fetch the **full body** without truncation. Use `head -c` with a generous limit (e.g. 15000 chars) rather than `--jq '.body[0:500]'` truncation. Outside-diff comments are typically at the top of the review body. +**Important:** When review bodies are large (e.g. CodeRabbit's review with embedded outside-diff comments), fetch the **full body** without truncation. Outside-diff comments are typically at the top of the review body. ## Phase 5: Consolidate and triage diff --git a/CLAUDE.md b/CLAUDE.md index 38e95ab841..bf36e2414b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -48,7 +48,7 @@ src/ai_company/ communication/ # Inter-agent message bus and channels config/ # YAML company config loading and validation core/ # Shared domain models and base classes - engine/ # Agent execution engine and task lifecycle + engine/ # Agent orchestration, execution loops, and task lifecycle memory/ # Persistent agent memory (memory layer TBD) observability/ # Structured logging, correlation tracking, log sinks providers/ # LLM provider abstraction (LiteLLM adapter) diff --git a/DESIGN_SPEC.md b/DESIGN_SPEC.md index eb875b351d..76f3a04ffa 100644 --- a/DESIGN_SPEC.md +++ b/DESIGN_SPEC.md @@ -151,7 +151,7 @@ Every agent has a comprehensive identity. At the design level, agent data splits - **Config (immutable)**: identity, personality, skills, model preferences, tool permissions, authority. Defined at hire time, changed only by explicit reconfiguration. Represented as frozen Pydantic models. - **Runtime state (mutable-via-copy)**: current status, active task, conversation history, execution metrics. Evolves during agent operation. Represented as Pydantic models using `model_copy(update=...)` for state transitions — never mutated in place. -> **Current state (M2):** Only the config layer exists as `AgentIdentity` (frozen Pydantic model in `core/agent.py`). The runtime state layer will be introduced in M3 when the agent execution engine is implemented. All identifier/name fields use `NotBlankStr` (from `core.types`) for automatic whitespace rejection; optional identifier fields use `NotBlankStr | None`; tuple fields use `tuple[NotBlankStr, ...]` for per-element validation. +> **Current state (M3):** Both layers are implemented. Config layer: `AgentIdentity` (frozen, in `core/agent.py`). Runtime state layer: `TaskExecution`, `AgentContext`, `AgentContextSnapshot` (frozen + `model_copy`, in `engine/`). `AgentEngine` orchestrates execution via `run()`. All identifier/name fields use `NotBlankStr` (from `core.types`) for automatic whitespace rejection; optional identifier fields use `NotBlankStr | None`; tuple fields use `tuple[NotBlankStr, ...]` for per-element validation. ```yaml # --- Current (M2): Config layer — AgentIdentity (frozen) --- @@ -911,6 +911,38 @@ hybrid: > **Auto-selection (optional):** When `execution_loop: "auto"`, the framework selects the loop based on `estimated_complexity`: simple → ReAct, medium → Plan-and-Execute, complex/epic → Hybrid. Configurable via `auto_loop_rules` — a mapping of complexity thresholds to loop implementations (e.g., `{simple_max_tokens: 500, medium_max_tokens: 3000}` with corresponding loop assignments). +#### AgentEngine Orchestrator + +`AgentEngine` (in `engine/agent_engine.py`) is the top-level entry point for running an agent on a task. It composes the execution loop with prompt construction, context management, tool invocation, and cost tracking into a single `run()` call. + +**`async run(identity, task, completion_config?, max_turns?, memory_messages?) -> AgentRunResult`** + +Pipeline steps: + +1. **Validate inputs** — agent must be `ACTIVE`, task must be `ASSIGNED` or `IN_PROGRESS`. Raises `ExecutionStateError` on violation. +2. **Build system prompt** — calls `build_system_prompt()` with agent identity, task, and available tool definitions. +3. **Create context** — `AgentContext.from_identity()` with the configured `max_turns`. +4. **Seed conversation** — injects system prompt, optional memory messages, and formatted task instruction as initial messages. +5. **Transition task** — `ASSIGNED` → `IN_PROGRESS` (pass-through if already `IN_PROGRESS`). +6. **Prepare tools and budget** — creates `ToolInvoker` from registry and `BudgetChecker` from task budget limit. +7. **Delegate to loop** — calls `ExecutionLoop.execute()` with context, provider, tool invoker, budget checker, and completion config. +8. **Record costs** — records accumulated `TokenUsage` to `CostTracker` (if available). Cost recording failures are logged but do not affect the result. +9. **Return result** — wraps `ExecutionResult` in `AgentRunResult` with engine-level metadata. + +Error handling: `MemoryError` and `RecursionError` propagate unconditionally. All other exceptions are caught and wrapped in an `AgentRunResult` with `TerminationReason.ERROR`. + +Constructor accepts: `provider` (required), `execution_loop` (defaults to `ReactLoop`), `tool_registry`, `cost_tracker`. The `run()` method also accepts `memory_messages` — optional working memory to inject between the system prompt and task instruction (memory retrieval is M5; the engine provides the injection hook). + +Logs structured events under the `execution.engine.*` namespace (10 constants in `events/execution.py`): creation, start, prompt built, completion, errors, invalid input, task transitions, and cost recording outcomes. + +**`AgentRunResult`** — frozen Pydantic model wrapping `ExecutionResult` with engine metadata: + +- `execution_result` — outcome from the execution loop +- `system_prompt` — the `SystemPrompt` used for this run +- `duration_seconds` — wall-clock run time +- `agent_id`, `task_id` — identifiers +- Computed fields: `termination_reason`, `total_turns`, `total_cost_usd`, `is_success` + ### 6.6 Agent Crash Recovery When an agent execution fails unexpectedly (unhandled exception, OOM, process kill), the framework needs a recovery mechanism. Recovery strategies are implemented behind a `RecoveryStrategy` protocol, making the system pluggable — new strategies can be added without modifying existing ones. @@ -2114,7 +2146,7 @@ ai-company/ │ │ ├── artifact.py # Produced work items │ │ ├── role.py # Role model │ │ └── role_catalog.py # Role catalog -│ ├── engine/ # Core engines (M3+) +│ ├── engine/ # Agent orchestration, execution loops, and task lifecycle │ │ ├── errors.py # Engine error hierarchy │ │ ├── prompt.py # System prompt builder │ │ ├── prompt_template.py # System prompt Jinja2 templates @@ -2122,7 +2154,8 @@ ai-company/ │ │ ├── context.py # AgentContext + AgentContextSnapshot │ │ ├── loop_protocol.py # ExecutionLoop protocol + result models │ │ ├── react_loop.py # ReAct loop implementation -│ │ ├── agent_engine.py # Agent execution engine (M3) +│ │ ├── run_result.py # AgentRunResult outcome model +│ │ ├── agent_engine.py # Agent execution engine │ │ ├── task_engine.py # Task routing & scheduling (M3-M4) │ │ ├── workflow_engine.py # Workflow orchestration (M4) │ │ ├── meeting_engine.py # Meeting coordination (M4) diff --git a/src/ai_company/budget/tracker.py b/src/ai_company/budget/tracker.py index c0ac49df49..4a0b98e31d 100644 --- a/src/ai_company/budget/tracker.py +++ b/src/ai_company/budget/tracker.py @@ -301,6 +301,8 @@ def _resolve_department(self, agent_id: str) -> str | None: return None try: return self._department_resolver(agent_id) + except MemoryError, RecursionError: + raise except Exception as exc: logger.warning( BUDGET_DEPARTMENT_RESOLVE_FAILED, diff --git a/src/ai_company/engine/__init__.py b/src/ai_company/engine/__init__.py index 25df804937..7e4c6a775a 100644 --- a/src/ai_company/engine/__init__.py +++ b/src/ai_company/engine/__init__.py @@ -1,9 +1,11 @@ """Agent execution engine. -Re-exports the public API for system prompt construction, -runtime execution state, execution loops, and engine errors. +Re-exports the public API for the agent orchestrator, run results, +system prompt construction, runtime execution state, execution loops, +and engine errors. """ +from ai_company.engine.agent_engine import AgentEngine from ai_company.engine.context import ( DEFAULT_MAX_TURNS, AgentContext, @@ -31,6 +33,7 @@ build_system_prompt, ) from ai_company.engine.react_loop import ReactLoop +from ai_company.engine.run_result import AgentRunResult from ai_company.engine.task_execution import StatusTransition, TaskExecution from ai_company.providers.models import ZERO_TOKEN_USAGE, add_token_usage @@ -39,6 +42,8 @@ "ZERO_TOKEN_USAGE", "AgentContext", "AgentContextSnapshot", + "AgentEngine", + "AgentRunResult", "BudgetChecker", "BudgetExhaustedError", "DefaultTokenEstimator", diff --git a/src/ai_company/engine/agent_engine.py b/src/ai_company/engine/agent_engine.py new file mode 100644 index 0000000000..5700359af1 --- /dev/null +++ b/src/ai_company/engine/agent_engine.py @@ -0,0 +1,586 @@ +"""Agent engine — top-level orchestrator. + +Ties together prompt construction, execution context, execution loop, +tool invocation, and budget tracking into a single ``run()`` entry point. +""" + +import time +from datetime import UTC, datetime +from typing import TYPE_CHECKING + +from ai_company.budget.cost_record import CostRecord +from ai_company.core.enums import AgentStatus, TaskStatus +from ai_company.engine.context import DEFAULT_MAX_TURNS, AgentContext +from ai_company.engine.errors import ExecutionStateError +from ai_company.engine.loop_protocol import ( + ExecutionResult, + TerminationReason, + TurnRecord, +) +from ai_company.engine.prompt import SystemPrompt, build_system_prompt +from ai_company.engine.react_loop import ReactLoop +from ai_company.engine.run_result import AgentRunResult +from ai_company.observability import get_logger +from ai_company.observability.events.execution import ( + EXECUTION_ENGINE_COMPLETE, + EXECUTION_ENGINE_COST_FAILED, + EXECUTION_ENGINE_COST_RECORDED, + EXECUTION_ENGINE_COST_SKIPPED, + EXECUTION_ENGINE_CREATED, + EXECUTION_ENGINE_ERROR, + EXECUTION_ENGINE_INVALID_INPUT, + EXECUTION_ENGINE_PROMPT_BUILT, + EXECUTION_ENGINE_START, + EXECUTION_ENGINE_TASK_TRANSITION, +) +from ai_company.providers.enums import MessageRole +from ai_company.providers.models import ChatMessage +from ai_company.tools.invoker import ToolInvoker + +if TYPE_CHECKING: + from ai_company.budget.tracker import CostTracker + from ai_company.core.agent import AgentIdentity + from ai_company.core.task import Task + from ai_company.engine.loop_protocol import BudgetChecker, ExecutionLoop + from ai_company.providers.models import CompletionConfig, ToolDefinition + from ai_company.providers.protocol import CompletionProvider + from ai_company.tools.registry import ToolRegistry + +logger = get_logger(__name__) + +_EXECUTABLE_STATUSES = frozenset({TaskStatus.ASSIGNED, TaskStatus.IN_PROGRESS}) +"""Task statuses the engine will accept for execution. + +CREATED tasks lack an assignee; terminal statuses (COMPLETED, CANCELLED) +and BLOCKED/IN_REVIEW are not executable. +""" + + +class AgentEngine: + """Top-level orchestrator for agent execution. + + Builds the system prompt, creates an execution context, delegates + to the configured ``ExecutionLoop``, and returns an ``AgentRunResult`` + with full metadata. + + Args: + provider: LLM completion provider (required). + execution_loop: Loop implementation. Defaults to ``ReactLoop()``. + tool_registry: Optional tools available to the agent. + cost_tracker: Optional cost recording service. When ``None``, + cost recording is skipped silently. + """ + + def __init__( + self, + *, + provider: CompletionProvider, + execution_loop: ExecutionLoop | None = None, + tool_registry: ToolRegistry | None = None, + cost_tracker: CostTracker | None = None, + ) -> None: + self._provider = provider + self._loop: ExecutionLoop = execution_loop or ReactLoop() + self._tool_registry = tool_registry + self._cost_tracker = cost_tracker + logger.debug( + EXECUTION_ENGINE_CREATED, + loop_type=self._loop.get_loop_type(), + has_tool_registry=self._tool_registry is not None, + has_cost_tracker=self._cost_tracker is not None, + ) + + async def run( + self, + *, + identity: AgentIdentity, + task: Task, + completion_config: CompletionConfig | None = None, + max_turns: int = DEFAULT_MAX_TURNS, + memory_messages: tuple[ChatMessage, ...] = (), + ) -> AgentRunResult: + """Execute an agent on a task. + + Args: + identity: Frozen agent identity card. + task: Task to execute (must be ASSIGNED or IN_PROGRESS). + completion_config: Optional per-run LLM config override. + max_turns: Maximum LLM turns allowed (must be >= 1). + memory_messages: Optional working memory messages to inject + between the system prompt and task instruction. + + Returns: + ``AgentRunResult`` with execution outcome and metadata. + All exceptions during execution (other than those listed + below) are caught and returned as an error result rather + than propagated. + + Raises: + ExecutionStateError: If pre-flight validation fails (agent + not ACTIVE or task not ASSIGNED/IN_PROGRESS). + ValueError: If ``max_turns`` is less than 1. + MemoryError: Re-raised unconditionally (non-recoverable). + RecursionError: Re-raised unconditionally (non-recoverable). + """ + agent_id = str(identity.id) + task_id = task.id + + if max_turns < 1: + msg = f"max_turns must be >= 1, got {max_turns}" + logger.warning( + EXECUTION_ENGINE_INVALID_INPUT, + agent_id=agent_id, + task_id=task_id, + reason=msg, + ) + raise ValueError(msg) + + self._validate_agent(identity, agent_id) + self._validate_task(task, agent_id, task_id) + + logger.info( + EXECUTION_ENGINE_START, + agent_id=agent_id, + task_id=task_id, + loop_type=self._loop.get_loop_type(), + max_turns=max_turns, + ) + + start = time.monotonic() + ctx: AgentContext | None = None + system_prompt: SystemPrompt | None = None + try: + ctx, system_prompt = self._prepare_context( + identity=identity, + task=task, + agent_id=agent_id, + task_id=task_id, + max_turns=max_turns, + memory_messages=memory_messages, + ) + return await self._execute( + identity=identity, + task=task, + agent_id=agent_id, + task_id=task_id, + completion_config=completion_config, + ctx=ctx, + system_prompt=system_prompt, + start=start, + ) + except MemoryError, RecursionError: + logger.error( + EXECUTION_ENGINE_ERROR, + agent_id=agent_id, + task_id=task_id, + error="non-recoverable error in run()", + exc_info=True, + ) + raise + except Exception as exc: + return self._handle_fatal_error( + exc=exc, + identity=identity, + task=task, + agent_id=agent_id, + task_id=task_id, + duration_seconds=time.monotonic() - start, + ctx=ctx, + system_prompt=system_prompt, + ) + + async def _execute( # noqa: PLR0913 + self, + *, + identity: AgentIdentity, + task: Task, + agent_id: str, + task_id: str, + completion_config: CompletionConfig | None, + ctx: AgentContext, + system_prompt: SystemPrompt, + start: float, + ) -> AgentRunResult: + """Run the execution loop, record costs, and build result.""" + budget_checker = _make_budget_checker(task) + tool_invoker = self._make_tool_invoker() + + logger.debug( + EXECUTION_ENGINE_PROMPT_BUILT, + agent_id=agent_id, + task_id=task_id, + estimated_tokens=system_prompt.estimated_tokens, + ) + + execution_result = await self._loop.execute( + context=ctx, + provider=self._provider, + tool_invoker=tool_invoker, + budget_checker=budget_checker, + completion_config=completion_config, + ) + duration = time.monotonic() - start + + await self._record_costs(execution_result, identity, agent_id, task_id) + + result = AgentRunResult( + execution_result=execution_result, + system_prompt=system_prompt, + duration_seconds=duration, + agent_id=agent_id, + task_id=task_id, + ) + self._log_completion(result, execution_result, agent_id, task_id, duration) + return result + + # ── Setup ──────────────────────────────────────────────────── + + def _prepare_context( # noqa: PLR0913 + self, + *, + identity: AgentIdentity, + task: Task, + agent_id: str, + task_id: str, + max_turns: int, + memory_messages: tuple[ChatMessage, ...], + ) -> tuple[AgentContext, SystemPrompt]: + """Build system prompt and prepare execution context.""" + tool_defs = self._get_tool_definitions() + system_prompt = build_system_prompt( + agent=identity, + task=task, + available_tools=tool_defs, + ) + + ctx = AgentContext.from_identity( + identity, + task=task, + max_turns=max_turns, + ) + ctx = ctx.with_message( + ChatMessage(role=MessageRole.SYSTEM, content=system_prompt.content), + ) + for msg in memory_messages: + ctx = ctx.with_message(msg) + ctx = ctx.with_message( + ChatMessage( + role=MessageRole.USER, + content=_format_task_instruction(task), + ), + ) + + ctx = self._transition_task_if_needed(ctx, agent_id, task_id) + return ctx, system_prompt + + # ── Validation ─────────────────────────────────────────────── + + def _validate_agent(self, identity: AgentIdentity, agent_id: str) -> None: + """Raise if agent is not ACTIVE.""" + if identity.status != AgentStatus.ACTIVE: + msg = ( + f"Agent {agent_id} has status {identity.status.value!r}; " + f"only 'active' agents can run tasks" + ) + logger.warning( + EXECUTION_ENGINE_INVALID_INPUT, + agent_id=agent_id, + reason=msg, + ) + raise ExecutionStateError(msg) + + def _validate_task( + self, + task: Task, + agent_id: str, + task_id: str, + ) -> None: + """Raise if task is not in an executable status.""" + if task.status not in _EXECUTABLE_STATUSES: + msg = ( + f"Task {task_id!r} has status {task.status.value!r}; " + f"only 'assigned' or 'in_progress' tasks can be executed" + ) + logger.warning( + EXECUTION_ENGINE_INVALID_INPUT, + agent_id=agent_id, + task_id=task_id, + reason=msg, + ) + raise ExecutionStateError(msg) + + # ── Helpers ────────────────────────────────────────────────── + + def _get_tool_definitions(self) -> tuple[ToolDefinition, ...]: + """Extract tool definitions from the registry for prompt building.""" + if self._tool_registry is None: + return () + return self._tool_registry.to_definitions() + + def _transition_task_if_needed( + self, + ctx: AgentContext, + agent_id: str, + task_id: str, + ) -> AgentContext: + """Transition ASSIGNED -> IN_PROGRESS; pass through IN_PROGRESS.""" + if ( + ctx.task_execution is not None + and ctx.task_execution.status == TaskStatus.ASSIGNED + ): + ctx = ctx.with_task_transition( + TaskStatus.IN_PROGRESS, + reason="Engine starting execution", + ) + logger.info( + EXECUTION_ENGINE_TASK_TRANSITION, + agent_id=agent_id, + task_id=task_id, + from_status=TaskStatus.ASSIGNED.value, + to_status=TaskStatus.IN_PROGRESS.value, + ) + return ctx + + def _make_tool_invoker(self) -> ToolInvoker | None: + """Create a ToolInvoker from the registry, or None.""" + if self._tool_registry is None: + return None + return ToolInvoker(self._tool_registry) + + def _log_completion( + self, + result: AgentRunResult, + execution_result: ExecutionResult, + agent_id: str, + task_id: str, + duration: float, + ) -> None: + """Log structured completion event for the finished run.""" + logger.info( + EXECUTION_ENGINE_COMPLETE, + agent_id=agent_id, + task_id=task_id, + termination_reason=result.termination_reason.value, + total_turns=result.total_turns, + total_tokens=execution_result.context.accumulated_cost.total_tokens, + duration_seconds=duration, + cost_usd=result.total_cost_usd, + ) + + async def _record_costs( + self, + result: ExecutionResult, + identity: AgentIdentity, + agent_id: str, + task_id: str, + ) -> None: + """Record per-turn costs to the CostTracker if available. + + Each turn produces its own ``CostRecord``, preserving per-call + granularity. Turns with zero cost and zero tokens are skipped. + + Recording failures for regular exceptions are logged but do not + affect the execution result. ``MemoryError`` and + ``RecursionError`` propagate unconditionally as non-recoverable + system errors. + """ + if self._cost_tracker is None: + logger.debug( + EXECUTION_ENGINE_COST_SKIPPED, + agent_id=agent_id, + task_id=task_id, + reason="no cost tracker configured", + ) + return + + tracker = self._cost_tracker + + for turn in result.turns: + # Skip only when provably nothing happened (zero cost and + # zero tokens); a turn with tokens but zero cost (e.g., a + # free-tier provider) is still recorded. + if ( + turn.cost_usd <= 0.0 + and turn.input_tokens == 0 + and turn.output_tokens == 0 + ): + logger.debug( + EXECUTION_ENGINE_COST_SKIPPED, + agent_id=agent_id, + task_id=task_id, + turn_number=turn.turn_number, + reason="zero cost and zero tokens", + ) + continue + + record = CostRecord( + agent_id=agent_id, + task_id=task_id, + provider=identity.model.provider, + model=identity.model.model_id, + input_tokens=turn.input_tokens, + output_tokens=turn.output_tokens, + cost_usd=turn.cost_usd, + timestamp=datetime.now(UTC), + ) + await self._submit_cost( + record, + turn, + agent_id, + task_id, + tracker=tracker, + ) + + async def _submit_cost( + self, + record: CostRecord, + turn: TurnRecord, + agent_id: str, + task_id: str, + *, + tracker: CostTracker, + ) -> None: + """Submit a cost record to the tracker, logging failures.""" + try: + await tracker.record(record) + except MemoryError, RecursionError: + logger.error( + EXECUTION_ENGINE_COST_FAILED, + agent_id=agent_id, + task_id=task_id, + error="non-recoverable error in cost recording", + exc_info=True, + ) + raise + except Exception as exc: + logger.exception( + EXECUTION_ENGINE_COST_FAILED, + agent_id=agent_id, + task_id=task_id, + error=f"{type(exc).__name__}: {exc}", + cost_usd=turn.cost_usd, + input_tokens=turn.input_tokens, + output_tokens=turn.output_tokens, + ) + return + + logger.info( + EXECUTION_ENGINE_COST_RECORDED, + agent_id=agent_id, + task_id=task_id, + cost_usd=turn.cost_usd, + ) + + def _handle_fatal_error( # noqa: PLR0913 + self, + *, + exc: Exception, + identity: AgentIdentity, + task: Task, + agent_id: str, + task_id: str, + duration_seconds: float, + ctx: AgentContext | None = None, + system_prompt: SystemPrompt | None = None, + ) -> AgentRunResult: + """Build an error ``AgentRunResult`` when the execution pipeline fails. + + When ``ctx`` and ``system_prompt`` are provided (i.e. context + preparation succeeded before the failure), they are preserved in + the error result so that accumulated state (conversation, task + transition) is not lost. + + If constructing the error result itself fails, the original + exception is re-raised so it is never silently lost. + """ + error_msg = f"{type(exc).__name__}: {exc}" + logger.exception( + EXECUTION_ENGINE_ERROR, + agent_id=agent_id, + task_id=task_id, + error=error_msg, + ) + + try: + error_ctx = ctx or AgentContext.from_identity(identity, task=task) + error_execution = ExecutionResult( + context=error_ctx, + termination_reason=TerminationReason.ERROR, + error_message=error_msg, + ) + error_prompt = system_prompt or SystemPrompt( + content="", + template_version="error", + estimated_tokens=0, + sections=(), + metadata={ + "agent_id": agent_id, + "name": identity.name, + "role": identity.role, + "department": identity.department, + "level": identity.level.value, + }, + ) + return AgentRunResult( + execution_result=error_execution, + system_prompt=error_prompt, + duration_seconds=duration_seconds, + agent_id=agent_id, + task_id=task_id, + ) + except MemoryError, RecursionError: + logger.error( + EXECUTION_ENGINE_ERROR, + agent_id=agent_id, + task_id=task_id, + error="non-recoverable error while building error result", + exc_info=True, + ) + raise + except Exception as build_exc: + logger.exception( + EXECUTION_ENGINE_ERROR, + agent_id=agent_id, + task_id=task_id, + error=f"Failed to build error result: {build_exc}", + original_error=error_msg, + ) + raise exc from None + + +def _format_task_instruction(task: Task) -> str: + """Format a task into a user message for the initial conversation.""" + parts = [f"# Task: {task.title}", "", task.description] + + if task.acceptance_criteria: + parts.append("") + parts.append("## Acceptance Criteria") + parts.extend(f"- {c.description}" for c in task.acceptance_criteria) + + if task.budget_limit > 0: + parts.append("") + parts.append(f"**Budget limit:** ${task.budget_limit:.2f} USD") + + if task.deadline: + parts.append("") + parts.append(f"**Deadline:** {task.deadline}") + + return "\n".join(parts) + + +def _make_budget_checker(task: Task) -> BudgetChecker | None: + """Create a budget checker if the task has a positive budget limit. + + The returned callable returns ``True`` when accumulated cost meets + or exceeds the limit (budget exhausted), ``False`` otherwise. + Returns ``None`` when there is no positive budget limit. + """ + if task.budget_limit <= 0: + return None + + limit = task.budget_limit + + def _check(ctx: AgentContext) -> bool: + return ctx.accumulated_cost.cost_usd >= limit + + return _check diff --git a/src/ai_company/engine/prompt.py b/src/ai_company/engine/prompt.py index 2bb797493f..2b90495de7 100644 --- a/src/ai_company/engine/prompt.py +++ b/src/ai_company/engine/prompt.py @@ -7,8 +7,8 @@ from ai_company.engine.prompt import build_system_prompt - prompt = build_system_prompt(agent=agent_identity) - print(prompt.content) + prompt = build_system_prompt(agent=agent_identity, task=task) + prompt.content # rendered system prompt string """ from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable @@ -217,6 +217,15 @@ def build_system_prompt( # noqa: PLR0913 ) except PromptBuildError: raise # Already logged by inner functions. + except MemoryError, RecursionError: + logger.error( + PROMPT_BUILD_ERROR, + agent_id=str(agent.id), + agent_name=agent.name, + error="non-recoverable error building prompt", + exc_info=True, + ) + raise except Exception as exc: logger.exception( PROMPT_BUILD_ERROR, diff --git a/src/ai_company/engine/run_result.py b/src/ai_company/engine/run_result.py new file mode 100644 index 0000000000..426bdb2d0f --- /dev/null +++ b/src/ai_company/engine/run_result.py @@ -0,0 +1,84 @@ +"""Agent run result model. + +Frozen Pydantic model wrapping ``ExecutionResult`` with outer metadata +from the engine layer (system prompt, wall-clock duration, agent/task IDs). +""" + +from pydantic import BaseModel, ConfigDict, Field, computed_field + +from ai_company.core.types import NotBlankStr # noqa: TC001 +from ai_company.engine.loop_protocol import ( + ExecutionResult, + TerminationReason, +) +from ai_company.engine.prompt import SystemPrompt # noqa: TC001 + + +class AgentRunResult(BaseModel): + """Immutable result of a complete agent engine run. + + Wraps the ``ExecutionResult`` from the loop with engine-level + metadata: system prompt, wall-clock duration, and agent/task IDs. + + Attributes: + execution_result: Outcome from the execution loop. + system_prompt: System prompt used for this run. + duration_seconds: Wall-clock run time in seconds. + agent_id: Agent identifier (string form of UUID). + task_id: Task identifier (always set currently; ``None`` + reserved for future taskless runs). + """ + + model_config = ConfigDict(frozen=True) + + execution_result: ExecutionResult = Field( + description="Outcome from the execution loop", + ) + system_prompt: SystemPrompt = Field( + description="System prompt used for this run", + ) + duration_seconds: float = Field( + ge=0.0, + description="Wall-clock run time in seconds", + ) + agent_id: NotBlankStr = Field(description="Agent identifier") + task_id: NotBlankStr | None = Field( + default=None, + description="Task identifier, or None for future taskless runs", + ) + + # mypy does not yet model Pydantic's @computed_field + @property + # combination correctly; the ignores are safe — Pydantic enforces + # the return type at runtime. + + @computed_field( # type: ignore[prop-decorator] + description="Why the execution terminated", + ) + @property + def termination_reason(self) -> TerminationReason: + """Why the execution loop terminated.""" + return self.execution_result.termination_reason + + @computed_field( # type: ignore[prop-decorator] + description="Total LLM turns completed", + ) + @property + def total_turns(self) -> int: + """Number of turns completed during execution.""" + return len(self.execution_result.turns) + + @computed_field( # type: ignore[prop-decorator] + description="Total cost in USD", + ) + @property + def total_cost_usd(self) -> float: + """Accumulated cost from the execution context.""" + return self.execution_result.context.accumulated_cost.cost_usd + + @computed_field( # type: ignore[prop-decorator] + description="Whether the run completed successfully", + ) + @property + def is_success(self) -> bool: + """True when termination reason is COMPLETED.""" + return self.termination_reason == TerminationReason.COMPLETED diff --git a/src/ai_company/observability/events/execution.py b/src/ai_company/observability/events/execution.py index 74e27e763a..ab3977221d 100644 --- a/src/ai_company/observability/events/execution.py +++ b/src/ai_company/observability/events/execution.py @@ -21,3 +21,14 @@ EXECUTION_LOOP_TERMINATED: Final[str] = "execution.loop.terminated" EXECUTION_LOOP_BUDGET_EXHAUSTED: Final[str] = "execution.loop.budget_exhausted" EXECUTION_LOOP_ERROR: Final[str] = "execution.loop.error" + +EXECUTION_ENGINE_CREATED: Final[str] = "execution.engine.created" +EXECUTION_ENGINE_START: Final[str] = "execution.engine.start" +EXECUTION_ENGINE_PROMPT_BUILT: Final[str] = "execution.engine.prompt_built" +EXECUTION_ENGINE_COMPLETE: Final[str] = "execution.engine.complete" +EXECUTION_ENGINE_ERROR: Final[str] = "execution.engine.error" +EXECUTION_ENGINE_INVALID_INPUT: Final[str] = "execution.engine.invalid_input" +EXECUTION_ENGINE_TASK_TRANSITION: Final[str] = "execution.engine.task_transition" +EXECUTION_ENGINE_COST_RECORDED: Final[str] = "execution.engine.cost_recorded" +EXECUTION_ENGINE_COST_SKIPPED: Final[str] = "execution.engine.cost_skipped" +EXECUTION_ENGINE_COST_FAILED: Final[str] = "execution.engine.cost_failed" diff --git a/tests/integration/engine/__init__.py b/tests/integration/engine/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/integration/engine/test_agent_engine_integration.py b/tests/integration/engine/test_agent_engine_integration.py new file mode 100644 index 0000000000..878a11403b --- /dev/null +++ b/tests/integration/engine/test_agent_engine_integration.py @@ -0,0 +1,208 @@ +"""Integration test: AgentEngine -> ReactLoop -> tool calls -> result. + +Demonstrates the full execution pipeline with a real ToolRegistry, +real ReactLoop, and a mock provider that returns tool calls. +""" + +from datetime import date +from typing import TYPE_CHECKING, Any +from uuid import uuid4 + +import pytest + +from ai_company.core.agent import AgentIdentity, ModelConfig, PersonalityConfig +from ai_company.core.enums import ( + Priority, + SeniorityLevel, + TaskStatus, + TaskType, +) +from ai_company.core.task import Task +from ai_company.engine.agent_engine import AgentEngine +from ai_company.engine.loop_protocol import TerminationReason +from ai_company.providers.enums import FinishReason +from ai_company.providers.models import ( + ChatMessage, + CompletionConfig, + CompletionResponse, + StreamChunk, + TokenUsage, + ToolCall, + ToolDefinition, +) +from ai_company.tools.base import BaseTool, ToolExecutionResult +from ai_company.tools.registry import ToolRegistry + +if TYPE_CHECKING: + from collections.abc import AsyncIterator + + from ai_company.providers.capabilities import ModelCapabilities + +pytestmark = [pytest.mark.integration, pytest.mark.timeout(30)] + + +class UppercaseTool(BaseTool): + """Test tool that uppercases input text.""" + + async def execute(self, *, arguments: dict[str, Any]) -> ToolExecutionResult: + """Uppercase the 'text' argument.""" + text = arguments.get("text", "") + return ToolExecutionResult(content=text.upper()) + + +class _ToolCallingProvider: + """Mock provider that issues a tool call on the first turn. + + Turn 1: Returns a tool call for the 'uppercase' tool. + Turn 2: Returns a final text response incorporating the tool result. + """ + + def __init__(self) -> None: + self._call_count = 0 + + async def complete( + self, + messages: list[ChatMessage], + model: str, + *, + tools: list[ToolDefinition] | None = None, + config: CompletionConfig | None = None, + ) -> CompletionResponse: + """Return tool call on turn 1, text response on turn 2.""" + self._call_count += 1 + + if self._call_count == 1: + return CompletionResponse( + content="", + finish_reason=FinishReason.TOOL_USE, + usage=TokenUsage( + input_tokens=50, + output_tokens=20, + cost_usd=0.005, + ), + model="test-model-001", + tool_calls=( + ToolCall( + id="call-001", + name="uppercase", + arguments={"text": "hello world"}, + ), + ), + ) + + return CompletionResponse( + content="The uppercased text is: HELLO WORLD", + finish_reason=FinishReason.STOP, + usage=TokenUsage( + input_tokens=80, + output_tokens=30, + cost_usd=0.008, + ), + model="test-model-001", + ) + + async def stream( + self, + messages: list[ChatMessage], + model: str, + *, + tools: list[ToolDefinition] | None = None, + config: CompletionConfig | None = None, + ) -> AsyncIterator[StreamChunk]: + """Not implemented for this test.""" + msg = "stream not supported" + raise NotImplementedError(msg) + + async def get_model_capabilities(self, model: str) -> ModelCapabilities: + """Return minimal capabilities.""" + from ai_company.providers.capabilities import ModelCapabilities + + return ModelCapabilities( + model_id=model, + provider="test-provider", + supports_tools=True, + supports_streaming=False, + max_context_tokens=8192, + max_output_tokens=4096, + cost_per_1k_input=0.01, + cost_per_1k_output=0.03, + ) + + +class TestAgentEngineToolCallIntegration: + """Full pipeline: AgentEngine -> ReactLoop -> tool execution -> result.""" + + async def test_full_tool_call_loop(self) -> None: + """Agent makes a tool call, gets result, produces final answer.""" + identity = AgentIdentity( + id=uuid4(), + name="Test Agent", + role="Developer", + department="Engineering", + level=SeniorityLevel.MID, + hiring_date=date(2026, 1, 15), + personality=PersonalityConfig( + traits=("analytical",), + ), + model=ModelConfig( + provider="test-provider", + model_id="test-model-001", + ), + ) + task = Task( + id="task-integration", + title="Uppercase a string", + description="Use the uppercase tool to convert text.", + type=TaskType.DEVELOPMENT, + priority=Priority.MEDIUM, + project="proj-001", + created_by="manager", + assigned_to=str(identity.id), + status=TaskStatus.ASSIGNED, + ) + + tool = UppercaseTool( + name="uppercase", + description="Converts text to uppercase.", + parameters_schema={ + "type": "object", + "properties": { + "text": {"type": "string", "description": "Text to uppercase"}, + }, + "required": ["text"], + }, + ) + registry = ToolRegistry([tool]) + provider = _ToolCallingProvider() + + engine = AgentEngine( + provider=provider, + tool_registry=registry, + ) + + result = await engine.run( + identity=identity, + task=task, + max_turns=5, + ) + + # Verify successful completion + assert result.is_success is True + assert result.termination_reason == TerminationReason.COMPLETED + assert result.total_turns == 2 # tool call turn + final answer turn + + # Verify cost was accumulated across turns + assert result.total_cost_usd > 0 + assert result.duration_seconds > 0 + + # Verify the tool was actually called (result in conversation) + conversation = result.execution_result.context.conversation + tool_results = [m for m in conversation if m.tool_result is not None] + assert len(tool_results) == 1 + assert tool_results[0].tool_result is not None + assert tool_results[0].tool_result.content == "HELLO WORLD" + + # Verify task transitioned to IN_PROGRESS + te = result.execution_result.context.task_execution + assert te is not None + assert te.status == TaskStatus.IN_PROGRESS diff --git a/tests/unit/engine/conftest.py b/tests/unit/engine/conftest.py index 33bdf4b3d2..51a25f8e76 100644 --- a/tests/unit/engine/conftest.py +++ b/tests/unit/engine/conftest.py @@ -28,6 +28,7 @@ from ai_company.engine.context import AgentContext from ai_company.engine.task_execution import TaskExecution from ai_company.providers.capabilities import ModelCapabilities +from ai_company.providers.enums import FinishReason from ai_company.providers.models import ( ChatMessage, CompletionConfig, @@ -237,6 +238,27 @@ async def get_model_capabilities(self, model: str) -> ModelCapabilities: ) +def make_completion_response( + *, + content: str = "Done.", + finish_reason: FinishReason = FinishReason.STOP, + input_tokens: int = 100, + output_tokens: int = 50, + cost_usd: float = 0.01, +) -> CompletionResponse: + """Build a simple CompletionResponse for tests.""" + return CompletionResponse( + content=content, + finish_reason=finish_reason, + usage=TokenUsage( + input_tokens=input_tokens, + output_tokens=output_tokens, + cost_usd=cost_usd, + ), + model="test-model-001", + ) + + @pytest.fixture def mock_provider_factory() -> type[MockCompletionProvider]: """Expose MockCompletionProvider class for test construction.""" diff --git a/tests/unit/engine/test_agent_engine.py b/tests/unit/engine/test_agent_engine.py new file mode 100644 index 0000000000..e587eb6293 --- /dev/null +++ b/tests/unit/engine/test_agent_engine.py @@ -0,0 +1,722 @@ +"""Unit tests for AgentEngine orchestrator.""" + +import copy +from typing import TYPE_CHECKING, Any +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from ai_company.budget.tracker import CostTracker +from ai_company.core.agent import AgentIdentity # noqa: TC001 +from ai_company.core.enums import AgentStatus, Priority, TaskStatus, TaskType +from ai_company.core.task import Task +from ai_company.engine.agent_engine import AgentEngine +from ai_company.engine.context import AgentContext +from ai_company.engine.errors import ExecutionStateError +from ai_company.engine.loop_protocol import ( + ExecutionResult, + TerminationReason, + TurnRecord, +) +from ai_company.engine.run_result import AgentRunResult +from ai_company.providers.enums import FinishReason + +if TYPE_CHECKING: + from .conftest import MockCompletionProvider + +from .conftest import make_completion_response as _make_completion_response + + +@pytest.mark.unit +class TestAgentEngineBasicRun: + """Happy path: identity + task -> successful result with metadata.""" + + async def test_basic_run_returns_result( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + response = _make_completion_response() + provider = mock_provider_factory([response]) + engine = AgentEngine(provider=provider) + + result = await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + assert isinstance(result, AgentRunResult) + assert result.agent_id == str(sample_agent_with_personality.id) + assert result.task_id == sample_task_with_criteria.id + + async def test_basic_run_is_success( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + response = _make_completion_response() + provider = mock_provider_factory([response]) + engine = AgentEngine(provider=provider) + + result = await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + assert result.is_success is True + assert result.termination_reason == TerminationReason.COMPLETED + + +@pytest.mark.unit +class TestAgentEngineSystemPrompt: + """System prompt is built and included in result.""" + + async def test_system_prompt_in_result( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + response = _make_completion_response() + provider = mock_provider_factory([response]) + engine = AgentEngine(provider=provider) + + result = await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + assert result.system_prompt.content + assert "identity" in result.system_prompt.sections + assert result.system_prompt.metadata["agent_id"] == str( + sample_agent_with_personality.id, + ) + + +@pytest.mark.unit +class TestAgentEngineTaskTransition: + """ASSIGNED -> IN_PROGRESS transition on start.""" + + async def test_assigned_transitions_to_in_progress( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + assert sample_task_with_criteria.status == TaskStatus.ASSIGNED + response = _make_completion_response() + provider = mock_provider_factory([response]) + engine = AgentEngine(provider=provider) + + result = await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + # The context inside the result should show IN_PROGRESS + te = result.execution_result.context.task_execution + assert te is not None + assert te.status == TaskStatus.IN_PROGRESS + + +@pytest.mark.unit +class TestAgentEngineAlreadyInProgress: + """IN_PROGRESS task runs without transition.""" + + async def test_in_progress_accepted( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + task_ip = sample_task_with_criteria.with_transition(TaskStatus.IN_PROGRESS) + response = _make_completion_response() + provider = mock_provider_factory([response]) + engine = AgentEngine(provider=provider) + + result = await engine.run( + identity=sample_agent_with_personality, + task=task_ip, + ) + + assert result.is_success is True + te = result.execution_result.context.task_execution + assert te is not None + assert te.status == TaskStatus.IN_PROGRESS + + +@pytest.mark.unit +class TestAgentEngineInvalidInput: + """Inactive agent, invalid task status -> error.""" + + async def test_inactive_agent_raises( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + inactive = sample_agent_with_personality.model_copy( + update={"status": AgentStatus.ON_LEAVE}, + ) + provider = mock_provider_factory([]) + engine = AgentEngine(provider=provider) + + with pytest.raises(ExecutionStateError, match="on_leave"): + await engine.run(identity=inactive, task=sample_task_with_criteria) + + async def test_terminated_agent_raises( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + terminated = sample_agent_with_personality.model_copy( + update={"status": AgentStatus.TERMINATED}, + ) + provider = mock_provider_factory([]) + engine = AgentEngine(provider=provider) + + with pytest.raises(ExecutionStateError, match="terminated"): + await engine.run(identity=terminated, task=sample_task_with_criteria) + + async def test_completed_task_raises( + self, + sample_agent_with_personality: AgentIdentity, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """A task already COMPLETED cannot be executed.""" + completed_task = Task( + id="task-done", + title="Already done", + description="This task is completed.", + type=TaskType.DEVELOPMENT, + priority=Priority.MEDIUM, + project="proj-001", + created_by="manager", + assigned_to="someone", + status=TaskStatus.COMPLETED, + ) + provider = mock_provider_factory([]) + engine = AgentEngine(provider=provider) + + with pytest.raises(ExecutionStateError, match="completed"): + await engine.run( + identity=sample_agent_with_personality, + task=completed_task, + ) + + async def test_created_task_raises( + self, + sample_agent_with_personality: AgentIdentity, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """A task still in CREATED status (unassigned) cannot be executed.""" + created_task = Task( + id="task-new", + title="New task", + description="Unassigned task.", + type=TaskType.DEVELOPMENT, + project="proj-001", + created_by="manager", + status=TaskStatus.CREATED, + ) + provider = mock_provider_factory([]) + engine = AgentEngine(provider=provider) + + with pytest.raises(ExecutionStateError, match="created"): + await engine.run( + identity=sample_agent_with_personality, + task=created_task, + ) + + async def test_blocked_task_raises( + self, + sample_agent_with_personality: AgentIdentity, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """A BLOCKED task cannot be executed.""" + blocked_task = Task( + id="task-blocked", + title="Blocked task", + description="This task is blocked.", + type=TaskType.DEVELOPMENT, + project="proj-001", + created_by="manager", + assigned_to="someone", + status=TaskStatus.BLOCKED, + ) + provider = mock_provider_factory([]) + engine = AgentEngine(provider=provider) + + with pytest.raises(ExecutionStateError, match="blocked"): + await engine.run( + identity=sample_agent_with_personality, + task=blocked_task, + ) + + +@pytest.mark.unit +class TestAgentEngineMaxTurnsBoundary: + """max_turns=1 is the minimum valid value.""" + + async def test_max_turns_one_succeeds( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """max_turns=1 allows exactly one LLM turn.""" + response = _make_completion_response() + provider = mock_provider_factory([response]) + engine = AgentEngine(provider=provider) + + result = await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + max_turns=1, + ) + + assert result.is_success is True + assert result.total_turns == 1 + + +@pytest.mark.unit +class TestAgentEngineWithTools: + """Tools passed through to loop, tool calls work.""" + + async def test_tools_from_registry( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + from ai_company.tools.base import BaseTool, ToolExecutionResult + from ai_company.tools.registry import ToolRegistry + + class EchoTool(BaseTool): + async def execute( + self, + *, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + return ToolExecutionResult(content=str(arguments)) + + registry = ToolRegistry([EchoTool(name="echo", description="Echoes input.")]) + response = _make_completion_response() + provider = mock_provider_factory([response]) + engine = AgentEngine( + provider=provider, + tool_registry=registry, + ) + + result = await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + assert result.is_success is True + # System prompt should include tools section + assert "tools" in result.system_prompt.sections + + +@pytest.mark.unit +class TestAgentEngineBudgetChecker: + """Budget limit creates checker, exhaustion terminates.""" + + async def test_budget_checker_passed_and_terminates( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """Budget limit > 0 creates checker and passes it to the loop.""" + ctx = AgentContext.from_identity( + sample_agent_with_personality, + task=sample_task_with_criteria, + ) + mock_result = ExecutionResult( + context=ctx, + termination_reason=TerminationReason.BUDGET_EXHAUSTED, + turns=(), + ) + mock_loop = MagicMock() + mock_loop.execute = AsyncMock(return_value=mock_result) + mock_loop.get_loop_type = MagicMock(return_value="react") + + provider = mock_provider_factory([]) + engine = AgentEngine( + provider=provider, + execution_loop=mock_loop, + ) + + result = await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + call_kwargs = mock_loop.execute.call_args.kwargs + assert call_kwargs["budget_checker"] is not None + assert result.termination_reason == TerminationReason.BUDGET_EXHAUSTED + assert result.is_success is False + + async def test_no_budget_limit_no_checker( + self, + sample_agent_with_personality: AgentIdentity, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """Task with budget_limit=0 should not create a budget checker.""" + task = Task( + id="task-no-budget", + title="No budget limit", + description="A task with no budget.", + type=TaskType.DEVELOPMENT, + project="proj-001", + created_by="manager", + assigned_to="someone", + budget_limit=0.0, + status=TaskStatus.ASSIGNED, + ) + response = _make_completion_response(cost_usd=100.0) + provider = mock_provider_factory([response]) + engine = AgentEngine(provider=provider) + + result = await engine.run( + identity=sample_agent_with_personality, + task=task, + ) + + # Without budget checker, should complete normally + assert result.termination_reason == TerminationReason.COMPLETED + + +@pytest.mark.unit +class TestAgentEngineCostRecording: + """CostTracker.record() called with correct data.""" + + async def test_cost_recorded( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + tracker = CostTracker() + response = _make_completion_response(cost_usd=0.05) + provider = mock_provider_factory([response]) + engine = AgentEngine( + provider=provider, + cost_tracker=tracker, + ) + + await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + count = await tracker.get_record_count() + assert count == 1 + total = await tracker.get_total_cost() + assert total > 0 + + async def test_no_cost_recorded_without_tracker( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """No error when cost_tracker is None.""" + response = _make_completion_response() + provider = mock_provider_factory([response]) + engine = AgentEngine(provider=provider) + + result = await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + assert result.is_success is True + + async def test_zero_cost_not_recorded( + self, + sample_agent_with_personality: AgentIdentity, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """CostTracker present but zero cost/tokens -> no record created.""" + tracker = CostTracker() + task = Task( + id="task-free", + title="Free task", + description="Zero cost run.", + type=TaskType.DEVELOPMENT, + project="proj-001", + created_by="manager", + assigned_to="someone", + status=TaskStatus.ASSIGNED, + ) + response = _make_completion_response( + cost_usd=0.0, + input_tokens=0, + output_tokens=0, + ) + provider = mock_provider_factory([response]) + engine = AgentEngine(provider=provider, cost_tracker=tracker) + + await engine.run(identity=sample_agent_with_personality, task=task) + + count = await tracker.get_record_count() + assert count == 0 + + async def test_free_provider_tokens_recorded( + self, + sample_agent_with_personality: AgentIdentity, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """Free provider: cost=0 but tokens>0 -> record IS created.""" + tracker = CostTracker() + task = Task( + id="task-free-tokens", + title="Free with tokens", + description="Zero cost but nonzero tokens.", + type=TaskType.DEVELOPMENT, + project="proj-001", + created_by="manager", + assigned_to="someone", + status=TaskStatus.ASSIGNED, + ) + response = _make_completion_response( + cost_usd=0.0, + input_tokens=5, + output_tokens=2, + ) + provider = mock_provider_factory([response]) + engine = AgentEngine(provider=provider, cost_tracker=tracker) + + await engine.run(identity=sample_agent_with_personality, task=task) + + count = await tracker.get_record_count() + assert count == 1 + + async def test_cost_tracker_failure_preserves_result( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """CostTracker.record() failure does not affect execution result.""" + tracker = MagicMock() + tracker.record = AsyncMock(side_effect=RuntimeError("DB write failed")) + response = _make_completion_response(cost_usd=0.05) + provider = mock_provider_factory([response]) + engine = AgentEngine(provider=provider, cost_tracker=tracker) + + result = await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + assert result.is_success is True + assert result.termination_reason == TerminationReason.COMPLETED + + +@pytest.mark.unit +class TestAgentEngineCompletionConfig: + """completion_config is forwarded to the execution loop.""" + + async def test_completion_config_forwarded( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """A provided CompletionConfig reaches the execution loop.""" + ctx = AgentContext.from_identity( + sample_agent_with_personality, + task=sample_task_with_criteria, + ) + mock_result = ExecutionResult( + context=ctx, + termination_reason=TerminationReason.COMPLETED, + turns=( + TurnRecord( + turn_number=1, + input_tokens=10, + output_tokens=5, + cost_usd=0.001, + finish_reason=FinishReason.STOP, + ), + ), + ) + mock_loop = MagicMock() + mock_loop.execute = AsyncMock(return_value=mock_result) + mock_loop.get_loop_type = MagicMock(return_value="custom") + + config = MagicMock() + provider = mock_provider_factory([]) + engine = AgentEngine( + provider=provider, + execution_loop=mock_loop, + ) + + await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + completion_config=config, + ) + + call_kwargs = mock_loop.execute.call_args.kwargs + assert call_kwargs["completion_config"] is config + + +@pytest.mark.unit +class TestAgentEngineMaxTurns: + """max_turns parameter is forwarded to the execution context.""" + + async def test_max_turns_forwarded( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """Custom max_turns value is propagated to the context.""" + response = _make_completion_response() + provider = mock_provider_factory([response]) + engine = AgentEngine(provider=provider) + + result = await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + max_turns=5, + ) + + assert result.execution_result.context.max_turns == 5 + + +@pytest.mark.unit +class TestAgentEngineDuration: + """duration_seconds > 0 in result.""" + + async def test_duration_is_positive( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + response = _make_completion_response() + provider = mock_provider_factory([response]) + engine = AgentEngine(provider=provider) + + result = await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + assert result.duration_seconds > 0 + + +@pytest.mark.unit +class TestAgentEngineDefaultLoop: + """No loop specified -> ReactLoop used.""" + + async def test_default_is_react_loop( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + response = _make_completion_response() + provider = mock_provider_factory([response]) + engine = AgentEngine(provider=provider) + + result = await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + # Verify through observable behavior: the result metadata + # confirms the default loop ran successfully + assert result.is_success is True + + async def test_custom_loop_used( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """A custom ExecutionLoop is used when provided.""" + ctx = AgentContext.from_identity( + sample_agent_with_personality, + task=sample_task_with_criteria, + ) + mock_result = ExecutionResult( + context=ctx, + termination_reason=TerminationReason.COMPLETED, + turns=( + TurnRecord( + turn_number=1, + input_tokens=10, + output_tokens=5, + cost_usd=0.001, + finish_reason=FinishReason.STOP, + ), + ), + ) + mock_loop = MagicMock() + mock_loop.execute = AsyncMock(return_value=mock_result) + mock_loop.get_loop_type = MagicMock(return_value="custom") + + provider = mock_provider_factory([]) + engine = AgentEngine( + provider=provider, + execution_loop=mock_loop, + ) + + result = await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + assert result.is_success is True + mock_loop.execute.assert_awaited_once() + + +@pytest.mark.unit +class TestAgentEngineImmutability: + """Original identity/task unchanged after run.""" + + async def test_identity_unchanged( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + identity_before = copy.deepcopy(sample_agent_with_personality) + response = _make_completion_response() + provider = mock_provider_factory([response]) + engine = AgentEngine(provider=provider) + + await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + assert sample_agent_with_personality == identity_before + + async def test_task_unchanged( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + task_before = copy.deepcopy(sample_task_with_criteria) + response = _make_completion_response() + provider = mock_provider_factory([response]) + engine = AgentEngine(provider=provider) + + await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + # Original task status should still be ASSIGNED + assert sample_task_with_criteria.status == TaskStatus.ASSIGNED + assert sample_task_with_criteria == task_before diff --git a/tests/unit/engine/test_agent_engine_errors.py b/tests/unit/engine/test_agent_engine_errors.py new file mode 100644 index 0000000000..0838dab740 --- /dev/null +++ b/tests/unit/engine/test_agent_engine_errors.py @@ -0,0 +1,352 @@ +"""Unit tests for AgentEngine error handling and edge cases.""" + +from typing import TYPE_CHECKING +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from ai_company.core.agent import AgentIdentity # noqa: TC001 +from ai_company.core.task import Task # noqa: TC001 +from ai_company.engine.agent_engine import AgentEngine +from ai_company.engine.context import AgentContext +from ai_company.engine.loop_protocol import ( + ExecutionResult, + TerminationReason, + TurnRecord, +) +from ai_company.providers.enums import FinishReason, MessageRole +from ai_company.providers.models import ChatMessage + +if TYPE_CHECKING: + from .conftest import MockCompletionProvider + +from .conftest import make_completion_response as _make_completion_response + + +@pytest.mark.unit +class TestAgentEngineErrorHandling: + """Provider exceptions -> error result (not crash).""" + + async def test_provider_error_returns_error_result( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + ) -> None: + provider = MagicMock() + provider.complete = AsyncMock(side_effect=RuntimeError("LLM is down")) + engine = AgentEngine(provider=provider) + + result = await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + # The error should be caught at either the loop or engine level + assert result.termination_reason == TerminationReason.ERROR + assert result.is_success is False + + async def test_prompt_build_error_returns_error_result( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + provider = mock_provider_factory([]) + engine = AgentEngine(provider=provider) + + with patch( + "ai_company.engine.agent_engine.build_system_prompt", + side_effect=RuntimeError("template broken"), + ): + result = await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + assert result.termination_reason == TerminationReason.ERROR + assert "template broken" in (result.execution_result.error_message or "") + + +@pytest.mark.unit +class TestAgentEngineNonRecoverable: + """MemoryError/RecursionError propagate.""" + + async def test_memory_error_propagates( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + provider = mock_provider_factory([]) + engine = AgentEngine(provider=provider) + + with ( + patch( + "ai_company.engine.agent_engine.build_system_prompt", + side_effect=MemoryError("out of memory"), + ), + pytest.raises(MemoryError), + ): + await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + async def test_recursion_error_propagates( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + provider = mock_provider_factory([]) + engine = AgentEngine(provider=provider) + + with ( + patch( + "ai_company.engine.agent_engine.build_system_prompt", + side_effect=RecursionError("too deep"), + ), + pytest.raises(RecursionError), + ): + await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + +@pytest.mark.unit +class TestAgentEngineMaxTurnsValidation: + """max_turns < 1 raises ValueError at the engine boundary.""" + + async def test_zero_max_turns_raises( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + provider = mock_provider_factory([]) + engine = AgentEngine(provider=provider) + + with pytest.raises(ValueError, match="max_turns must be >= 1"): + await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + max_turns=0, + ) + + async def test_negative_max_turns_raises( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + provider = mock_provider_factory([]) + engine = AgentEngine(provider=provider) + + with pytest.raises(ValueError, match="max_turns must be >= 1"): + await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + max_turns=-5, + ) + + +@pytest.mark.unit +class TestAgentEngineCostRecordingNonRecoverable: + """MemoryError/RecursionError in _record_costs propagate unconditionally.""" + + async def test_memory_error_in_cost_recording_propagates( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """MemoryError from CostTracker.record() is not swallowed.""" + tracker = MagicMock() + tracker.record = AsyncMock(side_effect=MemoryError("OOM in tracker")) + response = _make_completion_response(cost_usd=0.05) + provider = mock_provider_factory([response]) + engine = AgentEngine(provider=provider, cost_tracker=tracker) + + with pytest.raises(MemoryError, match="OOM in tracker"): + await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + async def test_recursion_error_in_cost_recording_propagates( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """RecursionError from CostTracker.record() is not swallowed.""" + tracker = MagicMock() + tracker.record = AsyncMock( + side_effect=RecursionError("infinite in tracker"), + ) + response = _make_completion_response(cost_usd=0.05) + provider = mock_provider_factory([response]) + engine = AgentEngine(provider=provider, cost_tracker=tracker) + + with pytest.raises(RecursionError, match="infinite in tracker"): + await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + +@pytest.mark.unit +class TestAgentEngineFatalErrorResult: + """_handle_fatal_error result has correct structure.""" + + async def test_error_result_has_error_message( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """Errors in _handle_fatal_error path produce template_version='error'.""" + provider = mock_provider_factory([]) + engine = AgentEngine(provider=provider) + + with patch( + "ai_company.engine.agent_engine.build_system_prompt", + side_effect=RuntimeError("LLM is down"), + ): + result = await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + assert result.termination_reason == TerminationReason.ERROR + assert result.is_success is False + assert "LLM is down" in (result.execution_result.error_message or "") + assert result.agent_id == str(sample_agent_with_personality.id) + assert result.task_id == sample_task_with_criteria.id + assert result.system_prompt.template_version == "error" + assert result.duration_seconds > 0 + + async def test_handle_fatal_error_secondary_failure_raises_original( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """If _handle_fatal_error itself fails, original exception is raised.""" + provider = mock_provider_factory([]) + engine = AgentEngine(provider=provider) + + with ( + patch( + "ai_company.engine.agent_engine.build_system_prompt", + side_effect=RuntimeError("original error"), + ), + patch( + "ai_company.engine.agent_engine.AgentContext.from_identity", + side_effect=ValueError("secondary failure"), + ), + pytest.raises(RuntimeError, match="original error"), + ): + await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + +@pytest.mark.unit +class TestAgentEngineFatalErrorNonRecoverable: + """MemoryError/RecursionError in _handle_fatal_error build path propagate.""" + + async def test_memory_error_in_error_build_propagates( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """MemoryError during error-result construction propagates directly.""" + provider = mock_provider_factory([]) + engine = AgentEngine(provider=provider) + + with ( + patch( + "ai_company.engine.agent_engine.build_system_prompt", + side_effect=RuntimeError("trigger fatal path"), + ), + patch( + "ai_company.engine.agent_engine.AgentContext.from_identity", + side_effect=MemoryError("OOM in error build"), + ), + pytest.raises(MemoryError, match="OOM in error build"), + ): + await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + ) + + +@pytest.mark.unit +class TestAgentEngineMemoryMessages: + """Working memory messages injected into conversation.""" + + async def test_memory_messages_in_context( + self, + sample_agent_with_personality: AgentIdentity, + sample_task_with_criteria: Task, + mock_provider_factory: type[MockCompletionProvider], + ) -> None: + """Memory messages appear between system prompt and task instruction.""" + ctx = AgentContext.from_identity( + sample_agent_with_personality, + task=sample_task_with_criteria, + ) + mock_result = ExecutionResult( + context=ctx, + termination_reason=TerminationReason.COMPLETED, + turns=( + TurnRecord( + turn_number=1, + input_tokens=10, + output_tokens=5, + cost_usd=0.001, + finish_reason=FinishReason.STOP, + ), + ), + ) + mock_loop = MagicMock() + mock_loop.execute = AsyncMock(return_value=mock_result) + mock_loop.get_loop_type = MagicMock(return_value="react") + + memory = ( + ChatMessage(role=MessageRole.USER, content="Previous context A"), + ChatMessage(role=MessageRole.ASSISTANT, content="Previous response B"), + ) + provider = mock_provider_factory([]) + engine = AgentEngine(provider=provider, execution_loop=mock_loop) + + await engine.run( + identity=sample_agent_with_personality, + task=sample_task_with_criteria, + memory_messages=memory, + ) + + # Verify context passed to loop has memory messages + call_kwargs = mock_loop.execute.call_args.kwargs + conversation = call_kwargs["context"].conversation + contents = [m.content for m in conversation] + # System prompt is first, then memory messages, then task instruction + assert "Previous context A" in contents + assert "Previous response B" in contents + # Memory messages should appear after system prompt (index 0) + sys_idx = next( + i for i, m in enumerate(conversation) if m.role == MessageRole.SYSTEM + ) + mem_idx = next( + i for i, m in enumerate(conversation) if m.content == "Previous context A" + ) + task_idx = next( + i + for i, m in enumerate(conversation) + if m.role == MessageRole.USER and "# Task:" in m.content + ) + assert sys_idx < mem_idx < task_idx diff --git a/tests/unit/engine/test_run_result.py b/tests/unit/engine/test_run_result.py new file mode 100644 index 0000000000..888d33d2fd --- /dev/null +++ b/tests/unit/engine/test_run_result.py @@ -0,0 +1,425 @@ +"""Unit tests for AgentRunResult model and _format_task_instruction helper.""" + +from datetime import date +from uuid import uuid4 + +import pytest +from pydantic import ValidationError + +from ai_company.core.agent import AgentIdentity, ModelConfig +from ai_company.core.enums import Priority, SeniorityLevel, TaskStatus, TaskType +from ai_company.core.task import Task +from ai_company.engine.agent_engine import ( + _format_task_instruction, + _make_budget_checker, +) +from ai_company.engine.context import AgentContext +from ai_company.engine.loop_protocol import ( + ExecutionResult, + TerminationReason, + TurnRecord, +) +from ai_company.engine.prompt import SystemPrompt +from ai_company.engine.run_result import AgentRunResult +from ai_company.providers.enums import FinishReason +from ai_company.providers.models import TokenUsage + + +def _test_identity() -> AgentIdentity: + """Create a minimal AgentIdentity for standalone result tests.""" + return AgentIdentity( + id=uuid4(), + name="Test Agent", + role="Developer", + department="Engineering", + level=SeniorityLevel.MID, + hiring_date=date(2026, 1, 15), + model=ModelConfig(provider="test-provider", model_id="test-model-001"), + ) + + +def _make_run_result( # noqa: PLR0913 + *, + termination_reason: TerminationReason = TerminationReason.COMPLETED, + turns: tuple[TurnRecord, ...] = (), + cost_usd: float = 0.05, + error_message: str | None = None, + agent_id: str = "agent-001", + task_id: str = "task-001", + duration_seconds: float = 1.5, +) -> AgentRunResult: + """Build an AgentRunResult directly for focused unit tests.""" + identity = _test_identity() + ctx = AgentContext.from_identity(identity) + # Apply the desired accumulated cost + ctx = ctx.model_copy( + update={ + "accumulated_cost": TokenUsage( + input_tokens=100, + output_tokens=50, + cost_usd=cost_usd, + ), + }, + ) + execution = ExecutionResult( + context=ctx, + termination_reason=termination_reason, + turns=turns, + error_message=error_message, + ) + prompt = SystemPrompt( + content="Test prompt", + template_version="1.0", + estimated_tokens=10, + sections=("identity",), + metadata={"agent_id": agent_id}, + ) + return AgentRunResult( + execution_result=execution, + system_prompt=prompt, + duration_seconds=duration_seconds, + agent_id=agent_id, + task_id=task_id, + ) + + +@pytest.mark.unit +class TestAgentRunResultFrozen: + """AgentRunResult is frozen — field reassignment raises.""" + + def test_frozen_execution_result(self) -> None: + result = _make_run_result() + with pytest.raises(ValidationError): + result.execution_result = None # type: ignore[assignment,misc] + + def test_frozen_duration(self) -> None: + result = _make_run_result() + with pytest.raises(ValidationError): + result.duration_seconds = 999.0 # type: ignore[misc] + + def test_frozen_agent_id(self) -> None: + result = _make_run_result() + with pytest.raises(ValidationError): + result.agent_id = "other" # type: ignore[misc] + + +@pytest.mark.unit +class TestAgentRunResultComputedFields: + """Computed fields delegate correctly to execution_result.""" + + def test_termination_reason_completed(self) -> None: + result = _make_run_result(termination_reason=TerminationReason.COMPLETED) + assert result.termination_reason == TerminationReason.COMPLETED + + def test_termination_reason_error(self) -> None: + result = _make_run_result( + termination_reason=TerminationReason.ERROR, + error_message="something failed", + ) + assert result.termination_reason == TerminationReason.ERROR + + def test_total_turns_zero(self) -> None: + result = _make_run_result(turns=()) + assert result.total_turns == 0 + + def test_total_turns_multiple(self) -> None: + turns = tuple( + TurnRecord( + turn_number=i, + input_tokens=10, + output_tokens=5, + cost_usd=0.001, + finish_reason=FinishReason.STOP, + ) + for i in range(1, 4) + ) + result = _make_run_result(turns=turns) + assert result.total_turns == 3 + + def test_total_cost_usd(self) -> None: + result = _make_run_result(cost_usd=0.123) + assert result.total_cost_usd == pytest.approx(0.123) + + def test_is_success_true(self) -> None: + result = _make_run_result(termination_reason=TerminationReason.COMPLETED) + assert result.is_success is True + + def test_is_success_false_on_error(self) -> None: + result = _make_run_result( + termination_reason=TerminationReason.ERROR, + error_message="err", + ) + assert result.is_success is False + + def test_is_success_false_on_max_turns(self) -> None: + result = _make_run_result(termination_reason=TerminationReason.MAX_TURNS) + assert result.is_success is False + + def test_is_success_false_on_budget(self) -> None: + result = _make_run_result( + termination_reason=TerminationReason.BUDGET_EXHAUSTED, + ) + assert result.is_success is False + + +@pytest.mark.unit +class TestAgentRunResultValidation: + """Field validation on AgentRunResult.""" + + def test_negative_duration_rejected(self) -> None: + with pytest.raises(ValidationError): + _make_run_result(duration_seconds=-1.0) + + def test_blank_agent_id_rejected(self) -> None: + with pytest.raises(ValidationError): + _make_run_result(agent_id=" ") + + def test_task_id_none_allowed(self) -> None: + """task_id=None is valid for future taskless runs.""" + identity = _test_identity() + ctx = AgentContext.from_identity(identity) + execution = ExecutionResult( + context=ctx, + termination_reason=TerminationReason.COMPLETED, + ) + prompt = SystemPrompt( + content="", + template_version="1.0", + estimated_tokens=0, + sections=(), + metadata={}, + ) + result = AgentRunResult( + execution_result=execution, + system_prompt=prompt, + duration_seconds=0.0, + agent_id="agent-001", + task_id=None, + ) + assert result.task_id is None + + +@pytest.mark.unit +class TestFormatTaskInstruction: + """Test _format_task_instruction helper.""" + + def test_basic_format(self, sample_task_with_criteria: Task) -> None: + result = _format_task_instruction(sample_task_with_criteria) + + assert "# Task: Implement authentication module" in result + assert "JWT-based authentication" in result + assert "## Acceptance Criteria" in result + assert "- Login endpoint returns JWT token" in result + assert "$5.00 USD" in result + + def test_deadline_included(self, sample_task_with_criteria: Task) -> None: + result = _format_task_instruction(sample_task_with_criteria) + assert "**Deadline:** 2026-04-01T00:00:00" in result + + def test_deadline_has_blank_line_separator( + self, + sample_task_with_criteria: Task, + ) -> None: + """Deadline block has a blank line before it (consistent with budget).""" + result = _format_task_instruction(sample_task_with_criteria) + lines = result.split("\n") + for i, line in enumerate(lines): + if line.startswith("**Deadline:**"): + assert lines[i - 1] == "", "Expected blank line before deadline" + break + else: + pytest.fail("Deadline line not found in output") + + def test_no_criteria_no_budget(self) -> None: + task = Task( + id="task-simple", + title="Simple task", + description="Do the thing.", + type=TaskType.DEVELOPMENT, + project="proj-001", + created_by="manager", + assigned_to="someone", + status=TaskStatus.ASSIGNED, + ) + result = _format_task_instruction(task) + + assert "# Task: Simple task" in result + assert "Do the thing." in result + assert "Acceptance Criteria" not in result + assert "Budget" not in result + + def test_deadline_only_no_budget(self) -> None: + """Deadline-only task still gets blank-line separator.""" + task = Task( + id="task-deadline", + title="Deadline task", + description="Has deadline only.", + type=TaskType.DEVELOPMENT, + project="proj-001", + created_by="manager", + assigned_to="someone", + status=TaskStatus.ASSIGNED, + budget_limit=0.0, + deadline="2026-06-01T00:00:00+00:00", + ) + result = _format_task_instruction(task) + + assert "**Deadline:**" in result + assert "Budget" not in result + lines = result.split("\n") + for i, line in enumerate(lines): + if line.startswith("**Deadline:**"): + assert lines[i - 1] == "", "Expected blank line before deadline" + break + + def test_budget_only_no_deadline(self) -> None: + task = Task( + id="task-budget", + title="Budget task", + description="Has budget only.", + type=TaskType.DEVELOPMENT, + priority=Priority.MEDIUM, + project="proj-001", + created_by="manager", + assigned_to="someone", + status=TaskStatus.ASSIGNED, + budget_limit=10.0, + ) + result = _format_task_instruction(task) + + assert "$10.00 USD" in result + assert "Deadline" not in result + + +@pytest.mark.unit +class TestMakeBudgetChecker: + """Test _make_budget_checker closure logic.""" + + def test_returns_none_for_zero_budget(self) -> None: + task = Task( + id="task-free", + title="Free", + description="No budget.", + type=TaskType.DEVELOPMENT, + project="proj-001", + created_by="manager", + assigned_to="someone", + status=TaskStatus.ASSIGNED, + budget_limit=0.0, + ) + assert _make_budget_checker(task) is None + + def test_returns_none_for_default_budget(self) -> None: + """Default budget_limit (0.0) returns None.""" + task = Task( + id="task-default", + title="Default", + description="Default budget.", + type=TaskType.DEVELOPMENT, + project="proj-001", + created_by="manager", + assigned_to="someone", + status=TaskStatus.ASSIGNED, + ) + assert _make_budget_checker(task) is None + + def test_returns_callable_for_positive_budget(self) -> None: + task = Task( + id="task-b", + title="Budgeted", + description="Has budget.", + type=TaskType.DEVELOPMENT, + project="proj-001", + created_by="manager", + assigned_to="someone", + status=TaskStatus.ASSIGNED, + budget_limit=5.0, + ) + checker = _make_budget_checker(task) + assert checker is not None + assert callable(checker) + + def test_checker_returns_false_under_budget(self) -> None: + task = Task( + id="task-b", + title="Budgeted", + description="Has budget.", + type=TaskType.DEVELOPMENT, + project="proj-001", + created_by="manager", + assigned_to="someone", + status=TaskStatus.ASSIGNED, + budget_limit=5.0, + ) + checker = _make_budget_checker(task) + assert checker is not None + + identity = _test_identity() + ctx = AgentContext.from_identity(identity) + ctx = ctx.model_copy( + update={ + "accumulated_cost": TokenUsage( + input_tokens=100, + output_tokens=50, + cost_usd=4.99, + ), + }, + ) + assert checker(ctx) is False + + def test_checker_returns_true_at_exact_budget(self) -> None: + """Boundary: cost_usd == limit returns True (>= comparison).""" + task = Task( + id="task-b", + title="Budgeted", + description="Has budget.", + type=TaskType.DEVELOPMENT, + project="proj-001", + created_by="manager", + assigned_to="someone", + status=TaskStatus.ASSIGNED, + budget_limit=5.0, + ) + checker = _make_budget_checker(task) + assert checker is not None + + identity = _test_identity() + ctx = AgentContext.from_identity(identity) + ctx = ctx.model_copy( + update={ + "accumulated_cost": TokenUsage( + input_tokens=100, + output_tokens=50, + cost_usd=5.0, + ), + }, + ) + assert checker(ctx) is True + + def test_checker_returns_true_over_budget(self) -> None: + task = Task( + id="task-b", + title="Budgeted", + description="Has budget.", + type=TaskType.DEVELOPMENT, + project="proj-001", + created_by="manager", + assigned_to="someone", + status=TaskStatus.ASSIGNED, + budget_limit=5.0, + ) + checker = _make_budget_checker(task) + assert checker is not None + + identity = _test_identity() + ctx = AgentContext.from_identity(identity) + ctx = ctx.model_copy( + update={ + "accumulated_cost": TokenUsage( + input_tokens=100, + output_tokens=50, + cost_usd=5.01, + ), + }, + ) + assert checker(ctx) is True