Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ src/synthorg/
meeting/ # Meeting protocol (round-robin, position papers, structured phases), scheduler (frequency, participant resolver), orchestrator
config/ # YAML company config loading and validation
core/ # Shared domain models, base classes, and resilience config (RetryConfig, RateLimiterConfig)
engine/ # Agent orchestration, execution loops, parallel execution, task decomposition, routing, task assignment, centralized single-writer task state engine (TaskEngine), task lifecycle, recovery, shutdown, workspace isolation, coordination (multi-agent pipeline: TopologyDispatcher protocol, 4 dispatchers — SAS/centralized/decentralized/context-dependent, wave execution, workspace lifecycle integration, CoordinationSectionConfig company config bridge, build_coordinator factory), coordination error classification, prompt policy validation, checkpoint recovery (checkpoint/, per-turn persistence, heartbeat detection, CheckpointRecoveryStrategy), approval gate (escalation detection, context parking/resume, EscalationInfo/ResumePayload models), stagnation detection (stagnation/, StagnationDetector protocol, ToolRepetitionDetector, dual-signal analysis, corrective prompt injection), agent runtime state (AgentRuntimeState, lightweight per-agent execution status for dashboard queries and recovery)
engine/ # Agent orchestration, execution loops, parallel execution, task decomposition, routing, task assignment, centralized single-writer task state engine (TaskEngine), task lifecycle, recovery, shutdown, workspace isolation, coordination (multi-agent pipeline: TopologyDispatcher protocol, 4 dispatchers — SAS/centralized/decentralized/context-dependent, wave execution, workspace lifecycle integration, CoordinationSectionConfig company config bridge, build_coordinator factory), coordination error classification, prompt policy validation, checkpoint recovery (checkpoint/, per-turn persistence, heartbeat detection, CheckpointRecoveryStrategy), approval gate (escalation detection, context parking/resume, EscalationInfo/ResumePayload models), stagnation detection (stagnation/, StagnationDetector protocol, ToolRepetitionDetector, dual-signal analysis, corrective prompt injection), agent runtime state (AgentRuntimeState, lightweight per-agent execution status for dashboard queries and recovery), context budget management (context_budget.py, ContextBudgetIndicator, fill estimation, token estimation protocol in token_estimation.py), conversation compaction (compaction/, CompactionCallback type alias, CompactionConfig, CompressionMetadata, oldest-turns summarizer)
hr/ # HR engine: hiring, firing, onboarding, offboarding, agent registry, performance tracking (task metrics, collaboration scoring, LLM calibration sampling, collaboration overrides, trend detection), promotion/demotion (criteria evaluation, approval strategies, model mapping)
memory/ # Persistent agent memory (pluggable MemoryBackend protocol), backends/ (Mem0 adapter: backends/mem0/), retrieval pipeline (ranking, RRF fusion, injection, context formatting, non-inferable filtering), shared org memory (org/), consolidation/archival (consolidation/)
persistence/ # Operational data persistence — pluggable PersistenceBackend protocol, SQLite initial, SettingsRepository (namespaced settings CRUD) (see Memory & Persistence design page)
Expand Down Expand Up @@ -196,7 +196,7 @@ site/ # Astro landing page (synthorg.io)
- **Every module** with business logic MUST have: `from synthorg.observability import get_logger` then `logger = get_logger(__name__)`
- **Never** use `import logging` / `logging.getLogger()` / `print()` in application code
- **Variable name**: always `logger` (not `_logger`, not `log`)
- **Event names**: always use constants from the domain-specific module under `synthorg.observability.events` (e.g., `API_REQUEST_STARTED` from `events.api`, `TOOL_INVOKE_START` from `events.tool`, `GIT_COMMAND_START` from `events.git`). Each domain has its own module — see `src/synthorg/observability/events/` for the full inventory of constants. Import directly: `from synthorg.observability.events.<domain> import EVENT_CONSTANT`
- **Event names**: always use constants from the domain-specific module under `synthorg.observability.events` (e.g., `API_REQUEST_STARTED` from `events.api`, `TOOL_INVOKE_START` from `events.tool`, `GIT_COMMAND_START` from `events.git`, `CONTEXT_BUDGET_FILL_UPDATED` from `events.context_budget`). Each domain has its own module — see `src/synthorg/observability/events/` for the full inventory of constants. Import directly: `from synthorg.observability.events.<domain> import EVENT_CONSTANT`
- **Structured kwargs**: always `logger.info(EVENT, key=value)` — never `logger.info("msg %s", val)`
- **All error paths** must log at WARNING or ERROR with context before raising
- **All state transitions** must log at INFO
Expand Down
79 changes: 79 additions & 0 deletions docs/design/engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,85 @@ sorted per-turn for order-independent comparison.

---

## Context Budget Management

Agents running long tasks consume their LLM context window without awareness.
The context budget system tracks fill levels, injects soft indicators into
system prompts, and compresses conversations at turn boundaries.

### Context Fill Tracking

`AgentContext` carries three context-budget fields:

