From c1caa596fb56ac05940756d83ef128c401fc49a1 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Wed, 8 Apr 2026 01:30:26 +0200 Subject: [PATCH 01/10] feat: engine intelligence v2 -- trace enrichment, compaction, versioning eval #1123: Execution trace enrichment - Add NodeType enum (LLM_CALL, TOOL_INVOCATION, QUALITY_CHECK, BUDGET_CHECK, STAGNATION_CHECK) to loop_protocol.py - Add node_types field to TurnRecord (auto-derived in make_turn_record) - Add AgentContribution + CoordinationResultWithAttribution wrapper in coordination/attribution.py with build_agent_contributions() factory - Update CoordinationService.coordinate() to return attributed result - Add record_coordination_contributions() to PerformanceTracker - Update all callers (agent_engine, API controller, tests) #1125: Agent-controlled compaction + epistemic marker preservation - Add epistemic marker detection (compaction/epistemic.py) with complexity-adaptive thresholds (COMPLEX/EPIC: 1+, SIMPLE/MEDIUM: 3+) - Rewrite _build_summary() to preserve marker-containing sentences - Add CompactContextTool (tools/context/compact_context.py) for agent-initiated compaction via metadata directive - Add CompactionConfig fields: agent_controlled, safety_threshold_percent, preserve_epistemic_markers with dual-threshold behavior - Add force_compaction() that bypasses threshold check - Register compact_context in build_default_tools() #1113: Versioning infrastructure evaluation - Evaluate WorkflowDefinitionVersion migration to generic VersionSnapshot[T] (favorable: content hash dedup, concurrent write safety, schema consistency) - Create follow-up issues: #1131 (migration), #1132 (config versioning), #1133 (role catalog versioning) Closes #1123 Closes #1125 Closes #1113 --- src/synthorg/api/controllers/coordination.py | 36 +- src/synthorg/engine/__init__.py | 4 + src/synthorg/engine/agent_engine.py | 8 +- src/synthorg/engine/compaction/epistemic.py | 133 ++++++ src/synthorg/engine/compaction/models.py | 49 ++- src/synthorg/engine/compaction/summarizer.py | 160 +++++++- src/synthorg/engine/coordination/__init__.py | 10 + .../engine/coordination/attribution.py | 273 +++++++++++++ src/synthorg/engine/coordination/service.py | 21 +- src/synthorg/engine/loop_helpers.py | 16 + src/synthorg/engine/loop_protocol.py | 22 + src/synthorg/hr/performance/tracker.py | 28 ++ .../observability/events/context_budget.py | 8 + .../observability/events/coordination.py | 1 + src/synthorg/tools/context/__init__.py | 1 + src/synthorg/tools/context/compact_context.py | 114 ++++++ src/synthorg/tools/factory.py | 5 + .../unit/api/controllers/test_coordination.py | 10 +- .../unit/engine/compaction/test_epistemic.py | 189 +++++++++ .../compaction/test_summarizer_markers.py | 267 ++++++++++++ .../engine/coordination/test_attribution.py | 241 +++++++++++ .../coordination/test_attribution_factory.py | 380 ++++++++++++++++++ .../unit/engine/test_coordination_service.py | 54 ++- tests/unit/tools/context/__init__.py | 0 .../tools/context/test_compact_context.py | 176 ++++++++ tests/unit/tools/test_factory.py | 3 +- .../unit/tools/test_factory_new_categories.py | 4 +- .../unit/tools/test_factory_sandbox_wiring.py | 1 + 28 files changed, 2152 insertions(+), 62 deletions(-) create mode 100644 src/synthorg/engine/compaction/epistemic.py create mode 100644 src/synthorg/engine/coordination/attribution.py create mode 100644 src/synthorg/tools/context/__init__.py create mode 100644 src/synthorg/tools/context/compact_context.py create mode 100644 tests/unit/engine/compaction/test_epistemic.py create mode 100644 tests/unit/engine/compaction/test_summarizer_markers.py create mode 100644 tests/unit/engine/coordination/test_attribution.py create mode 100644 tests/unit/engine/coordination/test_attribution_factory.py create mode 100644 tests/unit/tools/context/__init__.py create mode 100644 tests/unit/tools/context/test_compact_context.py diff --git a/src/synthorg/api/controllers/coordination.py b/src/synthorg/api/controllers/coordination.py index ee45fb58fb..0dfcbf7ba8 100644 --- a/src/synthorg/api/controllers/coordination.py +++ b/src/synthorg/api/controllers/coordination.py @@ -42,6 +42,9 @@ from synthorg.api.state import AppState from synthorg.core.agent import AgentIdentity from synthorg.core.task import Task + from synthorg.engine.coordination.attribution import ( + CoordinationResultWithAttribution, + ) logger = get_logger(__name__) @@ -171,7 +174,7 @@ async def coordinate_task( agent_count=len(agents), ) - result = await self._execute( + attributed = await self._execute( app_state, request, context, @@ -188,7 +191,10 @@ async def coordinate_task( ) currency = DEFAULT_CURRENCY return ApiResponse( - data=_map_result_to_response(result, currency=currency), + data=_map_result_to_response( + attributed.result, + currency=currency, + ), ) async def _get_task( @@ -239,10 +245,10 @@ async def _execute( request: Request[Any, Any, Any], context: CoordinationContext, task_id: str, - ) -> CoordinationResult: + ) -> CoordinationResultWithAttribution: """Run coordination and publish WS events.""" try: - result = await app_state.coordinator.coordinate(context) + attributed = await app_state.coordinator.coordinate(context) except CoordinationPhaseError as exc: logger.warning( API_COORDINATION_FAILED, @@ -278,7 +284,7 @@ async def _execute( ws_event_type = ( WsEventType.COORDINATION_COMPLETED - if result.is_success + if attributed.is_success else WsEventType.COORDINATION_FAILED ) _publish_ws_event( @@ -286,23 +292,25 @@ async def _execute( ws_event_type, { "task_id": task_id, - "topology": result.topology.value, - "is_success": result.is_success, - "total_duration_seconds": result.total_duration_seconds, + "topology": attributed.result.topology.value, + "is_success": attributed.is_success, + "total_duration_seconds": attributed.result.total_duration_seconds, }, ) log_event = ( - API_COORDINATION_COMPLETED if result.is_success else API_COORDINATION_FAILED + API_COORDINATION_COMPLETED + if attributed.is_success + else API_COORDINATION_FAILED ) - log_fn = logger.info if result.is_success else logger.warning + log_fn = logger.info if attributed.is_success else logger.warning log_fn( log_event, task_id=task_id, - topology=result.topology.value, - is_success=result.is_success, - total_duration_seconds=result.total_duration_seconds, + topology=attributed.result.topology.value, + is_success=attributed.is_success, + total_duration_seconds=attributed.result.total_duration_seconds, ) - return result + return attributed async def _resolve_agents( self, diff --git a/src/synthorg/engine/__init__.py b/src/synthorg/engine/__init__.py index b80272288b..99adc57b50 100644 --- a/src/synthorg/engine/__init__.py +++ b/src/synthorg/engine/__init__.py @@ -51,12 +51,14 @@ AgentContextSnapshot, ) from synthorg.engine.coordination import ( + AgentContribution, CentralizedDispatcher, ContextDependentDispatcher, CoordinationConfig, CoordinationContext, CoordinationPhaseResult, CoordinationResult, + CoordinationResultWithAttribution, CoordinationWave, DecentralizedDispatcher, DispatchResult, @@ -225,6 +227,7 @@ "AgentAssignment", "AgentContext", "AgentContextSnapshot", + "AgentContribution", "AgentEngine", "AgentOutcome", "AgentRunResult", @@ -256,6 +259,7 @@ "CoordinationPhaseError", "CoordinationPhaseResult", "CoordinationResult", + "CoordinationResultWithAttribution", "CoordinationWave", "CostOptimizedAssignmentStrategy", "CreateTaskData", diff --git a/src/synthorg/engine/agent_engine.py b/src/synthorg/engine/agent_engine.py index fd4cac94a3..42c2fdc83d 100644 --- a/src/synthorg/engine/agent_engine.py +++ b/src/synthorg/engine/agent_engine.py @@ -117,9 +117,11 @@ from synthorg.core.agent import AgentIdentity from synthorg.core.task import Task from synthorg.engine.compaction import CompactionCallback + from synthorg.engine.coordination.attribution import ( + CoordinationResultWithAttribution, + ) from synthorg.engine.coordination.models import ( CoordinationContext, - CoordinationResult, ) from synthorg.engine.coordination.service import MultiAgentCoordinator from synthorg.engine.hybrid_models import HybridLoopConfig @@ -452,14 +454,14 @@ def coordinator(self) -> MultiAgentCoordinator | None: async def coordinate( self, context: CoordinationContext, - ) -> CoordinationResult: + ) -> CoordinationResultWithAttribution: """Delegate to the multi-agent coordinator. Args: context: Coordination context with task, agents, and config. Returns: - Coordination result with all phase outcomes. + Coordination result with per-agent attribution data. Raises: ExecutionStateError: If no coordinator is configured. diff --git a/src/synthorg/engine/compaction/epistemic.py b/src/synthorg/engine/compaction/epistemic.py new file mode 100644 index 0000000000..11c046e4a7 --- /dev/null +++ b/src/synthorg/engine/compaction/epistemic.py @@ -0,0 +1,133 @@ +"""Epistemic marker detection for compaction summaries. + +Detects reasoning markers (hedging, reconsideration, uncertainty, +verification, correction) in assistant messages. Messages with +high marker density are preserved during compaction to maintain +reasoning chain integrity. + +Reference: arXiv:2603.24472 -- removing epistemic markers degrades +accuracy by up to 63% on complex reasoning tasks. +""" + +import re + +from synthorg.core.enums import Complexity + +# Precompiled case-insensitive word-boundary patterns grouped by type. +_HEDGING = re.compile(r"\b(wait|hmm|hm|ah)\b", re.IGNORECASE) +_RECONSIDERATION = re.compile( + r"\b(actually|let me reconsider|on second thought|I was wrong)\b", + re.IGNORECASE, +) +_UNCERTAINTY = re.compile( + r"\b(perhaps|alternatively|I'm not sure|uncertain)\b", + re.IGNORECASE, +) +_VERIFICATION = re.compile( + r"\b(check|verify|double-check|let me verify)\b", + re.IGNORECASE, +) +_CORRECTION = re.compile( + r"\b(but wait|actually no|hold on)\b", + re.IGNORECASE, +) + +EPISTEMIC_PATTERNS: tuple[re.Pattern[str], ...] = ( + _HEDGING, + _RECONSIDERATION, + _UNCERTAINTY, + _VERIFICATION, + _CORRECTION, +) + +# Sentence-splitting regex: split on period, question mark, exclamation, +# or newline followed by optional whitespace. +_SENTENCE_SPLIT = re.compile(r"[.?!]\s+|\n+") + +# Complexity levels that use the low-threshold (>= 1 marker). +_HIGH_COMPLEXITY = frozenset({Complexity.COMPLEX, Complexity.EPIC}) + +# Marker count thresholds per complexity tier. +_COMPLEX_THRESHOLD = 1 +_SIMPLE_THRESHOLD = 3 + + +def count_epistemic_markers(text: str) -> int: + """Count distinct epistemic pattern matches in text. + + Each pattern is counted once regardless of how many times it + matches within the text. + + Args: + text: Input text to scan. + + Returns: + Number of distinct patterns that matched (0 to 5). + """ + return sum(1 for p in EPISTEMIC_PATTERNS if p.search(text)) + + +def should_preserve_message( + text: str, + complexity: Complexity, +) -> bool: + """Decide whether a message should be preserved during compaction. + + Uses complexity-adaptive thresholds: + - COMPLEX/EPIC: preserve if >= 1 epistemic marker + - SIMPLE/MEDIUM: preserve if >= 3 epistemic markers + + Args: + text: Message content. + complexity: Task complexity level. + + Returns: + True if the message should be preserved verbatim. + """ + count = count_epistemic_markers(text) + threshold = ( + _COMPLEX_THRESHOLD if complexity in _HIGH_COMPLEXITY else _SIMPLE_THRESHOLD + ) + return count >= threshold + + +def extract_marker_sentences( + text: str, + *, + max_chars: int = 200, +) -> str: + """Extract sentences containing epistemic markers. + + Splits text on sentence boundaries (period, question mark, + exclamation, newline) and collects sentences where at least + one epistemic pattern matches. Truncates the result to + *max_chars*. + + Args: + text: Input text. + max_chars: Maximum character length of the result. + + Returns: + Joined marker-containing sentences, truncated if needed. + """ + sentences = _SENTENCE_SPLIT.split(text) + marker_sentences: list[str] = [] + total_len = 0 + + for sentence in sentences: + stripped = sentence.strip() + if not stripped: + continue + if any(p.search(stripped) for p in EPISTEMIC_PATTERNS): + if total_len + len(stripped) + 2 > max_chars: + break + marker_sentences.append(stripped) + total_len += len(stripped) + 2 # +2 for "; " separator + + if not marker_sentences: + return "" + + joined = "; ".join(marker_sentences) + if len(joined) > max_chars: + return joined[:max_chars] + "..." + return joined diff --git a/src/synthorg/engine/compaction/models.py b/src/synthorg/engine/compaction/models.py index 94b88ad39e..86df04ca61 100644 --- a/src/synthorg/engine/compaction/models.py +++ b/src/synthorg/engine/compaction/models.py @@ -4,12 +4,19 @@ immutability convention. """ -from pydantic import BaseModel, ConfigDict, Field +from typing import Self + +from pydantic import BaseModel, ConfigDict, Field, model_validator class CompactionConfig(BaseModel): """Configuration for context compaction behavior. + When ``agent_controlled`` is ``True``, automatic compaction uses + ``safety_threshold_percent`` instead of ``fill_threshold_percent``, + allowing agents to manage compaction via the ``compact_context`` + tool while retaining a safety net. + Attributes: fill_threshold_percent: Context fill percentage that triggers compaction (e.g. 80.0 means compact when 80% full). @@ -17,6 +24,12 @@ class CompactionConfig(BaseModel): messages required before compaction is allowed. preserve_recent_turns: Number of recent turn pairs to keep uncompressed after compaction. + agent_controlled: Enable agent-initiated compaction via the + ``compact_context`` tool. + safety_threshold_percent: Auto-compaction threshold when + ``agent_controlled`` is ``True`` (safety net). + preserve_epistemic_markers: Detect and preserve epistemic + markers (hedging, reconsideration, etc.) in summaries. """ model_config = ConfigDict(frozen=True, allow_inf_nan=False) @@ -37,6 +50,40 @@ class CompactionConfig(BaseModel): ge=1, description="Recent turn pairs to keep uncompressed", ) + agent_controlled: bool = Field( + default=False, + description=( + "Enable agent-initiated compaction via compact_context tool. " + "When True, auto-compaction uses safety_threshold_percent." + ), + ) + safety_threshold_percent: float = Field( + default=95.0, + gt=0.0, + le=100.0, + description=( + "Auto-compaction threshold when agent_controlled=True (safety net)." + ), + ) + preserve_epistemic_markers: bool = Field( + default=True, + description=("Detect and preserve epistemic markers in compaction summaries."), + ) + + @model_validator(mode="after") + def _validate_safety_above_fill(self) -> Self: + """Safety threshold must exceed fill threshold when agent-controlled.""" + if ( + self.agent_controlled + and self.safety_threshold_percent <= self.fill_threshold_percent + ): + msg = ( + f"safety_threshold_percent ({self.safety_threshold_percent}) " + f"must be greater than fill_threshold_percent " + f"({self.fill_threshold_percent}) when agent_controlled=True" + ) + raise ValueError(msg) + return self class CompressionMetadata(BaseModel): diff --git a/src/synthorg/engine/compaction/summarizer.py b/src/synthorg/engine/compaction/summarizer.py index ee6bbf92cb..af3ef51419 100644 --- a/src/synthorg/engine/compaction/summarizer.py +++ b/src/synthorg/engine/compaction/summarizer.py @@ -7,6 +7,11 @@ from typing import TYPE_CHECKING +from synthorg.core.enums import Complexity +from synthorg.engine.compaction.epistemic import ( + extract_marker_sentences, + should_preserve_message, +) from synthorg.engine.compaction.models import ( CompactionConfig, CompressionMetadata, @@ -79,7 +84,12 @@ def _do_compaction( New compacted ``AgentContext`` or ``None`` if no compaction needed. """ fill_pct = ctx.context_fill_percent - if fill_pct is None or fill_pct < config.fill_threshold_percent: + effective_threshold = ( + config.safety_threshold_percent + if config.agent_controlled + else config.fill_threshold_percent + ) + if fill_pct is None or fill_pct < effective_threshold: return None conversation = ctx.conversation @@ -105,12 +115,15 @@ def _do_compaction( return None head, archivable, recent = split + task_complexity = _extract_task_complexity(ctx) compressed, metadata, summary_tokens = _compress( ctx, head, archivable, recent, estimator, + preserve_markers=config.preserve_epistemic_markers, + task_complexity=task_complexity, ) # Re-estimate fill with compressed conversation. Counts @@ -173,18 +186,26 @@ def _split_conversation( return head, archivable, recent -def _compress( +def _compress( # noqa: PLR0913 ctx: AgentContext, head: tuple[ChatMessage, ...], archivable: tuple[ChatMessage, ...], recent: tuple[ChatMessage, ...], estimator: PromptTokenEstimator, + *, + preserve_markers: bool, + task_complexity: Complexity, ) -> tuple[tuple[ChatMessage, ...], CompressionMetadata, int]: """Build compressed conversation and metadata. Returns ``(compressed_conversation, metadata, summary_tokens)``. """ - summary_text = _build_summary(archivable, ctx.execution_id) + summary_text = _build_summary( + archivable, + ctx.execution_id, + preserve_markers=preserve_markers, + task_complexity=task_complexity, + ) summary_msg = ChatMessage( role=MessageRole.SYSTEM, content=summary_text, @@ -206,33 +227,67 @@ def _compress( return compressed, metadata, summary_tokens +def _extract_task_complexity(ctx: AgentContext) -> Complexity: + """Extract task complexity from context, defaulting to COMPLEX.""" + task_exec = getattr(ctx, "task_execution", None) + if task_exec is not None: + task = getattr(task_exec, "task", None) + if task is not None: + complexity = getattr(task, "estimated_complexity", None) + if isinstance(complexity, Complexity): + return complexity + return Complexity.COMPLEX + + def _build_summary( messages: tuple[ChatMessage, ...], execution_id: str, + *, + preserve_markers: bool, + task_complexity: Complexity, ) -> str: - """Build a simple text summary from archived messages. + """Build a text summary from archived messages. - Concatenates sanitized assistant message content snippets into a - summary paragraph, capped at ``_MAX_SUMMARY_CHARS``. Each snippet - is redacted for file paths and URLs via ``sanitize_message``. + When ``preserve_markers`` is True, assistant messages with + epistemic markers (hedging, reconsideration, etc.) are preserved + as marker-containing sentences instead of being sanitized down + to 100-char snippets. Args: messages: The archived messages to summarize. execution_id: Execution identifier for log correlation. + preserve_markers: Whether to preserve epistemic markers. + task_complexity: Task complexity for marker thresholds. Returns: Summary text describing the archived conversation. """ snippets: list[str] = [] + preserved_count = 0 + for msg in messages: - if msg.role == MessageRole.ASSISTANT and msg.content: - cleaned = msg.content.replace("\n", " ").strip() - if cleaned: - snippet = sanitize_message(cleaned, max_length=100) - snippets.append(snippet) - - # Drop useless "details redacted" placeholders so the summary - # retains only meaningful content. + if msg.role != MessageRole.ASSISTANT or not msg.content: + continue + cleaned = msg.content.replace("\n", " ").strip() + if not cleaned: + continue + + # Check for epistemic markers worth preserving. + if preserve_markers and should_preserve_message( + cleaned, + task_complexity, + ): + marker_text = extract_marker_sentences(cleaned) + if marker_text: + snippets.append(marker_text) + preserved_count += 1 + continue + + # Standard sanitized snippet. + snippet = sanitize_message(cleaned, max_length=100) + snippets.append(snippet) + + # Drop useless "details redacted" placeholders. useful = [s for s in snippets if s != "details redacted"] if not useful: logger.debug( @@ -247,4 +302,79 @@ def _build_summary( if len(joined) > _MAX_SUMMARY_CHARS: joined = joined[:_MAX_SUMMARY_CHARS] + "..." + if preserved_count > 0: + return ( + f"[Archived {len(messages)} messages. " + f"Epistemic markers preserved from " + f"{preserved_count} messages. " + f"Summary: {joined}]" + ) return f"[Archived {len(messages)} messages. Summary of prior work: {joined}]" + + +def force_compaction( + ctx: AgentContext, + config: CompactionConfig, + estimator: PromptTokenEstimator, +) -> AgentContext | None: + """Compact context without checking the fill threshold. + + Used when an agent explicitly requests compaction via the + ``compact_context`` tool. Skips the threshold check but + still enforces minimum message count and recent turn + preservation. + + Args: + ctx: Current agent context. + config: Compaction configuration. + estimator: Token estimator. + + Returns: + Compacted context, or ``None`` if too few messages. + """ + conversation = ctx.conversation + if len(conversation) < config.min_messages_to_compact: + logger.debug( + CONTEXT_BUDGET_COMPACTION_SKIPPED, + execution_id=ctx.execution_id, + reason="too_few_messages_for_forced_compaction", + message_count=len(conversation), + ) + return None + + logger.info( + CONTEXT_BUDGET_COMPACTION_STARTED, + execution_id=ctx.execution_id, + fill_percent=ctx.context_fill_percent, + message_count=len(conversation), + forced=True, + ) + + split = _split_conversation(ctx, config) + if split is None: + return None + head, archivable, recent = split + + task_complexity = _extract_task_complexity(ctx) + compressed, metadata, summary_tokens = _compress( + ctx, + head, + archivable, + recent, + estimator, + preserve_markers=config.preserve_epistemic_markers, + task_complexity=task_complexity, + ) + + new_fill = estimator.estimate_conversation_tokens(compressed) + logger.info( + CONTEXT_BUDGET_COMPACTION_COMPLETED, + execution_id=ctx.execution_id, + original_messages=len(conversation), + compacted_messages=len(compressed), + archived_turns=metadata.archived_turns, + summary_tokens=summary_tokens, + compactions_total=metadata.compactions_performed, + forced=True, + ) + return ctx.with_compression(metadata, compressed, new_fill) diff --git a/src/synthorg/engine/coordination/__init__.py b/src/synthorg/engine/coordination/__init__.py index 61422aa6a9..05784608da 100644 --- a/src/synthorg/engine/coordination/__init__.py +++ b/src/synthorg/engine/coordination/__init__.py @@ -5,6 +5,12 @@ dispatchers. """ +from synthorg.engine.coordination.attribution import ( + AgentContribution, + CoordinationResultWithAttribution, + FailureAttribution, + build_agent_contributions, +) from synthorg.engine.coordination.config import CoordinationConfig from synthorg.engine.coordination.dispatchers import ( CentralizedDispatcher, @@ -27,19 +33,23 @@ from synthorg.engine.coordination.service import MultiAgentCoordinator __all__ = [ + "AgentContribution", "CentralizedDispatcher", "ContextDependentDispatcher", "CoordinationConfig", "CoordinationContext", "CoordinationPhaseResult", "CoordinationResult", + "CoordinationResultWithAttribution", "CoordinationSectionConfig", "CoordinationWave", "DecentralizedDispatcher", "DispatchResult", + "FailureAttribution", "MultiAgentCoordinator", "SasDispatcher", "TopologyDispatcher", + "build_agent_contributions", "build_coordinator", "build_execution_waves", "select_dispatcher", diff --git a/src/synthorg/engine/coordination/attribution.py b/src/synthorg/engine/coordination/attribution.py new file mode 100644 index 0000000000..139cc52500 --- /dev/null +++ b/src/synthorg/engine/coordination/attribution.py @@ -0,0 +1,273 @@ +"""Structural credit assignment for coordinated multi-agent execution. + +Provides per-agent contribution scoring and failure attribution +without modifying the frozen ``CoordinationResult``. The wrapper +``CoordinationResultWithAttribution`` pairs the original result +with attribution data built from routing decisions and wave outcomes. +""" + +from typing import Literal, Self + +from pydantic import BaseModel, ConfigDict, Field, computed_field, model_validator + +from synthorg.core.enums import FailureCategory +from synthorg.core.types import NotBlankStr +from synthorg.engine.coordination.models import ( # noqa: TC001 + CoordinationResult, + CoordinationWave, +) +from synthorg.engine.loop_protocol import TerminationReason +from synthorg.engine.recovery import infer_failure_category +from synthorg.engine.routing.models import RoutingResult # noqa: TC001 +from synthorg.observability import get_logger +from synthorg.observability.events.coordination import ( + COORDINATION_ATTRIBUTION_BUILT, +) + +logger = get_logger(__name__) + +FailureAttribution = Literal[ + "direct", + "upstream_contamination", + "coordination_overhead", + "quality_gate", +] + +_MAX_EVIDENCE_LENGTH = 500 + +# Map FailureCategory -> FailureAttribution for error-based outcomes. +_CATEGORY_TO_ATTRIBUTION: dict[FailureCategory, FailureAttribution] = { + FailureCategory.TOOL_FAILURE: "direct", + FailureCategory.STAGNATION: "direct", + FailureCategory.TIMEOUT: "direct", + FailureCategory.DELEGATION_FAILED: "direct", + FailureCategory.BUDGET_EXCEEDED: "coordination_overhead", + FailureCategory.QUALITY_GATE_FAILED: "quality_gate", + FailureCategory.UNKNOWN: "direct", +} + +# Map TerminationReason -> FailureAttribution for non-success runs. +_TERMINATION_TO_ATTRIBUTION: dict[TerminationReason, FailureAttribution] = { + TerminationReason.STAGNATION: "direct", + TerminationReason.BUDGET_EXHAUSTED: "coordination_overhead", + TerminationReason.MAX_TURNS: "coordination_overhead", + TerminationReason.ERROR: "direct", + TerminationReason.SHUTDOWN: "coordination_overhead", +} + + +class AgentContribution(BaseModel): + """Per-agent contribution to a coordinated task execution. + + Attributes: + agent_id: Identifier of the contributing agent. + subtask_id: Identifier of the subtask this agent executed. + contribution_score: Normalized score (0.0-1.0) reflecting + the agent's contribution quality. + failure_attribution: Classification of why the agent failed + (``None`` when the agent succeeded with score 1.0). + evidence: Truncated error message or evidence pointer + (``None`` when the agent succeeded). + """ + + model_config = ConfigDict(frozen=True, allow_inf_nan=False) + + agent_id: NotBlankStr = Field(description="Contributing agent") + subtask_id: NotBlankStr = Field(description="Subtask executed") + contribution_score: float = Field( + ge=0.0, + le=1.0, + description="Contribution quality (0.0-1.0)", + ) + failure_attribution: FailureAttribution | None = Field( + default=None, + description="Why the agent failed (None on success)", + ) + evidence: str | None = Field( + default=None, + max_length=_MAX_EVIDENCE_LENGTH, + description="Truncated error or evidence pointer", + ) + + @model_validator(mode="after") + def _validate_score_attribution_consistency(self) -> Self: + """Score < 1.0 requires failure_attribution; 1.0 forbids it.""" + if self.contribution_score < 1.0 and self.failure_attribution is None: + msg = ( + "failure_attribution must be set when " + f"contribution_score ({self.contribution_score}) < 1.0" + ) + raise ValueError(msg) + if self.contribution_score == 1.0 and self.failure_attribution is not None: + msg = "failure_attribution must be None when contribution_score is 1.0" + raise ValueError(msg) + return self + + +class CoordinationResultWithAttribution(BaseModel): + """Immutable wrapper pairing a CoordinationResult with attribution. + + Preserves the frozen ``CoordinationResult`` contract while adding + per-agent contribution data for structural credit assignment. + + Attributes: + result: The original coordination result (unmodified). + agent_contributions: Per-agent contribution records. + """ + + model_config = ConfigDict(frozen=True, allow_inf_nan=False) + + result: CoordinationResult = Field( + description="Original coordination result", + ) + agent_contributions: tuple[AgentContribution, ...] = Field( + default=(), + description="Per-agent contributions", + ) + + @computed_field( # type: ignore[prop-decorator] + description="Whether all phases succeeded", + ) + @property + def is_success(self) -> bool: + """Delegate to the wrapped result.""" + return self.result.is_success + + @computed_field( # type: ignore[prop-decorator] + description="Average contribution score", + ) + @property + def total_contribution_score(self) -> float: + """Average of contribution scores, 0.0 when empty.""" + if not self.agent_contributions: + return 0.0 + total = sum(c.contribution_score for c in self.agent_contributions) + return total / len(self.agent_contributions) + + +def build_agent_contributions( + routing_result: RoutingResult, + waves: tuple[CoordinationWave, ...], +) -> tuple[AgentContribution, ...]: + """Build contribution records from routing and execution data. + + Walks routing decisions to establish agent-to-subtask bindings, + then inspects wave outcomes to determine each agent's score and + failure classification. + + Args: + routing_result: Routing decisions mapping agents to subtasks. + waves: Executed coordination waves with outcomes. + + Returns: + Tuple of ``AgentContribution`` records, one per agent outcome. + """ + # Build agent->subtask lookup from routing decisions. + agent_to_subtask: dict[str, str] = {} + for decision in routing_result.decisions: + agent_id = str(decision.selected_candidate.agent_identity.id) + agent_to_subtask[agent_id] = str(decision.subtask_id) + + contributions: list[AgentContribution] = [] + + for wave in waves: + if wave.execution_result is None: + continue + for outcome in wave.execution_result.outcomes: + subtask_id = agent_to_subtask.get( + str(outcome.agent_id), + str(outcome.task_id), + ) + contrib = _score_outcome( + agent_id=str(outcome.agent_id), + subtask_id=subtask_id, + outcome_result=outcome.result, + outcome_error=outcome.error, + ) + contributions.append(contrib) + + result = tuple(contributions) + + if result: + success_count = sum(1 for c in result if c.contribution_score == 1.0) + avg_score = sum(c.contribution_score for c in result) / len(result) + logger.info( + COORDINATION_ATTRIBUTION_BUILT, + agent_count=len(result), + success_count=success_count, + avg_score=round(avg_score, 3), + ) + + return result + + +def _score_outcome( + *, + agent_id: str, + subtask_id: str, + outcome_result: object | None, + outcome_error: str | None, +) -> AgentContribution: + """Score a single agent outcome. + + Args: + agent_id: Agent identifier. + subtask_id: Subtask identifier. + outcome_result: AgentRunResult if execution completed, else None. + outcome_error: Error string if agent failed before execution. + + Returns: + An ``AgentContribution`` with score and attribution. + """ + # Case 1: Agent failed with an error string (no execution at all). + if outcome_error is not None: + category = infer_failure_category(outcome_error) + attribution = _CATEGORY_TO_ATTRIBUTION.get(category, "direct") + return AgentContribution( + agent_id=NotBlankStr(agent_id), + subtask_id=NotBlankStr(subtask_id), + contribution_score=0.0, + failure_attribution=attribution, + evidence=outcome_error[:_MAX_EVIDENCE_LENGTH], + ) + + # Case 2: Execution completed -- check if successful. + # outcome_result is AgentRunResult but typed as object to avoid + # circular import; access attributes dynamically. + if outcome_result is not None: + is_success = getattr(outcome_result, "is_success", False) + if is_success: + return AgentContribution( + agent_id=NotBlankStr(agent_id), + subtask_id=NotBlankStr(subtask_id), + contribution_score=1.0, + ) + + # Non-success termination. + termination_reason: TerminationReason | None = getattr( + outcome_result, "termination_reason", None + ) + failure_attr: FailureAttribution = "direct" + if termination_reason is not None: + failure_attr = _TERMINATION_TO_ATTRIBUTION.get(termination_reason, "direct") + exec_result = getattr(outcome_result, "execution_result", None) + error_text = "" + if exec_result is not None: + error_text = getattr(exec_result, "error_message", "") or "" + + return AgentContribution( + agent_id=NotBlankStr(agent_id), + subtask_id=NotBlankStr(subtask_id), + contribution_score=0.0, + failure_attribution=failure_attr, + evidence=error_text[:_MAX_EVIDENCE_LENGTH] or None, + ) + + # Should not reach here -- AgentOutcome requires result XOR error. + return AgentContribution( + agent_id=NotBlankStr(agent_id), + subtask_id=NotBlankStr(subtask_id), + contribution_score=0.0, + failure_attribution="direct", + evidence="No result or error in outcome", + ) diff --git a/src/synthorg/engine/coordination/service.py b/src/synthorg/engine/coordination/service.py index 98374cb801..a1f4b6bae2 100644 --- a/src/synthorg/engine/coordination/service.py +++ b/src/synthorg/engine/coordination/service.py @@ -10,6 +10,10 @@ from uuid import uuid4 from synthorg.core.enums import CoordinationTopology, TaskStatus +from synthorg.engine.coordination.attribution import ( + CoordinationResultWithAttribution, + build_agent_contributions, +) from synthorg.engine.coordination.dispatchers import ( DispatchResult, select_dispatcher, @@ -94,7 +98,7 @@ def __init__( async def coordinate( self, context: CoordinationContext, - ) -> CoordinationResult: + ) -> CoordinationResultWithAttribution: """Run the full multi-agent coordination pipeline. Pipeline: @@ -105,12 +109,14 @@ async def coordinate( 5. Select dispatcher and execute waves. 6. Rollup subtask statuses. 7. Update parent task via TaskEngine (if provided). + 8. Build per-agent attribution from routing + outcomes. Args: context: Coordination context with task, agents, and config. Returns: - CoordinationResult with all phase outcomes. + CoordinationResultWithAttribution wrapping the result + with per-agent contribution data. Raises: CoordinationPhaseError: When a critical phase fails. @@ -195,7 +201,16 @@ async def coordinate( total_cost_usd=total_cost, ) - return result + # Phase 8: Build per-agent attribution. + contributions = build_agent_contributions( + routing_result, + dispatch_result.waves, + ) + + return CoordinationResultWithAttribution( + result=result, + agent_contributions=contributions, + ) async def _phase_decompose( self, diff --git a/src/synthorg/engine/loop_helpers.py b/src/synthorg/engine/loop_helpers.py index 7a40822896..63da05a370 100644 --- a/src/synthorg/engine/loop_helpers.py +++ b/src/synthorg/engine/loop_helpers.py @@ -40,6 +40,7 @@ from .loop_protocol import ( BudgetChecker, ExecutionResult, + NodeType, ShutdownChecker, TerminationReason, TurnRecord, @@ -466,9 +467,15 @@ def make_turn_record( *, call_category: LLMCallCategory | None = None, provider_metadata: dict[str, object] | None = None, + extra_node_types: tuple[NodeType, ...] = (), ) -> TurnRecord: """Create a ``TurnRecord`` from a provider response. + Automatically derives ``LLM_CALL`` (always) and ``TOOL_INVOCATION`` + (when tool calls are present). Callers pass additional node types + via *extra_node_types* for checks that ran this turn (quality, + budget, stagnation). + Args: turn_number: 1-indexed turn number. response: Provider completion response. @@ -478,6 +485,8 @@ def make_turn_record( ``_synthorg_latency_ms``, ``_synthorg_cache_hit``, ``_synthorg_retry_count``, and ``_synthorg_retry_reason`` are extracted when present. + extra_node_types: Additional node types beyond the + auto-derived LLM_CALL and TOOL_INVOCATION. """ md = provider_metadata or {} latency_ms_raw = md.get("_synthorg_latency_ms") @@ -501,6 +510,12 @@ def make_turn_record( if isinstance(retry_reason_raw, str): retry_reason = retry_reason_raw + # Auto-derive base node types from response content. + derived: list[NodeType] = [NodeType.LLM_CALL] + if response.tool_calls: + derived.append(NodeType.TOOL_INVOCATION) + node_types = tuple(derived) + extra_node_types + return TurnRecord( turn_number=turn_number, input_tokens=response.usage.input_tokens, @@ -514,6 +529,7 @@ def make_turn_record( cache_hit=cache_hit, retry_count=retry_count, retry_reason=retry_reason, + node_types=node_types, ) diff --git a/src/synthorg/engine/loop_protocol.py b/src/synthorg/engine/loop_protocol.py index 9c4a7f3cc2..62028599a2 100644 --- a/src/synthorg/engine/loop_protocol.py +++ b/src/synthorg/engine/loop_protocol.py @@ -25,6 +25,21 @@ from synthorg.tools.invoker import ToolInvoker +class NodeType(StrEnum): + """Type of computation node executed within a turn. + + Used for structural credit assignment and post-hoc trace analysis. + Each turn records which node types executed, enabling fine-grained + attribution of costs and failures. + """ + + LLM_CALL = "llm_call" + TOOL_INVOCATION = "tool_invocation" + QUALITY_CHECK = "quality_check" + BUDGET_CHECK = "budget_check" + STAGNATION_CHECK = "stagnation_check" + + class TerminationReason(StrEnum): """Why the execution loop terminated.""" @@ -56,6 +71,9 @@ class TurnRecord(BaseModel): cache_hit: Whether the provider served this turn from cache. retry_count: Number of retry attempts before success. retry_reason: Exception type name of the last retried error. + node_types: Node types that executed in this turn (e.g. + LLM_CALL, TOOL_INVOCATION). Defaults to empty for + deserialization of legacy data. success: Whether this turn completed without error or content filter (computed). """ @@ -98,6 +116,10 @@ class TurnRecord(BaseModel): default=None, description="Exception type name of the last retried error", ) + node_types: tuple[NodeType, ...] = Field( + default=(), + description="Node types that executed in this turn", + ) @model_validator(mode="after") def _validate_retry_consistency(self) -> Self: diff --git a/src/synthorg/hr/performance/tracker.py b/src/synthorg/hr/performance/tracker.py index e1f9075d47..fa628133e8 100644 --- a/src/synthorg/hr/performance/tracker.py +++ b/src/synthorg/hr/performance/tracker.py @@ -32,6 +32,7 @@ from pydantic import AwareDatetime from synthorg.core.task import AcceptanceCriterion + from synthorg.engine.coordination.attribution import AgentContribution from synthorg.hr.performance.collaboration_override_store import ( CollaborationOverrideStore, ) @@ -96,6 +97,7 @@ def __init__( # noqa: PLR0913 self._quality_override_store = quality_override_store self._task_metrics: dict[str, list[TaskMetricRecord]] = {} self._collab_metrics: dict[str, list[CollaborationMetricRecord]] = {} + self._contributions: dict[str, list[AgentContribution]] = {} self._background_tasks: set[asyncio.Task[None]] = set() @staticmethod @@ -177,6 +179,32 @@ async def record_task_metric( ) return record + def record_coordination_contributions( + self, + contributions: tuple[AgentContribution, ...], + ) -> None: + """Store per-agent contributions from coordination. + + Args: + contributions: Attribution records from a coordinated run. + """ + for contrib in contributions: + agent_key = str(contrib.agent_id) + if agent_key not in self._contributions: + self._contributions[agent_key] = [] + self._contributions[agent_key].append(contrib) + + if contributions: + logger.info( + PERF_METRIC_RECORDED, + contribution_count=len(contributions), + avg_score=round( + sum(c.contribution_score for c in contributions) + / len(contributions), + 3, + ), + ) + async def score_task_quality( self, *, diff --git a/src/synthorg/observability/events/context_budget.py b/src/synthorg/observability/events/context_budget.py index 2b3e9b0de2..650f3faf72 100644 --- a/src/synthorg/observability/events/context_budget.py +++ b/src/synthorg/observability/events/context_budget.py @@ -16,3 +16,11 @@ # Indicator injection CONTEXT_BUDGET_INDICATOR_INJECTED: Final[str] = "context_budget.indicator.injected" + +# Agent-controlled compaction +CONTEXT_BUDGET_AGENT_COMPACTION_REQUESTED: Final[str] = ( + "context_budget.agent_compaction.requested" +) +CONTEXT_BUDGET_EPISTEMIC_MARKERS_PRESERVED: Final[str] = ( + "context_budget.epistemic_markers.preserved" +) diff --git a/src/synthorg/observability/events/coordination.py b/src/synthorg/observability/events/coordination.py index c7170fdb93..8eeb835634 100644 --- a/src/synthorg/observability/events/coordination.py +++ b/src/synthorg/observability/events/coordination.py @@ -16,3 +16,4 @@ COORDINATION_CLEANUP_FAILED: Final[str] = "coordination.cleanup.failed" COORDINATION_WAVE_BUILT: Final[str] = "coordination.wave.built" COORDINATION_FACTORY_BUILT: Final[str] = "coordination.factory.built" +COORDINATION_ATTRIBUTION_BUILT: Final[str] = "coordination.attribution.built" diff --git a/src/synthorg/tools/context/__init__.py b/src/synthorg/tools/context/__init__.py new file mode 100644 index 0000000000..5a800fba9f --- /dev/null +++ b/src/synthorg/tools/context/__init__.py @@ -0,0 +1 @@ +"""Context management tools (compaction, etc.).""" diff --git a/src/synthorg/tools/context/compact_context.py b/src/synthorg/tools/context/compact_context.py new file mode 100644 index 0000000000..3ea24255b6 --- /dev/null +++ b/src/synthorg/tools/context/compact_context.py @@ -0,0 +1,114 @@ +"""Agent-controlled context compaction tool. + +Allows agents to explicitly request context compaction when context +fill is high and reasoning clarity is critical. The tool signals +intent via metadata -- it does NOT mutate the frozen AgentContext +directly. The execution loop detects the directive and invokes +compaction at the turn boundary. +""" + +from typing import Any + +from synthorg.core.enums import ToolCategory +from synthorg.observability import get_logger +from synthorg.observability.events.context_budget import ( + CONTEXT_BUDGET_AGENT_COMPACTION_REQUESTED, +) +from synthorg.tools.base import BaseTool, ToolExecutionResult + +logger = get_logger(__name__) + +_COMPACT_CONTEXT_SCHEMA: dict[str, Any] = { + "type": "object", + "properties": { + "strategy": { + "type": "string", + "enum": ["summarize"], + "description": ( + "Compaction strategy. Currently only 'summarize' is supported." + ), + }, + "preserve_markers": { + "type": "boolean", + "default": True, + "description": ( + "Whether to preserve epistemic markers (wait, hmm, " + "actually, etc.) in the compaction summary." + ), + }, + "reason": { + "type": "string", + "minLength": 10, + "maxLength": 256, + "description": ( + "Brief explanation for why compaction is needed " + "now (e.g., 'context fill at 92 percent, need to " + "preserve reasoning clarity')." + ), + }, + }, + "required": ["strategy", "reason"], + "additionalProperties": False, +} + + +class CompactContextTool(BaseTool): + """Signal context compaction to the execution loop. + + The tool validates arguments and returns a compaction directive + in ``ToolExecutionResult.metadata``. The execution loop detects + the directive and performs actual compaction at the turn boundary. + + This tool is stateless and safe to register unconditionally. + Compaction only triggers when ``CompactionConfig.agent_controlled`` + is enabled in the engine configuration. + """ + + def __init__(self) -> None: + super().__init__( + name="compact_context", + description=( + "Request context compaction when conversation has " + "grown large. Preserves recent turns and creates a " + "summary of older exchanges. Use when context fill " + "is high and accuracy on complex reasoning is " + "critical." + ), + parameters_schema=_COMPACT_CONTEXT_SCHEMA, + category=ToolCategory.MEMORY, + ) + + async def execute( + self, + *, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + """Signal compaction directive via metadata. + + Args: + arguments: Validated tool arguments (strategy, reason, + optionally preserve_markers). + + Returns: + Result with ``compaction_directive`` metadata key. + """ + strategy = arguments.get("strategy", "summarize") + reason = arguments.get("reason", "") + preserve_markers = arguments.get("preserve_markers", True) + + logger.info( + CONTEXT_BUDGET_AGENT_COMPACTION_REQUESTED, + strategy=strategy, + preserve_markers=preserve_markers, + reason=reason, + ) + + return ToolExecutionResult( + content=("Compaction directive accepted. Will execute at turn boundary."), + metadata={ + "compaction_directive": True, + "strategy": strategy, + "preserve_markers": preserve_markers, + "reason": reason, + }, + ) diff --git a/src/synthorg/tools/factory.py b/src/synthorg/tools/factory.py index 2c46a9bdc9..dda34173ad 100644 --- a/src/synthorg/tools/factory.py +++ b/src/synthorg/tools/factory.py @@ -191,6 +191,10 @@ def build_default_tools( # noqa: PLR0913 logger.warning(TOOL_FACTORY_ERROR, error=msg) raise ValueError(msg) + from synthorg.tools.context.compact_context import ( # noqa: PLC0415 + CompactContextTool, + ) + all_tools: list[BaseTool] = [ *_build_file_system_tools(workspace=workspace), *_build_git_tools( @@ -202,6 +206,7 @@ def build_default_tools( # noqa: PLR0913 network_policy=web_network_policy, search_provider=web_search_provider, ), + CompactContextTool(), ] all_tools.extend( diff --git a/tests/unit/api/controllers/test_coordination.py b/tests/unit/api/controllers/test_coordination.py index 6d60adcb6f..fe72c6161e 100644 --- a/tests/unit/api/controllers/test_coordination.py +++ b/tests/unit/api/controllers/test_coordination.py @@ -19,6 +19,9 @@ CoordinationTopology, SeniorityLevel, ) +from synthorg.engine.coordination.attribution import ( + CoordinationResultWithAttribution, +) from synthorg.engine.coordination.models import ( CoordinationPhaseResult, CoordinationResult, @@ -49,21 +52,22 @@ def _make_coordination_result( task_id: str = "task-001", *, is_success: bool = True, -) -> CoordinationResult: - """Build a minimal CoordinationResult for tests.""" +) -> CoordinationResultWithAttribution: + """Build a minimal CoordinationResultWithAttribution for tests.""" phase = CoordinationPhaseResult( phase="decompose", success=is_success, duration_seconds=0.1, error=None if is_success else "test error", ) - return CoordinationResult( + result = CoordinationResult( parent_task_id=task_id, topology=CoordinationTopology.SAS, phases=(phase,), total_duration_seconds=0.5, total_cost_usd=0.01, ) + return CoordinationResultWithAttribution(result=result) @pytest.fixture diff --git a/tests/unit/engine/compaction/test_epistemic.py b/tests/unit/engine/compaction/test_epistemic.py new file mode 100644 index 0000000000..3bca826709 --- /dev/null +++ b/tests/unit/engine/compaction/test_epistemic.py @@ -0,0 +1,189 @@ +"""Tests for epistemic marker detection in compaction summaries.""" + +import pytest + +from synthorg.core.enums import Complexity +from synthorg.engine.compaction.epistemic import ( + count_epistemic_markers, + extract_marker_sentences, + should_preserve_message, +) + + +@pytest.mark.unit +class TestCountEpistemicMarkers: + """Tests for count_epistemic_markers function.""" + + def test_no_markers(self) -> None: + """Text with no markers returns 0.""" + text = "The sky is blue and the grass is green." + assert count_epistemic_markers(text) == 0 + + def test_one_marker_from_each_group(self) -> None: + """Count distinct pattern matches across all 5 groups.""" + # One hedging only: "hmm" + assert count_epistemic_markers("Hmm, that's odd.") == 1 + + # One reconsideration only: "on second thought" + assert count_epistemic_markers("On second thought, no.") == 1 + + # One uncertainty only: "perhaps" + assert count_epistemic_markers("Perhaps we should try.") == 1 + + # One verification only: "verify" + assert count_epistemic_markers("Let me verify this.") == 1 + + # One correction only: "hold on" + assert count_epistemic_markers("Hold on, that's wrong.") == 1 + + def test_multiple_from_same_group_counted_once(self) -> None: + """Multiple matches from same pattern group counted as 1.""" + text = "Wait, hmm, Hmm again. These are all hedging." + # All three are in the hedging group, so count is 1 + assert count_epistemic_markers(text) == 1 + + def test_mixed_markers(self) -> None: + """Multiple distinct patterns counted correctly.""" + text = "Wait, actually I was wrong. Let me verify this." + # "wait" (hedging), "actually" (reconsideration), "verify" (verification) + assert count_epistemic_markers(text) == 3 + + def test_case_insensitive_matching(self) -> None: + """Markers match regardless of case.""" + text1 = "Hmm, that looks odd." + text2 = "HMM, THAT LOOKS ODD." + text3 = "hMm, ThAt LoOkS oDd." + assert count_epistemic_markers(text1) == count_epistemic_markers(text2) + assert count_epistemic_markers(text2) == count_epistemic_markers(text3) + assert count_epistemic_markers(text1) == 1 + + +@pytest.mark.unit +class TestShouldPreserveMessage: + """Tests for should_preserve_message function.""" + + def test_complex_with_one_marker_preserves(self) -> None: + """COMPLEX complexity: preserve if >= 1 marker.""" + text = "Wait, I need to think about this more." + assert should_preserve_message(text, Complexity.COMPLEX) is True + + def test_epic_with_one_marker_preserves(self) -> None: + """EPIC complexity: preserve if >= 1 marker.""" + text = "Hmm, let me reconsider this." + assert should_preserve_message(text, Complexity.EPIC) is True + + def test_simple_with_one_marker_does_not_preserve(self) -> None: + """SIMPLE complexity: preserve only if >= 3 markers.""" + text = "Wait, there's an issue." + assert should_preserve_message(text, Complexity.SIMPLE) is False + + def test_simple_with_three_markers_preserves(self) -> None: + """SIMPLE complexity: preserve if >= 3 markers.""" + text = "Wait, actually, let me verify this carefully." + # "wait" (hedging), "actually" (reconsideration), "verify" (verification) + assert should_preserve_message(text, Complexity.SIMPLE) is True + + def test_medium_with_two_markers_does_not_preserve(self) -> None: + """MEDIUM complexity: preserve only if >= 3 markers.""" + # "hmm" (hedging) + "perhaps" (uncertainty) = 2 groups + text = "Hmm, perhaps we should try a different approach." + assert should_preserve_message(text, Complexity.MEDIUM) is False + + def test_medium_with_three_markers_preserves(self) -> None: + """MEDIUM complexity: preserve if >= 3 markers.""" + text = "Wait, actually I was wrong. Let me verify." + # "wait" (hedging), "actually" (reconsideration), "verify" (verification) + assert should_preserve_message(text, Complexity.MEDIUM) is True + + def test_no_markers_never_preserves(self) -> None: + """Text with no markers never preserves.""" + text = "This is a straightforward statement with no reasoning markers." + assert should_preserve_message(text, Complexity.COMPLEX) is False + assert should_preserve_message(text, Complexity.EPIC) is False + assert should_preserve_message(text, Complexity.SIMPLE) is False + assert should_preserve_message(text, Complexity.MEDIUM) is False + + +@pytest.mark.unit +class TestExtractMarkerSentences: + """Tests for extract_marker_sentences function.""" + + def test_no_markers_returns_empty_string(self) -> None: + """Text with no marker sentences returns empty string.""" + text = "This is a straightforward statement. No reasoning markers here." + result = extract_marker_sentences(text) + assert result == "" + + def test_extract_single_marker_sentence(self) -> None: + """Extract single sentence containing a marker.""" + text = ( + "The plan seems straightforward. " + "Wait, I just realized something. " + "Let me move forward." + ) + result = extract_marker_sentences(text) + assert "Wait, I just realized something" in result + + def test_extract_multiple_marker_sentences(self) -> None: + """Extract multiple sentences with markers and join with '; '.""" + text = ( + "First statement. " + "Wait, let me reconsider. " + "Some filler text here. " + "Actually, I was wrong. " + "More filler. " + "But hold on, there's another issue." + ) + result = extract_marker_sentences(text) + assert "Wait, let me reconsider" in result + assert "Actually, I was wrong" in result + assert "But hold on, there's another issue" in result + # Check they're joined with "; " + assert "; " in result + + def test_truncates_at_max_chars(self) -> None: + """Result is truncated at max_chars with ellipsis.""" + # Create text with many marker sentences + sentences = [ + "Wait, I need to think about something.", + "Actually, let me reconsider this completely.", + "Hmm, this is more complex than I thought.", + "Perhaps we should verify each step carefully.", + "But hold on, there's a critical issue here.", + ] + text = " ".join(sentences) + max_chars = 50 + result = extract_marker_sentences(text, max_chars=max_chars) + + # Should be truncated and have ellipsis + assert len(result) <= max_chars + 3 # max_chars + "..." + if len(result) > max_chars: + assert result.endswith("...") + + def test_respects_max_chars_default(self) -> None: + """Default max_chars is 200.""" + text = "Wait, " + "x" * 300 + result = extract_marker_sentences(text) + # Should be capped at 200 (or slightly more with "; " joining) + assert len(result) <= 210 # Allow some margin for separator + + def test_handles_newlines_as_sentence_boundaries(self) -> None: + """Newlines are treated as sentence boundaries.""" + text = "Normal statement.\nWait, something important here.\nMore text." + result = extract_marker_sentences(text) + assert "Wait, something important here" in result + assert "Normal statement" not in result + + def test_handles_multiple_punctuation_marks(self) -> None: + """Handles sentences ending with ?, !, or periods.""" + text = "Really? Actually, wait! Let me check. Hmm, this needs verification?" + result = extract_marker_sentences(text) + assert "Actually, wait" in result or "Actually" in result + + def test_strips_whitespace_from_sentences(self) -> None: + """Extracted sentences have whitespace stripped.""" + text = " Normal. Wait, important. More. " + result = extract_marker_sentences(text) + # Should not have extra whitespace + assert "Wait, important" in result + assert " " not in result or result.count(" ") <= result.count(" ") diff --git a/tests/unit/engine/compaction/test_summarizer_markers.py b/tests/unit/engine/compaction/test_summarizer_markers.py new file mode 100644 index 0000000000..a1a6da43bc --- /dev/null +++ b/tests/unit/engine/compaction/test_summarizer_markers.py @@ -0,0 +1,267 @@ +"""Tests for epistemic marker preservation in _build_summary.""" + +import pytest + +from synthorg.core.enums import Complexity +from synthorg.engine.compaction.summarizer import _build_summary +from synthorg.providers.enums import MessageRole +from synthorg.providers.models import ChatMessage + + +def _msg(role: MessageRole, content: str) -> ChatMessage: + """Create a chat message.""" + return ChatMessage(role=role, content=content) + + +@pytest.mark.unit +class TestBuildSummaryMarkers: + """Tests for _build_summary with epistemic marker preservation.""" + + def test_no_markers_standard_format(self) -> None: + """Messages without markers produce standard format.""" + messages = ( + _msg(MessageRole.ASSISTANT, "The answer is 42."), + _msg(MessageRole.USER, "Thanks"), + _msg( + MessageRole.ASSISTANT, + "This is a straightforward solution with no reasoning.", + ), + ) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.COMPLEX, + ) + + assert "[Archived 3 messages. Summary of prior work:" in summary + + def test_with_markers_complex_preserved(self) -> None: + """Message with marker + COMPLEX complexity -> preserved.""" + messages = (_msg(MessageRole.ASSISTANT, "Wait, I need to reconsider this."),) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.COMPLEX, + ) + + # Should have epistemic markers preserved format + assert "Epistemic markers preserved" in summary + assert "Wait" in summary or "reconsider" in summary + + def test_with_markers_epic_preserved(self) -> None: + """Message with marker + EPIC complexity -> preserved.""" + messages = (_msg(MessageRole.ASSISTANT, "Hmm, let me verify this."),) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.EPIC, + ) + + assert "Epistemic markers preserved" in summary + + def test_with_one_marker_simple_not_preserved(self) -> None: + """Message with 1 marker + SIMPLE complexity -> NOT preserved.""" + messages = (_msg(MessageRole.ASSISTANT, "Wait, I see the issue."),) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.SIMPLE, + ) + + # Should use standard format (threshold is 3 for SIMPLE) + assert "Summary of prior work:" in summary + assert "Epistemic markers preserved" not in summary + + def test_with_three_markers_simple_preserved(self) -> None: + """Message with 3 markers + SIMPLE complexity -> preserved.""" + messages = ( + _msg( + MessageRole.ASSISTANT, + "Wait, actually I was wrong. Let me verify this.", + ), + ) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.SIMPLE, + ) + + assert "Epistemic markers preserved" in summary + + def test_with_markers_medium_below_threshold(self) -> None: + """Message with 2 markers + MEDIUM complexity -> NOT preserved.""" + # "hmm" (hedging) + "perhaps" (uncertainty) = 2 groups < 3 threshold + messages = (_msg(MessageRole.ASSISTANT, "Hmm, perhaps we should try again."),) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.MEDIUM, + ) + + assert "Summary of prior work:" in summary + assert "Epistemic markers preserved" not in summary + + def test_markers_disabled_standard_format(self) -> None: + """preserve_markers=False -> standard format even with markers.""" + messages = ( + _msg( + MessageRole.ASSISTANT, + "Wait, actually let me verify. This is important.", + ), + ) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=False, + task_complexity=Complexity.COMPLEX, + ) + + # Even with COMPLEX, if preserve_markers is False, use standard format + assert "Summary of prior work:" in summary + assert "Epistemic markers preserved" not in summary + + def test_empty_messages_fallback(self) -> None: + """No assistant messages -> fallback format.""" + messages = ( + _msg(MessageRole.USER, "What's the answer?"), + _msg(MessageRole.SYSTEM, "You are helpful."), + ) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.COMPLEX, + ) + + assert "[Archived 2 messages from earlier" in summary + + def test_preserved_count_in_summary(self) -> None: + """Summary mentions count of preserved messages.""" + messages = ( + _msg( + MessageRole.ASSISTANT, + "Wait, I need to reconsider this approach.", + ), + _msg(MessageRole.USER, "Ok"), + _msg( + MessageRole.ASSISTANT, + "Actually, let me verify the calculations.", + ), + ) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.COMPLEX, + ) + + # Should mention 2 preserved messages + assert "Epistemic markers preserved from 2 messages" in summary + + def test_mixed_preserved_and_standard(self) -> None: + """Mix of preserved markers and standard snippets.""" + messages = ( + _msg(MessageRole.ASSISTANT, "Wait, I need to think about this."), + _msg( + MessageRole.ASSISTANT, + "This is just a straightforward statement.", + ), + ) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.COMPLEX, + ) + + # Should have preserved markers format + assert "Epistemic markers preserved" in summary + # Should have summary content + assert len(summary) > 50 + + def test_marker_sentences_joined_correctly(self) -> None: + """Extracted marker sentences are joined in summary.""" + messages = ( + _msg( + MessageRole.ASSISTANT, + "This is normal. Wait, let me reconsider. More normal text.", + ), + ) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.COMPLEX, + ) + + # Should contain the extracted marker sentence + assert "Wait" in summary or "reconsider" in summary + + def test_system_messages_ignored(self) -> None: + """SYSTEM messages don't contribute to summary.""" + messages = ( + _msg(MessageRole.SYSTEM, "System instruction here"), + _msg(MessageRole.ASSISTANT, "Response content"), + ) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.COMPLEX, + ) + + # Should archive both but only extract from ASSISTANT + assert "[Archived 2 messages" in summary + + def test_user_messages_ignored(self) -> None: + """USER messages don't contribute to summary.""" + messages = ( + _msg(MessageRole.USER, "User query here"), + _msg(MessageRole.ASSISTANT, "Wait, let me think about this."), + ) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.COMPLEX, + ) + + # USER message is archived but not summarized + assert "Epistemic markers preserved" in summary + + def test_empty_assistant_content_ignored(self) -> None: + """Empty assistant messages are skipped.""" + messages = ( + _msg(MessageRole.ASSISTANT, ""), + _msg(MessageRole.ASSISTANT, "Wait, something important."), + ) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.COMPLEX, + ) + + # First message is empty and skipped, second has marker + assert "Epistemic markers preserved from 1 messages" in summary + + def test_summary_respects_max_length(self) -> None: + """Summary is truncated at MAX_SUMMARY_CHARS.""" + # Create very long content + long_content = "Wait, " + "x" * 600 + messages = (_msg(MessageRole.ASSISTANT, long_content),) + summary = _build_summary( + messages, + execution_id="test-exec", + preserve_markers=True, + task_complexity=Complexity.COMPLEX, + ) + + # Summary should be reasonable length (max 500 chars for content) + assert len(summary) < 1000 diff --git a/tests/unit/engine/coordination/test_attribution.py b/tests/unit/engine/coordination/test_attribution.py new file mode 100644 index 0000000000..848b82fd8c --- /dev/null +++ b/tests/unit/engine/coordination/test_attribution.py @@ -0,0 +1,241 @@ +"""Unit tests for coordination attribution models. + +Tests AgentContribution, CoordinationResultWithAttribution, and +FailureAttribution validation. +""" + +import pytest + +from synthorg.core.enums import CoordinationTopology +from synthorg.core.types import NotBlankStr +from synthorg.engine.coordination.attribution import ( + AgentContribution, + CoordinationResultWithAttribution, + FailureAttribution, +) +from synthorg.engine.coordination.models import ( + CoordinationPhaseResult, + CoordinationResult, +) + + +def _make_coord_result( + *, + parent_task_id: str = "task-1", + topology: CoordinationTopology = CoordinationTopology.SAS, + success: bool = True, +) -> CoordinationResult: + """Build a minimal CoordinationResult for testing.""" + return CoordinationResult( + parent_task_id=NotBlankStr(parent_task_id), + topology=topology, + phases=( + CoordinationPhaseResult( + phase=NotBlankStr("dispatch"), + success=success, + duration_seconds=1.0, + error=None if success else "phase error", + ), + ), + total_duration_seconds=1.0, + total_cost_usd=0.5, + ) + + +class TestAgentContribution: + """Tests for the AgentContribution model.""" + + @pytest.mark.unit + def test_successful_contribution(self) -> None: + """Score 1.0 requires no failure attribution.""" + contrib = AgentContribution( + agent_id=NotBlankStr("agent-1"), + subtask_id=NotBlankStr("subtask-1"), + contribution_score=1.0, + ) + assert contrib.contribution_score == 1.0 + assert contrib.failure_attribution is None + assert contrib.evidence is None + + @pytest.mark.unit + def test_failed_contribution_direct(self) -> None: + """Score < 1.0 requires failure attribution.""" + contrib = AgentContribution( + agent_id=NotBlankStr("agent-1"), + subtask_id=NotBlankStr("subtask-1"), + contribution_score=0.0, + failure_attribution="direct", + evidence="Tool invocation failed: timeout", + ) + assert contrib.contribution_score == 0.0 + assert contrib.failure_attribution == "direct" + assert contrib.evidence is not None + + @pytest.mark.unit + @pytest.mark.parametrize( + "attribution", + [ + "direct", + "upstream_contamination", + "coordination_overhead", + "quality_gate", + ], + ) + def test_all_failure_attribution_values( + self, + attribution: FailureAttribution, + ) -> None: + """All failure attribution literals are accepted.""" + contrib = AgentContribution( + agent_id=NotBlankStr("agent-1"), + subtask_id=NotBlankStr("subtask-1"), + contribution_score=0.5, + failure_attribution=attribution, + ) + assert contrib.failure_attribution == attribution + + @pytest.mark.unit + def test_score_below_one_requires_attribution(self) -> None: + """Score < 1.0 without failure_attribution is rejected.""" + with pytest.raises(ValueError, match="failure_attribution"): + AgentContribution( + agent_id=NotBlankStr("agent-1"), + subtask_id=NotBlankStr("subtask-1"), + contribution_score=0.5, + ) + + @pytest.mark.unit + def test_score_one_with_attribution_rejected(self) -> None: + """Score == 1.0 with failure_attribution set is rejected.""" + with pytest.raises(ValueError, match="failure_attribution"): + AgentContribution( + agent_id=NotBlankStr("agent-1"), + subtask_id=NotBlankStr("subtask-1"), + contribution_score=1.0, + failure_attribution="direct", + ) + + @pytest.mark.unit + def test_score_bounds_low(self) -> None: + """Score below 0.0 is rejected.""" + with pytest.raises(ValueError, match="greater than or equal to 0"): + AgentContribution( + agent_id=NotBlankStr("agent-1"), + subtask_id=NotBlankStr("subtask-1"), + contribution_score=-0.1, + failure_attribution="direct", + ) + + @pytest.mark.unit + def test_score_bounds_high(self) -> None: + """Score above 1.0 is rejected.""" + with pytest.raises(ValueError, match="less than or equal to 1"): + AgentContribution( + agent_id=NotBlankStr("agent-1"), + subtask_id=NotBlankStr("subtask-1"), + contribution_score=1.1, + ) + + @pytest.mark.unit + def test_frozen(self) -> None: + """AgentContribution is immutable.""" + contrib = AgentContribution( + agent_id=NotBlankStr("agent-1"), + subtask_id=NotBlankStr("subtask-1"), + contribution_score=1.0, + ) + with pytest.raises(Exception, match="frozen"): + contrib.contribution_score = 0.5 # type: ignore[misc] + + @pytest.mark.unit + def test_evidence_max_length(self) -> None: + """Evidence exceeding 500 chars is rejected.""" + with pytest.raises(ValueError, match="at most 500"): + AgentContribution( + agent_id=NotBlankStr("agent-1"), + subtask_id=NotBlankStr("subtask-1"), + contribution_score=0.0, + failure_attribution="direct", + evidence="x" * 501, + ) + + @pytest.mark.unit + def test_evidence_at_max_length(self) -> None: + """Evidence at exactly 500 chars is accepted.""" + contrib = AgentContribution( + agent_id=NotBlankStr("agent-1"), + subtask_id=NotBlankStr("subtask-1"), + contribution_score=0.0, + failure_attribution="direct", + evidence="x" * 500, + ) + assert len(contrib.evidence) == 500 # type: ignore[arg-type] + + +class TestCoordinationResultWithAttribution: + """Tests for the CoordinationResultWithAttribution wrapper.""" + + @pytest.mark.unit + def test_wraps_result(self) -> None: + """Wrapper preserves the original CoordinationResult.""" + result = _make_coord_result() + wrapper = CoordinationResultWithAttribution( + result=result, + agent_contributions=(), + ) + assert wrapper.result is result + + @pytest.mark.unit + def test_is_success_delegates(self) -> None: + """is_success delegates to the wrapped result.""" + success_wrapper = CoordinationResultWithAttribution( + result=_make_coord_result(success=True), + agent_contributions=(), + ) + assert success_wrapper.is_success is True + + failure_wrapper = CoordinationResultWithAttribution( + result=_make_coord_result(success=False), + agent_contributions=(), + ) + assert failure_wrapper.is_success is False + + @pytest.mark.unit + def test_total_contribution_score_empty(self) -> None: + """Empty contributions yield 0.0 average.""" + wrapper = CoordinationResultWithAttribution( + result=_make_coord_result(), + agent_contributions=(), + ) + assert wrapper.total_contribution_score == 0.0 + + @pytest.mark.unit + def test_total_contribution_score_computed(self) -> None: + """Average of contribution scores is computed correctly.""" + wrapper = CoordinationResultWithAttribution( + result=_make_coord_result(), + agent_contributions=( + AgentContribution( + agent_id=NotBlankStr("a1"), + subtask_id=NotBlankStr("s1"), + contribution_score=1.0, + ), + AgentContribution( + agent_id=NotBlankStr("a2"), + subtask_id=NotBlankStr("s2"), + contribution_score=0.0, + failure_attribution="direct", + ), + ), + ) + assert wrapper.total_contribution_score == pytest.approx(0.5) + + @pytest.mark.unit + def test_frozen(self) -> None: + """CoordinationResultWithAttribution is immutable.""" + wrapper = CoordinationResultWithAttribution( + result=_make_coord_result(), + agent_contributions=(), + ) + with pytest.raises(Exception, match="frozen"): + wrapper.agent_contributions = () # type: ignore[misc] diff --git a/tests/unit/engine/coordination/test_attribution_factory.py b/tests/unit/engine/coordination/test_attribution_factory.py new file mode 100644 index 0000000000..f786feedbc --- /dev/null +++ b/tests/unit/engine/coordination/test_attribution_factory.py @@ -0,0 +1,380 @@ +"""Unit tests for build_agent_contributions factory function.""" + +from datetime import date + +import pytest + +from synthorg.core.agent import AgentIdentity, ModelConfig, PersonalityConfig +from synthorg.core.enums import ( + Complexity, + CoordinationTopology, + Priority, + SeniorityLevel, + TaskStatus, + TaskType, +) +from synthorg.core.task import Task +from synthorg.core.types import NotBlankStr +from synthorg.engine.context import AgentContext +from synthorg.engine.coordination.attribution import build_agent_contributions +from synthorg.engine.coordination.models import CoordinationWave +from synthorg.engine.loop_protocol import ExecutionResult, TerminationReason +from synthorg.engine.parallel_models import ( + AgentOutcome, + ParallelExecutionResult, +) +from synthorg.engine.prompt import SystemPrompt +from synthorg.engine.routing.models import ( + RoutingCandidate, + RoutingDecision, + RoutingResult, +) +from synthorg.engine.run_result import AgentRunResult + + +def _make_identity(name: str = "test-agent", **kwargs: object) -> AgentIdentity: + defaults: dict[str, object] = { + "role": "engineer", + "department": "engineering", + "level": SeniorityLevel.MID, + "hiring_date": date(2026, 1, 15), + "personality": PersonalityConfig(traits=("analytical",)), + "model": ModelConfig( + provider="test-provider", + model_id="test-small-001", + ), + } + defaults.update(kwargs) + return AgentIdentity(name=name, **defaults) # type: ignore[arg-type] + + +def _make_task( + title: str = "test-task", + task_id: str | None = None, + **kwargs: object, +) -> Task: + defaults: dict[str, object] = { + "id": task_id or f"task-{title}", + "description": "A test task", + "type": TaskType.DEVELOPMENT, + "priority": Priority.MEDIUM, + "project": "test-project", + "created_by": "tester", + "assigned_to": "test-agent", + "status": TaskStatus.ASSIGNED, + "estimated_complexity": Complexity.SIMPLE, + } + defaults.update(kwargs) + return Task(title=title, **defaults) # type: ignore[arg-type] + + +def _make_run_result( + agent_id: str, + task_id: str, + *, + reason: TerminationReason = TerminationReason.COMPLETED, +) -> AgentRunResult: + identity = _make_identity(agent_id) + task = _make_task(agent_id, task_id=task_id, assigned_to=agent_id) + ctx = AgentContext.from_identity(identity, task=task) + error_msg = "test error" if reason == TerminationReason.ERROR else None + return AgentRunResult( + execution_result=ExecutionResult( + context=ctx, + termination_reason=reason, + error_message=error_msg, + ), + system_prompt=SystemPrompt( + content="test", + template_version="1.0", + estimated_tokens=1, + sections=("identity",), + metadata={"agent_id": agent_id}, + ), + duration_seconds=1.0, + agent_id=agent_id, + task_id=task_id, + ) + + +def _make_routing_result( + agent_subtask_pairs: list[tuple[str, str]], + *, + parent_task_id: str = "parent-1", +) -> RoutingResult: + decisions = tuple( + RoutingDecision( + subtask_id=NotBlankStr(subtask_id), + selected_candidate=RoutingCandidate( + agent_identity=_make_identity(agent_id), + score=0.9, + matched_skills=(), + reason=NotBlankStr("test routing"), + ), + topology=CoordinationTopology.SAS, + ) + for agent_id, subtask_id in agent_subtask_pairs + ) + return RoutingResult( + parent_task_id=NotBlankStr(parent_task_id), + decisions=decisions, + ) + + +def _make_successful_outcome( + agent_id: str, + task_id: str, +) -> AgentOutcome: + return AgentOutcome( + task_id=NotBlankStr(task_id), + agent_id=NotBlankStr(agent_id), + result=_make_run_result(agent_id, task_id), + ) + + +def _make_failed_outcome( + agent_id: str, + task_id: str, + *, + error: str = "tool invocation failed: timeout", +) -> AgentOutcome: + return AgentOutcome( + task_id=NotBlankStr(task_id), + agent_id=NotBlankStr(agent_id), + error=error, + ) + + +def _make_terminated_outcome( + agent_id: str, + task_id: str, + *, + reason: TerminationReason = TerminationReason.STAGNATION, +) -> AgentOutcome: + return AgentOutcome( + task_id=NotBlankStr(task_id), + agent_id=NotBlankStr(agent_id), + result=_make_run_result(agent_id, task_id, reason=reason), + ) + + +def _make_waves( + outcomes: list[AgentOutcome], + *, + group_id: str = "group-1", +) -> tuple[CoordinationWave, ...]: + return ( + CoordinationWave( + wave_index=0, + subtask_ids=tuple(NotBlankStr(o.task_id) for o in outcomes), + execution_result=ParallelExecutionResult( + group_id=NotBlankStr(group_id), + outcomes=tuple(outcomes), + total_duration_seconds=2.0, + ), + ), + ) + + +class TestBuildAgentContributions: + """Tests for the build_agent_contributions factory.""" + + @pytest.mark.unit + def test_all_success(self) -> None: + """All agents succeed: score 1.0, no failure attribution.""" + routing = _make_routing_result( + [ + ("agent-1", "sub-1"), + ("agent-2", "sub-2"), + ] + ) + waves = _make_waves( + [ + _make_successful_outcome("agent-1", "sub-1"), + _make_successful_outcome("agent-2", "sub-2"), + ] + ) + + contribs = build_agent_contributions(routing, waves) + + assert len(contribs) == 2 + for c in contribs: + assert c.contribution_score == 1.0 + assert c.failure_attribution is None + + @pytest.mark.unit + def test_all_failed_with_errors(self) -> None: + """All agents fail with errors: score 0.0, direct attribution.""" + routing = _make_routing_result([("agent-1", "sub-1")]) + waves = _make_waves( + [ + _make_failed_outcome("agent-1", "sub-1", error="tool execution failed"), + ] + ) + + contribs = build_agent_contributions(routing, waves) + + assert len(contribs) == 1 + assert contribs[0].contribution_score == 0.0 + assert contribs[0].failure_attribution == "direct" + assert contribs[0].evidence is not None + + @pytest.mark.unit + def test_stagnation_termination(self) -> None: + """Agent terminated by stagnation: score 0.0, direct.""" + routing = _make_routing_result([("agent-1", "sub-1")]) + waves = _make_waves( + [ + _make_terminated_outcome( + "agent-1", + "sub-1", + reason=TerminationReason.STAGNATION, + ), + ] + ) + + contribs = build_agent_contributions(routing, waves) + + assert len(contribs) == 1 + assert contribs[0].contribution_score == 0.0 + assert contribs[0].failure_attribution == "direct" + + @pytest.mark.unit + def test_budget_exhausted_termination(self) -> None: + """Agent ran out of budget: coordination_overhead.""" + routing = _make_routing_result([("agent-1", "sub-1")]) + waves = _make_waves( + [ + _make_terminated_outcome( + "agent-1", + "sub-1", + reason=TerminationReason.BUDGET_EXHAUSTED, + ), + ] + ) + + contribs = build_agent_contributions(routing, waves) + + assert len(contribs) == 1 + assert contribs[0].contribution_score == 0.0 + assert contribs[0].failure_attribution == "coordination_overhead" + + @pytest.mark.unit + def test_mixed_outcomes(self) -> None: + """Mix of success and failure: different scores and attributions.""" + routing = _make_routing_result( + [ + ("agent-1", "sub-1"), + ("agent-2", "sub-2"), + ] + ) + waves = _make_waves( + [ + _make_successful_outcome("agent-1", "sub-1"), + _make_failed_outcome("agent-2", "sub-2", error="budget exceeded"), + ] + ) + + contribs = build_agent_contributions(routing, waves) + + by_agent = {c.agent_id: c for c in contribs} + assert by_agent["agent-1"].contribution_score == 1.0 + assert by_agent["agent-2"].contribution_score == 0.0 + + @pytest.mark.unit + def test_empty_waves(self) -> None: + """No waves: empty contributions.""" + routing = _make_routing_result([("agent-1", "sub-1")]) + contribs = build_agent_contributions(routing, ()) + assert contribs == () + + @pytest.mark.unit + def test_wave_without_execution_result(self) -> None: + """Wave with no execution result: skipped.""" + routing = _make_routing_result([("agent-1", "sub-1")]) + waves = ( + CoordinationWave( + wave_index=0, + subtask_ids=(NotBlankStr("sub-1"),), + execution_result=None, + ), + ) + contribs = build_agent_contributions(routing, waves) + assert contribs == () + + @pytest.mark.unit + def test_evidence_truncated(self) -> None: + """Long error messages are truncated to 500 chars.""" + long_error = "x" * 1000 + routing = _make_routing_result([("agent-1", "sub-1")]) + waves = _make_waves( + [ + _make_failed_outcome("agent-1", "sub-1", error=long_error), + ] + ) + + contribs = build_agent_contributions(routing, waves) + + assert contribs[0].evidence is not None + assert len(contribs[0].evidence) <= 500 + + @pytest.mark.unit + def test_returns_tuple(self) -> None: + """Return type is a tuple (immutable).""" + routing = _make_routing_result([("agent-1", "sub-1")]) + waves = _make_waves([_make_successful_outcome("agent-1", "sub-1")]) + contribs = build_agent_contributions(routing, waves) + assert isinstance(contribs, tuple) + + @pytest.mark.unit + def test_quality_gate_error_classification(self) -> None: + """Error containing 'quality' is classified as quality_gate.""" + routing = _make_routing_result([("agent-1", "sub-1")]) + waves = _make_waves( + [ + _make_failed_outcome( + "agent-1", + "sub-1", + error="quality criteria not met", + ), + ] + ) + + contribs = build_agent_contributions(routing, waves) + + assert contribs[0].failure_attribution == "quality_gate" + + @pytest.mark.unit + def test_multiple_waves(self) -> None: + """Contributions collected from multiple waves.""" + routing = _make_routing_result( + [ + ("agent-1", "sub-1"), + ("agent-2", "sub-2"), + ] + ) + wave1 = CoordinationWave( + wave_index=0, + subtask_ids=(NotBlankStr("sub-1"),), + execution_result=ParallelExecutionResult( + group_id=NotBlankStr("g1"), + outcomes=(_make_successful_outcome("agent-1", "sub-1"),), + total_duration_seconds=1.0, + ), + ) + wave2 = CoordinationWave( + wave_index=1, + subtask_ids=(NotBlankStr("sub-2"),), + execution_result=ParallelExecutionResult( + group_id=NotBlankStr("g2"), + outcomes=(_make_failed_outcome("agent-2", "sub-2"),), + total_duration_seconds=1.0, + ), + ) + + contribs = build_agent_contributions(routing, (wave1, wave2)) + + assert len(contribs) == 2 + by_agent = {c.agent_id: c for c in contribs} + assert by_agent["agent-1"].contribution_score == 1.0 + assert by_agent["agent-2"].contribution_score == 0.0 diff --git a/tests/unit/engine/test_coordination_service.py b/tests/unit/engine/test_coordination_service.py index af982f712a..65973e4677 100644 --- a/tests/unit/engine/test_coordination_service.py +++ b/tests/unit/engine/test_coordination_service.py @@ -132,9 +132,10 @@ async def test_happy_path_two_parallel_subtasks(self) -> None: ), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result - assert result.is_success + assert attributed.is_success assert result.topology == CoordinationTopology.CENTRALIZED assert result.decomposition_result is not None assert result.routing_result is not None @@ -142,6 +143,7 @@ async def test_happy_path_two_parallel_subtasks(self) -> None: assert result.status_rollup is not None assert result.status_rollup.completed == 2 assert result.total_duration_seconds > 0 + assert isinstance(attributed.agent_contributions, tuple) @pytest.mark.unit async def test_sas_topology_single_agent(self) -> None: @@ -174,9 +176,10 @@ async def test_sas_topology_single_agent(self) -> None: available_agents=(make_assignment_agent("alice"),), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result - assert result.is_success + assert attributed.is_success assert result.topology == CoordinationTopology.SAS assert len(result.waves) == 2 @@ -285,10 +288,11 @@ async def test_partial_execution_fail_fast_off(self) -> None: config=CoordinationConfig(fail_fast=False), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result # Not fully successful (wave 0 failed) - assert not result.is_success + assert not attributed.is_success # Both waves still executed assert len(result.waves) == 2 assert result.status_rollup is not None @@ -329,9 +333,9 @@ async def test_task_engine_parent_update(self) -> None: available_agents=(make_assignment_agent("alice"),), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) - assert result.is_success + assert attributed.is_success task_engine.submit.assert_called_once() @pytest.mark.unit @@ -355,9 +359,10 @@ async def test_no_task_engine_skips_update(self) -> None: available_agents=(make_assignment_agent("alice"),), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result - assert result.is_success + assert attributed.is_success # No update_parent phase in results update_phases = [p for p in result.phases if p.phase == "update_parent"] assert len(update_phases) == 0 @@ -411,7 +416,8 @@ async def test_status_rollup_correctness(self) -> None: ), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result assert result.status_rollup is not None assert result.status_rollup.completed == 1 @@ -501,9 +507,10 @@ async def test_workspace_lifecycle(self) -> None: ), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result - assert result.is_success + assert attributed.is_success ws_service.setup_group.assert_called_once() ws_service.merge_group.assert_called_once() ws_service.teardown_group.assert_called_once() @@ -549,7 +556,8 @@ async def test_auto_topology_resolves_to_centralized(self) -> None: available_agents=(make_assignment_agent("alice"),), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result assert result.topology == CoordinationTopology.CENTRALIZED @pytest.mark.unit @@ -583,7 +591,8 @@ async def test_update_parent_submit_fails(self) -> None: available_agents=(make_assignment_agent("alice"),), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result update_phases = [p for p in result.phases if p.phase == "update_parent"] assert len(update_phases) == 1 assert not update_phases[0].success @@ -613,7 +622,8 @@ async def test_update_parent_exception_captured(self) -> None: available_agents=(make_assignment_agent("alice"),), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result update_phases = [p for p in result.phases if p.phase == "update_parent"] assert len(update_phases) == 1 assert not update_phases[0].success @@ -653,7 +663,8 @@ async def test_rollup_error_captured(self) -> None: available_agents=(make_assignment_agent("alice"),), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result rollup_phases = [p for p in result.phases if p.phase == "rollup"] assert len(rollup_phases) == 1 assert not rollup_phases[0].success @@ -750,7 +761,8 @@ async def test_total_cost_aggregated(self) -> None: available_agents=(make_assignment_agent("alice"),), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result assert result.total_cost_usd == pytest.approx(0.08) @pytest.mark.unit @@ -782,7 +794,8 @@ async def test_fail_fast_stops_after_failed_wave(self) -> None: config=CoordinationConfig(fail_fast=True), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result # Only one wave executed (fail_fast stopped before wave 1) assert len(result.waves) == 1 @@ -817,7 +830,8 @@ async def test_rollup_includes_blocked_subtasks(self) -> None: available_agents=(make_assignment_agent("alice"),), ) - result = await coordinator.coordinate(ctx) + attributed = await coordinator.coordinate(ctx) + result = attributed.result assert result.status_rollup is not None # 1 completed + 1 blocked = 2 total diff --git a/tests/unit/tools/context/__init__.py b/tests/unit/tools/context/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/unit/tools/context/test_compact_context.py b/tests/unit/tools/context/test_compact_context.py new file mode 100644 index 0000000000..fe4086e6aa --- /dev/null +++ b/tests/unit/tools/context/test_compact_context.py @@ -0,0 +1,176 @@ +"""Tests for CompactContextTool.""" + +import pytest + +from synthorg.core.enums import ToolCategory +from synthorg.providers.models import ToolDefinition +from synthorg.tools.context.compact_context import CompactContextTool + + +@pytest.mark.unit +class TestCompactContextTool: + """Tests for CompactContextTool class.""" + + def test_tool_name(self) -> None: + """Tool name is 'compact_context'.""" + tool = CompactContextTool() + assert tool.name == "compact_context" + + def test_tool_category(self) -> None: + """Tool category is MEMORY.""" + tool = CompactContextTool() + assert tool.category == ToolCategory.MEMORY + + def test_tool_description_not_empty(self) -> None: + """Tool has a non-empty description.""" + tool = CompactContextTool() + assert tool.description + assert isinstance(tool.description, str) + assert len(tool.description) > 0 + + def test_parameters_schema_has_required_fields(self) -> None: + """Parameters schema includes strategy and reason.""" + tool = CompactContextTool() + schema = tool.parameters_schema + assert "properties" in schema + assert "strategy" in schema["properties"] + assert "reason" in schema["properties"] + assert "required" in schema + assert "strategy" in schema["required"] + assert "reason" in schema["required"] + + def test_preserve_markers_optional_in_schema(self) -> None: + """preserve_markers is optional with default True.""" + tool = CompactContextTool() + schema = tool.parameters_schema + assert "preserve_markers" in schema["properties"] + # Should NOT be in required list since it has a default + assert "preserve_markers" not in schema["required"] + # Schema should indicate default is True + assert schema["properties"]["preserve_markers"]["default"] is True + + async def test_execute_valid_args(self) -> None: + """Execute with valid arguments returns ToolExecutionResult.""" + tool = CompactContextTool() + result = await tool.execute( + arguments={ + "strategy": "summarize", + "reason": "context at 90 percent fill", + }, + ) + + assert result.content + assert isinstance(result.content, str) + assert "Compaction directive accepted" in result.content + + async def test_execute_has_correct_metadata_keys(self) -> None: + """Execute result metadata contains strategy, preserve_markers, reason.""" + tool = CompactContextTool() + result = await tool.execute( + arguments={ + "strategy": "summarize", + "reason": "context at 90 percent fill", + }, + ) + + metadata = result.metadata + assert "strategy" in metadata + assert "preserve_markers" in metadata + assert "reason" in metadata + assert "compaction_directive" in metadata + + async def test_execute_preserve_markers_default_true(self) -> None: + """When preserve_markers not provided, defaults to True.""" + tool = CompactContextTool() + result = await tool.execute( + arguments={ + "strategy": "summarize", + "reason": "context at 90 percent fill", + }, + ) + + assert result.metadata["preserve_markers"] is True + + async def test_execute_preserve_markers_explicit_false(self) -> None: + """When preserve_markers is False, it's preserved in metadata.""" + tool = CompactContextTool() + result = await tool.execute( + arguments={ + "strategy": "summarize", + "reason": "context at 90 percent fill", + "preserve_markers": False, + }, + ) + + assert result.metadata["preserve_markers"] is False + + async def test_execute_strategy_in_metadata(self) -> None: + """Strategy from arguments is in metadata.""" + tool = CompactContextTool() + result = await tool.execute( + arguments={ + "strategy": "summarize", + "reason": "context at 90 percent fill", + }, + ) + + assert result.metadata["strategy"] == "summarize" + + async def test_execute_reason_in_metadata(self) -> None: + """Reason from arguments is in metadata.""" + tool = CompactContextTool() + reason_text = "context at 90 percent fill" + result = await tool.execute( + arguments={ + "strategy": "summarize", + "reason": reason_text, + }, + ) + + assert result.metadata["reason"] == reason_text + + async def test_execute_compaction_directive_true(self) -> None: + """Metadata includes compaction_directive set to True.""" + tool = CompactContextTool() + result = await tool.execute( + arguments={ + "strategy": "summarize", + "reason": "context at 90 percent fill", + }, + ) + + assert result.metadata["compaction_directive"] is True + + def test_to_definition_returns_valid_tool_definition(self) -> None: + """to_definition() returns a valid ToolDefinition.""" + tool = CompactContextTool() + definition = tool.to_definition() + + assert isinstance(definition, ToolDefinition) + assert definition.name == "compact_context" + assert definition.description + assert definition.parameters_schema + + def test_to_definition_schema_is_correct(self) -> None: + """ToolDefinition contains correct schema.""" + tool = CompactContextTool() + definition = tool.to_definition() + + schema = definition.parameters_schema + assert "properties" in schema + assert "strategy" in schema["properties"] + assert "reason" in schema["properties"] + assert "strategy" in schema["required"] + assert "reason" in schema["required"] + + async def test_execute_is_not_error(self) -> None: + """Execute result is_error is False.""" + tool = CompactContextTool() + result = await tool.execute( + arguments={ + "strategy": "summarize", + "reason": "context at 90 percent fill", + }, + ) + + assert result.is_error is False diff --git a/tests/unit/tools/test_factory.py b/tests/unit/tools/test_factory.py index 9d58baf1d9..9d306f43cc 100644 --- a/tests/unit/tools/test_factory.py +++ b/tests/unit/tools/test_factory.py @@ -17,6 +17,7 @@ from synthorg.tools.git_url_validator import GitCloneNetworkPolicy _EXPECTED_TOOL_NAMES: tuple[str, ...] = ( + "compact_context", "delete_file", "edit_file", "git_branch", @@ -42,7 +43,7 @@ def test_returns_all_expected_tools( self, tmp_path: Path, ) -> None: - """Factory returns all 14 built-in tools sorted by name.""" + """Factory returns all 15 built-in tools sorted by name.""" tools = build_default_tools(workspace=tmp_path) names = tuple(t.name for t in tools) assert names == _EXPECTED_TOOL_NAMES diff --git a/tests/unit/tools/test_factory_new_categories.py b/tests/unit/tools/test_factory_new_categories.py index 69089d606e..f5bb503adc 100644 --- a/tests/unit/tools/test_factory_new_categories.py +++ b/tests/unit/tools/test_factory_new_categories.py @@ -134,10 +134,10 @@ class TestFactoryToolCount: @pytest.mark.unit def test_default_tool_count(self, workspace: Path) -> None: - """Default: 5 fs + 6 git + 2 web + 1 terminal = 14 tools.""" + """Default: 5 fs + 6 git + 2 web + 1 terminal + 1 context = 15 tools.""" tools = build_default_tools(workspace=workspace) - assert len(tools) == 14 + assert len(tools) == 15 @pytest.mark.unit def test_tools_sorted_by_name(self, workspace: Path) -> None: diff --git a/tests/unit/tools/test_factory_sandbox_wiring.py b/tests/unit/tools/test_factory_sandbox_wiring.py index f70e20edce..76f430392a 100644 --- a/tests/unit/tools/test_factory_sandbox_wiring.py +++ b/tests/unit/tools/test_factory_sandbox_wiring.py @@ -42,6 +42,7 @@ + len(_FS_TOOL_NAMES) + len(_WEB_TOOL_NAMES) + len(_TERMINAL_TOOL_NAMES) + + 1 # compact_context (context management) ) From 518abfce2e77f5ed8b77327ae70da5ef45c4b0d4 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Wed, 8 Apr 2026 01:32:01 +0200 Subject: [PATCH 02/10] fix: add None guard for parameters_schema in compact_context test --- tests/unit/tools/context/test_compact_context.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit/tools/context/test_compact_context.py b/tests/unit/tools/context/test_compact_context.py index fe4086e6aa..90bce6b7ef 100644 --- a/tests/unit/tools/context/test_compact_context.py +++ b/tests/unit/tools/context/test_compact_context.py @@ -32,6 +32,7 @@ def test_parameters_schema_has_required_fields(self) -> None: """Parameters schema includes strategy and reason.""" tool = CompactContextTool() schema = tool.parameters_schema + assert schema is not None assert "properties" in schema assert "strategy" in schema["properties"] assert "reason" in schema["properties"] @@ -43,6 +44,7 @@ def test_preserve_markers_optional_in_schema(self) -> None: """preserve_markers is optional with default True.""" tool = CompactContextTool() schema = tool.parameters_schema + assert schema is not None assert "preserve_markers" in schema["properties"] # Should NOT be in required list since it has a default assert "preserve_markers" not in schema["required"] From 63b30579ce7e2ef7934fbdf117852976eb0b23bc Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Wed, 8 Apr 2026 07:06:22 +0200 Subject: [PATCH 03/10] fix: wire tracker integration + add missing test coverage from review agents --- src/synthorg/engine/coordination/service.py | 14 +++- tests/unit/engine/compaction/test_models.py | 42 ++++++++++ .../compaction/test_summarizer_markers.py | 80 ++++++++++++++++++- .../coordination/test_attribution_factory.py | 17 ++++ 4 files changed, 150 insertions(+), 3 deletions(-) diff --git a/src/synthorg/engine/coordination/service.py b/src/synthorg/engine/coordination/service.py index a1f4b6bae2..6282173315 100644 --- a/src/synthorg/engine/coordination/service.py +++ b/src/synthorg/engine/coordination/service.py @@ -47,6 +47,7 @@ from synthorg.engine.routing.service import TaskRoutingService from synthorg.engine.task_engine import TaskEngine from synthorg.engine.workspace.service import WorkspaceIsolationService + from synthorg.hr.performance.tracker import PerformanceTracker logger = get_logger(__name__) @@ -70,17 +71,20 @@ class MultiAgentCoordinator: parallel_executor: Executor for parallel agent runs. workspace_service: Optional workspace isolation service. task_engine: Optional task engine for parent status updates. + performance_tracker: Optional tracker for recording per-agent + coordination contributions. """ __slots__ = ( "_decomposition_service", "_parallel_executor", + "_performance_tracker", "_routing_service", "_task_engine", "_workspace_service", ) - def __init__( + def __init__( # noqa: PLR0913 self, *, decomposition_service: DecompositionService, @@ -88,12 +92,14 @@ def __init__( parallel_executor: ParallelExecutor, workspace_service: WorkspaceIsolationService | None = None, task_engine: TaskEngine | None = None, + performance_tracker: PerformanceTracker | None = None, ) -> None: self._decomposition_service = decomposition_service self._routing_service = routing_service self._parallel_executor = parallel_executor self._workspace_service = workspace_service self._task_engine = task_engine + self._performance_tracker = performance_tracker async def coordinate( self, @@ -207,6 +213,12 @@ async def coordinate( dispatch_result.waves, ) + # Feed contributions into performance tracker if available. + if self._performance_tracker is not None and contributions: + self._performance_tracker.record_coordination_contributions( + contributions, + ) + return CoordinationResultWithAttribution( result=result, agent_contributions=contributions, diff --git a/tests/unit/engine/compaction/test_models.py b/tests/unit/engine/compaction/test_models.py index 90416f48c2..8787813bf7 100644 --- a/tests/unit/engine/compaction/test_models.py +++ b/tests/unit/engine/compaction/test_models.py @@ -42,6 +42,48 @@ def test_frozen(self) -> None: with pytest.raises(ValidationError): config.fill_threshold_percent = 90.0 # type: ignore[misc] + def test_agent_controlled_safety_below_fill_rejected(self) -> None: + """safety_threshold must exceed fill_threshold when agent_controlled.""" + with pytest.raises(ValueError, match="safety_threshold_percent"): + CompactionConfig( + agent_controlled=True, + fill_threshold_percent=80.0, + safety_threshold_percent=70.0, + ) + + def test_agent_controlled_safety_equal_fill_rejected(self) -> None: + """Equal thresholds rejected when agent_controlled.""" + with pytest.raises(ValueError, match="safety_threshold_percent"): + CompactionConfig( + agent_controlled=True, + fill_threshold_percent=80.0, + safety_threshold_percent=80.0, + ) + + def test_agent_controlled_valid_thresholds(self) -> None: + """Valid agent-controlled config accepted.""" + config = CompactionConfig( + agent_controlled=True, + fill_threshold_percent=80.0, + safety_threshold_percent=95.0, + ) + assert config.agent_controlled is True + assert config.safety_threshold_percent == 95.0 + + def test_not_agent_controlled_ignores_threshold_ordering(self) -> None: + """When not agent_controlled, threshold ordering is not checked.""" + config = CompactionConfig( + agent_controlled=False, + fill_threshold_percent=80.0, + safety_threshold_percent=70.0, + ) + assert config.agent_controlled is False + + def test_preserve_epistemic_markers_default(self) -> None: + """preserve_epistemic_markers defaults to True.""" + config = CompactionConfig() + assert config.preserve_epistemic_markers is True + @pytest.mark.unit class TestCompressionMetadata: diff --git a/tests/unit/engine/compaction/test_summarizer_markers.py b/tests/unit/engine/compaction/test_summarizer_markers.py index a1a6da43bc..91ecf3d697 100644 --- a/tests/unit/engine/compaction/test_summarizer_markers.py +++ b/tests/unit/engine/compaction/test_summarizer_markers.py @@ -1,9 +1,15 @@ """Tests for epistemic marker preservation in _build_summary.""" +from datetime import date + import pytest -from synthorg.core.enums import Complexity -from synthorg.engine.compaction.summarizer import _build_summary +from synthorg.core.agent import AgentIdentity, ModelConfig, PersonalityConfig +from synthorg.core.enums import Complexity, SeniorityLevel +from synthorg.engine.compaction.models import CompactionConfig +from synthorg.engine.compaction.summarizer import _build_summary, force_compaction +from synthorg.engine.context import AgentContext +from synthorg.engine.token_estimation import DefaultTokenEstimator from synthorg.providers.enums import MessageRole from synthorg.providers.models import ChatMessage @@ -265,3 +271,73 @@ def test_summary_respects_max_length(self) -> None: # Summary should be reasonable length (max 500 chars for content) assert len(summary) < 1000 + + +def _make_identity(name: str = "test-agent") -> AgentIdentity: + """Create a test agent identity.""" + return AgentIdentity( + name=name, + role="engineer", + department="engineering", + level=SeniorityLevel.MID, + hiring_date=date(2026, 1, 15), + personality=PersonalityConfig(traits=("analytical",)), + model=ModelConfig( + provider="test-provider", + model_id="test-small-001", + ), + ) + + +@pytest.mark.unit +class TestForceCompaction: + """Tests for force_compaction function.""" + + def test_force_compaction_too_few_messages(self) -> None: + """force_compaction returns None with too few messages.""" + identity = _make_identity() + ctx = AgentContext.from_identity(identity) + # Add only 2 messages (below default min_messages_to_compact=4) + ctx = ctx.with_message( + ChatMessage(role=MessageRole.USER, content="Hello"), + ) + ctx = ctx.with_message( + ChatMessage(role=MessageRole.ASSISTANT, content="Hi"), + ) + + config = CompactionConfig(min_messages_to_compact=4) + estimator = DefaultTokenEstimator() + + result = force_compaction(ctx, config, estimator) + + assert result is None + + def test_force_compaction_bypasses_threshold(self) -> None: + """force_compaction runs even when fill is below threshold.""" + identity = _make_identity() + ctx = AgentContext.from_identity(identity) + # Add 8 messages to ensure we have enough for compaction + for i in range(8): + if i % 2 == 0: + msg = ChatMessage( + role=MessageRole.USER, + content=f"User message {i}", + ) + else: + msg = ChatMessage( + role=MessageRole.ASSISTANT, + content=f"Response {i}", + ) + ctx = ctx.with_message(msg) + + config = CompactionConfig( + min_messages_to_compact=4, + fill_threshold_percent=95.0, # Very high threshold + ) + estimator = DefaultTokenEstimator() + + # Should succeed despite low fill percentage + result = force_compaction(ctx, config, estimator) + + # Result should be an AgentContext (not None) + assert result is None or isinstance(result, AgentContext) diff --git a/tests/unit/engine/coordination/test_attribution_factory.py b/tests/unit/engine/coordination/test_attribution_factory.py index f786feedbc..e92f82b581 100644 --- a/tests/unit/engine/coordination/test_attribution_factory.py +++ b/tests/unit/engine/coordination/test_attribution_factory.py @@ -378,3 +378,20 @@ def test_multiple_waves(self) -> None: by_agent = {c.agent_id: c for c in contribs} assert by_agent["agent-1"].contribution_score == 1.0 assert by_agent["agent-2"].contribution_score == 0.0 + + @pytest.mark.unit + def test_unmapped_agent_uses_task_id_as_subtask(self) -> None: + """Agent not in routing decisions falls back to task_id.""" + # Route only agent-1, but include agent-2 in outcomes + routing = _make_routing_result([("agent-1", "sub-1")]) + waves = _make_waves( + [ + _make_successful_outcome("agent-1", "sub-1"), + _make_successful_outcome("agent-2", "sub-2"), # not in routing + ] + ) + contribs = build_agent_contributions(routing, waves) + assert len(contribs) == 2 + by_agent = {c.agent_id: c for c in contribs} + # Unmapped agent falls back to using task_id as subtask_id + assert by_agent["agent-2"].subtask_id == "sub-2" From 1f1c7c3ea22381b5c8ab7a9b7ea0064b94c62875 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Wed, 8 Apr 2026 07:43:58 +0200 Subject: [PATCH 04/10] fix: address 24 PR review items from local agents, Gemini, Copilot, CodeRabbit Critical: - Document 8 new context_budget + 15 new coordination event constants in CLAUDE.md - Add asyncio.Lock to PerformanceTracker for shared mutable dict access Major: - Fix epistemic marker extraction: first sentence exceeding max_chars no longer returns empty string; fix off-by-2 separator accounting for first sentence - Add TerminationReason.PARKED to _TERMINATION_TO_ATTRIBUTION mapping - Guard tracker write in coordination service with try/except post-completion - Fix agent-to-subtask mapping overwrite for multi-subtask agents (use list) - Sanitize reason text in compact_context tool before logging/propagation - Sanitize marker_text in summarizer before appending to summary - Use deepcopy() at construction for CompactContextTool schema - Replace asyncio.gather with TaskGroup for agent resolution in controller - Add missing currency field to frontend CoordinationResultResponse type - Rename total_contribution_score to avg_contribution_score (was computing avg) - Document attribution subsystem in docs/design/engine.md Medium: - Remove unreachable length check in epistemic.py - Fix "from 1 messages" grammar (singular/plural) - Extract attributed.result into local vars in coordination controller - Expand CompactionConfig docstring for dual-threshold agent-controlled mode - Add Protocol types for outcome_result type safety under TYPE_CHECKING - Update ACG cross-reference: NodeType now provides formal node typing - Narrow pytest.raises(Exception) to ValidationError in frozen model tests Minor: - Clarify whitespace stripping assertions in test_epistemic.py - Strengthen force_compaction test to assert result is not None - Refactor force_compaction to delegate to _do_compaction(force=True) --- CLAUDE.md | 2 +- docs/design/engine.md | 23 +++++- src/synthorg/api/controllers/coordination.py | 40 ++++++---- src/synthorg/engine/compaction/epistemic.py | 14 ++-- src/synthorg/engine/compaction/models.py | 22 ++++-- src/synthorg/engine/compaction/summarizer.py | 77 ++++++------------- .../engine/coordination/attribution.py | 47 ++++++++--- src/synthorg/engine/coordination/service.py | 18 ++++- src/synthorg/hr/performance/tracker.py | 14 ++-- src/synthorg/tools/context/compact_context.py | 9 ++- .../unit/engine/compaction/test_epistemic.py | 4 +- .../compaction/test_summarizer_markers.py | 8 +- .../engine/coordination/test_attribution.py | 13 ++-- web/src/api/types.ts | 1 + 14 files changed, 175 insertions(+), 117 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 073f716227..b57d12bdca 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -90,7 +90,7 @@ See `web/CLAUDE.md` for the full component inventory, design token rules, and po - **Every module** with business logic MUST have: `from synthorg.observability import get_logger` then `logger = get_logger(__name__)` - **Never** use `import logging` / `logging.getLogger()` / `print()` in application code (exception: `observability/setup.py`, `observability/sinks.py`, `observability/syslog_handler.py`, and `observability/http_handler.py` may use stdlib `logging` and `print(..., file=sys.stderr)` for handler construction, bootstrap, and error reporting code that runs before or during logging system configuration) - **Variable name**: always `logger` (not `_logger`, not `log`) -- **Event names**: always use constants from the domain-specific module under `synthorg.observability.events` (e.g., `API_REQUEST_STARTED` from `events.api`, `TOOL_INVOKE_START` from `events.tool`, `GIT_COMMAND_START` from `events.git`, `CONTEXT_BUDGET_FILL_UPDATED` from `events.context_budget`, `BACKUP_STARTED` from `events.backup`, `SETUP_COMPLETED` from `events.setup`, `ROUTING_CANDIDATE_SELECTED` from `events.routing`, `SHIPPING_HTTP_BATCH_SENT` from `events.shipping`, `EVAL_REPORT_COMPUTED` from `events.evaluation`, `PROMPT_PROFILE_SELECTED` from `events.prompt`, `PROCEDURAL_MEMORY_START` from `events.procedural_memory`, `PERF_LLM_JUDGE_STARTED` from `events.performance`, `TASK_ENGINE_OBSERVER_FAILED` from `events.task_engine`, `WORKFLOW_EXEC_COMPLETED` from `events.workflow_execution`, `BLUEPRINT_INSTANTIATE_START` from `events.blueprint`, `WORKFLOW_DEF_ROLLED_BACK` from `events.workflow_definition`, `WORKFLOW_VERSION_SAVED` from `events.workflow_version`, `MEMORY_FINE_TUNE_STARTED`, `MEMORY_SELF_EDIT_TOOL_EXECUTE`, `MEMORY_SELF_EDIT_CORE_READ`, `MEMORY_SELF_EDIT_CORE_WRITE`, `MEMORY_SELF_EDIT_CORE_WRITE_REJECTED`, `MEMORY_SELF_EDIT_ARCHIVAL_SEARCH`, `MEMORY_SELF_EDIT_ARCHIVAL_WRITE`, `MEMORY_SELF_EDIT_RECALL_READ`, `MEMORY_SELF_EDIT_RECALL_WRITE`, `MEMORY_SELF_EDIT_WRITE_FAILED` from `events.memory`, `REPORTING_GENERATION_STARTED` from `events.reporting`, `RISK_BUDGET_SCORE_COMPUTED` from `events.risk_budget`, `LLM_STRATEGY_SYNTHESIZED` and `DISTILLATION_CAPTURED` from `events.consolidation`, `MEMORY_DIVERSITY_RERANKED`, `MEMORY_DIVERSITY_RERANK_FAILED`, and `MEMORY_REFORMULATION_ROUND` from `events.memory`, `NOTIFICATION_DISPATCHED` and `NOTIFICATION_DISPATCH_FAILED` from `events.notification`, `QUALITY_STEP_CLASSIFIED` from `events.quality`, `HEALTH_TICKET_EMITTED` from `events.health`, `TRAJECTORY_SCORING_START` from `events.trajectory`, `COORD_METRICS_AMDAHL_COMPUTED` from `events.coordination_metrics`, `WEB_REQUEST_START` and `WEB_SSRF_BLOCKED` from `events.web`, `DB_QUERY_START` and `DB_WRITE_BLOCKED` from `events.database`, `TERMINAL_COMMAND_START` and `TERMINAL_COMMAND_BLOCKED` from `events.terminal`, `SUB_CONSTRAINT_RESOLVED` and `SUB_CONSTRAINT_DENIED` from `events.sub_constraint`, `VERSION_SAVED` and `VERSION_SNAPSHOT_FAILED` from `events.versioning`, `ANALYTICS_AGGREGATION_COMPUTED` and `ANALYTICS_RETRY_RATE_ALERT` from `events.analytics`, `CALL_CLASSIFICATION_COMPUTED` from `events.call_classification`, `QUOTA_THRESHOLD_ALERT` and `QUOTA_POLL_FAILED` from `events.quota`, `CONFLICT_DEBATE_EVALUATOR_FAILED` from `events.conflict`, `DELEGATION_LOOP_CIRCUIT_BACKOFF` and `DELEGATION_LOOP_CIRCUIT_PERSIST_FAILED` from `events.delegation`, `MEETING_EVENT_COOLDOWN_SKIPPED` and `MEETING_TASKS_CAPPED` from `events.meeting`, `PERSISTENCE_CIRCUIT_BREAKER_SAVED`, `PERSISTENCE_CIRCUIT_BREAKER_SAVE_FAILED`, `PERSISTENCE_CIRCUIT_BREAKER_LOADED`, `PERSISTENCE_CIRCUIT_BREAKER_LOAD_FAILED`, `PERSISTENCE_CIRCUIT_BREAKER_DELETED`, and `PERSISTENCE_CIRCUIT_BREAKER_DELETE_FAILED` from `events.persistence`). Each domain has its own module -- see `src/synthorg/observability/events/` for the full inventory of constants. Import directly: `from synthorg.observability.events. import EVENT_CONSTANT` +- **Event names**: always use constants from the domain-specific module under `synthorg.observability.events` (e.g., `API_REQUEST_STARTED` from `events.api`, `TOOL_INVOKE_START` from `events.tool`, `GIT_COMMAND_START` from `events.git`, `CONTEXT_BUDGET_FILL_UPDATED`, `CONTEXT_BUDGET_COMPACTION_STARTED`, `CONTEXT_BUDGET_COMPACTION_COMPLETED`, `CONTEXT_BUDGET_COMPACTION_FAILED`, `CONTEXT_BUDGET_COMPACTION_SKIPPED`, `CONTEXT_BUDGET_COMPACTION_FALLBACK`, `CONTEXT_BUDGET_INDICATOR_INJECTED`, `CONTEXT_BUDGET_AGENT_COMPACTION_REQUESTED`, `CONTEXT_BUDGET_EPISTEMIC_MARKERS_PRESERVED` from `events.context_budget`, `BACKUP_STARTED` from `events.backup`, `SETUP_COMPLETED` from `events.setup`, `ROUTING_CANDIDATE_SELECTED` from `events.routing`, `SHIPPING_HTTP_BATCH_SENT` from `events.shipping`, `EVAL_REPORT_COMPUTED` from `events.evaluation`, `PROMPT_PROFILE_SELECTED` from `events.prompt`, `PROCEDURAL_MEMORY_START` from `events.procedural_memory`, `PERF_LLM_JUDGE_STARTED` from `events.performance`, `TASK_ENGINE_OBSERVER_FAILED` from `events.task_engine`, `WORKFLOW_EXEC_COMPLETED` from `events.workflow_execution`, `BLUEPRINT_INSTANTIATE_START` from `events.blueprint`, `WORKFLOW_DEF_ROLLED_BACK` from `events.workflow_definition`, `WORKFLOW_VERSION_SAVED` from `events.workflow_version`, `MEMORY_FINE_TUNE_STARTED`, `MEMORY_SELF_EDIT_TOOL_EXECUTE`, `MEMORY_SELF_EDIT_CORE_READ`, `MEMORY_SELF_EDIT_CORE_WRITE`, `MEMORY_SELF_EDIT_CORE_WRITE_REJECTED`, `MEMORY_SELF_EDIT_ARCHIVAL_SEARCH`, `MEMORY_SELF_EDIT_ARCHIVAL_WRITE`, `MEMORY_SELF_EDIT_RECALL_READ`, `MEMORY_SELF_EDIT_RECALL_WRITE`, `MEMORY_SELF_EDIT_WRITE_FAILED` from `events.memory`, `REPORTING_GENERATION_STARTED` from `events.reporting`, `RISK_BUDGET_SCORE_COMPUTED` from `events.risk_budget`, `LLM_STRATEGY_SYNTHESIZED` and `DISTILLATION_CAPTURED` from `events.consolidation`, `MEMORY_DIVERSITY_RERANKED`, `MEMORY_DIVERSITY_RERANK_FAILED`, and `MEMORY_REFORMULATION_ROUND` from `events.memory`, `NOTIFICATION_DISPATCHED` and `NOTIFICATION_DISPATCH_FAILED` from `events.notification`, `QUALITY_STEP_CLASSIFIED` from `events.quality`, `HEALTH_TICKET_EMITTED` from `events.health`, `TRAJECTORY_SCORING_START` from `events.trajectory`, `COORD_METRICS_AMDAHL_COMPUTED` from `events.coordination_metrics`, `COORDINATION_STARTED`, `COORDINATION_COMPLETED`, `COORDINATION_FAILED`, `COORDINATION_PHASE_STARTED`, `COORDINATION_PHASE_COMPLETED`, `COORDINATION_PHASE_FAILED`, `COORDINATION_WAVE_STARTED`, `COORDINATION_WAVE_COMPLETED`, `COORDINATION_TOPOLOGY_RESOLVED`, `COORDINATION_CLEANUP_STARTED`, `COORDINATION_CLEANUP_COMPLETED`, `COORDINATION_CLEANUP_FAILED`, `COORDINATION_WAVE_BUILT`, `COORDINATION_FACTORY_BUILT`, and `COORDINATION_ATTRIBUTION_BUILT` from `events.coordination`, `WEB_REQUEST_START` and `WEB_SSRF_BLOCKED` from `events.web`, `DB_QUERY_START` and `DB_WRITE_BLOCKED` from `events.database`, `TERMINAL_COMMAND_START` and `TERMINAL_COMMAND_BLOCKED` from `events.terminal`, `SUB_CONSTRAINT_RESOLVED` and `SUB_CONSTRAINT_DENIED` from `events.sub_constraint`, `VERSION_SAVED` and `VERSION_SNAPSHOT_FAILED` from `events.versioning`, `ANALYTICS_AGGREGATION_COMPUTED` and `ANALYTICS_RETRY_RATE_ALERT` from `events.analytics`, `CALL_CLASSIFICATION_COMPUTED` from `events.call_classification`, `QUOTA_THRESHOLD_ALERT` and `QUOTA_POLL_FAILED` from `events.quota`, `CONFLICT_DEBATE_EVALUATOR_FAILED` from `events.conflict`, `DELEGATION_LOOP_CIRCUIT_BACKOFF` and `DELEGATION_LOOP_CIRCUIT_PERSIST_FAILED` from `events.delegation`, `MEETING_EVENT_COOLDOWN_SKIPPED` and `MEETING_TASKS_CAPPED` from `events.meeting`, `PERSISTENCE_CIRCUIT_BREAKER_SAVED`, `PERSISTENCE_CIRCUIT_BREAKER_SAVE_FAILED`, `PERSISTENCE_CIRCUIT_BREAKER_LOADED`, `PERSISTENCE_CIRCUIT_BREAKER_LOAD_FAILED`, `PERSISTENCE_CIRCUIT_BREAKER_DELETED`, and `PERSISTENCE_CIRCUIT_BREAKER_DELETE_FAILED` from `events.persistence`). Each domain has its own module -- see `src/synthorg/observability/events/` for the full inventory of constants. Import directly: `from synthorg.observability.events. import EVENT_CONSTANT` - **Structured kwargs**: always `logger.info(EVENT, key=value)` -- never `logger.info("msg %s", val)` - **All error paths** must log at WARNING or ERROR with context before raising - **All state transitions** must log at INFO diff --git a/docs/design/engine.md b/docs/design/engine.md index 1a04655aea..c7a4f1a580 100644 --- a/docs/design/engine.md +++ b/docs/design/engine.md @@ -1446,6 +1446,27 @@ groups and routing decisions into `ParallelExecutionGroup` instances. Subtasks without routing decisions are skipped. Empty waves (all subtasks unroutable) are dropped. +#### Per-Agent Attribution (Phase 8) + +After the pipeline completes, `build_agent_contributions()` in +`coordination/attribution.py` produces a `tuple[AgentContribution, ...]` from +routing decisions and wave outcomes: + +- **`AgentContribution`** -- frozen Pydantic model recording each agent's + `contribution_score` (0.0--1.0), `failure_attribution` classification, and + optional `evidence` excerpt. +- **`CoordinationResultWithAttribution`** -- wrapper pairing the frozen + `CoordinationResult` with the contribution tuple. The `avg_contribution_score` + computed field provides a quick aggregate. +- **Failure attribution categories** -- `"direct"` (agent's own failure), + `"upstream_contamination"` (bad input from another agent), + `"coordination_overhead"` (system-initiated: budget, shutdown, parking), + `"quality_gate"` (failed quality check). +- **Integration** -- contributions are fed into `PerformanceTracker + .record_coordination_contributions()` for trend analysis. The tracker + guards writes behind an `asyncio.Lock` to avoid race conditions from + concurrent coordination runs. + --- ## ACG Vocabulary Cross-Reference @@ -1466,7 +1487,7 @@ external audiences; use SynthOrg terms in implementation discussions. | ACG Template | `CompanyConfig` + company YAML | Partial | ACG is graph-level; SynthOrg operates at org-level | | Realized Graph | `AgentContext` + `TaskExecution` + `CoordinationResult` | Strong | Runtime execution state | | Execution Trace | `TurnRecord` tuple + observability events (82+ constants) | Strong | SynthOrg's trace is richer than ACG baseline | -| Nodes | LLM calls (`call_provider`), tool invocations, validation checks | Partial | No formal node typing yet | +| Nodes | LLM calls (`call_provider`), tool invocations, validation checks | Strong | Typed via `NodeType` enum on `TurnRecord.node_types` | | Edges | `SubtaskDefinition.dependencies`, `DecompositionPlan` DAG | Strong | Multi-agent; implicit in single-agent loops | | Scheduling Policies | `AutoLoopConfig` + `select_loop_type()` + `CoordinationConfig` | Strong | Loop selector + topology selection | | Conditional Branching | HybridLoop replan, PlanExecuteLoop step checks | Partial | Not expressed as graph-level conditionals | diff --git a/src/synthorg/api/controllers/coordination.py b/src/synthorg/api/controllers/coordination.py index 0dfcbf7ba8..4820dda879 100644 --- a/src/synthorg/api/controllers/coordination.py +++ b/src/synthorg/api/controllers/coordination.py @@ -282,9 +282,12 @@ async def _execute( ) raise + result = attributed.result + is_success = attributed.is_success + ws_event_type = ( WsEventType.COORDINATION_COMPLETED - if attributed.is_success + if is_success else WsEventType.COORDINATION_FAILED ) _publish_ws_event( @@ -292,23 +295,21 @@ async def _execute( ws_event_type, { "task_id": task_id, - "topology": attributed.result.topology.value, - "is_success": attributed.is_success, - "total_duration_seconds": attributed.result.total_duration_seconds, + "topology": result.topology.value, + "is_success": is_success, + "total_duration_seconds": result.total_duration_seconds, }, ) log_event = ( - API_COORDINATION_COMPLETED - if attributed.is_success - else API_COORDINATION_FAILED + API_COORDINATION_COMPLETED if is_success else API_COORDINATION_FAILED ) - log_fn = logger.info if attributed.is_success else logger.warning + log_fn = logger.info if is_success else logger.warning log_fn( log_event, task_id=task_id, - topology=attributed.result.topology.value, - is_success=attributed.is_success, - total_duration_seconds=attributed.result.total_duration_seconds, + topology=result.topology.value, + is_success=is_success, + total_duration_seconds=result.total_duration_seconds, ) return attributed @@ -334,11 +335,20 @@ async def _resolve_agents( registry = app_state.agent_registry if data.agent_names is not None: - results = await asyncio.gather( - *(registry.get_by_name(name) for name in data.agent_names) - ) + names = data.agent_names + results: list[AgentIdentity | None] = [None] * len(names) + async with asyncio.TaskGroup() as tg: + for idx, name in enumerate(names): + + async def _resolve( + i: int = idx, + n: str = name, + ) -> None: + results[i] = await registry.get_by_name(n) + + tg.create_task(_resolve()) agents: list[AgentIdentity] = [] - for name, agent in zip(data.agent_names, results, strict=True): + for name, agent in zip(names, results, strict=True): if agent is None: logger.warning( API_COORDINATION_AGENT_RESOLVE_FAILED, diff --git a/src/synthorg/engine/compaction/epistemic.py b/src/synthorg/engine/compaction/epistemic.py index 11c046e4a7..95cdea6d19 100644 --- a/src/synthorg/engine/compaction/epistemic.py +++ b/src/synthorg/engine/compaction/epistemic.py @@ -119,15 +119,17 @@ def extract_marker_sentences( if not stripped: continue if any(p.search(stripped) for p in EPISTEMIC_PATTERNS): - if total_len + len(stripped) + 2 > max_chars: + sep_len = 2 if marker_sentences else 0 + if total_len + sep_len + len(stripped) > max_chars: + # If this is the first sentence and it exceeds max_chars, + # include a truncated version rather than returning empty. + if not marker_sentences: + marker_sentences.append(stripped[:max_chars]) break marker_sentences.append(stripped) - total_len += len(stripped) + 2 # +2 for "; " separator + total_len += sep_len + len(stripped) if not marker_sentences: return "" - joined = "; ".join(marker_sentences) - if len(joined) > max_chars: - return joined[:max_chars] + "..." - return joined + return "; ".join(marker_sentences) diff --git a/src/synthorg/engine/compaction/models.py b/src/synthorg/engine/compaction/models.py index 86df04ca61..9c44f9c35f 100644 --- a/src/synthorg/engine/compaction/models.py +++ b/src/synthorg/engine/compaction/models.py @@ -12,14 +12,23 @@ class CompactionConfig(BaseModel): """Configuration for context compaction behavior. - When ``agent_controlled`` is ``True``, automatic compaction uses - ``safety_threshold_percent`` instead of ``fill_threshold_percent``, - allowing agents to manage compaction via the ``compact_context`` - tool while retaining a safety net. + Two operating modes: + + **Standard** (``agent_controlled=False``): automatic compaction + triggers when context fill reaches ``fill_threshold_percent``. + + **Agent-controlled** (``agent_controlled=True``): agents manage + compaction via the ``compact_context`` tool. Automatic compaction + is deferred to ``safety_threshold_percent`` (which must be higher + than ``fill_threshold_percent``), giving agents headroom to decide + when and how to compact while retaining a safety net. Attributes: fill_threshold_percent: Context fill percentage that triggers - compaction (e.g. 80.0 means compact when 80% full). + compaction in standard mode (e.g. 80.0 means compact when + 80% full). In agent-controlled mode this threshold is + NOT used for automatic compaction -- agents decide when to + compact below ``safety_threshold_percent``. min_messages_to_compact: Minimum number of conversation messages required before compaction is allowed. preserve_recent_turns: Number of recent turn pairs to keep @@ -27,7 +36,8 @@ class CompactionConfig(BaseModel): agent_controlled: Enable agent-initiated compaction via the ``compact_context`` tool. safety_threshold_percent: Auto-compaction threshold when - ``agent_controlled`` is ``True`` (safety net). + ``agent_controlled`` is ``True`` (safety net). Must be + greater than ``fill_threshold_percent``. preserve_epistemic_markers: Detect and preserve epistemic markers (hedging, reconsideration, etc.) in summaries. """ diff --git a/src/synthorg/engine/compaction/summarizer.py b/src/synthorg/engine/compaction/summarizer.py index af3ef51419..6f97b1ce24 100644 --- a/src/synthorg/engine/compaction/summarizer.py +++ b/src/synthorg/engine/compaction/summarizer.py @@ -72,6 +72,8 @@ def _do_compaction( ctx: AgentContext, config: CompactionConfig, estimator: PromptTokenEstimator, + *, + force: bool = False, ) -> AgentContext | None: """Core compaction logic. @@ -79,18 +81,20 @@ def _do_compaction( ctx: Current agent context. config: Compaction configuration. estimator: Token estimator. + force: Skip fill threshold check (for agent-initiated compaction). Returns: New compacted ``AgentContext`` or ``None`` if no compaction needed. """ fill_pct = ctx.context_fill_percent - effective_threshold = ( - config.safety_threshold_percent - if config.agent_controlled - else config.fill_threshold_percent - ) - if fill_pct is None or fill_pct < effective_threshold: - return None + if not force: + effective_threshold = ( + config.safety_threshold_percent + if config.agent_controlled + else config.fill_threshold_percent + ) + if fill_pct is None or fill_pct < effective_threshold: + return None conversation = ctx.conversation if len(conversation) < config.min_messages_to_compact: @@ -279,7 +283,11 @@ def _build_summary( ): marker_text = extract_marker_sentences(cleaned) if marker_text: - snippets.append(marker_text) + sanitized_markers = sanitize_message( + marker_text, + max_length=max(len(marker_text), 1), + ) + snippets.append(sanitized_markers) preserved_count += 1 continue @@ -303,10 +311,11 @@ def _build_summary( joined = joined[:_MAX_SUMMARY_CHARS] + "..." if preserved_count > 0: + msg_word = "message" if preserved_count == 1 else "messages" return ( f"[Archived {len(messages)} messages. " f"Epistemic markers preserved from " - f"{preserved_count} messages. " + f"{preserved_count} {msg_word}. " f"Summary: {joined}]" ) return f"[Archived {len(messages)} messages. Summary of prior work: {joined}]" @@ -320,9 +329,9 @@ def force_compaction( """Compact context without checking the fill threshold. Used when an agent explicitly requests compaction via the - ``compact_context`` tool. Skips the threshold check but - still enforces minimum message count and recent turn - preservation. + ``compact_context`` tool. Delegates to ``_do_compaction`` + core logic but bypasses the fill threshold check by + temporarily using a zero-percent threshold. Args: ctx: Current agent context. @@ -332,49 +341,11 @@ def force_compaction( Returns: Compacted context, or ``None`` if too few messages. """ - conversation = ctx.conversation - if len(conversation) < config.min_messages_to_compact: - logger.debug( - CONTEXT_BUDGET_COMPACTION_SKIPPED, - execution_id=ctx.execution_id, - reason="too_few_messages_for_forced_compaction", - message_count=len(conversation), - ) - return None - - logger.info( + logger.debug( CONTEXT_BUDGET_COMPACTION_STARTED, execution_id=ctx.execution_id, fill_percent=ctx.context_fill_percent, - message_count=len(conversation), - forced=True, - ) - - split = _split_conversation(ctx, config) - if split is None: - return None - head, archivable, recent = split - - task_complexity = _extract_task_complexity(ctx) - compressed, metadata, summary_tokens = _compress( - ctx, - head, - archivable, - recent, - estimator, - preserve_markers=config.preserve_epistemic_markers, - task_complexity=task_complexity, - ) - - new_fill = estimator.estimate_conversation_tokens(compressed) - logger.info( - CONTEXT_BUDGET_COMPACTION_COMPLETED, - execution_id=ctx.execution_id, - original_messages=len(conversation), - compacted_messages=len(compressed), - archived_turns=metadata.archived_turns, - summary_tokens=summary_tokens, - compactions_total=metadata.compactions_performed, + message_count=len(ctx.conversation), forced=True, ) - return ctx.with_compression(metadata, compressed, new_fill) + return _do_compaction(ctx, config, estimator, force=True) diff --git a/src/synthorg/engine/coordination/attribution.py b/src/synthorg/engine/coordination/attribution.py index 139cc52500..ca6d7efe10 100644 --- a/src/synthorg/engine/coordination/attribution.py +++ b/src/synthorg/engine/coordination/attribution.py @@ -6,7 +6,7 @@ with attribution data built from routing decisions and wave outcomes. """ -from typing import Literal, Self +from typing import TYPE_CHECKING, Literal, Protocol, Self from pydantic import BaseModel, ConfigDict, Field, computed_field, model_validator @@ -26,6 +26,18 @@ logger = get_logger(__name__) + +if TYPE_CHECKING: + + class _ExecutionResultLike(Protocol): + error_message: str | None + + class _AgentRunResultLike(Protocol): + is_success: bool + termination_reason: TerminationReason | None + execution_result: _ExecutionResultLike | None + + FailureAttribution = Literal[ "direct", "upstream_contamination", @@ -53,6 +65,7 @@ TerminationReason.MAX_TURNS: "coordination_overhead", TerminationReason.ERROR: "direct", TerminationReason.SHUTDOWN: "coordination_overhead", + TerminationReason.PARKED: "coordination_overhead", } @@ -134,10 +147,10 @@ def is_success(self) -> bool: return self.result.is_success @computed_field( # type: ignore[prop-decorator] - description="Average contribution score", + description="Average contribution score across agents", ) @property - def total_contribution_score(self) -> float: + def avg_contribution_score(self) -> float: """Average of contribution scores, 0.0 when empty.""" if not self.agent_contributions: return 0.0 @@ -162,11 +175,17 @@ def build_agent_contributions( Returns: Tuple of ``AgentContribution`` records, one per agent outcome. """ - # Build agent->subtask lookup from routing decisions. - agent_to_subtask: dict[str, str] = {} + # Build agent->subtask lookups from routing decisions. + # Use a list per agent_id to handle agents with multiple subtasks. + agent_to_subtasks: dict[str, list[str]] = {} for decision in routing_result.decisions: agent_id = str(decision.selected_candidate.agent_identity.id) - agent_to_subtask[agent_id] = str(decision.subtask_id) + agent_to_subtasks.setdefault(agent_id, []).append( + str(decision.subtask_id), + ) + + # Track consumption index per agent for round-robin matching. + agent_subtask_idx: dict[str, int] = {} contributions: list[AgentContribution] = [] @@ -174,10 +193,14 @@ def build_agent_contributions( if wave.execution_result is None: continue for outcome in wave.execution_result.outcomes: - subtask_id = agent_to_subtask.get( - str(outcome.agent_id), - str(outcome.task_id), - ) + aid = str(outcome.agent_id) + subtask_list = agent_to_subtasks.get(aid, []) + idx = agent_subtask_idx.get(aid, 0) + if idx < len(subtask_list): + subtask_id = subtask_list[idx] + agent_subtask_idx[aid] = idx + 1 + else: + subtask_id = str(outcome.task_id) contrib = _score_outcome( agent_id=str(outcome.agent_id), subtask_id=subtask_id, @@ -232,8 +255,8 @@ def _score_outcome( ) # Case 2: Execution completed -- check if successful. - # outcome_result is AgentRunResult but typed as object to avoid - # circular import; access attributes dynamically. + # outcome_result conforms to _AgentRunResultLike; typed as object + # at runtime to avoid circular import. if outcome_result is not None: is_success = getattr(outcome_result, "is_success", False) if is_success: diff --git a/src/synthorg/engine/coordination/service.py b/src/synthorg/engine/coordination/service.py index 6282173315..4046672747 100644 --- a/src/synthorg/engine/coordination/service.py +++ b/src/synthorg/engine/coordination/service.py @@ -26,6 +26,7 @@ from synthorg.engine.task_engine_models import TransitionTaskMutation from synthorg.observability import get_logger from synthorg.observability.events.coordination import ( + COORDINATION_CLEANUP_FAILED, COORDINATION_COMPLETED, COORDINATION_FAILED, COORDINATION_PHASE_COMPLETED, @@ -214,10 +215,21 @@ async def coordinate( ) # Feed contributions into performance tracker if available. + # Guard so tracker failures don't fail an already-completed run. if self._performance_tracker is not None and contributions: - self._performance_tracker.record_coordination_contributions( - contributions, - ) + try: + self._performance_tracker.record_coordination_contributions( + contributions, + ) + except MemoryError, RecursionError: + raise + except Exception as tracker_exc: + logger.warning( + COORDINATION_CLEANUP_FAILED, + parent_task_id=task.id, + error=str(tracker_exc), + context="post_completion_tracker_write", + ) return CoordinationResultWithAttribution( result=result, diff --git a/src/synthorg/hr/performance/tracker.py b/src/synthorg/hr/performance/tracker.py index fa628133e8..0ee454c9d4 100644 --- a/src/synthorg/hr/performance/tracker.py +++ b/src/synthorg/hr/performance/tracker.py @@ -99,6 +99,7 @@ def __init__( # noqa: PLR0913 self._collab_metrics: dict[str, list[CollaborationMetricRecord]] = {} self._contributions: dict[str, list[AgentContribution]] = {} self._background_tasks: set[asyncio.Task[None]] = set() + self._metrics_lock = asyncio.Lock() @staticmethod def _default_quality() -> QualityScoringStrategy: @@ -166,10 +167,11 @@ async def record_task_metric( Returns: The stored record. """ - agent_key = str(record.agent_id) - if agent_key not in self._task_metrics: - self._task_metrics[agent_key] = [] - self._task_metrics[agent_key].append(record) + async with self._metrics_lock: + agent_key = str(record.agent_id) + if agent_key not in self._task_metrics: + self._task_metrics[agent_key] = [] + self._task_metrics[agent_key].append(record) logger.info( PERF_METRIC_RECORDED, @@ -190,9 +192,7 @@ def record_coordination_contributions( """ for contrib in contributions: agent_key = str(contrib.agent_id) - if agent_key not in self._contributions: - self._contributions[agent_key] = [] - self._contributions[agent_key].append(contrib) + self._contributions.setdefault(agent_key, []).append(contrib) if contributions: logger.info( diff --git a/src/synthorg/tools/context/compact_context.py b/src/synthorg/tools/context/compact_context.py index 3ea24255b6..4e07c471c8 100644 --- a/src/synthorg/tools/context/compact_context.py +++ b/src/synthorg/tools/context/compact_context.py @@ -7,9 +7,11 @@ compaction at the turn boundary. """ +from copy import deepcopy from typing import Any from synthorg.core.enums import ToolCategory +from synthorg.engine.sanitization import sanitize_message from synthorg.observability import get_logger from synthorg.observability.events.context_budget import ( CONTEXT_BUDGET_AGENT_COMPACTION_REQUESTED, @@ -74,7 +76,7 @@ def __init__(self) -> None: "is high and accuracy on complex reasoning is " "critical." ), - parameters_schema=_COMPACT_CONTEXT_SCHEMA, + parameters_schema=deepcopy(_COMPACT_CONTEXT_SCHEMA), category=ToolCategory.MEMORY, ) @@ -95,12 +97,13 @@ async def execute( strategy = arguments.get("strategy", "summarize") reason = arguments.get("reason", "") preserve_markers = arguments.get("preserve_markers", True) + sanitized_reason = sanitize_message(reason, max_length=256) logger.info( CONTEXT_BUDGET_AGENT_COMPACTION_REQUESTED, strategy=strategy, preserve_markers=preserve_markers, - reason=reason, + reason=sanitized_reason, ) return ToolExecutionResult( @@ -109,6 +112,6 @@ async def execute( "compaction_directive": True, "strategy": strategy, "preserve_markers": preserve_markers, - "reason": reason, + "reason": sanitized_reason, }, ) diff --git a/tests/unit/engine/compaction/test_epistemic.py b/tests/unit/engine/compaction/test_epistemic.py index 3bca826709..0628cb7b2b 100644 --- a/tests/unit/engine/compaction/test_epistemic.py +++ b/tests/unit/engine/compaction/test_epistemic.py @@ -186,4 +186,6 @@ def test_strips_whitespace_from_sentences(self) -> None: result = extract_marker_sentences(text) # Should not have extra whitespace assert "Wait, important" in result - assert " " not in result or result.count(" ") <= result.count(" ") + assert not result.startswith(" ") + assert not result.endswith(" ") + assert " " not in result diff --git a/tests/unit/engine/compaction/test_summarizer_markers.py b/tests/unit/engine/compaction/test_summarizer_markers.py index 91ecf3d697..2f47eac47e 100644 --- a/tests/unit/engine/compaction/test_summarizer_markers.py +++ b/tests/unit/engine/compaction/test_summarizer_markers.py @@ -255,7 +255,7 @@ def test_empty_assistant_content_ignored(self) -> None: ) # First message is empty and skipped, second has marker - assert "Epistemic markers preserved from 1 messages" in summary + assert "Epistemic markers preserved from 1 message." in summary def test_summary_respects_max_length(self) -> None: """Summary is truncated at MAX_SUMMARY_CHARS.""" @@ -339,5 +339,7 @@ def test_force_compaction_bypasses_threshold(self) -> None: # Should succeed despite low fill percentage result = force_compaction(ctx, config, estimator) - # Result should be an AgentContext (not None) - assert result is None or isinstance(result, AgentContext) + # Forced compaction should produce a compacted context + assert result is not None + assert isinstance(result, AgentContext) + assert len(result.conversation) < len(ctx.conversation) diff --git a/tests/unit/engine/coordination/test_attribution.py b/tests/unit/engine/coordination/test_attribution.py index 848b82fd8c..30009eafbe 100644 --- a/tests/unit/engine/coordination/test_attribution.py +++ b/tests/unit/engine/coordination/test_attribution.py @@ -5,6 +5,7 @@ """ import pytest +from pydantic import ValidationError from synthorg.core.enums import CoordinationTopology from synthorg.core.types import NotBlankStr @@ -144,7 +145,7 @@ def test_frozen(self) -> None: subtask_id=NotBlankStr("subtask-1"), contribution_score=1.0, ) - with pytest.raises(Exception, match="frozen"): + with pytest.raises(ValidationError, match="frozen"): contrib.contribution_score = 0.5 # type: ignore[misc] @pytest.mark.unit @@ -201,16 +202,16 @@ def test_is_success_delegates(self) -> None: assert failure_wrapper.is_success is False @pytest.mark.unit - def test_total_contribution_score_empty(self) -> None: + def test_avg_contribution_score_empty(self) -> None: """Empty contributions yield 0.0 average.""" wrapper = CoordinationResultWithAttribution( result=_make_coord_result(), agent_contributions=(), ) - assert wrapper.total_contribution_score == 0.0 + assert wrapper.avg_contribution_score == 0.0 @pytest.mark.unit - def test_total_contribution_score_computed(self) -> None: + def test_avg_contribution_score_computed(self) -> None: """Average of contribution scores is computed correctly.""" wrapper = CoordinationResultWithAttribution( result=_make_coord_result(), @@ -228,7 +229,7 @@ def test_total_contribution_score_computed(self) -> None: ), ), ) - assert wrapper.total_contribution_score == pytest.approx(0.5) + assert wrapper.avg_contribution_score == pytest.approx(0.5) @pytest.mark.unit def test_frozen(self) -> None: @@ -237,5 +238,5 @@ def test_frozen(self) -> None: result=_make_coord_result(), agent_contributions=(), ) - with pytest.raises(Exception, match="frozen"): + with pytest.raises(ValidationError, match="frozen"): wrapper.agent_contributions = () # type: ignore[misc] diff --git a/web/src/api/types.ts b/web/src/api/types.ts index b5bbd51ed2..38cf348ed8 100644 --- a/web/src/api/types.ts +++ b/web/src/api/types.ts @@ -1312,6 +1312,7 @@ export interface CoordinationResultResponse { topology: CoordinationTopology total_duration_seconds: number total_cost_usd: number + currency: string readonly phases: readonly CoordinationPhaseResponse[] wave_count: number is_success: boolean From b42bbb4dd6cda232f33515a535eb4379967a014c Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Wed, 8 Apr 2026 07:51:28 +0200 Subject: [PATCH 05/10] fix: update integration tests for compact_context tool + attributed result - Bump expected tool count from 14 to 15 in test_factory_integration.py - Wrap mock coordinate result in CoordinationResultWithAttribution --- tests/integration/engine/test_coordination_wiring.py | 6 +++++- tests/integration/tools/test_factory_integration.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/integration/engine/test_coordination_wiring.py b/tests/integration/engine/test_coordination_wiring.py index 4abe6e9926..9fddf30b1b 100644 --- a/tests/integration/engine/test_coordination_wiring.py +++ b/tests/integration/engine/test_coordination_wiring.py @@ -222,6 +222,9 @@ async def test_build_coordinator_and_coordinate_via_api(self) -> None: # a task-id-aware manual strategy so decomposition succeeds from unittest.mock import AsyncMock + from synthorg.engine.coordination.attribution import ( + CoordinationResultWithAttribution, + ) from synthorg.engine.coordination.models import ( CoordinationPhaseResult, CoordinationResult, @@ -229,7 +232,7 @@ async def test_build_coordinator_and_coordinate_via_api(self) -> None: async def _mock_coordinate(context): # type: ignore[no-untyped-def] """Return a realistic result keyed to the actual task.""" - return CoordinationResult( + result = CoordinationResult( parent_task_id=context.task.id, topology=CoordinationTopology.SAS, phases=( @@ -247,6 +250,7 @@ async def _mock_coordinate(context): # type: ignore[no-untyped-def] total_duration_seconds=0.05, total_cost_usd=0.001, ) + return CoordinationResultWithAttribution(result=result) coordinator = AsyncMock() coordinator.coordinate.side_effect = _mock_coordinate diff --git a/tests/integration/tools/test_factory_integration.py b/tests/integration/tools/test_factory_integration.py index e008122dab..f172c5c7f0 100644 --- a/tests/integration/tools/test_factory_integration.py +++ b/tests/integration/tools/test_factory_integration.py @@ -12,7 +12,7 @@ from synthorg.tools.git_tools import GitCloneTool from synthorg.tools.registry import ToolRegistry -_EXPECTED_TOOL_COUNT: int = 14 +_EXPECTED_TOOL_COUNT: int = 15 @pytest.mark.integration From 6ba8b390f1f1b6c691c55eeca80abe6b75dbe059 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Wed, 8 Apr 2026 08:36:04 +0200 Subject: [PATCH 06/10] fix: use --dist loadscope in pre-push test runner to prevent worker crashes With the default --dist load, xdist scatters individual tests across workers randomly, causing each worker to repeatedly create and tear down heavy Litestar TestClient apps for different modules. This leads to resource exhaustion and random worker crashes when running the full 15K+ unit suite. --dist loadscope groups tests by module so each worker handles related tests together, dramatically reducing fixture churn. Verified: 15,627 passed, 0 failed (previously 4-8 random failures per run). --- scripts/run_affected_tests.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/scripts/run_affected_tests.py b/scripts/run_affected_tests.py index f0405120ff..80f425eb4a 100644 --- a/scripts/run_affected_tests.py +++ b/scripts/run_affected_tests.py @@ -148,7 +148,12 @@ def _affected_test_dirs(changed: list[str]) -> tuple[list[str], bool]: def _run_pytest(paths: list[str]) -> int: - """Run pytest with the given paths.""" + """Run pytest with the given paths. + + Uses ``--dist loadscope`` to group tests by module, preventing + xdist worker crashes from repeated heavy fixture teardown/setup + when individual tests are scattered across workers. + """ cmd = [ sys.executable, "-m", @@ -158,6 +163,8 @@ def _run_pytest(paths: list[str]) -> int: "unit", "-n", "8", + "--dist", + "loadscope", "-q", ] result = subprocess.run(cmd, cwd=_REPO_ROOT, check=False) From 793f8742705e279ed0e56b0107c501548a34f286 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Wed, 8 Apr 2026 10:06:59 +0200 Subject: [PATCH 07/10] fix: address 8 CodeRabbit round-2 findings - Rename "Per-Agent Attribution (Phase 8)" heading to "Per-Agent Attribution" in design spec (attribution is post-pipeline) - Add logger to epistemic.py for convention consistency - Remove duplicate STARTED log from force_compaction, add forced= kwarg to canonical INFO emission in _do_compaction - Guard build_agent_contributions with try/except in coordinator so attribution failures don't fail completed runs - Replace round-robin subtask mapping with task_id-based lookup (handles multi-subtask agents and out-of-order outcomes) - Wrap _COMPACT_CONTEXT_SCHEMA in MappingProxyType for read-only enforcement; deepcopy from raw dict at construction - Add tool name presence checks to factory integration test - Add test for first-sentence truncation path in epistemic extraction --- docs/design/engine.md | 2 +- scripts/run_affected_tests.py | 5 +++ src/synthorg/engine/compaction/epistemic.py | 3 ++ src/synthorg/engine/compaction/summarizer.py | 8 +---- .../engine/coordination/attribution.py | 31 +++++++++---------- src/synthorg/engine/coordination/service.py | 25 ++++++++++----- src/synthorg/tools/context/compact_context.py | 8 +++-- .../tools/test_factory_integration.py | 7 ++++- .../unit/engine/compaction/test_epistemic.py | 22 ++++++++----- 9 files changed, 70 insertions(+), 41 deletions(-) diff --git a/docs/design/engine.md b/docs/design/engine.md index c7a4f1a580..b14d5ad05f 100644 --- a/docs/design/engine.md +++ b/docs/design/engine.md @@ -1446,7 +1446,7 @@ groups and routing decisions into `ParallelExecutionGroup` instances. Subtasks without routing decisions are skipped. Empty waves (all subtasks unroutable) are dropped. -#### Per-Agent Attribution (Phase 8) +#### Per-Agent Attribution After the pipeline completes, `build_agent_contributions()` in `coordination/attribution.py` produces a `tuple[AgentContribution, ...]` from diff --git a/scripts/run_affected_tests.py b/scripts/run_affected_tests.py index 80f425eb4a..b2be5a9332 100644 --- a/scripts/run_affected_tests.py +++ b/scripts/run_affected_tests.py @@ -153,6 +153,10 @@ def _run_pytest(paths: list[str]) -> int: Uses ``--dist loadscope`` to group tests by module, preventing xdist worker crashes from repeated heavy fixture teardown/setup when individual tests are scattered across workers. + + ``--max-worker-restart=0`` disables worker restarts to avoid a + known xdist scheduler KeyError when the loadscope scheduler + tries to reassign work to a restarted worker with a new id. """ cmd = [ sys.executable, @@ -165,6 +169,7 @@ def _run_pytest(paths: list[str]) -> int: "8", "--dist", "loadscope", + "--max-worker-restart=0", "-q", ] result = subprocess.run(cmd, cwd=_REPO_ROOT, check=False) diff --git a/src/synthorg/engine/compaction/epistemic.py b/src/synthorg/engine/compaction/epistemic.py index 95cdea6d19..3cb69132e3 100644 --- a/src/synthorg/engine/compaction/epistemic.py +++ b/src/synthorg/engine/compaction/epistemic.py @@ -12,6 +12,9 @@ import re from synthorg.core.enums import Complexity +from synthorg.observability import get_logger + +logger = get_logger(__name__) # Precompiled case-insensitive word-boundary patterns grouped by type. _HEDGING = re.compile(r"\b(wait|hmm|hm|ah)\b", re.IGNORECASE) diff --git a/src/synthorg/engine/compaction/summarizer.py b/src/synthorg/engine/compaction/summarizer.py index 6f97b1ce24..d64e16183f 100644 --- a/src/synthorg/engine/compaction/summarizer.py +++ b/src/synthorg/engine/compaction/summarizer.py @@ -112,6 +112,7 @@ def _do_compaction( execution_id=ctx.execution_id, fill_percent=fill_pct, message_count=len(conversation), + forced=force, ) split = _split_conversation(ctx, config) @@ -341,11 +342,4 @@ def force_compaction( Returns: Compacted context, or ``None`` if too few messages. """ - logger.debug( - CONTEXT_BUDGET_COMPACTION_STARTED, - execution_id=ctx.execution_id, - fill_percent=ctx.context_fill_percent, - message_count=len(ctx.conversation), - forced=True, - ) return _do_compaction(ctx, config, estimator, force=True) diff --git a/src/synthorg/engine/coordination/attribution.py b/src/synthorg/engine/coordination/attribution.py index ca6d7efe10..16ef784162 100644 --- a/src/synthorg/engine/coordination/attribution.py +++ b/src/synthorg/engine/coordination/attribution.py @@ -175,17 +175,16 @@ def build_agent_contributions( Returns: Tuple of ``AgentContribution`` records, one per agent outcome. """ - # Build agent->subtask lookups from routing decisions. - # Use a list per agent_id to handle agents with multiple subtasks. - agent_to_subtasks: dict[str, list[str]] = {} + # Build lookups from routing decisions. + # Primary: task_id -> subtask_id (direct match from execution outcome). + # Fallback: agent_id -> subtask_id (single-subtask agents only). + routed_subtask_ids: set[str] = set() + agent_to_subtask: dict[str, str] = {} for decision in routing_result.decisions: - agent_id = str(decision.selected_candidate.agent_identity.id) - agent_to_subtasks.setdefault(agent_id, []).append( - str(decision.subtask_id), - ) - - # Track consumption index per agent for round-robin matching. - agent_subtask_idx: dict[str, int] = {} + sid = str(decision.subtask_id) + aid = str(decision.selected_candidate.agent_identity.id) + routed_subtask_ids.add(sid) + agent_to_subtask[aid] = sid # last-write for multi-subtask agents contributions: list[AgentContribution] = [] @@ -193,14 +192,14 @@ def build_agent_contributions( if wave.execution_result is None: continue for outcome in wave.execution_result.outcomes: + tid = str(outcome.task_id) aid = str(outcome.agent_id) - subtask_list = agent_to_subtasks.get(aid, []) - idx = agent_subtask_idx.get(aid, 0) - if idx < len(subtask_list): - subtask_id = subtask_list[idx] - agent_subtask_idx[aid] = idx + 1 + # Prefer task_id when it matches a routed subtask (handles + # multi-subtask agents correctly). Fall back to agent lookup. + if tid in routed_subtask_ids: + subtask_id = tid else: - subtask_id = str(outcome.task_id) + subtask_id = agent_to_subtask.get(aid, tid) contrib = _score_outcome( agent_id=str(outcome.agent_id), subtask_id=subtask_id, diff --git a/src/synthorg/engine/coordination/service.py b/src/synthorg/engine/coordination/service.py index 4046672747..365599fbbd 100644 --- a/src/synthorg/engine/coordination/service.py +++ b/src/synthorg/engine/coordination/service.py @@ -11,6 +11,7 @@ from synthorg.core.enums import CoordinationTopology, TaskStatus from synthorg.engine.coordination.attribution import ( + AgentContribution, CoordinationResultWithAttribution, build_agent_contributions, ) @@ -208,14 +209,24 @@ async def coordinate( total_cost_usd=total_cost, ) - # Phase 8: Build per-agent attribution. - contributions = build_agent_contributions( - routing_result, - dispatch_result.waves, - ) + # Post-pipeline: build per-agent attribution. + # Guard so attribution/tracker failures don't fail a completed run. + contributions: tuple[AgentContribution, ...] = () + try: + contributions = build_agent_contributions( + routing_result, + dispatch_result.waves, + ) + except MemoryError, RecursionError: + raise + except Exception as attr_exc: + logger.warning( + COORDINATION_CLEANUP_FAILED, + parent_task_id=task.id, + error=str(attr_exc), + context="post_completion_attribution_build", + ) - # Feed contributions into performance tracker if available. - # Guard so tracker failures don't fail an already-completed run. if self._performance_tracker is not None and contributions: try: self._performance_tracker.record_coordination_contributions( diff --git a/src/synthorg/tools/context/compact_context.py b/src/synthorg/tools/context/compact_context.py index 4e07c471c8..8fe0c6fdc9 100644 --- a/src/synthorg/tools/context/compact_context.py +++ b/src/synthorg/tools/context/compact_context.py @@ -8,6 +8,7 @@ """ from copy import deepcopy +from types import MappingProxyType from typing import Any from synthorg.core.enums import ToolCategory @@ -20,7 +21,9 @@ logger = get_logger(__name__) -_COMPACT_CONTEXT_SCHEMA: dict[str, Any] = { +# Raw dict kept private for deepcopy at construction (MappingProxyType +# is not picklable). Public read-only view below. +_RAW_SCHEMA: dict[str, Any] = { "type": "object", "properties": { "strategy": { @@ -52,6 +55,7 @@ "required": ["strategy", "reason"], "additionalProperties": False, } +_COMPACT_CONTEXT_SCHEMA: MappingProxyType[str, Any] = MappingProxyType(_RAW_SCHEMA) class CompactContextTool(BaseTool): @@ -76,7 +80,7 @@ def __init__(self) -> None: "is high and accuracy on complex reasoning is " "critical." ), - parameters_schema=deepcopy(_COMPACT_CONTEXT_SCHEMA), + parameters_schema=deepcopy(_RAW_SCHEMA), category=ToolCategory.MEMORY, ) diff --git a/tests/integration/tools/test_factory_integration.py b/tests/integration/tools/test_factory_integration.py index f172c5c7f0..5fca10f79a 100644 --- a/tests/integration/tools/test_factory_integration.py +++ b/tests/integration/tools/test_factory_integration.py @@ -102,4 +102,9 @@ def test_factory_tools_form_valid_registry( """Factory output can be wrapped in ToolRegistry without errors.""" tools = build_default_tools(workspace=tmp_path) registry = ToolRegistry(tools) - assert len(list(registry.all_tools())) == _EXPECTED_TOOL_COUNT + all_tools = list(registry.all_tools()) + assert len(all_tools) == _EXPECTED_TOOL_COUNT + tool_names = {t.name for t in all_tools} + assert "compact_context" in tool_names + assert "read_file" in tool_names + assert "write_file" in tool_names diff --git a/tests/unit/engine/compaction/test_epistemic.py b/tests/unit/engine/compaction/test_epistemic.py index 0628cb7b2b..443751b6a2 100644 --- a/tests/unit/engine/compaction/test_epistemic.py +++ b/tests/unit/engine/compaction/test_epistemic.py @@ -142,23 +142,31 @@ def test_extract_multiple_marker_sentences(self) -> None: assert "; " in result def test_truncates_at_max_chars(self) -> None: - """Result is truncated at max_chars with ellipsis.""" - # Create text with many marker sentences + """First marker sentence exceeding max_chars is truncated.""" + # Single long marker sentence that exceeds max_chars + long_marker = "Wait, " + "x" * 100 + " important" + text = long_marker + max_chars = 30 + result = extract_marker_sentences(text, max_chars=max_chars) + + # Should be truncated to exactly max_chars (first-sentence path) + assert len(result) == max_chars + assert result == long_marker[:max_chars] + + def test_truncates_multi_sentence_at_max_chars(self) -> None: + """Multiple short marker sentences stop accumulating at max_chars.""" sentences = [ "Wait, I need to think about something.", "Actually, let me reconsider this completely.", "Hmm, this is more complex than I thought.", "Perhaps we should verify each step carefully.", - "But hold on, there's a critical issue here.", ] text = " ".join(sentences) max_chars = 50 result = extract_marker_sentences(text, max_chars=max_chars) - # Should be truncated and have ellipsis - assert len(result) <= max_chars + 3 # max_chars + "..." - if len(result) > max_chars: - assert result.endswith("...") + # Should contain at most max_chars worth of content + assert len(result) <= max_chars def test_respects_max_chars_default(self) -> None: """Default max_chars is 200.""" From db14680660dfe5bd217b920baf268c9f43d62f3f Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Wed, 8 Apr 2026 13:06:40 +0200 Subject: [PATCH 08/10] fix: address 4 CodeRabbit round-3 findings - Fix misleading force_compaction docstring (now correctly describes force=True delegation instead of zero-percent threshold) - Correct design spec: record_coordination_contributions() is sync (no await points, dict ops are atomic in single-threaded event loop); record_task_metric() is the method guarded by asyncio.Lock - Wire performance_tracker through build_coordinator factory so attribution data is actually recorded (was defaulting to None) - Tighten test_respects_max_chars_default assertion: single sentence exceeding 200 chars is truncated to exactly 200 --- docs/design/engine.md | 7 ++++--- src/synthorg/engine/compaction/summarizer.py | 5 +++-- src/synthorg/engine/coordination/factory.py | 5 +++++ tests/unit/engine/compaction/test_epistemic.py | 6 +++--- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/docs/design/engine.md b/docs/design/engine.md index b14d5ad05f..787c86649d 100644 --- a/docs/design/engine.md +++ b/docs/design/engine.md @@ -1463,9 +1463,10 @@ routing decisions and wave outcomes: `"coordination_overhead"` (system-initiated: budget, shutdown, parking), `"quality_gate"` (failed quality check). - **Integration** -- contributions are fed into `PerformanceTracker - .record_coordination_contributions()` for trend analysis. The tracker - guards writes behind an `asyncio.Lock` to avoid race conditions from - concurrent coordination runs. + .record_coordination_contributions()` for trend analysis. The async + method `record_task_metric()` guards writes behind an `asyncio.Lock`; + `record_coordination_contributions()` is synchronous (no await points) + so dict operations are atomic within the single-threaded event loop. --- diff --git a/src/synthorg/engine/compaction/summarizer.py b/src/synthorg/engine/compaction/summarizer.py index d64e16183f..6cbc815bac 100644 --- a/src/synthorg/engine/compaction/summarizer.py +++ b/src/synthorg/engine/compaction/summarizer.py @@ -331,8 +331,9 @@ def force_compaction( Used when an agent explicitly requests compaction via the ``compact_context`` tool. Delegates to ``_do_compaction`` - core logic but bypasses the fill threshold check by - temporarily using a zero-percent threshold. + with ``force=True`` which skips the fill-threshold comparison + entirely while preserving all other checks (minimum message + count, recent turn preservation). Args: ctx: Current agent context. diff --git a/src/synthorg/engine/coordination/factory.py b/src/synthorg/engine/coordination/factory.py index 849126d0e0..b66f23ffe9 100644 --- a/src/synthorg/engine/coordination/factory.py +++ b/src/synthorg/engine/coordination/factory.py @@ -39,6 +39,7 @@ from synthorg.engine.workspace.config import WorkspaceIsolationConfig from synthorg.engine.workspace.protocol import WorkspaceIsolationStrategy from synthorg.engine.workspace.service import WorkspaceIsolationService + from synthorg.hr.performance.tracker import PerformanceTracker from synthorg.providers.protocol import CompletionProvider logger = get_logger(__name__) @@ -165,6 +166,7 @@ def build_coordinator( # noqa: PLR0913 workspace_strategy: WorkspaceIsolationStrategy | None = None, workspace_config: WorkspaceIsolationConfig | None = None, shutdown_manager: ShutdownManager | None = None, + performance_tracker: PerformanceTracker | None = None, ) -> MultiAgentCoordinator: """Build a fully wired :class:`MultiAgentCoordinator`. @@ -190,6 +192,8 @@ def build_coordinator( # noqa: PLR0913 workspace_strategy: Optional workspace isolation strategy. workspace_config: Optional workspace isolation config. shutdown_manager: Optional shutdown manager for the executor. + performance_tracker: Optional tracker for recording + per-agent coordination contributions. Returns: A fully constructed ``MultiAgentCoordinator``. @@ -215,6 +219,7 @@ def build_coordinator( # noqa: PLR0913 workspace_strategy, workspace_config ), task_engine=task_engine, + performance_tracker=performance_tracker, ) logger.debug( diff --git a/tests/unit/engine/compaction/test_epistemic.py b/tests/unit/engine/compaction/test_epistemic.py index 443751b6a2..ec480a240a 100644 --- a/tests/unit/engine/compaction/test_epistemic.py +++ b/tests/unit/engine/compaction/test_epistemic.py @@ -169,11 +169,11 @@ def test_truncates_multi_sentence_at_max_chars(self) -> None: assert len(result) <= max_chars def test_respects_max_chars_default(self) -> None: - """Default max_chars is 200.""" + """Default max_chars is 200 -- single long sentence is truncated.""" text = "Wait, " + "x" * 300 result = extract_marker_sentences(text) - # Should be capped at 200 (or slightly more with "; " joining) - assert len(result) <= 210 # Allow some margin for separator + # Single sentence exceeds default 200 chars -> truncated to exactly 200 + assert len(result) == 200 def test_handles_newlines_as_sentence_boundaries(self) -> None: """Newlines are treated as sentence boundaries.""" From d8a975c3e73b66710522f8441474d6912c5498e1 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Wed, 8 Apr 2026 13:31:15 +0200 Subject: [PATCH 09/10] fix: check both stdout and stderr in debate evaluator log test The test_evaluator_exception_logs_event test only checked capsys.out but structlog may route to stderr when configured by a preceding test in the same xdist worker. Check combined stdout+stderr output. --- .../conflict_resolution/test_debate_strategy.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/unit/communication/conflict_resolution/test_debate_strategy.py b/tests/unit/communication/conflict_resolution/test_debate_strategy.py index f2d3b4ebc0..fbf05e9311 100644 --- a/tests/unit/communication/conflict_resolution/test_debate_strategy.py +++ b/tests/unit/communication/conflict_resolution/test_debate_strategy.py @@ -1,6 +1,7 @@ """Tests for the structured debate + judge resolution strategy.""" import pytest +import structlog from synthorg.communication.conflict_resolution.config import DebateConfig from synthorg.communication.conflict_resolution.debate_strategy import ( @@ -410,7 +411,6 @@ async def test_evaluator_exception_falls_back_to_authority( async def test_evaluator_exception_logs_event( self, hierarchy: HierarchyResolver, - capsys: pytest.CaptureFixture[str], ) -> None: """Evaluator failure is logged with the dedicated event.""" judge = RaisingJudgeEvaluator() @@ -429,9 +429,10 @@ async def test_evaluator_exception_logs_event( ), ), ) - await resolver.resolve(conflict) - captured = capsys.readouterr() - assert "conflict.debate.evaluator_failed" in captured.out + with structlog.testing.capture_logs() as logs: + await resolver.resolve(conflict) + events = [e["event"] for e in logs] + assert "conflict.debate.evaluator_failed" in events @pytest.mark.parametrize( "error_cls", From c81954c9614b634ef3d124eba7659f96f3b76970 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Wed, 8 Apr 2026 14:00:51 +0200 Subject: [PATCH 10/10] fix: clarify loadscope deviation from pyproject.toml worksteal default --- scripts/run_affected_tests.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/scripts/run_affected_tests.py b/scripts/run_affected_tests.py index b2be5a9332..f3034a1579 100644 --- a/scripts/run_affected_tests.py +++ b/scripts/run_affected_tests.py @@ -150,9 +150,11 @@ def _affected_test_dirs(changed: list[str]) -> tuple[list[str], bool]: def _run_pytest(paths: list[str]) -> int: """Run pytest with the given paths. - Uses ``--dist loadscope`` to group tests by module, preventing - xdist worker crashes from repeated heavy fixture teardown/setup - when individual tests are scattered across workers. + Uses ``--dist loadscope`` instead of pyproject.toml's default + ``worksteal`` to group tests by module, preventing xdist worker + crashes from repeated heavy fixture teardown/setup (Litestar + TestClient, SQLite connections) when individual tests are + scattered across workers during full-suite runs. ``--max-worker-restart=0`` disables worker restarts to avoid a known xdist scheduler KeyError when the loadscope scheduler