diff --git a/CLAUDE.md b/CLAUDE.md index 073f716227..b57d12bdca 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -90,7 +90,7 @@ See `web/CLAUDE.md` for the full component inventory, design token rules, and po - **Every module** with business logic MUST have: `from synthorg.observability import get_logger` then `logger = get_logger(__name__)` - **Never** use `import logging` / `logging.getLogger()` / `print()` in application code (exception: `observability/setup.py`, `observability/sinks.py`, `observability/syslog_handler.py`, and `observability/http_handler.py` may use stdlib `logging` and `print(..., file=sys.stderr)` for handler construction, bootstrap, and error reporting code that runs before or during logging system configuration) - **Variable name**: always `logger` (not `_logger`, not `log`) -- **Event names**: always use constants from the domain-specific module under `synthorg.observability.events` (e.g., `API_REQUEST_STARTED` from `events.api`, `TOOL_INVOKE_START` from `events.tool`, `GIT_COMMAND_START` from `events.git`, `CONTEXT_BUDGET_FILL_UPDATED` from `events.context_budget`, `BACKUP_STARTED` from `events.backup`, `SETUP_COMPLETED` from `events.setup`, `ROUTING_CANDIDATE_SELECTED` from `events.routing`, `SHIPPING_HTTP_BATCH_SENT` from `events.shipping`, `EVAL_REPORT_COMPUTED` from `events.evaluation`, `PROMPT_PROFILE_SELECTED` from `events.prompt`, `PROCEDURAL_MEMORY_START` from `events.procedural_memory`, `PERF_LLM_JUDGE_STARTED` from `events.performance`, `TASK_ENGINE_OBSERVER_FAILED` from `events.task_engine`, `WORKFLOW_EXEC_COMPLETED` from `events.workflow_execution`, `BLUEPRINT_INSTANTIATE_START` from `events.blueprint`, `WORKFLOW_DEF_ROLLED_BACK` from `events.workflow_definition`, `WORKFLOW_VERSION_SAVED` from `events.workflow_version`, `MEMORY_FINE_TUNE_STARTED`, `MEMORY_SELF_EDIT_TOOL_EXECUTE`, `MEMORY_SELF_EDIT_CORE_READ`, `MEMORY_SELF_EDIT_CORE_WRITE`, `MEMORY_SELF_EDIT_CORE_WRITE_REJECTED`, `MEMORY_SELF_EDIT_ARCHIVAL_SEARCH`, `MEMORY_SELF_EDIT_ARCHIVAL_WRITE`, `MEMORY_SELF_EDIT_RECALL_READ`, `MEMORY_SELF_EDIT_RECALL_WRITE`, `MEMORY_SELF_EDIT_WRITE_FAILED` from `events.memory`, `REPORTING_GENERATION_STARTED` from `events.reporting`, `RISK_BUDGET_SCORE_COMPUTED` from `events.risk_budget`, `LLM_STRATEGY_SYNTHESIZED` and `DISTILLATION_CAPTURED` from `events.consolidation`, `MEMORY_DIVERSITY_RERANKED`, `MEMORY_DIVERSITY_RERANK_FAILED`, and `MEMORY_REFORMULATION_ROUND` from `events.memory`, `NOTIFICATION_DISPATCHED` and `NOTIFICATION_DISPATCH_FAILED` from `events.notification`, `QUALITY_STEP_CLASSIFIED` from `events.quality`, `HEALTH_TICKET_EMITTED` from `events.health`, `TRAJECTORY_SCORING_START` from `events.trajectory`, `COORD_METRICS_AMDAHL_COMPUTED` from `events.coordination_metrics`, `WEB_REQUEST_START` and `WEB_SSRF_BLOCKED` from `events.web`, `DB_QUERY_START` and `DB_WRITE_BLOCKED` from `events.database`, `TERMINAL_COMMAND_START` and `TERMINAL_COMMAND_BLOCKED` from `events.terminal`, `SUB_CONSTRAINT_RESOLVED` and `SUB_CONSTRAINT_DENIED` from `events.sub_constraint`, `VERSION_SAVED` and `VERSION_SNAPSHOT_FAILED` from `events.versioning`, `ANALYTICS_AGGREGATION_COMPUTED` and `ANALYTICS_RETRY_RATE_ALERT` from `events.analytics`, `CALL_CLASSIFICATION_COMPUTED` from `events.call_classification`, `QUOTA_THRESHOLD_ALERT` and `QUOTA_POLL_FAILED` from `events.quota`, `CONFLICT_DEBATE_EVALUATOR_FAILED` from `events.conflict`, `DELEGATION_LOOP_CIRCUIT_BACKOFF` and `DELEGATION_LOOP_CIRCUIT_PERSIST_FAILED` from `events.delegation`, `MEETING_EVENT_COOLDOWN_SKIPPED` and `MEETING_TASKS_CAPPED` from `events.meeting`, `PERSISTENCE_CIRCUIT_BREAKER_SAVED`, `PERSISTENCE_CIRCUIT_BREAKER_SAVE_FAILED`, `PERSISTENCE_CIRCUIT_BREAKER_LOADED`, `PERSISTENCE_CIRCUIT_BREAKER_LOAD_FAILED`, `PERSISTENCE_CIRCUIT_BREAKER_DELETED`, and `PERSISTENCE_CIRCUIT_BREAKER_DELETE_FAILED` from `events.persistence`). Each domain has its own module -- see `src/synthorg/observability/events/` for the full inventory of constants. Import directly: `from synthorg.observability.events. import EVENT_CONSTANT` +- **Event names**: always use constants from the domain-specific module under `synthorg.observability.events` (e.g., `API_REQUEST_STARTED` from `events.api`, `TOOL_INVOKE_START` from `events.tool`, `GIT_COMMAND_START` from `events.git`, `CONTEXT_BUDGET_FILL_UPDATED`, `CONTEXT_BUDGET_COMPACTION_STARTED`, `CONTEXT_BUDGET_COMPACTION_COMPLETED`, `CONTEXT_BUDGET_COMPACTION_FAILED`, `CONTEXT_BUDGET_COMPACTION_SKIPPED`, `CONTEXT_BUDGET_COMPACTION_FALLBACK`, `CONTEXT_BUDGET_INDICATOR_INJECTED`, `CONTEXT_BUDGET_AGENT_COMPACTION_REQUESTED`, `CONTEXT_BUDGET_EPISTEMIC_MARKERS_PRESERVED` from `events.context_budget`, `BACKUP_STARTED` from `events.backup`, `SETUP_COMPLETED` from `events.setup`, `ROUTING_CANDIDATE_SELECTED` from `events.routing`, `SHIPPING_HTTP_BATCH_SENT` from `events.shipping`, `EVAL_REPORT_COMPUTED` from `events.evaluation`, `PROMPT_PROFILE_SELECTED` from `events.prompt`, `PROCEDURAL_MEMORY_START` from `events.procedural_memory`, `PERF_LLM_JUDGE_STARTED` from `events.performance`, `TASK_ENGINE_OBSERVER_FAILED` from `events.task_engine`, `WORKFLOW_EXEC_COMPLETED` from `events.workflow_execution`, `BLUEPRINT_INSTANTIATE_START` from `events.blueprint`, `WORKFLOW_DEF_ROLLED_BACK` from `events.workflow_definition`, `WORKFLOW_VERSION_SAVED` from `events.workflow_version`, `MEMORY_FINE_TUNE_STARTED`, `MEMORY_SELF_EDIT_TOOL_EXECUTE`, `MEMORY_SELF_EDIT_CORE_READ`, `MEMORY_SELF_EDIT_CORE_WRITE`, `MEMORY_SELF_EDIT_CORE_WRITE_REJECTED`, `MEMORY_SELF_EDIT_ARCHIVAL_SEARCH`, `MEMORY_SELF_EDIT_ARCHIVAL_WRITE`, `MEMORY_SELF_EDIT_RECALL_READ`, `MEMORY_SELF_EDIT_RECALL_WRITE`, `MEMORY_SELF_EDIT_WRITE_FAILED` from `events.memory`, `REPORTING_GENERATION_STARTED` from `events.reporting`, `RISK_BUDGET_SCORE_COMPUTED` from `events.risk_budget`, `LLM_STRATEGY_SYNTHESIZED` and `DISTILLATION_CAPTURED` from `events.consolidation`, `MEMORY_DIVERSITY_RERANKED`, `MEMORY_DIVERSITY_RERANK_FAILED`, and `MEMORY_REFORMULATION_ROUND` from `events.memory`, `NOTIFICATION_DISPATCHED` and `NOTIFICATION_DISPATCH_FAILED` from `events.notification`, `QUALITY_STEP_CLASSIFIED` from `events.quality`, `HEALTH_TICKET_EMITTED` from `events.health`, `TRAJECTORY_SCORING_START` from `events.trajectory`, `COORD_METRICS_AMDAHL_COMPUTED` from `events.coordination_metrics`, `COORDINATION_STARTED`, `COORDINATION_COMPLETED`, `COORDINATION_FAILED`, `COORDINATION_PHASE_STARTED`, `COORDINATION_PHASE_COMPLETED`, `COORDINATION_PHASE_FAILED`, `COORDINATION_WAVE_STARTED`, `COORDINATION_WAVE_COMPLETED`, `COORDINATION_TOPOLOGY_RESOLVED`, `COORDINATION_CLEANUP_STARTED`, `COORDINATION_CLEANUP_COMPLETED`, `COORDINATION_CLEANUP_FAILED`, `COORDINATION_WAVE_BUILT`, `COORDINATION_FACTORY_BUILT`, and `COORDINATION_ATTRIBUTION_BUILT` from `events.coordination`, `WEB_REQUEST_START` and `WEB_SSRF_BLOCKED` from `events.web`, `DB_QUERY_START` and `DB_WRITE_BLOCKED` from `events.database`, `TERMINAL_COMMAND_START` and `TERMINAL_COMMAND_BLOCKED` from `events.terminal`, `SUB_CONSTRAINT_RESOLVED` and `SUB_CONSTRAINT_DENIED` from `events.sub_constraint`, `VERSION_SAVED` and `VERSION_SNAPSHOT_FAILED` from `events.versioning`, `ANALYTICS_AGGREGATION_COMPUTED` and `ANALYTICS_RETRY_RATE_ALERT` from `events.analytics`, `CALL_CLASSIFICATION_COMPUTED` from `events.call_classification`, `QUOTA_THRESHOLD_ALERT` and `QUOTA_POLL_FAILED` from `events.quota`, `CONFLICT_DEBATE_EVALUATOR_FAILED` from `events.conflict`, `DELEGATION_LOOP_CIRCUIT_BACKOFF` and `DELEGATION_LOOP_CIRCUIT_PERSIST_FAILED` from `events.delegation`, `MEETING_EVENT_COOLDOWN_SKIPPED` and `MEETING_TASKS_CAPPED` from `events.meeting`, `PERSISTENCE_CIRCUIT_BREAKER_SAVED`, `PERSISTENCE_CIRCUIT_BREAKER_SAVE_FAILED`, `PERSISTENCE_CIRCUIT_BREAKER_LOADED`, `PERSISTENCE_CIRCUIT_BREAKER_LOAD_FAILED`, `PERSISTENCE_CIRCUIT_BREAKER_DELETED`, and `PERSISTENCE_CIRCUIT_BREAKER_DELETE_FAILED` from `events.persistence`). Each domain has its own module -- see `src/synthorg/observability/events/` for the full inventory of constants. Import directly: `from synthorg.observability.events. import EVENT_CONSTANT` - **Structured kwargs**: always `logger.info(EVENT, key=value)` -- never `logger.info("msg %s", val)` - **All error paths** must log at WARNING or ERROR with context before raising - **All state transitions** must log at INFO diff --git a/docs/design/engine.md b/docs/design/engine.md index 1a04655aea..787c86649d 100644 --- a/docs/design/engine.md +++ b/docs/design/engine.md @@ -1446,6 +1446,28 @@ groups and routing decisions into `ParallelExecutionGroup` instances. Subtasks without routing decisions are skipped. Empty waves (all subtasks unroutable) are dropped. +#### Per-Agent Attribution + +After the pipeline completes, `build_agent_contributions()` in +`coordination/attribution.py` produces a `tuple[AgentContribution, ...]` from +routing decisions and wave outcomes: + +- **`AgentContribution`** -- frozen Pydantic model recording each agent's + `contribution_score` (0.0--1.0), `failure_attribution` classification, and + optional `evidence` excerpt. +- **`CoordinationResultWithAttribution`** -- wrapper pairing the frozen + `CoordinationResult` with the contribution tuple. The `avg_contribution_score` + computed field provides a quick aggregate. +- **Failure attribution categories** -- `"direct"` (agent's own failure), + `"upstream_contamination"` (bad input from another agent), + `"coordination_overhead"` (system-initiated: budget, shutdown, parking), + `"quality_gate"` (failed quality check). +- **Integration** -- contributions are fed into `PerformanceTracker + .record_coordination_contributions()` for trend analysis. The async + method `record_task_metric()` guards writes behind an `asyncio.Lock`; + `record_coordination_contributions()` is synchronous (no await points) + so dict operations are atomic within the single-threaded event loop. + --- ## ACG Vocabulary Cross-Reference @@ -1466,7 +1488,7 @@ external audiences; use SynthOrg terms in implementation discussions. | ACG Template | `CompanyConfig` + company YAML | Partial | ACG is graph-level; SynthOrg operates at org-level | | Realized Graph | `AgentContext` + `TaskExecution` + `CoordinationResult` | Strong | Runtime execution state | | Execution Trace | `TurnRecord` tuple + observability events (82+ constants) | Strong | SynthOrg's trace is richer than ACG baseline | -| Nodes | LLM calls (`call_provider`), tool invocations, validation checks | Partial | No formal node typing yet | +| Nodes | LLM calls (`call_provider`), tool invocations, validation checks | Strong | Typed via `NodeType` enum on `TurnRecord.node_types` | | Edges | `SubtaskDefinition.dependencies`, `DecompositionPlan` DAG | Strong | Multi-agent; implicit in single-agent loops | | Scheduling Policies | `AutoLoopConfig` + `select_loop_type()` + `CoordinationConfig` | Strong | Loop selector + topology selection | | Conditional Branching | HybridLoop replan, PlanExecuteLoop step checks | Partial | Not expressed as graph-level conditionals | diff --git a/scripts/run_affected_tests.py b/scripts/run_affected_tests.py index f0405120ff..f3034a1579 100644 --- a/scripts/run_affected_tests.py +++ b/scripts/run_affected_tests.py @@ -148,7 +148,18 @@ def _affected_test_dirs(changed: list[str]) -> tuple[list[str], bool]: def _run_pytest(paths: list[str]) -> int: - """Run pytest with the given paths.""" + """Run pytest with the given paths. + + Uses ``--dist loadscope`` instead of pyproject.toml's default + ``worksteal`` to group tests by module, preventing xdist worker + crashes from repeated heavy fixture teardown/setup (Litestar + TestClient, SQLite connections) when individual tests are + scattered across workers during full-suite runs. + + ``--max-worker-restart=0`` disables worker restarts to avoid a + known xdist scheduler KeyError when the loadscope scheduler + tries to reassign work to a restarted worker with a new id. + """ cmd = [ sys.executable, "-m", @@ -158,6 +169,9 @@ def _run_pytest(paths: list[str]) -> int: "unit", "-n", "8", + "--dist", + "loadscope", + "--max-worker-restart=0", "-q", ] result = subprocess.run(cmd, cwd=_REPO_ROOT, check=False) diff --git a/src/synthorg/api/controllers/coordination.py b/src/synthorg/api/controllers/coordination.py index ee45fb58fb..4820dda879 100644 --- a/src/synthorg/api/controllers/coordination.py +++ b/src/synthorg/api/controllers/coordination.py @@ -42,6 +42,9 @@ from synthorg.api.state import AppState from synthorg.core.agent import AgentIdentity from synthorg.core.task import Task + from synthorg.engine.coordination.attribution import ( + CoordinationResultWithAttribution, + ) logger = get_logger(__name__) @@ -171,7 +174,7 @@ async def coordinate_task( agent_count=len(agents), ) - result = await self._execute( + attributed = await self._execute( app_state, request, context, @@ -188,7 +191,10 @@ async def coordinate_task( ) currency = DEFAULT_CURRENCY return ApiResponse( - data=_map_result_to_response(result, currency=currency), + data=_map_result_to_response( + attributed.result, + currency=currency, + ), ) async def _get_task( @@ -239,10 +245,10 @@ async def _execute( request: Request[Any, Any, Any], context: CoordinationContext, task_id: str, - ) -> CoordinationResult: + ) -> CoordinationResultWithAttribution: """Run coordination and publish WS events.""" try: - result = await app_state.coordinator.coordinate(context) + attributed = await app_state.coordinator.coordinate(context) except CoordinationPhaseError as exc: logger.warning( API_COORDINATION_FAILED, @@ -276,9 +282,12 @@ async def _execute( ) raise + result = attributed.result + is_success = attributed.is_success + ws_event_type = ( WsEventType.COORDINATION_COMPLETED - if result.is_success + if is_success else WsEventType.COORDINATION_FAILED ) _publish_ws_event( @@ -287,22 +296,22 @@ async def _execute( { "task_id": task_id, "topology": result.topology.value, - "is_success": result.is_success, + "is_success": is_success, "total_duration_seconds": result.total_duration_seconds, }, ) log_event = ( - API_COORDINATION_COMPLETED if result.is_success else API_COORDINATION_FAILED + API_COORDINATION_COMPLETED if is_success else API_COORDINATION_FAILED ) - log_fn = logger.info if result.is_success else logger.warning + log_fn = logger.info if is_success else logger.warning log_fn( log_event, task_id=task_id, topology=result.topology.value, - is_success=result.is_success, + is_success=is_success, total_duration_seconds=result.total_duration_seconds, ) - return result + return attributed async def _resolve_agents( self, @@ -326,11 +335,20 @@ async def _resolve_agents( registry = app_state.agent_registry if data.agent_names is not None: - results = await asyncio.gather( - *(registry.get_by_name(name) for name in data.agent_names) - ) + names = data.agent_names + results: list[AgentIdentity | None] = [None] * len(names) + async with asyncio.TaskGroup() as tg: + for idx, name in enumerate(names): + + async def _resolve( + i: int = idx, + n: str = name, + ) -> None: + results[i] = await registry.get_by_name(n) + + tg.create_task(_resolve()) agents: list[AgentIdentity] = [] - for name, agent in zip(data.agent_names, results, strict=True): + for name, agent in zip(names, results, strict=True): if agent is None: logger.warning( API_COORDINATION_AGENT_RESOLVE_FAILED, diff --git a/src/synthorg/engine/__init__.py b/src/synthorg/engine/__init__.py index b80272288b..99adc57b50 100644 --- a/src/synthorg/engine/__init__.py +++ b/src/synthorg/engine/__init__.py @@ -51,12 +51,14 @@ AgentContextSnapshot, ) from synthorg.engine.coordination import ( + AgentContribution, CentralizedDispatcher, ContextDependentDispatcher, CoordinationConfig, CoordinationContext, CoordinationPhaseResult, CoordinationResult, + CoordinationResultWithAttribution, CoordinationWave, DecentralizedDispatcher, DispatchResult, @@ -225,6 +227,7 @@ "AgentAssignment", "AgentContext", "AgentContextSnapshot", + "AgentContribution", "AgentEngine", "AgentOutcome", "AgentRunResult", @@ -256,6 +259,7 @@ "CoordinationPhaseError", "CoordinationPhaseResult", "CoordinationResult", + "CoordinationResultWithAttribution", "CoordinationWave", "CostOptimizedAssignmentStrategy", "CreateTaskData", diff --git a/src/synthorg/engine/agent_engine.py b/src/synthorg/engine/agent_engine.py index fd4cac94a3..42c2fdc83d 100644 --- a/src/synthorg/engine/agent_engine.py +++ b/src/synthorg/engine/agent_engine.py @@ -117,9 +117,11 @@ from synthorg.core.agent import AgentIdentity from synthorg.core.task import Task from synthorg.engine.compaction import CompactionCallback + from synthorg.engine.coordination.attribution import ( + CoordinationResultWithAttribution, + ) from synthorg.engine.coordination.models import ( CoordinationContext, - CoordinationResult, ) from synthorg.engine.coordination.service import MultiAgentCoordinator from synthorg.engine.hybrid_models import HybridLoopConfig @@ -452,14 +454,14 @@ def coordinator(self) -> MultiAgentCoordinator | None: async def coordinate( self, context: CoordinationContext, - ) -> CoordinationResult: + ) -> CoordinationResultWithAttribution: """Delegate to the multi-agent coordinator. Args: context: Coordination context with task, agents, and config. Returns: - Coordination result with all phase outcomes. + Coordination result with per-agent attribution data. Raises: ExecutionStateError: If no coordinator is configured. diff --git a/src/synthorg/engine/compaction/epistemic.py b/src/synthorg/engine/compaction/epistemic.py new file mode 100644 index 0000000000..3cb69132e3 --- /dev/null +++ b/src/synthorg/engine/compaction/epistemic.py @@ -0,0 +1,138 @@ +"""Epistemic marker detection for compaction summaries. + +Detects reasoning markers (hedging, reconsideration, uncertainty, +verification, correction) in assistant messages. Messages with +high marker density are preserved during compaction to maintain +reasoning chain integrity. + +Reference: arXiv:2603.24472 -- removing epistemic markers degrades +accuracy by up to 63% on complex reasoning tasks. +""" + +import re + +from synthorg.core.enums import Complexity +from synthorg.observability import get_logger + +logger = get_logger(__name__) + +# Precompiled case-insensitive word-boundary patterns grouped by type. +_HEDGING = re.compile(r"\b(wait|hmm|hm|ah)\b", re.IGNORECASE) +_RECONSIDERATION = re.compile( + r"\b(actually|let me reconsider|on second thought|I was wrong)\b", + re.IGNORECASE, +) +_UNCERTAINTY = re.compile( + r"\b(perhaps|alternatively|I'm not sure|uncertain)\b", + re.IGNORECASE, +) +_VERIFICATION = re.compile( + r"\b(check|verify|double-check|let me verify)\b", + re.IGNORECASE, +) +_CORRECTION = re.compile( + r"\b(but wait|actually no|hold on)\b", + re.IGNORECASE, +) + +EPISTEMIC_PATTERNS: tuple[re.Pattern[str], ...] = ( + _HEDGING, + _RECONSIDERATION, + _UNCERTAINTY, + _VERIFICATION, + _CORRECTION, +) + +# Sentence-splitting regex: split on period, question mark, exclamation, +# or newline followed by optional whitespace. +_SENTENCE_SPLIT = re.compile(r"[.?!]\s+|\n+") + +# Complexity levels that use the low-threshold (>= 1 marker). +_HIGH_COMPLEXITY = frozenset({Complexity.COMPLEX, Complexity.EPIC}) + +# Marker count thresholds per complexity tier. +_COMPLEX_THRESHOLD = 1 +_SIMPLE_THRESHOLD = 3 + + +def count_epistemic_markers(text: str) -> int: + """Count distinct epistemic pattern matches in text. + + Each pattern is counted once regardless of how many times it + matches within the text. + + Args: + text: Input text to scan. + + Returns: + Number of distinct patterns that matched (0 to 5). + """ + return sum(1 for p in EPISTEMIC_PATTERNS if p.search(text)) + + +def should_preserve_message( + text: str, + complexity: Complexity, +) -> bool: + """Decide whether a message should be preserved during compaction. + + Uses complexity-adaptive thresholds: + - COMPLEX/EPIC: preserve if >= 1 epistemic marker + - SIMPLE/MEDIUM: preserve if >= 3 epistemic markers + + Args: + text: Message content. + complexity: Task complexity level. + + Returns: + True if the message should be preserved verbatim. + """ + count = count_epistemic_markers(text) + threshold = ( + _COMPLEX_THRESHOLD if complexity in _HIGH_COMPLEXITY else _SIMPLE_THRESHOLD + ) + return count >= threshold + + +def extract_marker_sentences( + text: str, + *, + max_chars: int = 200, +) -> str: + """Extract sentences containing epistemic markers. + + Splits text on sentence boundaries (period, question mark, + exclamation, newline) and collects sentences where at least + one epistemic pattern matches. Truncates the result to + *max_chars*. + + Args: + text: Input text. + max_chars: Maximum character length of the result. + + Returns: + Joined marker-containing sentences, truncated if needed. + """ + sentences = _SENTENCE_SPLIT.split(text) + marker_sentences: list[str] = [] + total_len = 0 + + for sentence in sentences: + stripped = sentence.strip() + if not stripped: + continue + if any(p.search(stripped) for p in EPISTEMIC_PATTERNS): + sep_len = 2 if marker_sentences else 0 + if total_len + sep_len + len(stripped) > max_chars: + # If this is the first sentence and it exceeds max_chars, + # include a truncated version rather than returning empty. + if not marker_sentences: + marker_sentences.append(stripped[:max_chars]) + break + marker_sentences.append(stripped) + total_len += sep_len + len(stripped) + + if not marker_sentences: + return "" + + return "; ".join(marker_sentences) diff --git a/src/synthorg/engine/compaction/models.py b/src/synthorg/engine/compaction/models.py index 94b88ad39e..9c44f9c35f 100644 --- a/src/synthorg/engine/compaction/models.py +++ b/src/synthorg/engine/compaction/models.py @@ -4,19 +4,42 @@ immutability convention. """ -from pydantic import BaseModel, ConfigDict, Field +from typing import Self + +from pydantic import BaseModel, ConfigDict, Field, model_validator class CompactionConfig(BaseModel): """Configuration for context compaction behavior. + Two operating modes: + + **Standard** (``agent_controlled=False``): automatic compaction + triggers when context fill reaches ``fill_threshold_percent``. + + **Agent-controlled** (``agent_controlled=True``): agents manage + compaction via the ``compact_context`` tool. Automatic compaction + is deferred to ``safety_threshold_percent`` (which must be higher + than ``fill_threshold_percent``), giving agents headroom to decide + when and how to compact while retaining a safety net. + Attributes: fill_threshold_percent: Context fill percentage that triggers - compaction (e.g. 80.0 means compact when 80% full). + compaction in standard mode (e.g. 80.0 means compact when + 80% full). In agent-controlled mode this threshold is + NOT used for automatic compaction -- agents decide when to + compact below ``safety_threshold_percent``. min_messages_to_compact: Minimum number of conversation messages required before compaction is allowed. preserve_recent_turns: Number of recent turn pairs to keep uncompressed after compaction. + agent_controlled: Enable agent-initiated compaction via the + ``compact_context`` tool. + safety_threshold_percent: Auto-compaction threshold when + ``agent_controlled`` is ``True`` (safety net). Must be + greater than ``fill_threshold_percent``. + preserve_epistemic_markers: Detect and preserve epistemic + markers (hedging, reconsideration, etc.) in summaries. """ model_config = ConfigDict(frozen=True, allow_inf_nan=False) @@ -37,6 +60,40 @@ class CompactionConfig(BaseModel): ge=1, description="Recent turn pairs to keep uncompressed", ) + agent_controlled: bool = Field( + default=False, + description=( + "Enable agent-initiated compaction via compact_context tool. " + "When True, auto-compaction uses safety_threshold_percent." + ), + ) + safety_threshold_percent: float = Field( + default=95.0, + gt=0.0, + le=100.0, + description=( + "Auto-compaction threshold when agent_controlled=True (safety net)." + ), + ) + preserve_epistemic_markers: bool = Field( + default=True, + description=("Detect and preserve epistemic markers in compaction summaries."), + ) + + @model_validator(mode="after") + def _validate_safety_above_fill(self) -> Self: + """Safety threshold must exceed fill threshold when agent-controlled.""" + if ( + self.agent_controlled + and self.safety_threshold_percent <= self.fill_threshold_percent + ): + msg = ( + f"safety_threshold_percent ({self.safety_threshold_percent}) " + f"must be greater than fill_threshold_percent " + f"({self.fill_threshold_percent}) when agent_controlled=True" + ) + raise ValueError(msg) + return self class CompressionMetadata(BaseModel): diff --git a/src/synthorg/engine/compaction/summarizer.py b/src/synthorg/engine/compaction/summarizer.py index ee6bbf92cb..6cbc815bac 100644 --- a/src/synthorg/engine/compaction/summarizer.py +++ b/src/synthorg/engine/compaction/summarizer.py @@ -7,6 +7,11 @@ from typing import TYPE_CHECKING +from synthorg.core.enums import Complexity +from synthorg.engine.compaction.epistemic import ( + extract_marker_sentences, + should_preserve_message, +) from synthorg.engine.compaction.models import ( CompactionConfig, CompressionMetadata, @@ -67,6 +72,8 @@ def _do_compaction( ctx: AgentContext, config: CompactionConfig, estimator: PromptTokenEstimator, + *, + force: bool = False, ) -> AgentContext | None: """Core compaction logic. @@ -74,13 +81,20 @@ def _do_compaction( ctx: Current agent context. config: Compaction configuration. estimator: Token estimator. + force: Skip fill threshold check (for agent-initiated compaction). Returns: New compacted ``AgentContext`` or ``None`` if no compaction needed. """ fill_pct = ctx.context_fill_percent - if fill_pct is None or fill_pct < config.fill_threshold_percent: - return None + if not force: + effective_threshold = ( + config.safety_threshold_percent + if config.agent_controlled + else config.fill_threshold_percent + ) + if fill_pct is None or fill_pct < effective_threshold: + return None conversation = ctx.conversation if len(conversation) < config.min_messages_to_compact: @@ -98,6 +112,7 @@ def _do_compaction( execution_id=ctx.execution_id, fill_percent=fill_pct, message_count=len(conversation), + forced=force, ) split = _split_conversation(ctx, config) @@ -105,12 +120,15 @@ def _do_compaction( return None head, archivable, recent = split + task_complexity = _extract_task_complexity(ctx) compressed, metadata, summary_tokens = _compress( ctx, head, archivable, recent, estimator, + preserve_markers=config.preserve_epistemic_markers, + task_complexity=task_complexity, ) # Re-estimate fill with compressed conversation. Counts @@ -173,18 +191,26 @@ def _split_conversation( return head, archivable, recent -def _compress( +def _compress( # noqa: PLR0913 ctx: AgentContext, head: tuple[ChatMessage, ...], archivable: tuple[ChatMessage, ...], recent: tuple[ChatMessage, ...], estimator: PromptTokenEstimator, + *, + preserve_markers: bool, + task_complexity: Complexity, ) -> tuple[tuple[ChatMessage, ...], CompressionMetadata, int]: """Build compressed conversation and metadata. Returns ``(compressed_conversation, metadata, summary_tokens)``. """ - summary_text = _build_summary(archivable, ctx.execution_id) + summary_text = _build_summary( + archivable, + ctx.execution_id, + preserve_markers=preserve_markers, + task_complexity=task_complexity, + ) summary_msg = ChatMessage( role=MessageRole.SYSTEM, content=summary_text, @@ -206,33 +232,71 @@ def _compress( return compressed, metadata, summary_tokens +def _extract_task_complexity(ctx: AgentContext) -> Complexity: + """Extract task complexity from context, defaulting to COMPLEX.""" + task_exec = getattr(ctx, "task_execution", None) + if task_exec is not None: + task = getattr(task_exec, "task", None) + if task is not None: + complexity = getattr(task, "estimated_complexity", None) + if isinstance(complexity, Complexity): + return complexity + return Complexity.COMPLEX + + def _build_summary( messages: tuple[ChatMessage, ...], execution_id: str, + *, + preserve_markers: bool, + task_complexity: Complexity, ) -> str: - """Build a simple text summary from archived messages. + """Build a text summary from archived messages. - Concatenates sanitized assistant message content snippets into a - summary paragraph, capped at ``_MAX_SUMMARY_CHARS``. Each snippet - is redacted for file paths and URLs via ``sanitize_message``. + When ``preserve_markers`` is True, assistant messages with + epistemic markers (hedging, reconsideration, etc.) are preserved + as marker-containing sentences instead of being sanitized down + to 100-char snippets. Args: messages: The archived messages to summarize. execution_id: Execution identifier for log correlation. + preserve_markers: Whether to preserve epistemic markers. + task_complexity: Task complexity for marker thresholds. Returns: Summary text describing the archived conversation. """ snippets: list[str] = [] + preserved_count = 0 + for msg in messages: - if msg.role == MessageRole.ASSISTANT and msg.content: - cleaned = msg.content.replace("\n", " ").strip() - if cleaned: - snippet = sanitize_message(cleaned, max_length=100) - snippets.append(snippet) - - # Drop useless "details redacted" placeholders so the summary - # retains only meaningful content. + if msg.role != MessageRole.ASSISTANT or not msg.content: + continue + cleaned = msg.content.replace("\n", " ").strip() + if not cleaned: + continue + + # Check for epistemic markers worth preserving. + if preserve_markers and should_preserve_message( + cleaned, + task_complexity, + ): + marker_text = extract_marker_sentences(cleaned) + if marker_text: + sanitized_markers = sanitize_message( + marker_text, + max_length=max(len(marker_text), 1), + ) + snippets.append(sanitized_markers) + preserved_count += 1 + continue + + # Standard sanitized snippet. + snippet = sanitize_message(cleaned, max_length=100) + snippets.append(snippet) + + # Drop useless "details redacted" placeholders. useful = [s for s in snippets if s != "details redacted"] if not useful: logger.debug( @@ -247,4 +311,36 @@ def _build_summary( if len(joined) > _MAX_SUMMARY_CHARS: joined = joined[:_MAX_SUMMARY_CHARS] + "..." + if preserved_count > 0: + msg_word = "message" if preserved_count == 1 else "messages" + return ( + f"[Archived {len(messages)} messages. " + f"Epistemic markers preserved from " + f"{preserved_count} {msg_word}. " + f"Summary: {joined}]" + ) return f"[Archived {len(messages)} messages. Summary of prior work: {joined}]" + + +def force_compaction( + ctx: AgentContext, + config: CompactionConfig, + estimator: PromptTokenEstimator, +) -> AgentContext | None: + """Compact context without checking the fill threshold. + + Used when an agent explicitly requests compaction via the + ``compact_context`` tool. Delegates to ``_do_compaction`` + with ``force=True`` which skips the fill-threshold comparison + entirely while preserving all other checks (minimum message + count, recent turn preservation). + + Args: + ctx: Current agent context. + config: Compaction configuration. + estimator: Token estimator. + + Returns: + Compacted context, or ``None`` if too few messages. + """ + return _do_compaction(ctx, config, estimator, force=True) diff --git a/src/synthorg/engine/coordination/__init__.py b/src/synthorg/engine/coordination/__init__.py index 61422aa6a9..05784608da 100644 --- a/src/synthorg/engine/coordination/__init__.py +++ b/src/synthorg/engine/coordination/__init__.py @@ -5,6 +5,12 @@ dispatchers. """ +from synthorg.engine.coordination.attribution import ( + AgentContribution, + CoordinationResultWithAttribution, + FailureAttribution, + build_agent_contributions, +) from synthorg.engine.coordination.config import CoordinationConfig from synthorg.engine.coordination.dispatchers import ( CentralizedDispatcher, @@ -27,19 +33,23 @@ from synthorg.engine.coordination.service import MultiAgentCoordinator __all__ = [ + "AgentContribution", "CentralizedDispatcher", "ContextDependentDispatcher", "CoordinationConfig", "CoordinationContext", "CoordinationPhaseResult", "CoordinationResult", + "CoordinationResultWithAttribution", "CoordinationSectionConfig", "CoordinationWave", "DecentralizedDispatcher", "DispatchResult", + "FailureAttribution", "MultiAgentCoordinator", "SasDispatcher", "TopologyDispatcher", + "build_agent_contributions", "build_coordinator", "build_execution_waves", "select_dispatcher", diff --git a/src/synthorg/engine/coordination/attribution.py b/src/synthorg/engine/coordination/attribution.py new file mode 100644 index 0000000000..16ef784162 --- /dev/null +++ b/src/synthorg/engine/coordination/attribution.py @@ -0,0 +1,295 @@ +"""Structural credit assignment for coordinated multi-agent execution. + +Provides per-agent contribution scoring and failure attribution +without modifying the frozen ``CoordinationResult``. The wrapper +``CoordinationResultWithAttribution`` pairs the original result +with attribution data built from routing decisions and wave outcomes. +""" + +from typing import TYPE_CHECKING, Literal, Protocol, Self + +from pydantic import BaseModel, ConfigDict, Field, computed_field, model_validator + +from synthorg.core.enums import FailureCategory +from synthorg.core.types import NotBlankStr +from synthorg.engine.coordination.models import ( # noqa: TC001 + CoordinationResult, + CoordinationWave, +) +from synthorg.engine.loop_protocol import TerminationReason +from synthorg.engine.recovery import infer_failure_category +from synthorg.engine.routing.models import RoutingResult # noqa: TC001 +from synthorg.observability import get_logger +from synthorg.observability.events.coordination import ( + COORDINATION_ATTRIBUTION_BUILT, +) + +logger = get_logger(__name__) + + +if TYPE_CHECKING: + + class _ExecutionResultLike(Protocol): + error_message: str | None + + class _AgentRunResultLike(Protocol): + is_success: bool + termination_reason: TerminationReason | None + execution_result: _ExecutionResultLike | None + + +FailureAttribution = Literal[ + "direct", + "upstream_contamination", + "coordination_overhead", + "quality_gate", +] + +_MAX_EVIDENCE_LENGTH = 500 + +# Map FailureCategory -> FailureAttribution for error-based outcomes. +_CATEGORY_TO_ATTRIBUTION: dict[FailureCategory, FailureAttribution] = { + FailureCategory.TOOL_FAILURE: "direct", + FailureCategory.STAGNATION: "direct", + FailureCategory.TIMEOUT: "direct", + FailureCategory.DELEGATION_FAILED: "direct", + FailureCategory.BUDGET_EXCEEDED: "coordination_overhead", + FailureCategory.QUALITY_GATE_FAILED: "quality_gate", + FailureCategory.UNKNOWN: "direct", +} + +# Map TerminationReason -> FailureAttribution for non-success runs. +_TERMINATION_TO_ATTRIBUTION: dict[TerminationReason, FailureAttribution] = { + TerminationReason.STAGNATION: "direct", + TerminationReason.BUDGET_EXHAUSTED: "coordination_overhead", + TerminationReason.MAX_TURNS: "coordination_overhead", + TerminationReason.ERROR: "direct", + TerminationReason.SHUTDOWN: "coordination_overhead", + TerminationReason.PARKED: "coordination_overhead", +} + + +class AgentContribution(BaseModel): + """Per-agent contribution to a coordinated task execution. + + Attributes: + agent_id: Identifier of the contributing agent. + subtask_id: Identifier of the subtask this agent executed. + contribution_score: Normalized score (0.0-1.0) reflecting + the agent's contribution quality. + failure_attribution: Classification of why the agent failed + (``None`` when the agent succeeded with score 1.0). + evidence: Truncated error message or evidence pointer + (``None`` when the agent succeeded). + """ + + model_config = ConfigDict(frozen=True, allow_inf_nan=False) + + agent_id: NotBlankStr = Field(description="Contributing agent") + subtask_id: NotBlankStr = Field(description="Subtask executed") + contribution_score: float = Field( + ge=0.0, + le=1.0, + description="Contribution quality (0.0-1.0)", + ) + failure_attribution: FailureAttribution | None = Field( + default=None, + description="Why the agent failed (None on success)", + ) + evidence: str | None = Field( + default=None, + max_length=_MAX_EVIDENCE_LENGTH, + description="Truncated error or evidence pointer", + ) + + @model_validator(mode="after") + def _validate_score_attribution_consistency(self) -> Self: + """Score < 1.0 requires failure_attribution; 1.0 forbids it.""" + if self.contribution_score < 1.0 and self.failure_attribution is None: + msg = ( + "failure_attribution must be set when " + f"contribution_score ({self.contribution_score}) < 1.0" + ) + raise ValueError(msg) + if self.contribution_score == 1.0 and self.failure_attribution is not None: + msg = "failure_attribution must be None when contribution_score is 1.0" + raise ValueError(msg) + return self + + +class CoordinationResultWithAttribution(BaseModel): + """Immutable wrapper pairing a CoordinationResult with attribution. + + Preserves the frozen ``CoordinationResult`` contract while adding + per-agent contribution data for structural credit assignment. + + Attributes: + result: The original coordination result (unmodified). + agent_contributions: Per-agent contribution records. + """ + + model_config = ConfigDict(frozen=True, allow_inf_nan=False) + + result: CoordinationResult = Field( + description="Original coordination result", + ) + agent_contributions: tuple[AgentContribution, ...] = Field( + default=(), + description="Per-agent contributions", + ) + + @computed_field( # type: ignore[prop-decorator] + description="Whether all phases succeeded", + ) + @property + def is_success(self) -> bool: + """Delegate to the wrapped result.""" + return self.result.is_success + + @computed_field( # type: ignore[prop-decorator] + description="Average contribution score across agents", + ) + @property + def avg_contribution_score(self) -> float: + """Average of contribution scores, 0.0 when empty.""" + if not self.agent_contributions: + return 0.0 + total = sum(c.contribution_score for c in self.agent_contributions) + return total / len(self.agent_contributions) + + +def build_agent_contributions( + routing_result: RoutingResult, + waves: tuple[CoordinationWave, ...], +) -> tuple[AgentContribution, ...]: + """Build contribution records from routing and execution data. + + Walks routing decisions to establish agent-to-subtask bindings, + then inspects wave outcomes to determine each agent's score and + failure classification. + + Args: + routing_result: Routing decisions mapping agents to subtasks. + waves: Executed coordination waves with outcomes. + + Returns: + Tuple of ``AgentContribution`` records, one per agent outcome. + """ + # Build lookups from routing decisions. + # Primary: task_id -> subtask_id (direct match from execution outcome). + # Fallback: agent_id -> subtask_id (single-subtask agents only). + routed_subtask_ids: set[str] = set() + agent_to_subtask: dict[str, str] = {} + for decision in routing_result.decisions: + sid = str(decision.subtask_id) + aid = str(decision.selected_candidate.agent_identity.id) + routed_subtask_ids.add(sid) + agent_to_subtask[aid] = sid # last-write for multi-subtask agents + + contributions: list[AgentContribution] = [] + + for wave in waves: + if wave.execution_result is None: + continue + for outcome in wave.execution_result.outcomes: + tid = str(outcome.task_id) + aid = str(outcome.agent_id) + # Prefer task_id when it matches a routed subtask (handles + # multi-subtask agents correctly). Fall back to agent lookup. + if tid in routed_subtask_ids: + subtask_id = tid + else: + subtask_id = agent_to_subtask.get(aid, tid) + contrib = _score_outcome( + agent_id=str(outcome.agent_id), + subtask_id=subtask_id, + outcome_result=outcome.result, + outcome_error=outcome.error, + ) + contributions.append(contrib) + + result = tuple(contributions) + + if result: + success_count = sum(1 for c in result if c.contribution_score == 1.0) + avg_score = sum(c.contribution_score for c in result) / len(result) + logger.info( + COORDINATION_ATTRIBUTION_BUILT, + agent_count=len(result), + success_count=success_count, + avg_score=round(avg_score, 3), + ) + + return result + + +def _score_outcome( + *, + agent_id: str, + subtask_id: str, + outcome_result: object | None, + outcome_error: str | None, +) -> AgentContribution: + """Score a single agent outcome. + + Args: + agent_id: Agent identifier. + subtask_id: Subtask identifier. + outcome_result: AgentRunResult if execution completed, else None. + outcome_error: Error string if agent failed before execution. + + Returns: + An ``AgentContribution`` with score and attribution. + """ + # Case 1: Agent failed with an error string (no execution at all). + if outcome_error is not None: + category = infer_failure_category(outcome_error) + attribution = _CATEGORY_TO_ATTRIBUTION.get(category, "direct") + return AgentContribution( + agent_id=NotBlankStr(agent_id), + subtask_id=NotBlankStr(subtask_id), + contribution_score=0.0, + failure_attribution=attribution, + evidence=outcome_error[:_MAX_EVIDENCE_LENGTH], + ) + + # Case 2: Execution completed -- check if successful. + # outcome_result conforms to _AgentRunResultLike; typed as object + # at runtime to avoid circular import. + if outcome_result is not None: + is_success = getattr(outcome_result, "is_success", False) + if is_success: + return AgentContribution( + agent_id=NotBlankStr(agent_id), + subtask_id=NotBlankStr(subtask_id), + contribution_score=1.0, + ) + + # Non-success termination. + termination_reason: TerminationReason | None = getattr( + outcome_result, "termination_reason", None + ) + failure_attr: FailureAttribution = "direct" + if termination_reason is not None: + failure_attr = _TERMINATION_TO_ATTRIBUTION.get(termination_reason, "direct") + exec_result = getattr(outcome_result, "execution_result", None) + error_text = "" + if exec_result is not None: + error_text = getattr(exec_result, "error_message", "") or "" + + return AgentContribution( + agent_id=NotBlankStr(agent_id), + subtask_id=NotBlankStr(subtask_id), + contribution_score=0.0, + failure_attribution=failure_attr, + evidence=error_text[:_MAX_EVIDENCE_LENGTH] or None, + ) + + # Should not reach here -- AgentOutcome requires result XOR error. + return AgentContribution( + agent_id=NotBlankStr(agent_id), + subtask_id=NotBlankStr(subtask_id), + contribution_score=0.0, + failure_attribution="direct", + evidence="No result or error in outcome", + ) diff --git a/src/synthorg/engine/coordination/factory.py b/src/synthorg/engine/coordination/factory.py index 849126d0e0..b66f23ffe9 100644 --- a/src/synthorg/engine/coordination/factory.py +++ b/src/synthorg/engine/coordination/factory.py @@ -39,6 +39,7 @@ from synthorg.engine.workspace.config import WorkspaceIsolationConfig from synthorg.engine.workspace.protocol import WorkspaceIsolationStrategy from synthorg.engine.workspace.service import WorkspaceIsolationService + from synthorg.hr.performance.tracker import PerformanceTracker from synthorg.providers.protocol import CompletionProvider logger = get_logger(__name__) @@ -165,6 +166,7 @@ def build_coordinator( # noqa: PLR0913 workspace_strategy: WorkspaceIsolationStrategy | None = None, workspace_config: WorkspaceIsolationConfig | None = None, shutdown_manager: ShutdownManager | None = None, + performance_tracker: PerformanceTracker | None = None, ) -> MultiAgentCoordinator: """Build a fully wired :class:`MultiAgentCoordinator`. @@ -190,6 +192,8 @@ def build_coordinator( # noqa: PLR0913 workspace_strategy: Optional workspace isolation strategy. workspace_config: Optional workspace isolation config. shutdown_manager: Optional shutdown manager for the executor. + performance_tracker: Optional tracker for recording + per-agent coordination contributions. Returns: A fully constructed ``MultiAgentCoordinator``. @@ -215,6 +219,7 @@ def build_coordinator( # noqa: PLR0913 workspace_strategy, workspace_config ), task_engine=task_engine, + performance_tracker=performance_tracker, ) logger.debug( diff --git a/src/synthorg/engine/coordination/service.py b/src/synthorg/engine/coordination/service.py index 98374cb801..365599fbbd 100644 --- a/src/synthorg/engine/coordination/service.py +++ b/src/synthorg/engine/coordination/service.py @@ -10,6 +10,11 @@ from uuid import uuid4 from synthorg.core.enums import CoordinationTopology, TaskStatus +from synthorg.engine.coordination.attribution import ( + AgentContribution, + CoordinationResultWithAttribution, + build_agent_contributions, +) from synthorg.engine.coordination.dispatchers import ( DispatchResult, select_dispatcher, @@ -22,6 +27,7 @@ from synthorg.engine.task_engine_models import TransitionTaskMutation from synthorg.observability import get_logger from synthorg.observability.events.coordination import ( + COORDINATION_CLEANUP_FAILED, COORDINATION_COMPLETED, COORDINATION_FAILED, COORDINATION_PHASE_COMPLETED, @@ -43,6 +49,7 @@ from synthorg.engine.routing.service import TaskRoutingService from synthorg.engine.task_engine import TaskEngine from synthorg.engine.workspace.service import WorkspaceIsolationService + from synthorg.hr.performance.tracker import PerformanceTracker logger = get_logger(__name__) @@ -66,17 +73,20 @@ class MultiAgentCoordinator: parallel_executor: Executor for parallel agent runs. workspace_service: Optional workspace isolation service. task_engine: Optional task engine for parent status updates. + performance_tracker: Optional tracker for recording per-agent + coordination contributions. """ __slots__ = ( "_decomposition_service", "_parallel_executor", + "_performance_tracker", "_routing_service", "_task_engine", "_workspace_service", ) - def __init__( + def __init__( # noqa: PLR0913 self, *, decomposition_service: DecompositionService, @@ -84,17 +94,19 @@ def __init__( parallel_executor: ParallelExecutor, workspace_service: WorkspaceIsolationService | None = None, task_engine: TaskEngine | None = None, + performance_tracker: PerformanceTracker | None = None, ) -> None: self._decomposition_service = decomposition_service self._routing_service = routing_service self._parallel_executor = parallel_executor self._workspace_service = workspace_service self._task_engine = task_engine + self._performance_tracker = performance_tracker async def coordinate( self, context: CoordinationContext, - ) -> CoordinationResult: + ) -> CoordinationResultWithAttribution: """Run the full multi-agent coordination pipeline. Pipeline: @@ -105,12 +117,14 @@ async def coordinate( 5. Select dispatcher and execute waves. 6. Rollup subtask statuses. 7. Update parent task via TaskEngine (if provided). + 8. Build per-agent attribution from routing + outcomes. Args: context: Coordination context with task, agents, and config. Returns: - CoordinationResult with all phase outcomes. + CoordinationResultWithAttribution wrapping the result + with per-agent contribution data. Raises: CoordinationPhaseError: When a critical phase fails. @@ -195,7 +209,43 @@ async def coordinate( total_cost_usd=total_cost, ) - return result + # Post-pipeline: build per-agent attribution. + # Guard so attribution/tracker failures don't fail a completed run. + contributions: tuple[AgentContribution, ...] = () + try: + contributions = build_agent_contributions( + routing_result, + dispatch_result.waves, + ) + except MemoryError, RecursionError: + raise + except Exception as attr_exc: + logger.warning( + COORDINATION_CLEANUP_FAILED, + parent_task_id=task.id, + error=str(attr_exc), + context="post_completion_attribution_build", + ) + + if self._performance_tracker is not None and contributions: + try: + self._performance_tracker.record_coordination_contributions( + contributions, + ) + except MemoryError, RecursionError: + raise + except Exception as tracker_exc: + logger.warning( + COORDINATION_CLEANUP_FAILED, + parent_task_id=task.id, + error=str(tracker_exc), + context="post_completion_tracker_write", + ) + + return CoordinationResultWithAttribution( + result=result, + agent_contributions=contributions, + ) async def _phase_decompose( self, diff --git a/src/synthorg/engine/loop_helpers.py b/src/synthorg/engine/loop_helpers.py index 7a40822896..63da05a370 100644 --- a/src/synthorg/engine/loop_helpers.py +++ b/src/synthorg/engine/loop_helpers.py @@ -40,6 +40,7 @@ from .loop_protocol import ( BudgetChecker, ExecutionResult, + NodeType, ShutdownChecker, TerminationReason, TurnRecord, @@ -466,9 +467,15 @@ def make_turn_record( *, call_category: LLMCallCategory | None = None, provider_metadata: dict[str, object] | None = None, + extra_node_types: tuple[NodeType, ...] = (), ) -> TurnRecord: """Create a ``TurnRecord`` from a provider response. + Automatically derives ``LLM_CALL`` (always) and ``TOOL_INVOCATION`` + (when tool calls are present). Callers pass additional node types + via *extra_node_types* for checks that ran this turn (quality, + budget, stagnation). + Args: turn_number: 1-indexed turn number. response: Provider completion response. @@ -478,6 +485,8 @@ def make_turn_record( ``_synthorg_latency_ms``, ``_synthorg_cache_hit``, ``_synthorg_retry_count``, and ``_synthorg_retry_reason`` are extracted when present. + extra_node_types: Additional node types beyond the + auto-derived LLM_CALL and TOOL_INVOCATION. """ md = provider_metadata or {} latency_ms_raw = md.get("_synthorg_latency_ms") @@ -501,6 +510,12 @@ def make_turn_record( if isinstance(retry_reason_raw, str): retry_reason = retry_reason_raw + # Auto-derive base node types from response content. + derived: list[NodeType] = [NodeType.LLM_CALL] + if response.tool_calls: + derived.append(NodeType.TOOL_INVOCATION) + node_types = tuple(derived) + extra_node_types + return TurnRecord( turn_number=turn_number, input_tokens=response.usage.input_tokens, @@ -514,6 +529,7 @@ def make_turn_record( cache_hit=cache_hit, retry_count=retry_count, retry_reason=retry_reason, + node_types=node_types, ) diff --git a/src/synthorg/engine/loop_protocol.py b/src/synthorg/engine/loop_protocol.py index 9c4a7f3cc2..62028599a2 100644 --- a/src/synthorg/engine/loop_protocol.py +++ b/src/synthorg/engine/loop_protocol.py @@ -25,6 +25,21 @@ from synthorg.tools.invoker import ToolInvoker +class NodeType(StrEnum): + """Type of computation node executed within a turn. + + Used for structural credit assignment and post-hoc trace analysis. + Each turn records which node types executed, enabling fine-grained + attribution of costs and failures. + """ + + LLM_CALL = "llm_call" + TOOL_INVOCATION = "tool_invocation" + QUALITY_CHECK = "quality_check" + BUDGET_CHECK = "budget_check" + STAGNATION_CHECK = "stagnation_check" + + class TerminationReason(StrEnum): """Why the execution loop terminated.""" @@ -56,6 +71,9 @@ class TurnRecord(BaseModel): cache_hit: Whether the provider served this turn from cache. retry_count: Number of retry attempts before success. retry_reason: Exception type name of the last retried error. + node_types: Node types that executed in this turn (e.g. + LLM_CALL, TOOL_INVOCATION). Defaults to empty for + deserialization of legacy data. success: Whether this turn completed without error or content filter (computed). """ @@ -98,6 +116,10 @@ class TurnRecord(BaseModel): default=None, description="Exception type name of the last retried error", ) + node_types: tuple[NodeType, ...] = Field( + default=(), + description="Node types that executed in this turn", + ) @model_validator(mode="after") def _validate_retry_consistency(self) -> Self: diff --git a/src/synthorg/hr/performance/tracker.py b/src/synthorg/hr/performance/tracker.py index e1f9075d47..0ee454c9d4 100644 --- a/src/synthorg/hr/performance/tracker.py +++ b/src/synthorg/hr/performance/tracker.py @@ -32,6 +32,7 @@ from pydantic import AwareDatetime from synthorg.core.task import AcceptanceCriterion + from synthorg.engine.coordination.attribution import AgentContribution from synthorg.hr.performance.collaboration_override_store import ( CollaborationOverrideStore, ) @@ -96,7 +97,9 @@ def __init__( # noqa: PLR0913 self._quality_override_store = quality_override_store self._task_metrics: dict[str, list[TaskMetricRecord]] = {} self._collab_metrics: dict[str, list[CollaborationMetricRecord]] = {} + self._contributions: dict[str, list[AgentContribution]] = {} self._background_tasks: set[asyncio.Task[None]] = set() + self._metrics_lock = asyncio.Lock() @staticmethod def _default_quality() -> QualityScoringStrategy: @@ -164,10 +167,11 @@ async def record_task_metric( Returns: The stored record. """ - agent_key = str(record.agent_id) - if agent_key not in self._task_metrics: - self._task_metrics[agent_key] = [] - self._task_metrics[agent_key].append(record) + async with self._metrics_lock: + agent_key = str(record.agent_id) + if agent_key not in self._task_metrics: + self._task_metrics[agent_key] = [] + self._task_metrics[agent_key].append(record) logger.info( PERF_METRIC_RECORDED, @@ -177,6 +181,30 @@ async def record_task_metric( ) return record + def record_coordination_contributions( + self, + contributions: tuple[AgentContribution, ...], + ) -> None: + """Store per-agent contributions from coordination. + + Args: + contributions: Attribution records from a coordinated run. + """ + for contrib in contributions: + agent_key = str(contrib.agent_id) + self._contributions.setdefault(agent_key, []).append(contrib) + + if contributions: + logger.info( + PERF_METRIC_RECORDED, + contribution_count=len(contributions), + avg_score=round( + sum(c.contribution_score for c in contributions) + / len(contributions), + 3, + ), + ) + async def score_task_quality( self, *, diff --git a/src/synthorg/observability/events/context_budget.py b/src/synthorg/observability/events/context_budget.py index 2b3e9b0de2..650f3faf72 100644 --- a/src/synthorg/observability/events/context_budget.py +++ b/src/synthorg/observability/events/context_budget.py @@ -16,3 +16,11 @@ # Indicator injection CONTEXT_BUDGET_INDICATOR_INJECTED: Final[str] = "context_budget.indicator.injected" + +# Agent-controlled compaction +CONTEXT_BUDGET_AGENT_COMPACTION_REQUESTED: Final[str] = ( + "context_budget.agent_compaction.requested" +) +CONTEXT_BUDGET_EPISTEMIC_MARKERS_PRESERVED: Final[str] = ( + "context_budget.epistemic_markers.preserved" +) diff --git a/src/synthorg/observability/events/coordination.py b/src/synthorg/observability/events/coordination.py index c7170fdb93..8eeb835634 100644 --- a/src/synthorg/observability/events/coordination.py +++ b/src/synthorg/observability/events/coordination.py @@ -16,3 +16,4 @@ COORDINATION_CLEANUP_FAILED: Final[str] = "coordination.cleanup.failed" COORDINATION_WAVE_BUILT: Final[str] = "coordination.wave.built" COORDINATION_FACTORY_BUILT: Final[str] = "coordination.factory.built" +COORDINATION_ATTRIBUTION_BUILT: Final[str] = "coordination.attribution.built" diff --git a/src/synthorg/tools/context/__init__.py b/src/synthorg/tools/context/__init__.py new file mode 100644 index 0000000000..5a800fba9f --- /dev/null +++ b/src/synthorg/tools/context/__init__.py @@ -0,0 +1 @@ +"""Context management tools (compaction, etc.).""" diff --git a/src/synthorg/tools/context/compact_context.py b/src/synthorg/tools/context/compact_context.py new file mode 100644 index 0000000000..8fe0c6fdc9 --- /dev/null +++ b/src/synthorg/tools/context/compact_context.py @@ -0,0 +1,121 @@ +"""Agent-controlled context compaction tool. + +Allows agents to explicitly request context compaction when context +fill is high and reasoning clarity is critical. The tool signals +intent via metadata -- it does NOT mutate the frozen AgentContext +directly. The execution loop detects the directive and invokes +compaction at the turn boundary. +""" + +from copy import deepcopy +from types import MappingProxyType +from typing import Any + +from synthorg.core.enums import ToolCategory +from synthorg.engine.sanitization import sanitize_message +from synthorg.observability import get_logger +from synthorg.observability.events.context_budget import ( + CONTEXT_BUDGET_AGENT_COMPACTION_REQUESTED, +) +from synthorg.tools.base import BaseTool, ToolExecutionResult + +logger = get_logger(__name__) + +# Raw dict kept private for deepcopy at construction (MappingProxyType +# is not picklable). Public read-only view below. +_RAW_SCHEMA: dict[str, Any] = { + "type": "object", + "properties": { + "strategy": { + "type": "string", + "enum": ["summarize"], + "description": ( + "Compaction strategy. Currently only 'summarize' is supported." + ), + }, + "preserve_markers": { + "type": "boolean", + "default": True, + "description": ( + "Whether to preserve epistemic markers (wait, hmm, " + "actually, etc.) in the compaction summary." + ), + }, + "reason": { + "type": "string", + "minLength": 10, + "maxLength": 256, + "description": ( + "Brief explanation for why compaction is needed " + "now (e.g., 'context fill at 92 percent, need to " + "preserve reasoning clarity')." + ), + }, + }, + "required": ["strategy", "reason"], + "additionalProperties": False, +} +_COMPACT_CONTEXT_SCHEMA: MappingProxyType[str, Any] = MappingProxyType(_RAW_SCHEMA) + + +class CompactContextTool(BaseTool): + """Signal context compaction to the execution loop. + + The tool validates arguments and returns a compaction directive + in ``ToolExecutionResult.metadata``. The execution loop detects + the directive and performs actual compaction at the turn boundary. + + This tool is stateless and safe to register unconditionally. + Compaction only triggers when ``CompactionConfig.agent_controlled`` + is enabled in the engine configuration. + """ + + def __init__(self) -> None: + super().__init__( + name="compact_context", + description=( + "Request context compaction when conversation has " + "grown large. Preserves recent turns and creates a " + "summary of older exchanges. Use when context fill " + "is high and accuracy on complex reasoning is " + "critical." + ), + parameters_schema=deepcopy(_RAW_SCHEMA), + category=ToolCategory.MEMORY, + ) + + async def execute( + self, + *, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + """Signal compaction directive via metadata. + + Args: + arguments: Validated tool arguments (strategy, reason, + optionally preserve_markers). + + Returns: + Result with ``compaction_directive`` metadata key. + """ + strategy = arguments.get("strategy", "summarize") + reason = arguments.get("reason", "") + preserve_markers = arguments.get("preserve_markers", True) + sanitized_reason = sanitize_message(reason, max_length=256) + + logger.info( + CONTEXT_BUDGET_AGENT_COMPACTION_REQUESTED, + strategy=strategy, + preserve_markers=preserve_markers, + reason=sanitized_reason, + ) + + return ToolExecutionResult( + content=("Compaction directive accepted. Will execute at turn boundary."), + metadata={ + "compaction_directive": True, + "strategy": strategy, + "preserve_markers": preserve_markers, + "reason": sanitized_reason, + }, + ) diff --git a/src/synthorg/tools/factory.py b/src/synthorg/tools/factory.py index 2c46a9bdc9..dda34173ad 100644 --- a/src/synthorg/tools/factory.py +++ b/src/synthorg/tools/factory.py @@ -191,6 +191,10 @@ def build_default_tools( # noqa: PLR0913 logger.warning(TOOL_FACTORY_ERROR, error=msg) raise ValueError(msg) + from synthorg.tools.context.compact_context import ( # noqa: PLC0415 + CompactContextTool, + ) + all_tools: list[BaseTool] = [ *_build_file_system_tools(workspace=workspace), *_build_git_tools( @@ -202,6 +206,7 @@ def build_default_tools( # noqa: PLR0913 network_policy=web_network_policy, search_provider=web_search_provider, ), + CompactContextTool(), ] all_tools.extend( diff --git a/tests/integration/engine/test_coordination_wiring.py b/tests/integration/engine/test_coordination_wiring.py index 4abe6e9926..9fddf30b1b 100644 --- a/tests/integration/engine/test_coordination_wiring.py +++ b/tests/integration/engine/test_coordination_wiring.py @@ -222,6 +222,9 @@ async def test_build_coordinator_and_coordinate_via_api(self) -> None: # a task-id-aware manual strategy so decomposition succeeds from unittest.mock import AsyncMock + from synthorg.engine.coordination.attribution import ( + CoordinationResultWithAttribution, + ) from synthorg.engine.coordination.models import ( CoordinationPhaseResult, CoordinationResult, @@ -229,7 +232,7 @@ async def test_build_coordinator_and_coordinate_via_api(self) -> None: async def _mock_coordinate(context): # type: ignore[no-untyped-def] """Return a realistic result keyed to the actual task.""" - return CoordinationResult( + result = CoordinationResult( parent_task_id=context.task.id, topology=CoordinationTopology.SAS, phases=( @@ -247,6 +250,7 @@ async def _mock_coordinate(context): # type: ignore[no-untyped-def] total_duration_seconds=0.05, total_cost_usd=0.001, ) + return CoordinationResultWithAttribution(result=result) coordinator = AsyncMock() coordinator.coordinate.side_effect = _mock_coordinate diff --git a/tests/integration/tools/test_factory_integration.py b/tests/integration/tools/test_factory_integration.py index e008122dab..5fca10f79a 100644 --- a/tests/integration/tools/test_factory_integration.py +++ b/tests/integration/tools/test_factory_integration.py @@ -12,7 +12,7 @@ from synthorg.tools.git_tools import GitCloneTool from synthorg.tools.registry import ToolRegistry -_EXPECTED_TOOL_COUNT: int = 14 +_EXPECTED_TOOL_COUNT: int = 15 @pytest.mark.integration @@ -102,4 +102,9 @@ def test_factory_tools_form_valid_registry( """Factory output can be wrapped in ToolRegistry without errors.""" tools = build_default_tools(workspace=tmp_path) registry = ToolRegistry(tools) - assert len(list(registry.all_tools())) == _EXPECTED_TOOL_COUNT + all_tools = list(registry.all_tools()) + assert len(all_tools) == _EXPECTED_TOOL_COUNT + tool_names = {t.name for t in all_tools} + assert "compact_context" in tool_names + assert "read_file" in tool_names + assert "write_file" in tool_names diff --git a/tests/unit/api/controllers/test_coordination.py b/tests/unit/api/controllers/test_coordination.py index 6d60adcb6f..fe72c6161e 100644 --- a/tests/unit/api/controllers/test_coordination.py +++ b/tests/unit/api/controllers/test_coordination.py @@ -19,6 +19,9 @@ CoordinationTopology, SeniorityLevel, ) +from synthorg.engine.coordination.attribution import ( + CoordinationResultWithAttribution, +) from synthorg.engine.coordination.models import ( CoordinationPhaseResult, CoordinationResult, @@ -49,21 +52,22 @@ def _make_coordination_result( task_id: str = "task-001", *, is_success: bool = True, -) -> CoordinationResult: - """Build a minimal CoordinationResult for tests.""" +) -> CoordinationResultWithAttribution: + """Build a minimal CoordinationResultWithAttribution for tests.""" phase = CoordinationPhaseResult( phase="decompose", success=is_success, duration_seconds=0.1, error=None if is_success else "test error", ) - return CoordinationResult( + result = CoordinationResult( parent_task_id=task_id, topology=CoordinationTopology.SAS, phases=(phase,), total_duration_seconds=0.5, total_cost_usd=0.01, ) + return CoordinationResultWithAttribution(result=result) @pytest.fixture diff --git a/tests/unit/communication/conflict_resolution/test_debate_strategy.py b/tests/unit/communication/conflict_resolution/test_debate_strategy.py index f2d3b4ebc0..fbf05e9311 100644 --- a/tests/unit/communication/conflict_resolution/test_debate_strategy.py +++ b/tests/unit/communication/conflict_resolution/test_debate_strategy.py @@ -1,6 +1,7 @@ """Tests for the structured debate + judge resolution strategy.""" import pytest +import structlog from synthorg.communication.conflict_resolution.config import DebateConfig from synthorg.communication.conflict_resolution.debate_strategy import ( @@ -410,7 +411,6 @@ async def test_evaluator_exception_falls_back_to_authority( async def test_evaluator_exception_logs_event( self, hierarchy: HierarchyResolver, - capsys: pytest.CaptureFixture[str], ) -> None: """Evaluator failure is logged with the dedicated event.""" judge = RaisingJudgeEvaluator() @@ -429,9 +429,10 @@ async def test_evaluator_exception_logs_event( ), ), ) - await resolver.resolve(conflict) - captured = capsys.readouterr() - assert "conflict.debate.evaluator_failed" in captured.out + with structlog.testing.capture_logs() as logs: + await resolver.resolve(conflict) + events = [e["event"] for e in logs] + assert "conflict.debate.evaluator_failed" in events @pytest.mark.parametrize( "error_cls", diff --git a/tests/unit/engine/compaction/test_epistemic.py b/tests/unit/engine/compaction/test_epistemic.py new file mode 100644 index 0000000000..ec480a240a --- /dev/null +++ b/tests/unit/engine/compaction/test_epistemic.py @@ -0,0 +1,199 @@ +"""Tests for epistemic marker detection in compaction summaries.""" + +import pytest + +from synthorg.core.enums import Complexity +from synthorg.engine.compaction.epistemic import ( + count_epistemic_markers, + extract_marker_sentences, + should_preserve_message, +) + + +@pytest.mark.unit +class TestCountEpistemicMarkers: + """Tests for count_epistemic_markers function.""" + + def test_no_markers(self) -> None: + """Text with no markers returns 0.""" + text = "The sky is blue and the grass is green." + assert count_epistemic_markers(text) == 0 + + def test_one_marker_from_each_group(self) -> None: + """Count distinct pattern matches across all 5 groups.""" + # One hedging only: "hmm" + assert count_epistemic_markers("Hmm, that's odd.") == 1 + + # One reconsideration only: "on second thought" + assert count_epistemic_markers("On second thought, no.") == 1 + + # One uncertainty only: "perhaps" + assert count_epistemic_markers("Perhaps we should try.") == 1 + + # One verification only: "verify" + assert count_epistemic_markers("Let me verify this.") == 1 + + # One correction only: "hold on" + assert count_epistemic_markers("Hold on, that's wrong.") == 1 + + def test_multiple_from_same_group_counted_once(self) -> None: + """Multiple matches from same pattern group counted as 1.""" + text = "Wait, hmm, Hmm again. These are all hedging." + # All three are in the hedging group, so count is 1 + assert count_epistemic_markers(text) == 1 + + def test_mixed_markers(self) -> None: + """Multiple distinct patterns counted correctly.""" + text = "Wait, actually I was wrong. Let me verify this." + # "wait" (hedging), "actually" (reconsideration), "verify" (verification) + assert count_epistemic_markers(text) == 3 + + def test_case_insensitive_matching(self) -> None: + """Markers match regardless of case.""" + text1 = "Hmm, that looks odd." + text2 = "HMM, THAT LOOKS ODD." + text3 = "hMm, ThAt LoOkS oDd." + assert count_epistemic_markers(text1) == count_epistemic_markers(text2) + assert count_epistemic_markers(text2) == count_epistemic_markers(text3) + assert count_epistemic_markers(text1) == 1 + + +@pytest.mark.unit +class TestShouldPreserveMessage: + """Tests for should_preserve_message function.""" + + def test_complex_with_one_marker_preserves(self) -> None: + """COMPLEX complexity: preserve if >= 1 marker.""" + text = "Wait, I need to think about this more." + assert should_preserve_message(text, Complexity.COMPLEX) is True + + def test_epic_with_one_marker_preserves(self) -> None: + """EPIC complexity: preserve if >= 1 marker.""" + text = "Hmm, let me reconsider this." + assert should_preserve_message(text, Complexity.EPIC) is True + + def test_simple_with_one_marker_does_not_preserve(self) -> None: + """SIMPLE complexity: preserve only if >= 3 markers.""" + text = "Wait, there's an issue." + assert should_preserve_message(text, Complexity.SIMPLE) is False + + def test_simple_with_three_markers_preserves(self) -> None: + """SIMPLE complexity: preserve if >= 3 markers.""" + text = "Wait, actually, let me verify this carefully." + # "wait" (hedging), "actually" (reconsideration), "verify" (verification) + assert should_preserve_message(text, Complexity.SIMPLE) is True + + def test_medium_with_two_markers_does_not_preserve(self) -> None: + """MEDIUM complexity: preserve only if >= 3 markers.""" + # "hmm" (hedging) + "perhaps" (uncertainty) = 2 groups + text = "Hmm, perhaps we should try a different approach." + assert should_preserve_message(text, Complexity.MEDIUM) is False + + def test_medium_with_three_markers_preserves(self) -> None: + """MEDIUM complexity: preserve if >= 3 markers.""" + text = "Wait, actually I was wrong. Let me verify." + # "wait" (hedging), "actually" (reconsideration), "verify" (verification) + assert should_preserve_message(text, Complexity.MEDIUM) is True + + def test_no_markers_never_preserves(self) -> None: + """Text with no markers never preserves.""" + text = "This is a straightforward statement with no reasoning markers." + assert should_preserve_message(text, Complexity.COMPLEX) is False + assert should_preserve_message(text, Complexity.EPIC) is False + assert should_preserve_message(text, Complexity.SIMPLE) is False + assert should_preserve_message(text, Complexity.MEDIUM) is False + + +@pytest.mark.unit +class TestExtractMarkerSentences: + """Tests for extract_marker_sentences function.""" + + def test_no_markers_returns_empty_string(self) -> None: + """Text with no marker sentences returns empty string.""" + text = "This is a straightforward statement. No reasoning markers here." + result = extract_marker_sentences(text) + assert result == "" + + def test_extract_single_marker_sentence(self) -> None: + """Extract single sentence containing a marker.""" + text = ( + "The plan seems straightforward. " + "Wait, I just realized something. " + "Let me move forward." + ) + result = extract_marker_sentences(text) + assert "Wait, I just realized something" in result + + def test_extract_multiple_marker_sentences(self) -> None: + """Extract multiple sentences with markers and join with '; '.""" + text = ( + "First statement. " + "Wait, let me reconsider. " + "Some filler text here. " + "Actually, I was wrong. " + "More filler. " + "But hold on, there's another issue." + ) + result = extract_marker_sentences(text) + assert "Wait, let me reconsider" in result + assert "Actually, I was wrong" in result + assert "But hold on, there's another issue" in result + # Check they're joined with "; " + assert "; " in result + + def test_truncates_at_max_chars(self) -> None: + """First marker sentence exceeding max_chars is truncated.""" + # Single long marker sentence that exceeds max_chars + long_marker = "Wait, " + "x" * 100 + " important" + text = long_marker + max_chars = 30 + result = extract_marker_sentences(text, max_chars=max_chars) + + # Should be truncated to exactly max_chars (first-sentence path) + assert len(result) == max_chars + assert result == long_marker[:max_chars] + + def test_truncates_multi_sentence_at_max_chars(self) -> None: + """Multiple short marker sentences stop accumulating at max_chars.""" + sentences = [ + "Wait, I need to think about something.", + "Actually, let me reconsider this completely.", + "Hmm, this is more complex than I thought.", + "Perhaps we should verify each step carefully.", + ] + text = " ".join(sentences) + max_chars = 50 + result = extract_marker_sentences(text, max_chars=max_chars) + + # Should contain at most max_chars worth of content + assert len(result) <= max_chars + + def test_respects_max_chars_default(self) -> None: + """Default max_chars is 200 -- single long sentence is truncated.""" + text = "Wait, " + "x" * 300 + result = extract_marker_sentences(text) + # Single sentence exceeds default 200 chars -> truncated to exactly 200 + assert len(result) == 200 + + def test_handles_newlines_as_sentence_boundaries(self) -> None: + """Newlines are treated as sentence boundaries.""" + text = "Normal statement.\nWait, something important here.\nMore text." + result = extract_marker_sentences(text) + assert "Wait, something important here" in result + assert "Normal statement" not in result + + def test_handles_multiple_punctuation_marks(self) -> None: + """Handles sentences ending with ?, !, or periods.""" + text = "Really? Actually, wait! Let me check. Hmm, this needs verification?" + result = extract_marker_sentences(text) + assert "Actually, wait" in result or "Actually" in result + + def test_strips_whitespace_from_sentences(self) -> None: + """Extracted sentences have whitespace stripped.""" + text = " Normal. Wait, important. More. " + result = extract_marker_sentences(text) + # Should not have extra whitespace + assert "Wait, important" in result + assert not result.startswith(" ") + assert not result.endswith(" ") + assert " " not in result diff --git a/tests/unit/engine/compaction/test_models.py b/tests/unit/engine/compaction/test_models.py index 90416f48c2..8787813bf7 100644 --- a/tests/unit/engine/compaction/test_models.py +++ b/tests/unit/engine/compaction/test_models.py @@ -42,6 +42,48 @@ def test_frozen(self) -> None: with pytest.raises(ValidationError): config.fill_threshold_percent = 90.0 # type: ignore[misc] + def test_agent_controlled_safety_below_fill_rejected(self) -> None: + """safety_threshold must exceed fill_threshold when agent_controlled.""" + with pytest.raises(ValueError, match="safety_threshold_percent"): + CompactionConfig( + agent_controlled=True, + fill_threshold_percent=80.0, + safety_threshold_percent=70.0, + ) + + def test_agent_controlled_safety_equal_fill_rejected(self) -> None: + """Equal thresholds rejected when agent_controlled.""" + with pytest.raises(ValueError, match="safety_threshold_percent"): + CompactionConfig( + agent_controlled=True, + fill_threshold_percent=80.0, + safety_threshold_percent=80.0, + ) + + def test_agent_controlled_valid_thresholds(self) -> None: + """Valid agent-controlled config accepted.""" + config = CompactionConfig( + agent_controlled=True, + fill_threshold_percent=80.0, + safety_threshold_percent=95.0, + ) + assert config.agent_controlled is True + assert config.safety_threshold_percent == 95.0 + + def test_not_agent_controlled_ignores_threshold_ordering(self) -> None: + """When not agent_controlled, threshold ordering is not checked.""" + config = CompactionConfig( + agent_controlled=False, + fill_threshold_percent=80.0, + safety_threshold_percent=70.0, + ) + assert config.agent_controlled is False + + def test_preserve_epistemic_markers_default(self) -> None: + """preserve_epistemic_markers defaults to True.""" + config = CompactionConfig() + assert config.preserve_epistemic_markers is True + @pytest.mark.unit class TestCompressionMetadata: diff --git a/tests/unit/engine/compaction/test_summarizer_markers.py b/tests/unit/engine/compaction/test_summarizer_markers.py new file mode 100644 index 0000000000..2f47eac47e --- /dev/null +++ b/tests/unit/engine/compaction/test_summarizer_markers.py @@ -0,0 +1,345 @@ +"""Tests for epistemic marker preservation in _build_summary.""" + +from datetime import date + +import pytest + +from synthorg.core.agent import AgentIdentity, ModelConfig, PersonalityConfig +from synthorg.core.enums import Complexity, SeniorityLevel +from synthorg.engine.compaction.models import CompactionConfig +from synthorg.engine.compaction.summarizer import _build_summary, force_compaction +from synthorg.engine.context import AgentContext +from synthorg.engine.token_estimation import DefaultTokenEstimator +from synthorg.providers.enums import MessageRole +from synthorg.providers.models import ChatMessage + + +def _msg(role: MessageRole, content: str) -> ChatMessage: + """Create a chat message.""" + return ChatMessage(role=role, content=content) + + +@pytest.mark.unit +class TestBuildSummaryMarkers: + """Tests for _build_summary with epistemic marker preservation.""" + + def test_no_markers_standard_format(self) -> None: + """Messages without markers produce standard format.""" + messages = ( + _msg(MessageRole.ASSISTANT, "The answer is 42."), + _msg(MessageRole.USER, "Thanks"), + _msg( + MessageRole.ASSISTANT, + "This is a straightforward solution with no reasoning.", + ), + ) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.COMPLEX, + ) + + assert "[Archived 3 messages. Summary of prior work:" in summary + + def test_with_markers_complex_preserved(self) -> None: + """Message with marker + COMPLEX complexity -> preserved.""" + messages = (_msg(MessageRole.ASSISTANT, "Wait, I need to reconsider this."),) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.COMPLEX, + ) + + # Should have epistemic markers preserved format + assert "Epistemic markers preserved" in summary + assert "Wait" in summary or "reconsider" in summary + + def test_with_markers_epic_preserved(self) -> None: + """Message with marker + EPIC complexity -> preserved.""" + messages = (_msg(MessageRole.ASSISTANT, "Hmm, let me verify this."),) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.EPIC, + ) + + assert "Epistemic markers preserved" in summary + + def test_with_one_marker_simple_not_preserved(self) -> None: + """Message with 1 marker + SIMPLE complexity -> NOT preserved.""" + messages = (_msg(MessageRole.ASSISTANT, "Wait, I see the issue."),) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.SIMPLE, + ) + + # Should use standard format (threshold is 3 for SIMPLE) + assert "Summary of prior work:" in summary + assert "Epistemic markers preserved" not in summary + + def test_with_three_markers_simple_preserved(self) -> None: + """Message with 3 markers + SIMPLE complexity -> preserved.""" + messages = ( + _msg( + MessageRole.ASSISTANT, + "Wait, actually I was wrong. Let me verify this.", + ), + ) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.SIMPLE, + ) + + assert "Epistemic markers preserved" in summary + + def test_with_markers_medium_below_threshold(self) -> None: + """Message with 2 markers + MEDIUM complexity -> NOT preserved.""" + # "hmm" (hedging) + "perhaps" (uncertainty) = 2 groups < 3 threshold + messages = (_msg(MessageRole.ASSISTANT, "Hmm, perhaps we should try again."),) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.MEDIUM, + ) + + assert "Summary of prior work:" in summary + assert "Epistemic markers preserved" not in summary + + def test_markers_disabled_standard_format(self) -> None: + """preserve_markers=False -> standard format even with markers.""" + messages = ( + _msg( + MessageRole.ASSISTANT, + "Wait, actually let me verify. This is important.", + ), + ) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=False, + task_complexity=Complexity.COMPLEX, + ) + + # Even with COMPLEX, if preserve_markers is False, use standard format + assert "Summary of prior work:" in summary + assert "Epistemic markers preserved" not in summary + + def test_empty_messages_fallback(self) -> None: + """No assistant messages -> fallback format.""" + messages = ( + _msg(MessageRole.USER, "What's the answer?"), + _msg(MessageRole.SYSTEM, "You are helpful."), + ) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.COMPLEX, + ) + + assert "[Archived 2 messages from earlier" in summary + + def test_preserved_count_in_summary(self) -> None: + """Summary mentions count of preserved messages.""" + messages = ( + _msg( + MessageRole.ASSISTANT, + "Wait, I need to reconsider this approach.", + ), + _msg(MessageRole.USER, "Ok"), + _msg( + MessageRole.ASSISTANT, + "Actually, let me verify the calculations.", + ), + ) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.COMPLEX, + ) + + # Should mention 2 preserved messages + assert "Epistemic markers preserved from 2 messages" in summary + + def test_mixed_preserved_and_standard(self) -> None: + """Mix of preserved markers and standard snippets.""" + messages = ( + _msg(MessageRole.ASSISTANT, "Wait, I need to think about this."), + _msg( + MessageRole.ASSISTANT, + "This is just a straightforward statement.", + ), + ) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.COMPLEX, + ) + + # Should have preserved markers format + assert "Epistemic markers preserved" in summary + # Should have summary content + assert len(summary) > 50 + + def test_marker_sentences_joined_correctly(self) -> None: + """Extracted marker sentences are joined in summary.""" + messages = ( + _msg( + MessageRole.ASSISTANT, + "This is normal. Wait, let me reconsider. More normal text.", + ), + ) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.COMPLEX, + ) + + # Should contain the extracted marker sentence + assert "Wait" in summary or "reconsider" in summary + + def test_system_messages_ignored(self) -> None: + """SYSTEM messages don't contribute to summary.""" + messages = ( + _msg(MessageRole.SYSTEM, "System instruction here"), + _msg(MessageRole.ASSISTANT, "Response content"), + ) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.COMPLEX, + ) + + # Should archive both but only extract from ASSISTANT + assert "[Archived 2 messages" in summary + + def test_user_messages_ignored(self) -> None: + """USER messages don't contribute to summary.""" + messages = ( + _msg(MessageRole.USER, "User query here"), + _msg(MessageRole.ASSISTANT, "Wait, let me think about this."), + ) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.COMPLEX, + ) + + # USER message is archived but not summarized + assert "Epistemic markers preserved" in summary + + def test_empty_assistant_content_ignored(self) -> None: + """Empty assistant messages are skipped.""" + messages = ( + _msg(MessageRole.ASSISTANT, ""), + _msg(MessageRole.ASSISTANT, "Wait, something important."), + ) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.COMPLEX, + ) + + # First message is empty and skipped, second has marker + assert "Epistemic markers preserved from 1 message." in summary + + def test_summary_respects_max_length(self) -> None: + """Summary is truncated at MAX_SUMMARY_CHARS.""" + # Create very long content + long_content = "Wait, " + "x" * 600 + messages = (_msg(MessageRole.ASSISTANT, long_content),) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.COMPLEX, + ) + + # Summary should be reasonable length (max 500 chars for content) + assert len(summary) < 1000 + + +def _make_identity(name: str = "test-agent") -> AgentIdentity: + """Create a test agent identity.""" + return AgentIdentity( + name=name, + role="engineer", + department="engineering", + level=SeniorityLevel.MID, + hiring_date=date(2026, 1, 15), + personality=PersonalityConfig(traits=("analytical",)), + model=ModelConfig( + provider="test-provider", + model_id="test-small-001", + ), + ) + + +@pytest.mark.unit +class TestForceCompaction: + """Tests for force_compaction function.""" + + def test_force_compaction_too_few_messages(self) -> None: + """force_compaction returns None with too few messages.""" + identity = _make_identity() + ctx = AgentContext.from_identity(identity) + # Add only 2 messages (below default min_messages_to_compact=4) + ctx = ctx.with_message( + ChatMessage(role=MessageRole.USER, content="Hello"), + ) + ctx = ctx.with_message( + ChatMessage(role=MessageRole.ASSISTANT, content="Hi"), + ) + + config = CompactionConfig(min_messages_to_compact=4) + estimator = DefaultTokenEstimator() + + result = force_compaction(ctx, config, estimator) + + assert result is None + + def test_force_compaction_bypasses_threshold(self) -> None: + """force_compaction runs even when fill is below threshold.""" + identity = _make_identity() + ctx = AgentContext.from_identity(identity) + # Add 8 messages to ensure we have enough for compaction + for i in range(8): + if i % 2 == 0: + msg = ChatMessage( + role=MessageRole.USER, + content=f"User message {i}", + ) + else: + msg = ChatMessage( + role=MessageRole.ASSISTANT, + content=f"Response {i}", + ) + ctx = ctx.with_message(msg) + + config = CompactionConfig( + min_messages_to_compact=4, + fill_threshold_percent=95.0, # Very high threshold + ) + estimator = DefaultTokenEstimator() + + # Should succeed despite low fill percentage + result = force_compaction(ctx, config, estimator) + + # Forced compaction should produce a compacted context + assert result is not None + assert isinstance(result, AgentContext) + assert len(result.conversation) < len(ctx.conversation) diff --git a/tests/unit/engine/coordination/test_attribution.py b/tests/unit/engine/coordination/test_attribution.py new file mode 100644 index 0000000000..30009eafbe --- /dev/null +++ b/tests/unit/engine/coordination/test_attribution.py @@ -0,0 +1,242 @@ +"""Unit tests for coordination attribution models. + +Tests AgentContribution, CoordinationResultWithAttribution, and +FailureAttribution validation. +""" + +import pytest +from pydantic import ValidationError + +from synthorg.core.enums import CoordinationTopology +from synthorg.core.types import NotBlankStr +from synthorg.engine.coordination.attribution import ( + AgentContribution, + CoordinationResultWithAttribution, + FailureAttribution, +) +from synthorg.engine.coordination.models import ( + CoordinationPhaseResult, + CoordinationResult, +) + + +def _make_coord_result( + *, + parent_task_id: str = "task-1", + topology: CoordinationTopology = CoordinationTopology.SAS, + success: bool = True, +) -> CoordinationResult: + """Build a minimal CoordinationResult for testing.""" + return CoordinationResult( + parent_task_id=NotBlankStr(parent_task_id), + topology=topology, + phases=( + CoordinationPhaseResult( + phase=NotBlankStr("dispatch"), + success=success, + duration_seconds=1.0, + error=None if success else "phase error", + ), + ), + total_duration_seconds=1.0, + total_cost_usd=0.5, + ) + + +class TestAgentContribution: + """Tests for the AgentContribution model.""" + + @pytest.mark.unit + def test_successful_contribution(self) -> None: + """Score 1.0 requires no failure attribution.""" + contrib = AgentContribution( + agent_id=NotBlankStr("agent-1"), + subtask_id=NotBlankStr("subtask-1"), + contribution_score=1.0, + ) + assert contrib.contribution_score == 1.0 + assert contrib.failure_attribution is None + assert contrib.evidence is None + + @pytest.mark.unit + def test_failed_contribution_direct(self) -> None: + """Score < 1.0 requires failure attribution.""" + contrib = AgentContribution( + agent_id=NotBlankStr("agent-1"), + subtask_id=NotBlankStr("subtask-1"), + contribution_score=0.0, + failure_attribution="direct", + evidence="Tool invocation failed: timeout", + ) + assert contrib.contribution_score == 0.0 + assert contrib.failure_attribution == "direct" + assert contrib.evidence is not None + + @pytest.mark.unit + @pytest.mark.parametrize( + "attribution", + [ + "direct", + "upstream_contamination", + "coordination_overhead", + "quality_gate", + ], + ) + def test_all_failure_attribution_values( + self, + attribution: FailureAttribution, + ) -> None: + """All failure attribution literals are accepted.""" + contrib = AgentContribution( + agent_id=NotBlankStr("agent-1"), + subtask_id=NotBlankStr("subtask-1"), + contribution_score=0.5, + failure_attribution=attribution, + ) + assert contrib.failure_attribution == attribution + + @pytest.mark.unit + def test_score_below_one_requires_attribution(self) -> None: + """Score < 1.0 without failure_attribution is rejected.""" + with pytest.raises(ValueError, match="failure_attribution"): + AgentContribution( + agent_id=NotBlankStr("agent-1"), + subtask_id=NotBlankStr("subtask-1"), + contribution_score=0.5, + ) + + @pytest.mark.unit + def test_score_one_with_attribution_rejected(self) -> None: + """Score == 1.0 with failure_attribution set is rejected.""" + with pytest.raises(ValueError, match="failure_attribution"): + AgentContribution( + agent_id=NotBlankStr("agent-1"), + subtask_id=NotBlankStr("subtask-1"), + contribution_score=1.0, + failure_attribution="direct", + ) + + @pytest.mark.unit + def test_score_bounds_low(self) -> None: + """Score below 0.0 is rejected.""" + with pytest.raises(ValueError, match="greater than or equal to 0"): + AgentContribution( + agent_id=NotBlankStr("agent-1"), + subtask_id=NotBlankStr("subtask-1"), + contribution_score=-0.1, + failure_attribution="direct", + ) + + @pytest.mark.unit + def test_score_bounds_high(self) -> None: + """Score above 1.0 is rejected.""" + with pytest.raises(ValueError, match="less than or equal to 1"): + AgentContribution( + agent_id=NotBlankStr("agent-1"), + subtask_id=NotBlankStr("subtask-1"), + contribution_score=1.1, + ) + + @pytest.mark.unit + def test_frozen(self) -> None: + """AgentContribution is immutable.""" + contrib = AgentContribution( + agent_id=NotBlankStr("agent-1"), + subtask_id=NotBlankStr("subtask-1"), + contribution_score=1.0, + ) + with pytest.raises(ValidationError, match="frozen"): + contrib.contribution_score = 0.5 # type: ignore[misc] + + @pytest.mark.unit + def test_evidence_max_length(self) -> None: + """Evidence exceeding 500 chars is rejected.""" + with pytest.raises(ValueError, match="at most 500"): + AgentContribution( + agent_id=NotBlankStr("agent-1"), + subtask_id=NotBlankStr("subtask-1"), + contribution_score=0.0, + failure_attribution="direct", + evidence="x" * 501, + ) + + @pytest.mark.unit + def test_evidence_at_max_length(self) -> None: + """Evidence at exactly 500 chars is accepted.""" + contrib = AgentContribution( + agent_id=NotBlankStr("agent-1"), + subtask_id=NotBlankStr("subtask-1"), + contribution_score=0.0, + failure_attribution="direct", + evidence="x" * 500, + ) + assert len(contrib.evidence) == 500 # type: ignore[arg-type] + + +class TestCoordinationResultWithAttribution: + """Tests for the CoordinationResultWithAttribution wrapper.""" + + @pytest.mark.unit + def test_wraps_result(self) -> None: + """Wrapper preserves the original CoordinationResult.""" + result = _make_coord_result() + wrapper = CoordinationResultWithAttribution( + result=result, + agent_contributions=(), + ) + assert wrapper.result is result + + @pytest.mark.unit + def test_is_success_delegates(self) -> None: + """is_success delegates to the wrapped result.""" + success_wrapper = CoordinationResultWithAttribution( + result=_make_coord_result(success=True), + agent_contributions=(), + ) + assert success_wrapper.is_success is True + + failure_wrapper = CoordinationResultWithAttribution( + result=_make_coord_result(success=False), + agent_contributions=(), + ) + assert failure_wrapper.is_success is False + + @pytest.mark.unit + def test_avg_contribution_score_empty(self) -> None: + """Empty contributions yield 0.0 average.""" + wrapper = CoordinationResultWithAttribution( + result=_make_coord_result(), + agent_contributions=(), + ) + assert wrapper.avg_contribution_score == 0.0 + + @pytest.mark.unit + def test_avg_contribution_score_computed(self) -> None: + """Average of contribution scores is computed correctly.""" + wrapper = CoordinationResultWithAttribution( + result=_make_coord_result(), + agent_contributions=( + AgentContribution( + agent_id=NotBlankStr("a1"), + subtask_id=NotBlankStr("s1"), + contribution_score=1.0, + ), + AgentContribution( + agent_id=NotBlankStr("a2"), + subtask_id=NotBlankStr("s2"), + contribution_score=0.0, + failure_attribution="direct", + ), + ), + ) + assert wrapper.avg_contribution_score == pytest.approx(0.5) + + @pytest.mark.unit + def test_frozen(self) -> None: + """CoordinationResultWithAttribution is immutable.""" + wrapper = CoordinationResultWithAttribution( + result=_make_coord_result(), + agent_contributions=(), + ) + with pytest.raises(ValidationError, match="frozen"): + wrapper.agent_contributions = () # type: ignore[misc] diff --git a/tests/unit/engine/coordination/test_attribution_factory.py b/tests/unit/engine/coordination/test_attribution_factory.py new file mode 100644 index 0000000000..e92f82b581 --- /dev/null +++ b/tests/unit/engine/coordination/test_attribution_factory.py @@ -0,0 +1,397 @@ +"""Unit tests for build_agent_contributions factory function.""" + +from datetime import date + +import pytest + +from synthorg.core.agent import AgentIdentity, ModelConfig, PersonalityConfig +from synthorg.core.enums import ( + Complexity, + CoordinationTopology, + Priority, + SeniorityLevel, + TaskStatus, + TaskType, +) +from synthorg.core.task import Task +from synthorg.core.types import NotBlankStr +from synthorg.engine.context import AgentContext +from synthorg.engine.coordination.attribution import build_agent_contributions +from synthorg.engine.coordination.models import CoordinationWave +from synthorg.engine.loop_protocol import ExecutionResult, TerminationReason +from synthorg.engine.parallel_models import ( + AgentOutcome, + ParallelExecutionResult, +) +from synthorg.engine.prompt import SystemPrompt +from synthorg.engine.routing.models import ( + RoutingCandidate, + RoutingDecision, + RoutingResult, +) +from synthorg.engine.run_result import AgentRunResult + + +def _make_identity(name: str = "test-agent", **kwargs: object) -> AgentIdentity: + defaults: dict[str, object] = { + "role": "engineer", + "department": "engineering", + "level": SeniorityLevel.MID, + "hiring_date": date(2026, 1, 15), + "personality": PersonalityConfig(traits=("analytical",)), + "model": ModelConfig( + provider="test-provider", + model_id="test-small-001", + ), + } + defaults.update(kwargs) + return AgentIdentity(name=name, **defaults) # type: ignore[arg-type] + + +def _make_task( + title: str = "test-task", + task_id: str | None = None, + **kwargs: object, +) -> Task: + defaults: dict[str, object] = { + "id": task_id or f"task-{title}", + "description": "A test task", + "type": TaskType.DEVELOPMENT, + "priority": Priority.MEDIUM, + "project": "test-project", + "created_by": "tester", + "assigned_to": "test-agent", + "status": TaskStatus.ASSIGNED, + "estimated_complexity": Complexity.SIMPLE, + } + defaults.update(kwargs) + return Task(title=title, **defaults) # type: ignore[arg-type] + + +def _make_run_result( + agent_id: str, + task_id: str, + *, + reason: TerminationReason = TerminationReason.COMPLETED, +) -> AgentRunResult: + identity = _make_identity(agent_id) + task = _make_task(agent_id, task_id=task_id, assigned_to=agent_id) + ctx = AgentContext.from_identity(identity, task=task) + error_msg = "test error" if reason == TerminationReason.ERROR else None + return AgentRunResult( + execution_result=ExecutionResult( + context=ctx, + termination_reason=reason, + error_message=error_msg, + ), + system_prompt=SystemPrompt( + content="test", + template_version="1.0", + estimated_tokens=1, + sections=("identity",), + metadata={"agent_id": agent_id}, + ), + duration_seconds=1.0, + agent_id=agent_id, + task_id=task_id, + ) + + +def _make_routing_result( + agent_subtask_pairs: list[tuple[str, str]], + *, + parent_task_id: str = "parent-1", +) -> RoutingResult: + decisions = tuple( + RoutingDecision( + subtask_id=NotBlankStr(subtask_id), + selected_candidate=RoutingCandidate( + agent_identity=_make_identity(agent_id), + score=0.9, + matched_skills=(), + reason=NotBlankStr("test routing"), + ), + topology=CoordinationTopology.SAS, + ) + for agent_id, subtask_id in agent_subtask_pairs + ) + return RoutingResult( + parent_task_id=NotBlankStr(parent_task_id), + decisions=decisions, + ) + + +def _make_successful_outcome( + agent_id: str, + task_id: str, +) -> AgentOutcome: + return AgentOutcome( + task_id=NotBlankStr(task_id), + agent_id=NotBlankStr(agent_id), + result=_make_run_result(agent_id, task_id), + ) + + +def _make_failed_outcome( + agent_id: str, + task_id: str, + *, + error: str = "tool invocation failed: timeout", +) -> AgentOutcome: + return AgentOutcome( + task_id=NotBlankStr(task_id), + agent_id=NotBlankStr(agent_id), + error=error, + ) + + +def _make_terminated_outcome( + agent_id: str, + task_id: str, + *, + reason: TerminationReason = TerminationReason.STAGNATION, +) -> AgentOutcome: + return AgentOutcome( + task_id=NotBlankStr(task_id), + agent_id=NotBlankStr(agent_id), + result=_make_run_result(agent_id, task_id, reason=reason), + ) + + +def _make_waves( + outcomes: list[AgentOutcome], + *, + group_id: str = "group-1", +) -> tuple[CoordinationWave, ...]: + return ( + CoordinationWave( + wave_index=0, + subtask_ids=tuple(NotBlankStr(o.task_id) for o in outcomes), + execution_result=ParallelExecutionResult( + group_id=NotBlankStr(group_id), + outcomes=tuple(outcomes), + total_duration_seconds=2.0, + ), + ), + ) + + +class TestBuildAgentContributions: + """Tests for the build_agent_contributions factory.""" + + @pytest.mark.unit + def test_all_success(self) -> None: + """All agents succeed: score 1.0, no failure attribution.""" + routing = _make_routing_result( + [ + ("agent-1", "sub-1"), + ("agent-2", "sub-2"), + ] + ) + waves = _make_waves( + [ + _make_successful_outcome("agent-1", "sub-1"), + _make_successful_outcome("agent-2", "sub-2"), + ] + ) + + contribs = build_agent_contributions(routing, waves) + + assert len(contribs) == 2 + for c in contribs: + assert c.contribution_score == 1.0 + assert c.failure_attribution is None + + @pytest.mark.unit + def test_all_failed_with_errors(self) -> None: + """All agents fail with errors: score 0.0, direct attribution.""" + routing = _make_routing_result([("agent-1", "sub-1")]) + waves = _make_waves( + [ + _make_failed_outcome("agent-1", "sub-1", error="tool execution failed"), + ] + ) + + contribs = build_agent_contributions(routing, waves) + + assert len(contribs) == 1 + assert contribs[0].contribution_score == 0.0 + assert contribs[0].failure_attribution == "direct" + assert contribs[0].evidence is not None + + @pytest.mark.unit + def test_stagnation_termination(self) -> None: + """Agent terminated by stagnation: score 0.0, direct.""" + routing = _make_routing_result([("agent-1", "sub-1")]) + waves = _make_waves( + [ + _make_terminated_outcome( + "agent-1", + "sub-1", + reason=TerminationReason.STAGNATION, + ), + ] + ) + + contribs = build_agent_contributions(routing, waves) + + assert len(contribs) == 1 + assert contribs[0].contribution_score == 0.0 + assert contribs[0].failure_attribution == "direct" + + @pytest.mark.unit + def test_budget_exhausted_termination(self) -> None: + """Agent ran out of budget: coordination_overhead.""" + routing = _make_routing_result([("agent-1", "sub-1")]) + waves = _make_waves( + [ + _make_terminated_outcome( + "agent-1", + "sub-1", + reason=TerminationReason.BUDGET_EXHAUSTED, + ), + ] + ) + + contribs = build_agent_contributions(routing, waves) + + assert len(contribs) == 1 + assert contribs[0].contribution_score == 0.0 + assert contribs[0].failure_attribution == "coordination_overhead" + + @pytest.mark.unit + def test_mixed_outcomes(self) -> None: + """Mix of success and failure: different scores and attributions.""" + routing = _make_routing_result( + [ + ("agent-1", "sub-1"), + ("agent-2", "sub-2"), + ] + ) + waves = _make_waves( + [ + _make_successful_outcome("agent-1", "sub-1"), + _make_failed_outcome("agent-2", "sub-2", error="budget exceeded"), + ] + ) + + contribs = build_agent_contributions(routing, waves) + + by_agent = {c.agent_id: c for c in contribs} + assert by_agent["agent-1"].contribution_score == 1.0 + assert by_agent["agent-2"].contribution_score == 0.0 + + @pytest.mark.unit + def test_empty_waves(self) -> None: + """No waves: empty contributions.""" + routing = _make_routing_result([("agent-1", "sub-1")]) + contribs = build_agent_contributions(routing, ()) + assert contribs == () + + @pytest.mark.unit + def test_wave_without_execution_result(self) -> None: + """Wave with no execution result: skipped.""" + routing = _make_routing_result([("agent-1", "sub-1")]) + waves = ( + CoordinationWave( + wave_index=0, + subtask_ids=(NotBlankStr("sub-1"),), + execution_result=None, + ), + ) + contribs = build_agent_contributions(routing, waves) + assert contribs == () + + @pytest.mark.unit + def test_evidence_truncated(self) -> None: + """Long error messages are truncated to 500 chars.""" + long_error = "x" * 1000 + routing = _make_routing_result([("agent-1", "sub-1")]) + waves = _make_waves( + [ + _make_failed_outcome("agent-1", "sub-1", error=long_error), + ] + ) + + contribs = build_agent_contributions(routing, waves) + + assert contribs[0].evidence is not None + assert len(contribs[0].evidence) <= 500 + + @pytest.mark.unit + def test_returns_tuple(self) -> None: + """Return type is a tuple (immutable).""" + routing = _make_routing_result([("agent-1", "sub-1")]) + waves = _make_waves([_make_successful_outcome("agent-1", "sub-1")]) + contribs = build_agent_contributions(routing, waves) + assert isinstance(contribs, tuple) + + @pytest.mark.unit + def test_quality_gate_error_classification(self) -> None: + """Error containing 'quality' is classified as quality_gate.""" + routing = _make_routing_result([("agent-1", "sub-1")]) + waves = _make_waves( + [ + _make_failed_outcome( + "agent-1", + "sub-1", + error="quality criteria not met", + ), + ] + ) + + contribs = build_agent_contributions(routing, waves) + + assert contribs[0].failure_attribution == "quality_gate" + + @pytest.mark.unit + def test_multiple_waves(self) -> None: + """Contributions collected from multiple waves.""" + routing = _make_routing_result( + [ + ("agent-1", "sub-1"), + ("agent-2", "sub-2"), + ] + ) + wave1 = CoordinationWave( + wave_index=0, + subtask_ids=(NotBlankStr("sub-1"),), + execution_result=ParallelExecutionResult( + group_id=NotBlankStr("g1"), + outcomes=(_make_successful_outcome("agent-1", "sub-1"),), + total_duration_seconds=1.0, + ), + ) + wave2 = CoordinationWave( + wave_index=1, + subtask_ids=(NotBlankStr("sub-2"),), + execution_result=ParallelExecutionResult( + group_id=NotBlankStr("g2"), + outcomes=(_make_failed_outcome("agent-2", "sub-2"),), + total_duration_seconds=1.0, + ), + ) + + contribs = build_agent_contributions(routing, (wave1, wave2)) + + assert len(contribs) == 2 + by_agent = {c.agent_id: c for c in contribs} + assert by_agent["agent-1"].contribution_score == 1.0 + assert by_agent["agent-2"].contribution_score == 0.0 + + @pytest.mark.unit + def test_unmapped_agent_uses_task_id_as_subtask(self) -> None: + """Agent not in routing decisions falls back to task_id.""" + # Route only agent-1, but include agent-2 in outcomes + routing = _make_routing_result([("agent-1", "sub-1")]) + waves = _make_waves( + [ + _make_successful_outcome("agent-1", "sub-1"), + _make_successful_outcome("agent-2", "sub-2"), # not in routing + ] + ) + contribs = build_agent_contributions(routing, waves) + assert len(contribs) == 2 + by_agent = {c.agent_id: c for c in contribs} + # Unmapped agent falls back to using task_id as subtask_id + assert by_agent["agent-2"].subtask_id == "sub-2" diff --git a/tests/unit/engine/test_coordination_service.py b/tests/unit/engine/test_coordination_service.py index af982f712a..65973e4677 100644 --- a/tests/unit/engine/test_coordination_service.py +++ b/tests/unit/engine/test_coordination_service.py @@ -132,9 +132,10 @@ async def test_happy_path_two_parallel_subtasks(self) -> None: ), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result - assert result.is_success + assert attributed.is_success assert result.topology == CoordinationTopology.CENTRALIZED assert result.decomposition_result is not None assert result.routing_result is not None @@ -142,6 +143,7 @@ async def test_happy_path_two_parallel_subtasks(self) -> None: assert result.status_rollup is not None assert result.status_rollup.completed == 2 assert result.total_duration_seconds > 0 + assert isinstance(attributed.agent_contributions, tuple) @pytest.mark.unit async def test_sas_topology_single_agent(self) -> None: @@ -174,9 +176,10 @@ async def test_sas_topology_single_agent(self) -> None: available_agents=(make_assignment_agent("alice"),), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result - assert result.is_success + assert attributed.is_success assert result.topology == CoordinationTopology.SAS assert len(result.waves) == 2 @@ -285,10 +288,11 @@ async def test_partial_execution_fail_fast_off(self) -> None: config=CoordinationConfig(fail_fast=False), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result # Not fully successful (wave 0 failed) - assert not result.is_success + assert not attributed.is_success # Both waves still executed assert len(result.waves) == 2 assert result.status_rollup is not None @@ -329,9 +333,9 @@ async def test_task_engine_parent_update(self) -> None: available_agents=(make_assignment_agent("alice"),), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) - assert result.is_success + assert attributed.is_success task_engine.submit.assert_called_once() @pytest.mark.unit @@ -355,9 +359,10 @@ async def test_no_task_engine_skips_update(self) -> None: available_agents=(make_assignment_agent("alice"),), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result - assert result.is_success + assert attributed.is_success # No update_parent phase in results update_phases = [p for p in result.phases if p.phase == "update_parent"] assert len(update_phases) == 0 @@ -411,7 +416,8 @@ async def test_status_rollup_correctness(self) -> None: ), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result assert result.status_rollup is not None assert result.status_rollup.completed == 1 @@ -501,9 +507,10 @@ async def test_workspace_lifecycle(self) -> None: ), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result - assert result.is_success + assert attributed.is_success ws_service.setup_group.assert_called_once() ws_service.merge_group.assert_called_once() ws_service.teardown_group.assert_called_once() @@ -549,7 +556,8 @@ async def test_auto_topology_resolves_to_centralized(self) -> None: available_agents=(make_assignment_agent("alice"),), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result assert result.topology == CoordinationTopology.CENTRALIZED @pytest.mark.unit @@ -583,7 +591,8 @@ async def test_update_parent_submit_fails(self) -> None: available_agents=(make_assignment_agent("alice"),), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result update_phases = [p for p in result.phases if p.phase == "update_parent"] assert len(update_phases) == 1 assert not update_phases[0].success @@ -613,7 +622,8 @@ async def test_update_parent_exception_captured(self) -> None: available_agents=(make_assignment_agent("alice"),), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result update_phases = [p for p in result.phases if p.phase == "update_parent"] assert len(update_phases) == 1 assert not update_phases[0].success @@ -653,7 +663,8 @@ async def test_rollup_error_captured(self) -> None: available_agents=(make_assignment_agent("alice"),), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result rollup_phases = [p for p in result.phases if p.phase == "rollup"] assert len(rollup_phases) == 1 assert not rollup_phases[0].success @@ -750,7 +761,8 @@ async def test_total_cost_aggregated(self) -> None: available_agents=(make_assignment_agent("alice"),), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result assert result.total_cost_usd == pytest.approx(0.08) @pytest.mark.unit @@ -782,7 +794,8 @@ async def test_fail_fast_stops_after_failed_wave(self) -> None: config=CoordinationConfig(fail_fast=True), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result # Only one wave executed (fail_fast stopped before wave 1) assert len(result.waves) == 1 @@ -817,7 +830,8 @@ async def test_rollup_includes_blocked_subtasks(self) -> None: available_agents=(make_assignment_agent("alice"),), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result assert result.status_rollup is not None # 1 completed + 1 blocked = 2 total diff --git a/tests/unit/tools/context/__init__.py b/tests/unit/tools/context/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/unit/tools/context/test_compact_context.py b/tests/unit/tools/context/test_compact_context.py new file mode 100644 index 0000000000..90bce6b7ef --- /dev/null +++ b/tests/unit/tools/context/test_compact_context.py @@ -0,0 +1,178 @@ +"""Tests for CompactContextTool.""" + +import pytest + +from synthorg.core.enums import ToolCategory +from synthorg.providers.models import ToolDefinition +from synthorg.tools.context.compact_context import CompactContextTool + + +@pytest.mark.unit +class TestCompactContextTool: + """Tests for CompactContextTool class.""" + + def test_tool_name(self) -> None: + """Tool name is 'compact_context'.""" + tool = CompactContextTool() + assert tool.name == "compact_context" + + def test_tool_category(self) -> None: + """Tool category is MEMORY.""" + tool = CompactContextTool() + assert tool.category == ToolCategory.MEMORY + + def test_tool_description_not_empty(self) -> None: + """Tool has a non-empty description.""" + tool = CompactContextTool() + assert tool.description + assert isinstance(tool.description, str) + assert len(tool.description) > 0 + + def test_parameters_schema_has_required_fields(self) -> None: + """Parameters schema includes strategy and reason.""" + tool = CompactContextTool() + schema = tool.parameters_schema + assert schema is not None + assert "properties" in schema + assert "strategy" in schema["properties"] + assert "reason" in schema["properties"] + assert "required" in schema + assert "strategy" in schema["required"] + assert "reason" in schema["required"] + + def test_preserve_markers_optional_in_schema(self) -> None: + """preserve_markers is optional with default True.""" + tool = CompactContextTool() + schema = tool.parameters_schema + assert schema is not None + assert "preserve_markers" in schema["properties"] + # Should NOT be in required list since it has a default + assert "preserve_markers" not in schema["required"] + # Schema should indicate default is True + assert schema["properties"]["preserve_markers"]["default"] is True + + async def test_execute_valid_args(self) -> None: + """Execute with valid arguments returns ToolExecutionResult.""" + tool = CompactContextTool() + result = await tool.execute( + arguments={ + "strategy": "summarize", + "reason": "context at 90 percent fill", + }, + ) + + assert result.content + assert isinstance(result.content, str) + assert "Compaction directive accepted" in result.content + + async def test_execute_has_correct_metadata_keys(self) -> None: + """Execute result metadata contains strategy, preserve_markers, reason.""" + tool = CompactContextTool() + result = await tool.execute( + arguments={ + "strategy": "summarize", + "reason": "context at 90 percent fill", + }, + ) + + metadata = result.metadata + assert "strategy" in metadata + assert "preserve_markers" in metadata + assert "reason" in metadata + assert "compaction_directive" in metadata + + async def test_execute_preserve_markers_default_true(self) -> None: + """When preserve_markers not provided, defaults to True.""" + tool = CompactContextTool() + result = await tool.execute( + arguments={ + "strategy": "summarize", + "reason": "context at 90 percent fill", + }, + ) + + assert result.metadata["preserve_markers"] is True + + async def test_execute_preserve_markers_explicit_false(self) -> None: + """When preserve_markers is False, it's preserved in metadata.""" + tool = CompactContextTool() + result = await tool.execute( + arguments={ + "strategy": "summarize", + "reason": "context at 90 percent fill", + "preserve_markers": False, + }, + ) + + assert result.metadata["preserve_markers"] is False + + async def test_execute_strategy_in_metadata(self) -> None: + """Strategy from arguments is in metadata.""" + tool = CompactContextTool() + result = await tool.execute( + arguments={ + "strategy": "summarize", + "reason": "context at 90 percent fill", + }, + ) + + assert result.metadata["strategy"] == "summarize" + + async def test_execute_reason_in_metadata(self) -> None: + """Reason from arguments is in metadata.""" + tool = CompactContextTool() + reason_text = "context at 90 percent fill" + result = await tool.execute( + arguments={ + "strategy": "summarize", + "reason": reason_text, + }, + ) + + assert result.metadata["reason"] == reason_text + + async def test_execute_compaction_directive_true(self) -> None: + """Metadata includes compaction_directive set to True.""" + tool = CompactContextTool() + result = await tool.execute( + arguments={ + "strategy": "summarize", + "reason": "context at 90 percent fill", + }, + ) + + assert result.metadata["compaction_directive"] is True + + def test_to_definition_returns_valid_tool_definition(self) -> None: + """to_definition() returns a valid ToolDefinition.""" + tool = CompactContextTool() + definition = tool.to_definition() + + assert isinstance(definition, ToolDefinition) + assert definition.name == "compact_context" + assert definition.description + assert definition.parameters_schema + + def test_to_definition_schema_is_correct(self) -> None: + """ToolDefinition contains correct schema.""" + tool = CompactContextTool() + definition = tool.to_definition() + + schema = definition.parameters_schema + assert "properties" in schema + assert "strategy" in schema["properties"] + assert "reason" in schema["properties"] + assert "strategy" in schema["required"] + assert "reason" in schema["required"] + + async def test_execute_is_not_error(self) -> None: + """Execute result is_error is False.""" + tool = CompactContextTool() + result = await tool.execute( + arguments={ + "strategy": "summarize", + "reason": "context at 90 percent fill", + }, + ) + + assert result.is_error is False diff --git a/tests/unit/tools/test_factory.py b/tests/unit/tools/test_factory.py index 9d58baf1d9..9d306f43cc 100644 --- a/tests/unit/tools/test_factory.py +++ b/tests/unit/tools/test_factory.py @@ -17,6 +17,7 @@ from synthorg.tools.git_url_validator import GitCloneNetworkPolicy _EXPECTED_TOOL_NAMES: tuple[str, ...] = ( + "compact_context", "delete_file", "edit_file", "git_branch", @@ -42,7 +43,7 @@ def test_returns_all_expected_tools( self, tmp_path: Path, ) -> None: - """Factory returns all 14 built-in tools sorted by name.""" + """Factory returns all 15 built-in tools sorted by name.""" tools = build_default_tools(workspace=tmp_path) names = tuple(t.name for t in tools) assert names == _EXPECTED_TOOL_NAMES diff --git a/tests/unit/tools/test_factory_new_categories.py b/tests/unit/tools/test_factory_new_categories.py index 69089d606e..f5bb503adc 100644 --- a/tests/unit/tools/test_factory_new_categories.py +++ b/tests/unit/tools/test_factory_new_categories.py @@ -134,10 +134,10 @@ class TestFactoryToolCount: @pytest.mark.unit def test_default_tool_count(self, workspace: Path) -> None: - """Default: 5 fs + 6 git + 2 web + 1 terminal = 14 tools.""" + """Default: 5 fs + 6 git + 2 web + 1 terminal + 1 context = 15 tools.""" tools = build_default_tools(workspace=workspace) - assert len(tools) == 14 + assert len(tools) == 15 @pytest.mark.unit def test_tools_sorted_by_name(self, workspace: Path) -> None: diff --git a/tests/unit/tools/test_factory_sandbox_wiring.py b/tests/unit/tools/test_factory_sandbox_wiring.py index f70e20edce..76f430392a 100644 --- a/tests/unit/tools/test_factory_sandbox_wiring.py +++ b/tests/unit/tools/test_factory_sandbox_wiring.py @@ -42,6 +42,7 @@ + len(_FS_TOOL_NAMES) + len(_WEB_TOOL_NAMES) + len(_TERMINAL_TOOL_NAMES) + + 1 # compact_context (context management) ) diff --git a/web/src/api/types.ts b/web/src/api/types.ts index b5bbd51ed2..38cf348ed8 100644 --- a/web/src/api/types.ts +++ b/web/src/api/types.ts @@ -1312,6 +1312,7 @@ export interface CoordinationResultResponse { topology: CoordinationTopology total_duration_seconds: number total_cost_usd: number + currency: string readonly phases: readonly CoordinationPhaseResponse[] wave_count: number is_success: boolean