Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ See `web/CLAUDE.md` for the full component inventory, design token rules, and po
- **Every module** with business logic MUST have: `from synthorg.observability import get_logger` then `logger = get_logger(__name__)`
- **Never** use `import logging` / `logging.getLogger()` / `print()` in application code (exception: `observability/setup.py`, `observability/sinks.py`, `observability/syslog_handler.py`, and `observability/http_handler.py` may use stdlib `logging` and `print(..., file=sys.stderr)` for handler construction, bootstrap, and error reporting code that runs before or during logging system configuration)
- **Variable name**: always `logger` (not `_logger`, not `log`)
- **Event names**: always use constants from the domain-specific module under `synthorg.observability.events` (e.g., `API_REQUEST_STARTED` from `events.api`, `TOOL_INVOKE_START` from `events.tool`, `GIT_COMMAND_START` from `events.git`, `CONTEXT_BUDGET_FILL_UPDATED` from `events.context_budget`, `BACKUP_STARTED` from `events.backup`, `SETUP_COMPLETED` from `events.setup`, `ROUTING_CANDIDATE_SELECTED` from `events.routing`, `SHIPPING_HTTP_BATCH_SENT` from `events.shipping`, `EVAL_REPORT_COMPUTED` from `events.evaluation`, `PROMPT_PROFILE_SELECTED` from `events.prompt`, `PROCEDURAL_MEMORY_START` from `events.procedural_memory`, `PERF_LLM_JUDGE_STARTED` from `events.performance`, `TASK_ENGINE_OBSERVER_FAILED` from `events.task_engine`, `WORKFLOW_EXEC_COMPLETED` from `events.workflow_execution`, `BLUEPRINT_INSTANTIATE_START` from `events.blueprint`, `WORKFLOW_DEF_ROLLED_BACK` from `events.workflow_definition`, `WORKFLOW_VERSION_SAVED` from `events.workflow_version`, `MEMORY_FINE_TUNE_STARTED`, `MEMORY_SELF_EDIT_TOOL_EXECUTE`, `MEMORY_SELF_EDIT_CORE_READ`, `MEMORY_SELF_EDIT_CORE_WRITE`, `MEMORY_SELF_EDIT_CORE_WRITE_REJECTED`, `MEMORY_SELF_EDIT_ARCHIVAL_SEARCH`, `MEMORY_SELF_EDIT_ARCHIVAL_WRITE`, `MEMORY_SELF_EDIT_RECALL_READ`, `MEMORY_SELF_EDIT_RECALL_WRITE`, `MEMORY_SELF_EDIT_WRITE_FAILED` from `events.memory`, `REPORTING_GENERATION_STARTED` from `events.reporting`, `RISK_BUDGET_SCORE_COMPUTED` from `events.risk_budget`, `LLM_STRATEGY_SYNTHESIZED` and `DISTILLATION_CAPTURED` from `events.consolidation`, `MEMORY_DIVERSITY_RERANKED`, `MEMORY_DIVERSITY_RERANK_FAILED`, and `MEMORY_REFORMULATION_ROUND` from `events.memory`, `NOTIFICATION_DISPATCHED` and `NOTIFICATION_DISPATCH_FAILED` from `events.notification`, `QUALITY_STEP_CLASSIFIED` from `events.quality`, `HEALTH_TICKET_EMITTED` from `events.health`, `TRAJECTORY_SCORING_START` from `events.trajectory`, `COORD_METRICS_AMDAHL_COMPUTED` from `events.coordination_metrics`, `WEB_REQUEST_START` and `WEB_SSRF_BLOCKED` from `events.web`, `DB_QUERY_START` and `DB_WRITE_BLOCKED` from `events.database`, `TERMINAL_COMMAND_START` and `TERMINAL_COMMAND_BLOCKED` from `events.terminal`, `SUB_CONSTRAINT_RESOLVED` and `SUB_CONSTRAINT_DENIED` from `events.sub_constraint`, `VERSION_SAVED` and `VERSION_SNAPSHOT_FAILED` from `events.versioning`, `ANALYTICS_AGGREGATION_COMPUTED` and `ANALYTICS_RETRY_RATE_ALERT` from `events.analytics`, `CALL_CLASSIFICATION_COMPUTED` from `events.call_classification`, `QUOTA_THRESHOLD_ALERT` and `QUOTA_POLL_FAILED` from `events.quota`, `CONFLICT_DEBATE_EVALUATOR_FAILED` from `events.conflict`, `DELEGATION_LOOP_CIRCUIT_BACKOFF` and `DELEGATION_LOOP_CIRCUIT_PERSIST_FAILED` from `events.delegation`, `MEETING_EVENT_COOLDOWN_SKIPPED` and `MEETING_TASKS_CAPPED` from `events.meeting`, `PERSISTENCE_CIRCUIT_BREAKER_SAVED`, `PERSISTENCE_CIRCUIT_BREAKER_SAVE_FAILED`, `PERSISTENCE_CIRCUIT_BREAKER_LOADED`, `PERSISTENCE_CIRCUIT_BREAKER_LOAD_FAILED`, `PERSISTENCE_CIRCUIT_BREAKER_DELETED`, and `PERSISTENCE_CIRCUIT_BREAKER_DELETE_FAILED` from `events.persistence`). Each domain has its own module -- see `src/synthorg/observability/events/` for the full inventory of constants. Import directly: `from synthorg.observability.events.<domain> import EVENT_CONSTANT`
- **Event names**: always use constants from the domain-specific module under `synthorg.observability.events` (e.g., `API_REQUEST_STARTED` from `events.api`, `TOOL_INVOKE_START` from `events.tool`, `GIT_COMMAND_START` from `events.git`, `CONTEXT_BUDGET_FILL_UPDATED`, `CONTEXT_BUDGET_COMPACTION_STARTED`, `CONTEXT_BUDGET_COMPACTION_COMPLETED`, `CONTEXT_BUDGET_COMPACTION_FAILED`, `CONTEXT_BUDGET_COMPACTION_SKIPPED`, `CONTEXT_BUDGET_COMPACTION_FALLBACK`, `CONTEXT_BUDGET_INDICATOR_INJECTED`, `CONTEXT_BUDGET_AGENT_COMPACTION_REQUESTED`, `CONTEXT_BUDGET_EPISTEMIC_MARKERS_PRESERVED` from `events.context_budget`, `BACKUP_STARTED` from `events.backup`, `SETUP_COMPLETED` from `events.setup`, `ROUTING_CANDIDATE_SELECTED` from `events.routing`, `SHIPPING_HTTP_BATCH_SENT` from `events.shipping`, `EVAL_REPORT_COMPUTED` from `events.evaluation`, `PROMPT_PROFILE_SELECTED` from `events.prompt`, `PROCEDURAL_MEMORY_START` from `events.procedural_memory`, `PERF_LLM_JUDGE_STARTED` from `events.performance`, `TASK_ENGINE_OBSERVER_FAILED` from `events.task_engine`, `WORKFLOW_EXEC_COMPLETED` from `events.workflow_execution`, `BLUEPRINT_INSTANTIATE_START` from `events.blueprint`, `WORKFLOW_DEF_ROLLED_BACK` from `events.workflow_definition`, `WORKFLOW_VERSION_SAVED` from `events.workflow_version`, `MEMORY_FINE_TUNE_STARTED`, `MEMORY_SELF_EDIT_TOOL_EXECUTE`, `MEMORY_SELF_EDIT_CORE_READ`, `MEMORY_SELF_EDIT_CORE_WRITE`, `MEMORY_SELF_EDIT_CORE_WRITE_REJECTED`, `MEMORY_SELF_EDIT_ARCHIVAL_SEARCH`, `MEMORY_SELF_EDIT_ARCHIVAL_WRITE`, `MEMORY_SELF_EDIT_RECALL_READ`, `MEMORY_SELF_EDIT_RECALL_WRITE`, `MEMORY_SELF_EDIT_WRITE_FAILED` from `events.memory`, `REPORTING_GENERATION_STARTED` from `events.reporting`, `RISK_BUDGET_SCORE_COMPUTED` from `events.risk_budget`, `LLM_STRATEGY_SYNTHESIZED` and `DISTILLATION_CAPTURED` from `events.consolidation`, `MEMORY_DIVERSITY_RERANKED`, `MEMORY_DIVERSITY_RERANK_FAILED`, and `MEMORY_REFORMULATION_ROUND` from `events.memory`, `NOTIFICATION_DISPATCHED` and `NOTIFICATION_DISPATCH_FAILED` from `events.notification`, `QUALITY_STEP_CLASSIFIED` from `events.quality`, `HEALTH_TICKET_EMITTED` from `events.health`, `TRAJECTORY_SCORING_START` from `events.trajectory`, `COORD_METRICS_AMDAHL_COMPUTED` from `events.coordination_metrics`, `COORDINATION_STARTED`, `COORDINATION_COMPLETED`, `COORDINATION_FAILED`, `COORDINATION_PHASE_STARTED`, `COORDINATION_PHASE_COMPLETED`, `COORDINATION_PHASE_FAILED`, `COORDINATION_WAVE_STARTED`, `COORDINATION_WAVE_COMPLETED`, `COORDINATION_TOPOLOGY_RESOLVED`, `COORDINATION_CLEANUP_STARTED`, `COORDINATION_CLEANUP_COMPLETED`, `COORDINATION_CLEANUP_FAILED`, `COORDINATION_WAVE_BUILT`, `COORDINATION_FACTORY_BUILT`, and `COORDINATION_ATTRIBUTION_BUILT` from `events.coordination`, `WEB_REQUEST_START` and `WEB_SSRF_BLOCKED` from `events.web`, `DB_QUERY_START` and `DB_WRITE_BLOCKED` from `events.database`, `TERMINAL_COMMAND_START` and `TERMINAL_COMMAND_BLOCKED` from `events.terminal`, `SUB_CONSTRAINT_RESOLVED` and `SUB_CONSTRAINT_DENIED` from `events.sub_constraint`, `VERSION_SAVED` and `VERSION_SNAPSHOT_FAILED` from `events.versioning`, `ANALYTICS_AGGREGATION_COMPUTED` and `ANALYTICS_RETRY_RATE_ALERT` from `events.analytics`, `CALL_CLASSIFICATION_COMPUTED` from `events.call_classification`, `QUOTA_THRESHOLD_ALERT` and `QUOTA_POLL_FAILED` from `events.quota`, `CONFLICT_DEBATE_EVALUATOR_FAILED` from `events.conflict`, `DELEGATION_LOOP_CIRCUIT_BACKOFF` and `DELEGATION_LOOP_CIRCUIT_PERSIST_FAILED` from `events.delegation`, `MEETING_EVENT_COOLDOWN_SKIPPED` and `MEETING_TASKS_CAPPED` from `events.meeting`, `PERSISTENCE_CIRCUIT_BREAKER_SAVED`, `PERSISTENCE_CIRCUIT_BREAKER_SAVE_FAILED`, `PERSISTENCE_CIRCUIT_BREAKER_LOADED`, `PERSISTENCE_CIRCUIT_BREAKER_LOAD_FAILED`, `PERSISTENCE_CIRCUIT_BREAKER_DELETED`, and `PERSISTENCE_CIRCUIT_BREAKER_DELETE_FAILED` from `events.persistence`). Each domain has its own module -- see `src/synthorg/observability/events/` for the full inventory of constants. Import directly: `from synthorg.observability.events.<domain> import EVENT_CONSTANT`
- **Structured kwargs**: always `logger.info(EVENT, key=value)` -- never `logger.info("msg %s", val)`
- **All error paths** must log at WARNING or ERROR with context before raising
- **All state transitions** must log at INFO
Expand Down
24 changes: 23 additions & 1 deletion docs/design/engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -1446,6 +1446,28 @@ groups and routing decisions into `ParallelExecutionGroup` instances. Subtasks
without routing decisions are skipped. Empty waves (all subtasks unroutable) are
dropped.