- `context_fill_tokens` — estimated tokens in the full context (system prompt +
conversation + tool definitions)
- `context_capacity_tokens` — the model's `max_context_tokens` from
`ModelCapabilities`, or `None` when unknown
- `context_fill_percent` — computed percentage (`fill / capacity * 100`),
`None` when capacity is unknown

Fill is re-estimated after each turn via `update_context_fill()` in
`context_budget.py`, using the `PromptTokenEstimator` protocol (default:
`DefaultTokenEstimator` at `len(text) // 4`).

### Soft Budget Indicators

`ContextBudgetIndicator` is injected into the system prompt via
`_SECTION_CONTEXT_BUDGET`:

```text
[Context: 12,450/16,000 tokens (78%) | 0 archived blocks]
```

The indicator is set at initial prompt build time. The `archived_blocks` count
is derived from `CompressionMetadata.compactions_performed`.

### Compaction Hook

`CompactionCallback` is a type alias (`Callable[[AgentContext], Coroutine[...,
AgentContext | None]]`) wired into both `ReactLoop` and `PlanExecuteLoop` via
their constructors — the same injection pattern as `checkpoint_callback`,
`stagnation_detector`, and `approval_gate`.

The default implementation (`make_compaction_callback` in
`compaction/summarizer.py`) archives oldest conversation turns into a summary
message when `context_fill_percent` exceeds a configurable threshold (default
80%).

`CompactionConfig` controls:

| Field | Default | Description |
|-------|---------|-------------|
| `fill_threshold_percent` | `80.0` | Fill percentage that triggers compaction |
| `min_messages_to_compact` | `4` | Minimum messages before compaction is allowed |
| `preserve_recent_turns` | `3` | Recent turn pairs to keep uncompressed |

Compaction errors are logged but never propagated — compaction is advisory,
not critical.

### Compressed Checkpoint Recovery

`CompressionMetadata` is persisted on `AgentContext` and serialized into
checkpoint JSON. On resume, `deserialize_and_reconcile()` detects compressed
checkpoints and includes compression-aware information in the reconciliation
message:

```text
Execution resumed from checkpoint at turn 8. Note: conversation was
previously compacted (archived 12 turns). Previous error: ...
```

### Loop Integration

- **ReactLoop**: compaction checked after stagnation detection, at turn
boundaries (between completed turns)
- **PlanExecuteLoop**: compaction checked within step execution at turn
boundaries, before stagnation detection

Both loops use the shared `invoke_compaction()` helper from `loop_helpers.py`.

---

## Agent Crash Recovery

