Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ src/synthorg/
core/ # Shared domain models, base classes, and resilience config (RetryConfig, RateLimiterConfig)
engine/ # Agent orchestration, execution loops, parallel execution, task decomposition, routing, task assignment, centralized single-writer task state engine (TaskEngine), task lifecycle, recovery, shutdown, workspace isolation, coordination (multi-agent pipeline: TopologyDispatcher protocol, 4 dispatchers — SAS/centralized/decentralized/context-dependent, wave execution, workspace lifecycle integration, CoordinationSectionConfig company config bridge, build_coordinator factory), coordination error classification, prompt policy validation, checkpoint recovery (checkpoint/, per-turn persistence, heartbeat detection, CheckpointRecoveryStrategy), approval gate (escalation detection, context parking/resume, EscalationInfo/ResumePayload models), stagnation detection (stagnation/, StagnationDetector protocol, ToolRepetitionDetector, dual-signal analysis, corrective prompt injection), agent runtime state (AgentRuntimeState, lightweight per-agent execution status for dashboard queries and recovery), context budget management (context_budget.py, ContextBudgetIndicator, fill estimation, token estimation protocol in token_estimation.py), conversation compaction (compaction/, CompactionCallback type alias, CompactionConfig, CompressionMetadata, oldest-turns summarizer)
hr/ # HR engine: hiring, firing, onboarding, offboarding, agent registry, performance tracking (task metrics, collaboration scoring, LLM calibration sampling, collaboration overrides, trend detection), promotion/demotion (criteria evaluation, approval strategies, model mapping)
memory/ # Persistent agent memory (pluggable MemoryBackend protocol), backends/ (Mem0 adapter: backends/mem0/), retrieval pipeline (ranking, RRF fusion, injection, context formatting, non-inferable filtering), shared org memory (org/), consolidation/archival (consolidation/)
memory/ # Persistent agent memory (pluggable MemoryBackend protocol), backends/ (Mem0 adapter: backends/mem0/), retrieval pipeline (ranking, RRF fusion, injection, context formatting, non-inferable filtering), shared org memory (org/), consolidation/archival (consolidation/, dual-mode density-aware archival: DensityClassifier, AbstractiveSummarizer, ExtractivePreserver, DualModeConsolidationStrategy)
persistence/ # Operational data persistence — pluggable PersistenceBackend protocol, SQLite initial, SettingsRepository (namespaced settings CRUD) (see Memory & Persistence design page)
observability/ # Structured logging, correlation tracking, log sinks
providers/ # LLM provider abstraction (LiteLLM adapter)
Expand Down
14 changes: 14 additions & 0 deletions docs/api/memory.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,17 @@ Persistent agent memory — protocol, retrieval pipeline, shared org memory, con
::: synthorg.memory.consolidation.strategy

::: synthorg.memory.consolidation.service

::: synthorg.memory.consolidation.retention

::: synthorg.memory.consolidation.archival

::: synthorg.memory.consolidation.simple_strategy

::: synthorg.memory.consolidation.density

::: synthorg.memory.consolidation.extractive

::: synthorg.memory.consolidation.abstractive

::: synthorg.memory.consolidation.dual_mode_strategy
2 changes: 1 addition & 1 deletion docs/architecture/tech-stack.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ These conventions are used throughout the codebase. For full details on each, se
| **LLM call analytics** | Adopted | Proxy metrics (`turns_per_task`, `tokens_per_task`) and data models for call categorization, coordination metrics, and orchestration ratio. |
| **Cost tiers and quota tracking** | Adopted | Configurable `CostTierDefinition` with merge/override semantics. `QuotaTracker` enforces per-provider request/token quotas with window-based rotation. |
| **Shared org memory** | Adopted | `OrgMemoryBackend` protocol with `HybridPromptRetrievalBackend`. Seniority-based write access control. Core policies in system prompts; extended facts retrieved on demand. |
| **Memory consolidation** | Adopted | `ConsolidationStrategy` protocol with deduplication + summarization. `RetentionEnforcer` for age-based cleanup. `ArchivalStore` for cold storage. |
| **Memory consolidation** | Adopted | `ConsolidationStrategy` protocol with simple (deduplication + summarization) and dual-mode (density-aware: abstractive LLM summary for sparse content, extractive preservation for dense content) strategies. `RetentionEnforcer` for age-based cleanup. `ArchivalStore` for cold storage with deterministic index-based restore. |
| **State coordination** | Adopted | Centralized single-writer `TaskEngine` with `asyncio.Queue`. Agents submit requests; engine applies `model_validate` / `with_transition` sequentially and publishes snapshots. |
| **Workspace isolation** | Adopted | Pluggable `WorkspaceIsolationStrategy` protocol. Default: git worktrees with sequential merge on completion. |
| **Graceful shutdown** | Adopted | Pluggable `ShutdownStrategy` protocol with cooperative 30-second timeout. Force-cancel after timeout with `INTERRUPTED` status. |
Expand Down
31 changes: 30 additions & 1 deletion docs/design/memory.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,36 @@ models in `memory/consolidation/config.py`:
|--------|---------|
| `ConsolidationConfig` | Top-level: `max_memories_per_agent` limit, nested `retention` and `archival` sub-configs |
| `RetentionConfig` | Per-category `RetentionRule` tuples (category + retention_days), optional `default_retention_days` fallback |
| `ArchivalConfig` | Enables/disables archival of consolidated entries to `ArchivalStore` |
| `ArchivalConfig` | Enables/disables archival of consolidated entries to `ArchivalStore`, nested `DualModeConfig` |
| `DualModeConfig` | Density-aware dual-mode archival: threshold, summarization model, anchor/fact limits |