#### Per-Agent Attribution

After the pipeline completes, `build_agent_contributions()` in
`coordination/attribution.py` produces a `tuple[AgentContribution, ...]` from
routing decisions and wave outcomes:

- **`AgentContribution`** -- frozen Pydantic model recording each agent's
`contribution_score` (0.0--1.0), `failure_attribution` classification, and
optional `evidence` excerpt.
- **`CoordinationResultWithAttribution`** -- wrapper pairing the frozen
`CoordinationResult` with the contribution tuple. The `avg_contribution_score`
computed field provides a quick aggregate.
- **Failure attribution categories** -- `"direct"` (agent's own failure),
`"upstream_contamination"` (bad input from another agent),
`"coordination_overhead"` (system-initiated: budget, shutdown, parking),
`"quality_gate"` (failed quality check).
- **Integration** -- contributions are fed into `PerformanceTracker
.record_coordination_contributions()` for trend analysis. The async
method `record_task_metric()` guards writes behind an `asyncio.Lock`;
`record_coordination_contributions()` is synchronous (no await points)
so dict operations are atomic within the single-threaded event loop.

---

## ACG Vocabulary Cross-Reference
Expand All @@ -1466,7 +1488,7 @@ external audiences; use SynthOrg terms in implementation discussions.
| ACG Template | `CompanyConfig` + company YAML | Partial | ACG is graph-level; SynthOrg operates at org-level |
| Realized Graph | `AgentContext` + `TaskExecution` + `CoordinationResult` | Strong | Runtime execution state |
| Execution Trace | `TurnRecord` tuple + observability events (82+ constants) | Strong | SynthOrg's trace is richer than ACG baseline |
| Nodes | LLM calls (`call_provider`), tool invocations, validation checks | Partial | No formal node typing yet |
| Nodes | LLM calls (`call_provider`), tool invocations, validation checks | Strong | Typed via `NodeType` enum on `TurnRecord.node_types` |
| Edges | `SubtaskDefinition.dependencies`, `DecompositionPlan` DAG | Strong | Multi-agent; implicit in single-agent loops |
| Scheduling Policies | `AutoLoopConfig` + `select_loop_type()` + `CoordinationConfig` | Strong | Loop selector + topology selection |
| Conditional Branching | HybridLoop replan, PlanExecuteLoop step checks | Partial | Not expressed as graph-level conditionals |
Expand Down
16 changes: 15 additions & 1 deletion scripts/run_affected_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,18 @@ def _affected_test_dirs(changed: list[str]) -> tuple[list[str], bool]:


def _run_pytest(paths: list[str]) -> int:
"""Run pytest with the given paths."""
"""Run pytest with the given paths.

Uses ``--dist loadscope`` instead of pyproject.toml's default
``worksteal`` to group tests by module, preventing xdist worker
crashes from repeated heavy fixture teardown/setup (Litestar
TestClient, SQLite connections) when individual tests are
scattered across workers during full-suite runs.

``--max-worker-restart=0`` disables worker restarts to avoid a
known xdist scheduler KeyError when the loadscope scheduler
tries to reassign work to a restarted worker with a new id.
"""
cmd = [
sys.executable,
"-m",
Expand All @@ -158,6 +169,9 @@ def _run_pytest(paths: list[str]) -> int:
"unit",
"-n",
"8",
"--dist",
"loadscope",
"--max-worker-restart=0",
"-q",
]
result = subprocess.run(cmd, cwd=_REPO_ROOT, check=False)
Expand Down
46 changes: 32 additions & 14 deletions src/synthorg/api/controllers/coordination.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
from synthorg.api.state import AppState
from synthorg.core.agent import AgentIdentity
from synthorg.core.task import Task
from synthorg.engine.coordination.attribution import (
CoordinationResultWithAttribution,
)

logger = get_logger(__name__)

Expand Down Expand Up @@ -171,7 +174,7 @@ async def coordinate_task(
agent_count=len(agents),
)

result = await self._execute(
attributed = await self._execute(
app_state,
request,
context,
Expand All @@ -188,7 +191,10 @@ async def coordinate_task(
)
currency = DEFAULT_CURRENCY
return ApiResponse(
data=_map_result_to_response(result, currency=currency),
data=_map_result_to_response(
attributed.result,
currency=currency,
),
)

async def _get_task(
Expand Down Expand Up @@ -239,10 +245,10 @@ async def _execute(
request: Request[Any, Any, Any],
context: CoordinationContext,
task_id: str,
) -> CoordinationResult:
) -> CoordinationResultWithAttribution:
"""Run coordination and publish WS events."""
try:
result = await app_state.coordinator.coordinate(context)
attributed = await app_state.coordinator.coordinate(context)
except CoordinationPhaseError as exc:
logger.warning(
API_COORDINATION_FAILED,
Expand Down Expand Up @@ -276,9 +282,12 @@ async def _execute(
)
raise

result = attributed.result
is_success = attributed.is_success

ws_event_type = (
WsEventType.COORDINATION_COMPLETED
if result.is_success
if is_success
else WsEventType.COORDINATION_FAILED
)
_publish_ws_event(
Expand All @@ -287,22 +296,22 @@ async def _execute(
{
"task_id": task_id,
"topology": result.topology.value,
"is_success": result.is_success,
"is_success": is_success,
"total_duration_seconds": result.total_duration_seconds,
},
)
log_event = (
API_COORDINATION_COMPLETED if result.is_success else API_COORDINATION_FAILED
API_COORDINATION_COMPLETED if is_success else API_COORDINATION_FAILED
)
log_fn = logger.info if result.is_success else logger.warning
log_fn = logger.info if is_success else logger.warning
log_fn(
log_event,
task_id=task_id,
topology=result.topology.value,
is_success=result.is_success,
is_success=is_success,
total_duration_seconds=result.total_duration_seconds,
)
Comment on lines 288 to 313
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To improve readability and reduce repetition, you could extract attributed.result and attributed.is_success into local variables at the beginning of this block. This would make the subsequent logic for WebSocket events and logging cleaner and easier to follow.

        result = attributed.result
        is_success = attributed.is_success

        ws_event_type = (
            WsEventType.COORDINATION_COMPLETED
            if is_success
            else WsEventType.COORDINATION_FAILED
        )
        _publish_ws_event(
            request,
            ws_event_type,
            {
                "task_id": task_id,
                "topology": result.topology.value,
                "is_success": is_success,
                "total_duration_seconds": result.total_duration_seconds,
            },
        )
        log_event = (
            API_COORDINATION_COMPLETED
            if is_success
            else API_COORDINATION_FAILED
        )
        log_fn = logger.info if is_success else logger.warning
        log_fn(
            log_event,
            task_id=task_id,
            topology=result.topology.value,
            is_success=is_success,
            total_duration_seconds=result.total_duration_seconds,
        )

return result
return attributed

async def _resolve_agents(
self,
Expand All @@ -326,11 +335,20 @@ async def _resolve_agents(
registry = app_state.agent_registry

if data.agent_names is not None:
results = await asyncio.gather(
*(registry.get_by_name(name) for name in data.agent_names)
)
names = data.agent_names
results: list[AgentIdentity | None] = [None] * len(names)
async with asyncio.TaskGroup() as tg:
for idx, name in enumerate(names):

async def _resolve(
i: int = idx,
n: str = name,
) -> None:
results[i] = await registry.get_by_name(n)

tg.create_task(_resolve())
agents: list[AgentIdentity] = []
for name, agent in zip(data.agent_names, results, strict=True):
for name, agent in zip(names, results, strict=True):
if agent is None:
logger.warning(
API_COORDINATION_AGENT_RESOLVE_FAILED,
Expand Down
4 changes: 4 additions & 0 deletions src/synthorg/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@
AgentContextSnapshot,
)
from synthorg.engine.coordination import (
AgentContribution,
CentralizedDispatcher,
ContextDependentDispatcher,
CoordinationConfig,
CoordinationContext,
CoordinationPhaseResult,
CoordinationResult,
CoordinationResultWithAttribution,
CoordinationWave,
DecentralizedDispatcher,
DispatchResult,
Expand Down Expand Up @@ -225,6 +227,7 @@
"AgentAssignment",
"AgentContext",
"AgentContextSnapshot",
"AgentContribution",
"AgentEngine",
"AgentOutcome",
"AgentRunResult",
Expand Down Expand Up @@ -256,6 +259,7 @@
"CoordinationPhaseError",
"CoordinationPhaseResult",
"CoordinationResult",
"CoordinationResultWithAttribution",
"CoordinationWave",
"CostOptimizedAssignmentStrategy",
"CreateTaskData",
Expand Down
8 changes: 5 additions & 3 deletions src/synthorg/engine/agent_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,11 @@
from synthorg.core.agent import AgentIdentity
from synthorg.core.task import Task
from synthorg.engine.compaction import CompactionCallback
from synthorg.engine.coordination.attribution import (
CoordinationResultWithAttribution,
)
from synthorg.engine.coordination.models import (
CoordinationContext,
CoordinationResult,
)
from synthorg.engine.coordination.service import MultiAgentCoordinator
from synthorg.engine.hybrid_models import HybridLoopConfig
Expand Down Expand Up @@ -452,14 +454,14 @@ def coordinator(self) -> MultiAgentCoordinator | None:
async def coordinate(
self,
context: CoordinationContext,
) -> CoordinationResult:
) -> CoordinationResultWithAttribution:
"""Delegate to the multi-agent coordinator.

Args:
context: Coordination context with task, agents, and config.

Returns:
Coordination result with all phase outcomes.
Coordination result with per-agent attribution data.

Raises:
ExecutionStateError: If no coordinator is configured.
Expand Down
Loading
Loading