Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
194ef10
feat: add centralized single-writer TaskEngine (#204)
Aureliolo Mar 12, 2026
ef4e7df
fix: harden TaskEngine error handling, types, and test coverage
Aureliolo Mar 12, 2026
03f14d4
fix: address 24 PR review items from local agents and external reviewers
Aureliolo Mar 12, 2026
c92fed8
fix: harden TaskEngine error handling, test structure, and API correc…
Aureliolo Mar 12, 2026
3254142
fix: remove trailing blank lines in task_engine_helpers (ruff format)
Aureliolo Mar 12, 2026
38006d5
fix: pin escalation_paths to empty tuple in RootConfigFactory to prev…
Aureliolo Mar 12, 2026
e894134
fix: address PR #325 review feedback — extract modules, harden error …
Aureliolo Mar 12, 2026
7ec0ab7
fix: address 22 PR review items and add 33 new tests for TaskEngine c…
Aureliolo Mar 12, 2026
d1adc05
fix: map TaskVersionConflictError to 409, fix docstrings, add 15 tests
Aureliolo Mar 12, 2026
ce3c594
fix: pre-PR review — 34 findings from 10 agents, fix code scanning al…
Aureliolo Mar 12, 2026
4462bc2
fix: resolve merge conflict in pages-preview.yml, suppress zizmor alert
Aureliolo Mar 12, 2026
327681e
fix: address 14 PR review items and add 15 tests for TaskEngine coverage
Aureliolo Mar 12, 2026
4d4f759
fix: address 14 PR review items and add 15 tests for TaskEngine coverage
Aureliolo Mar 12, 2026
f1b28ae
fix: resolve 5 PR review findings (runtime bugs, future leaks, consis…
Aureliolo Mar 12, 2026
d23952a
fix: list_tasks pagination total, dead PydanticValidationError handle…
Aureliolo Mar 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/pages-preview.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ jobs:
echo "head_sha=$HEAD_SHA" >> "$GITHUB_OUTPUT"
echo "same_repo=$SAME_REPO" >> "$GITHUB_OUTPUT"

# zizmor: ignore[untrusted-checkout] — intentional: preview builds require
# PR code. Mitigated: no secrets in this job, contents:read + pull-requests:read
# only, persist-credentials:false. Deploy job uses artifacts, not checkout.
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
with:
persist-credentials: false
Expand Down
4 changes: 2 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ src/ai_company/
communication/ # Message bus, dispatcher, messenger, channels, delegation, loop prevention, conflict resolution, meeting protocol
config/ # YAML company config loading and validation
core/ # Shared domain models, base classes, and resilience config (RetryConfig, RateLimiterConfig)
engine/ # Agent orchestration, execution loops, parallel execution, task decomposition, routing, task assignment, task lifecycle, recovery, shutdown, workspace isolation, coordination error classification, and prompt policy validation
engine/ # Agent orchestration, execution loops, parallel execution, task decomposition, routing, task assignment, centralized single-writer task state engine (TaskEngine), task lifecycle, recovery, shutdown, workspace isolation, coordination error classification, and prompt policy validation
hr/ # HR engine: hiring, firing, onboarding, offboarding, agent registry, performance tracking (task metrics, collaboration scoring, trend detection), promotion/demotion (criteria evaluation, approval strategies, model mapping)
memory/ # Persistent agent memory (Mem0 initial, custom stack future — see Decision Log), retrieval pipeline (ranking, injection, context formatting, non-inferable filtering), shared org memory (org/), consolidation/archival (consolidation/)
persistence/ # Operational data persistence — pluggable PersistenceBackend protocol, SQLite initial (see Memory & Persistence design page)
Expand Down Expand Up @@ -127,7 +127,7 @@ src/ai_company/
- **Every module** with business logic MUST have: `from ai_company.observability import get_logger` then `logger = get_logger(__name__)`
- **Never** use `import logging` / `logging.getLogger()` / `print()` in application code
- **Variable name**: always `logger` (not `_logger`, not `log`)
- **Event names**: always use constants from the domain-specific module under `ai_company.observability.events` (e.g. `PROVIDER_CALL_START` from `events.provider`, `BUDGET_RECORD_ADDED` from `events.budget`, `CFO_ANOMALY_DETECTED` from `events.cfo`, `CONFLICT_DETECTED` from `events.conflict`, `MEETING_STARTED` from `events.meeting`, `CLASSIFICATION_START` from `events.classification`, `CONSOLIDATION_START` from `events.consolidation`, `ORG_MEMORY_QUERY_START` from `events.org_memory`, `API_REQUEST_STARTED` from `events.api`, `CODE_RUNNER_EXECUTE_START` from `events.code_runner`, `DOCKER_EXECUTE_START` from `events.docker`, `MCP_INVOKE_START` from `events.mcp`, `SECURITY_EVALUATE_START` from `events.security`, `HR_HIRING_REQUEST_CREATED` from `events.hr`, `PERF_METRIC_RECORDED` from `events.performance`, `TRUST_EVALUATE_START` from `events.trust`, `PROMOTION_EVALUATE_START` from `events.promotion`, `PROMPT_BUILD_START` from `events.prompt`, `MEMORY_RETRIEVAL_START` from `events.memory`, `AUTONOMY_ACTION_AUTO_APPROVED` from `events.autonomy`, `TIMEOUT_POLICY_EVALUATED` from `events.timeout`, `PERSISTENCE_AUDIT_ENTRY_SAVED` from `events.persistence`). Import directly: `from ai_company.observability.events.<domain> import EVENT_CONSTANT`
- **Event names**: always use constants from the domain-specific module under `ai_company.observability.events` (e.g. `PROVIDER_CALL_START` from `events.provider`, `BUDGET_RECORD_ADDED` from `events.budget`, `CFO_ANOMALY_DETECTED` from `events.cfo`, `CONFLICT_DETECTED` from `events.conflict`, `MEETING_STARTED` from `events.meeting`, `CLASSIFICATION_START` from `events.classification`, `CONSOLIDATION_START` from `events.consolidation`, `ORG_MEMORY_QUERY_START` from `events.org_memory`, `API_REQUEST_STARTED` from `events.api`, `CODE_RUNNER_EXECUTE_START` from `events.code_runner`, `DOCKER_EXECUTE_START` from `events.docker`, `MCP_INVOKE_START` from `events.mcp`, `SECURITY_EVALUATE_START` from `events.security`, `HR_HIRING_REQUEST_CREATED` from `events.hr`, `PERF_METRIC_RECORDED` from `events.performance`, `TRUST_EVALUATE_START` from `events.trust`, `PROMOTION_EVALUATE_START` from `events.promotion`, `PROMPT_BUILD_START` from `events.prompt`, `MEMORY_RETRIEVAL_START` from `events.memory`, `AUTONOMY_ACTION_AUTO_APPROVED` from `events.autonomy`, `TIMEOUT_POLICY_EVALUATED` from `events.timeout`, `PERSISTENCE_AUDIT_ENTRY_SAVED` from `events.persistence`, `TASK_ENGINE_STARTED` from `events.task_engine`). Import directly: `from ai_company.observability.events.<domain> import EVENT_CONSTANT`
- **Structured kwargs**: always `logger.info(EVENT, key=value)` — never `logger.info("msg %s", val)`
- **All error paths** must log at WARNING or ERROR with context before raising
- **All state transitions** must log at INFO
Expand Down
2 changes: 1 addition & 1 deletion docs/architecture/tech-stack.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ These conventions are used throughout the codebase. For full details on each, se
| **Cost tiers and quota tracking** | Adopted | Configurable `CostTierDefinition` with merge/override semantics. `QuotaTracker` enforces per-provider request/token quotas with window-based rotation. |
| **Shared org memory** | Adopted | `OrgMemoryBackend` protocol with `HybridPromptRetrievalBackend`. Seniority-based write access control. Core policies in system prompts; extended facts retrieved on demand. |
| **Memory consolidation** | Adopted | `ConsolidationStrategy` protocol with deduplication + summarization. `RetentionEnforcer` for age-based cleanup. `ArchivalStore` for cold storage. |
| **State coordination** | Planned | Centralized single-writer `TaskEngine` with `asyncio.Queue`. Agents submit requests; engine applies `model_copy(update=...)` sequentially and publishes snapshots. |
| **State coordination** | Adopted | Centralized single-writer `TaskEngine` with `asyncio.Queue`. Agents submit requests; engine applies `model_validate` / `with_transition` sequentially and publishes snapshots. |
| **Workspace isolation** | Adopted | Pluggable `WorkspaceIsolationStrategy` protocol. Default: git worktrees with sequential merge on completion. |
| **Graceful shutdown** | Adopted | Pluggable `ShutdownStrategy` protocol with cooperative 30-second timeout. Force-cancel after timeout with `INTERRUPTED` status. |
| **Template inheritance** | Adopted | `extends` field triggers parent resolution at render time with deep merge by field type. Circular chain detection included. |
Expand Down
66 changes: 64 additions & 2 deletions docs/design/engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,68 @@ exceptions on failure; scoring-based strategies return

---

## TaskEngine — Centralized State Coordination

All task state mutations flow through a single-writer `TaskEngine` that owns the
authoritative task state. This eliminates race conditions when multiple agents
attempt concurrent transitions on the same task.

### Architecture

```text
Agent / API ──submit()──▶ asyncio.Queue ──▶ _processing_loop ──▶ Persistence
├──▶ Version tracking (optimistic concurrency)
└──▶ Snapshot publishing (MessageBus)
```

- **Single writer**: A background `asyncio.Task` consumes `TaskMutation`
requests sequentially from an `asyncio.Queue`.
- **Immutable-style updates**: Each mutation constructs a new `Task` instance
from the previous one (for example via
`Task.model_validate({**task.model_dump(), **updates})` or
`Task.with_transition(...)`); the existing instance is never mutated.
- **Optimistic concurrency**: Per-task version counters held in-memory
(volatile). An unknown task is seeded at version 1 on first access —
this is a heuristic baseline, **not** loaded from persistence. Version
tracking resets on engine restart; durable persistence of versions is a
future enhancement. Callers can pass `expected_version` to detect stale
writes; on mismatch the engine returns a failed `TaskMutationResult`
with `error_code="version_conflict"`. Convenience methods raise
`TaskVersionConflictError`.
- **Read-through**: `get_task()` and `list_tasks()` bypass the queue and
read directly from persistence — safe because TaskEngine is the sole writer.
- **Snapshot publishing**: On success, a `TaskStateChanged` event is published
to the message bus for downstream consumers (WebSocket bridge, audit, etc.).

### Mutation Types

| Mutation | Description |
|----------|-------------|
| `CreateTaskMutation` | Generates a unique ID, persists, and returns the new task. |
| `UpdateTaskMutation` | Applies field updates with immutable-field rejection (`id`, `status`, `created_by`) and re-validates via `model_validate`. |
| `TransitionTaskMutation` | Validates status transition via `Task.with_transition()`, supports field overrides. |
| `DeleteTaskMutation` | Removes from persistence and clears version tracking. |
| `CancelTaskMutation` | Shortcut for transition to `CANCELLED`. |

### Error Handling

- **Typed errors**: `TaskNotFoundError` and `TaskVersionConflictError` provide
precise failure classification — API controllers catch these directly instead
of parsing error strings.
- **Error sanitization**: Internal exception details (SQL paths, stack traces)
are replaced with a generic message before reaching callers.
- **Queue full**: `TaskEngineQueueFullError` signals backpressure when the
queue is at capacity.

### Lifecycle

- **start()**: Spawns the background processing task.
- **stop()**: Sets `_running = False`, drains the queue within a configurable
timeout, then cancels. Abandoned futures receive a failure result.

---

## Agent Execution Loop

The agent execution loop defines how an agent processes a task from start to
Expand Down Expand Up @@ -346,7 +408,7 @@ async run(
alone when no enforcer is configured.
8. **Delegate to loop** -- calls `ExecutionLoop.execute()` with context,
provider, tool invoker, budget checker, and completion config. If
`timeout_seconds` is set, wraps the call in `asyncio.wait_for`; on expiry
`timeout_seconds` is set, wraps the call in `asyncio.wait`; on expiry
the run returns with `TerminationReason.ERROR` but cost recording and
post-execution processing still occur.
9. **Record costs** -- records accumulated `TokenUsage` to `CostTracker` (if
Expand Down Expand Up @@ -619,7 +681,7 @@ These are complementary systems handling different types of shared state:

| State Type | Coordination | Mechanism |
|-----------|-------------|-----------|
| Framework state (tasks, assignments, budget) | Centralized single-writer (`TaskEngine`) | `model_copy(update=...)` via async queue |
| Framework state (tasks, assignments, budget) | Centralized single-writer (`TaskEngine`) | `model_validate` / `with_transition` via async queue |
| Code and files (agent work output) | Workspace isolation (`WorkspaceIsolationStrategy`) | Git worktrees / branches |
| Agent memory (personal) | Per-agent ownership | Each agent owns its memory exclusively |
| Org memory (shared knowledge) | Single-writer (`OrgMemoryBackend`) | `OrgMemoryBackend` protocol with role-based write access control |
Expand Down
Loading
Loading