#### Dual-Mode Archival

When `ArchivalConfig.dual_mode.enabled` is `True`, consolidation classifies content density before
choosing an archival mode. This prevents catastrophic information loss from naively summarizing
dense content (code, structured data, identifiers). Based on research: Memex
([arXiv:2603.04257](https://arxiv.org/abs/2603.04257)) and KV Cache Attention Matching
([arXiv:2602.16284](https://arxiv.org/abs/2602.16284)).

| Density | Archival Mode | Method |
|---------|--------------|--------|
| Sparse (conversational, narrative) | `ABSTRACTIVE` | LLM-generated summary via `AbstractiveSummarizer` |
| Dense (code, structured data, IDs) | `EXTRACTIVE` | Verbatim key-fact extraction + start/mid/end anchors via `ExtractivePreserver` |

**Classification** is heuristic-based (`DensityClassifier`), using five weighted signals: code
patterns, structured data markers, identifier density, numeric density, and line structure. No LLM
is needed for classification — only for abstractive summarization. Groups are classified by
majority vote: if most entries in a category group are dense, the group uses extractive mode.

**Deterministic restore**: When entries are archived, the service builds an `archival_index`
(mapping `original_id` → `archival_id`) on `ConsolidationResult`. Agents can use this index to
call `ArchivalStore.restore(agent_id, entry_id)` directly by ID, bypassing semantic search.

| Model | Purpose |
|-------|---------|
| `ArchivalMode` | Enum: `ABSTRACTIVE` or `EXTRACTIVE` |
| `ArchivalModeAssignment` | Maps a removed entry ID to its archival mode (set by strategy) |
| `ArchivalIndexEntry` | Maps original entry ID to archival store ID (built by service) |

!!! abstract "Scope Note"

Expand Down
6 changes: 6 additions & 0 deletions src/synthorg/memory/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
MemoryStorageConfig,
)
from synthorg.memory.consolidation import (
ArchivalMode,
ArchivalStore,
ConsolidationConfig,
ConsolidationResult,
ConsolidationStrategy,
ContentDensity,
DualModeConsolidationStrategy,
MemoryConsolidationService,
RetentionEnforcer,
SimpleConsolidationStrategy,
Expand Down Expand Up @@ -70,13 +73,16 @@
from synthorg.memory.shared import SharedKnowledgeStore

__all__ = [
"ArchivalMode",
"ArchivalStore",
"CompanyMemoryConfig",
"ConsolidationConfig",
"ConsolidationResult",
"ConsolidationStrategy",
"ContentDensity",
"ContextInjectionStrategy",
"DefaultTokenEstimator",
"DualModeConsolidationStrategy",
"FusionStrategy",
"InjectionPoint",
"InjectionStrategy",
Expand Down
19 changes: 19 additions & 0 deletions src/synthorg/memory/consolidation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,24 @@
``synthorg.memory.consolidation`` directly.
"""

from synthorg.memory.consolidation.abstractive import AbstractiveSummarizer
from synthorg.memory.consolidation.archival import ArchivalStore
from synthorg.memory.consolidation.config import (
ArchivalConfig,
ConsolidationConfig,
DualModeConfig,
RetentionConfig,
)
from synthorg.memory.consolidation.density import ContentDensity, DensityClassifier
from synthorg.memory.consolidation.dual_mode_strategy import (
DualModeConsolidationStrategy,
)
from synthorg.memory.consolidation.extractive import ExtractivePreserver
from synthorg.memory.consolidation.models import (
ArchivalEntry,
ArchivalIndexEntry,
ArchivalMode,
ArchivalModeAssignment,
ConsolidationResult,
RetentionRule,
)
Expand All @@ -23,12 +33,21 @@
from synthorg.memory.consolidation.strategy import ConsolidationStrategy

__all__ = [
"AbstractiveSummarizer",
"ArchivalConfig",
"ArchivalEntry",
"ArchivalIndexEntry",
"ArchivalMode",
"ArchivalModeAssignment",
"ArchivalStore",
"ConsolidationConfig",
"ConsolidationResult",
"ConsolidationStrategy",
"ContentDensity",
"DensityClassifier",
"DualModeConfig",
"DualModeConsolidationStrategy",
"ExtractivePreserver",
"MemoryConsolidationService",
"RetentionConfig",
"RetentionEnforcer",
Expand Down
172 changes: 172 additions & 0 deletions src/synthorg/memory/consolidation/abstractive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
"""Abstractive summarizer for sparse memory content.

Uses an LLM (via ``CompletionProvider``) to generate concise summaries
of conversational/narrative memory content. Falls back to truncation
if the LLM call fails.
"""

import asyncio

from synthorg.core.types import NotBlankStr # noqa: TC001
from synthorg.memory.models import MemoryEntry # noqa: TC001
from synthorg.observability import get_logger
from synthorg.observability.events.consolidation import (
DUAL_MODE_ABSTRACTIVE_FALLBACK,
DUAL_MODE_ABSTRACTIVE_SUMMARY,
)
from synthorg.providers.enums import MessageRole
from synthorg.providers.errors import ProviderError
from synthorg.providers.models import ChatMessage, CompletionConfig
from synthorg.providers.protocol import CompletionProvider # noqa: TC001

logger = get_logger(__name__)

_TRUNCATE_LENGTH = 200

_SYSTEM_PROMPT = (
"You are a memory consolidation assistant. Summarize the following "
"memory content concisely, preserving key decisions, events, and "
"learnings. Be factual, specific, and brief."
)


def _truncate_fallback(content: str) -> str:
"""Truncate content as a fallback when LLM summarization fails."""
if len(content) <= _TRUNCATE_LENGTH:
return content
return content[:_TRUNCATE_LENGTH] + "..."


class AbstractiveSummarizer:
"""LLM-based abstractive summarizer for sparse content.

Uses a ``CompletionProvider`` to generate concise summaries of
conversational/narrative memory content. Falls back to truncation
if the LLM call fails with a retryable error.

Args:
provider: Completion provider for LLM calls.
model: Model identifier to use for summarization.
max_summary_tokens: Maximum tokens for the summary response.
temperature: Sampling temperature for summarization.

Raises:
ValueError: If ``model`` is empty or whitespace-only.
"""

def __init__(
self,
*,
provider: CompletionProvider,
model: NotBlankStr,
max_summary_tokens: int = 200,
temperature: float = 0.3,
) -> None:
if not model or not model.strip():
msg = "model must be a non-blank string"
raise ValueError(msg)
self._provider = provider
self._model = model
self._config = CompletionConfig(
temperature=temperature,
max_tokens=max_summary_tokens,
)

async def summarize(self, content: str) -> str:
"""Generate an abstractive summary of the given content.

Falls back to truncation if the LLM call fails with a
retryable error or returns empty content. Non-retryable
provider errors (authentication, invalid model) propagate.

Args:
content: The sparse/conversational text to summarize.

Returns:
Summary text.
"""
try:
messages = [
ChatMessage(role=MessageRole.SYSTEM, content=_SYSTEM_PROMPT),
ChatMessage(role=MessageRole.USER, content=content),
]
response = await self._provider.complete(
messages,
self._model,
config=self._config,
)
if response.content and response.content.strip():
logger.debug(
DUAL_MODE_ABSTRACTIVE_SUMMARY,
content_length=len(content),
summary_length=len(response.content),
model=self._model,
)
return response.content.strip()
except MemoryError, RecursionError:
raise
except ProviderError as exc:
if not exc.is_retryable:
logger.warning(
DUAL_MODE_ABSTRACTIVE_FALLBACK,
content_length=len(content),
error=str(exc),
error_type=type(exc).__name__,
retryable=False,
)
raise
Comment thread
coderabbitai[bot] marked this conversation as resolved.
logger.warning(
DUAL_MODE_ABSTRACTIVE_FALLBACK,
content_length=len(content),
error=str(exc),
error_type=type(exc).__name__,
)
return _truncate_fallback(content)
except Exception as exc:
logger.warning(
DUAL_MODE_ABSTRACTIVE_FALLBACK,
content_length=len(content),
error=str(exc),
error_type=type(exc).__name__,
)
return _truncate_fallback(content)

# Fallback: empty/whitespace-only LLM response
logger.debug(
DUAL_MODE_ABSTRACTIVE_FALLBACK,
content_length=len(content),
reason="empty_response",
)
return _truncate_fallback(content)

async def summarize_batch(
self,
entries: tuple[MemoryEntry, ...],
) -> tuple[tuple[NotBlankStr, str], ...]:
"""Summarize multiple entries concurrently.

Each entry is summarized independently via ``asyncio.TaskGroup``.
Failures for individual entries fall back to truncation without
aborting the batch.

Args:
entries: Memory entries to summarize.

Returns:
Tuple of ``(entry_id, summary)`` pairs in input order.
"""
if not entries:
return ()

results: dict[NotBlankStr, str] = {}
async with asyncio.TaskGroup() as tg:
tasks: dict[NotBlankStr, asyncio.Task[str]] = {}
for entry in entries:
tasks[entry.id] = tg.create_task(
self.summarize(entry.content),
)

for entry_id, task in tasks.items():
results[entry_id] = task.result()

return tuple((entry.id, results[entry.id]) for entry in entries)
Loading
Loading