When an agent execution fails unexpectedly (unhandled exception, OOM, process
Expand Down
4 changes: 2 additions & 2 deletions docs/roadmap/open-questions.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Numbers are stable identifiers — resolved questions are removed without renumb
| # | Question | Impact | Notes |
|---|----------|--------|-------|
| 1 | How deep should agent personality affect output? | Medium | Too deep leads to inconsistency; too shallow makes all agents feel the same. |
| 3 | How to handle context window limits for long tasks? | High | Agents may lose track of complex multi-file changes. |
| 3 | ~~How to handle context window limits for long tasks?~~ | ~~High~~ | **Partially resolved**: context budget management (#416) provides fill tracking, soft indicators, and oldest-turns compaction. Remaining: LLM-based summarization, tiktoken estimator, AgentEngine wiring. |
| 4 | Should agents be able to create/modify other agents? | Medium | For example, a CTO "hires" a developer by creating a new agent config. |
| 6 | What metrics define "good" agent performance? | Medium | Needed for HR/hiring/firing decisions. |
| 8 | Optimal message bus for local-first architecture? | Medium | asyncio queues vs Redis vs embedded broker. |
Expand All @@ -21,7 +21,7 @@ Numbers are stable identifiers — resolved questions are removed without renumb

| Risk | Severity | Mitigation |
|------|----------|------------|
| Context window exhaustion on complex tasks | High | Memory summarization, task decomposition, working memory management. |
| Context window exhaustion on complex tasks | Medium | **Partially mitigated**: context budget management (#416) tracks fill, injects indicators, and compacts at turn boundaries. Remaining: LLM-based summarization for higher-quality summaries. |
| Cost explosion from agent loops | High | Budget hard stops, loop detection, max iterations per task. |
| Agent quality degradation with cheap models | Medium | Quality gates, minimum model requirements per task type. |
| Third-party library breaking changes | Medium | Pin versions, integration tests, abstraction layers. |
Expand Down
6 changes: 4 additions & 2 deletions src/synthorg/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@
StepStatus,
)
from synthorg.engine.prompt import (
DefaultTokenEstimator,
PromptTokenEstimator,
SystemPrompt,
build_system_prompt,
)
Expand Down Expand Up @@ -185,6 +183,10 @@
UpdateTaskMutation,
)
from synthorg.engine.task_execution import StatusTransition, TaskExecution
from synthorg.engine.token_estimation import (
DefaultTokenEstimator,
PromptTokenEstimator,
)
from synthorg.engine.workspace import (
MergeConflict,
MergeOrchestrator,
Expand Down
27 changes: 21 additions & 6 deletions src/synthorg/engine/checkpoint/resume.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,23 @@ def deserialize_and_reconcile(
)
raise

compression = checkpoint_ctx.compression_metadata
compaction_note = (
f"Note: conversation was previously compacted "
f"(archived {compression.archived_turns} turns). "
if compression is not None
else ""
)
reconciliation_content = (
f"Execution resumed from checkpoint at turn "
f"{checkpoint_ctx.turn_count}. {compaction_note}"
f"Previous error: {error_message}. "
"Review progress and continue."
)

reconciliation_msg = ChatMessage(
role=MessageRole.SYSTEM,
content=(
f"Execution resumed from checkpoint at turn "
f"{checkpoint_ctx.turn_count}. Previous error: "
f"{error_message}. "
"Review progress and continue."
),
content=reconciliation_content,
)
logger.debug(
CHECKPOINT_RECOVERY_RECONCILIATION,
Expand Down Expand Up @@ -117,13 +126,15 @@ def make_loop_with_callback( # noqa: PLR0913
checkpoint_callback=callback,
approval_gate=loop.approval_gate,
stagnation_detector=loop.stagnation_detector,
compaction_callback=loop.compaction_callback,
)
if isinstance(loop, PlanExecuteLoop):
return PlanExecuteLoop(
config=loop.config,
checkpoint_callback=callback,
approval_gate=loop.approval_gate,
stagnation_detector=loop.stagnation_detector,
compaction_callback=loop.compaction_callback,
)
logger.warning(
CHECKPOINT_UNSUPPORTED_LOOP,
Expand Down Expand Up @@ -158,6 +169,8 @@ async def cleanup_checkpoint_artifacts(
execution_id=execution_id,
deleted_count=count,
)
except MemoryError, RecursionError:
raise
except Exception:
logger.warning(
CHECKPOINT_DELETE_FAILED,
Expand All @@ -173,6 +186,8 @@ async def cleanup_checkpoint_artifacts(
HEARTBEAT_DELETED,
execution_id=execution_id,
)
except MemoryError, RecursionError:
raise
except Exception:
logger.warning(
HEARTBEAT_DELETE_FAILED,
Expand Down
18 changes: 18 additions & 0 deletions src/synthorg/engine/compaction/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""Context compaction subpackage.

Provides a pluggable compaction hook for execution loops that
compresses older conversation turns when the context window
fill level exceeds a configurable threshold.
"""

from synthorg.engine.compaction.models import (
CompactionConfig,
CompressionMetadata,
)
from synthorg.engine.compaction.protocol import CompactionCallback

__all__ = [
"CompactionCallback",
"CompactionConfig",
"CompressionMetadata",
]
73 changes: 73 additions & 0 deletions src/synthorg/engine/compaction/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Compaction configuration and result models.

All models are frozen Pydantic models following the project's
immutability convention.
"""

from pydantic import BaseModel, ConfigDict, Field


class CompactionConfig(BaseModel):
"""Configuration for context compaction behavior.

Attributes:
fill_threshold_percent: Context fill percentage that triggers
compaction (e.g. 80.0 means compact when 80% full).
min_messages_to_compact: Minimum number of conversation
messages required before compaction is allowed.
preserve_recent_turns: Number of recent turn pairs to keep
uncompressed after compaction.
"""

model_config = ConfigDict(frozen=True)

fill_threshold_percent: float = Field(
default=80.0,
gt=0.0,
le=100.0,
description="Fill percentage that triggers compaction",
)
min_messages_to_compact: int = Field(
default=4,
ge=2,
description="Minimum messages before compaction is allowed",
)
preserve_recent_turns: int = Field(
default=3,
ge=1,
description="Recent turn pairs to keep uncompressed",
)


class CompressionMetadata(BaseModel):
"""Metadata about conversation compression on an ``AgentContext``.

Attached to ``AgentContext.compression_metadata`` when conversation
compaction has occurred, enabling compressed checkpoint recovery.

Attributes:
compression_point: Turn number at which compaction occurred.
archived_turns: Number of turns that were archived.
summary_tokens: Token count of the summary message.
compactions_performed: Total number of compactions so far.
"""

model_config = ConfigDict(frozen=True)

compression_point: int = Field(
ge=0,
description="Turn number at which compaction occurred",
)
archived_turns: int = Field(
ge=0,
description="Number of turns archived",
)
summary_tokens: int = Field(
ge=0,
description="Token count of the summary message",
)
compactions_performed: int = Field(
default=1,
ge=1,
description="Total compactions performed so far",
)
24 changes: 24 additions & 0 deletions src/synthorg/engine/compaction/protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Compaction callback type alias.

Follows the ``CheckpointCallback`` pattern — a simple callable type
alias rather than a protocol class, since the callback has a single
responsibility with no configuration methods.
"""

from collections.abc import Callable, Coroutine
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from synthorg.engine.context import AgentContext

CompactionCallback = Callable[
["AgentContext"],
Coroutine[Any, Any, "AgentContext | None"],
]
"""Async callback invoked at turn boundaries to compress conversation.

Receives the current ``AgentContext`` and returns either:

- A new ``AgentContext`` with compressed conversation (compaction ran).
- ``None`` to signal no compaction was needed or possible.
"""
Loading
Loading