From c27db074df5bbf8c6e8405ee938118347fded003 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Thu, 21 May 2026 19:12:24 +0200 Subject: [PATCH 1/8] feat: stakes-aware model routing (#1998) --- src/synthorg/api/app.py | 11 + src/synthorg/config/defaults.py | 1 + src/synthorg/config/schema.py | 7 + src/synthorg/core/enums.py | 55 ++++ src/synthorg/core/task.py | 9 + src/synthorg/engine/agent_engine.py | 30 ++ src/synthorg/engine/decomposition/models.py | 6 + src/synthorg/engine/decomposition/service.py | 18 +- src/synthorg/engine/pipeline/service.py | 25 +- src/synthorg/engine/review_gate.py | 12 + .../engine/routing_policy/__init__.py | 41 +++ src/synthorg/engine/routing_policy/config.py | 122 ++++++++ src/synthorg/engine/routing_policy/factory.py | 95 ++++++ src/synthorg/engine/routing_policy/models.py | 34 +++ .../engine/routing_policy/protocol.py | 26 ++ src/synthorg/engine/routing_policy/router.py | 60 ++++ .../engine/routing_policy/strategies.py | 234 +++++++++++++++ src/synthorg/engine/routing_policy/tiers.py | 36 +++ src/synthorg/engine/stakes/__init__.py | 26 ++ src/synthorg/engine/stakes/config.py | 136 +++++++++ src/synthorg/engine/stakes/factory.py | 43 +++ src/synthorg/engine/stakes/heuristic.py | 91 ++++++ src/synthorg/engine/stakes/protocol.py | 26 ++ .../observability/events/stakes_routing.py | 10 + src/synthorg/workers/runtime_builder.py | 31 ++ .../test_acceptance_comparison.py | 236 +++++++++++++++ .../routing_policy/test_cost_properties.py | 92 ++++++ .../routing_policy/test_engine_integration.py | 114 +++++++ .../engine/routing_policy/test_strategies.py | 281 ++++++++++++++++++ tests/unit/engine/stakes/test_assessor.py | 193 ++++++++++++ tests/unit/engine/stakes/test_propagation.py | 107 +++++++ web/src/api/types/enum-values.gen.ts | 8 + web/src/api/types/openapi.gen.ts | 15 + 33 files changed, 2229 insertions(+), 2 deletions(-) create mode 100644 src/synthorg/engine/routing_policy/__init__.py create mode 100644 src/synthorg/engine/routing_policy/config.py create mode 100644 src/synthorg/engine/routing_policy/factory.py create mode 100644 src/synthorg/engine/routing_policy/models.py create mode 100644 src/synthorg/engine/routing_policy/protocol.py create mode 100644 src/synthorg/engine/routing_policy/router.py create mode 100644 src/synthorg/engine/routing_policy/strategies.py create mode 100644 src/synthorg/engine/routing_policy/tiers.py create mode 100644 src/synthorg/engine/stakes/__init__.py create mode 100644 src/synthorg/engine/stakes/config.py create mode 100644 src/synthorg/engine/stakes/factory.py create mode 100644 src/synthorg/engine/stakes/heuristic.py create mode 100644 src/synthorg/engine/stakes/protocol.py create mode 100644 src/synthorg/observability/events/stakes_routing.py create mode 100644 tests/unit/engine/routing_policy/test_acceptance_comparison.py create mode 100644 tests/unit/engine/routing_policy/test_cost_properties.py create mode 100644 tests/unit/engine/routing_policy/test_engine_integration.py create mode 100644 tests/unit/engine/routing_policy/test_strategies.py create mode 100644 tests/unit/engine/stakes/test_assessor.py create mode 100644 tests/unit/engine/stakes/test_propagation.py diff --git a/src/synthorg/api/app.py b/src/synthorg/api/app.py index 8e9abcf31c..b74ef4e3f8 100644 --- a/src/synthorg/api/app.py +++ b/src/synthorg/api/app.py @@ -1290,6 +1290,17 @@ async def _install_runtime_services() -> None: and app_state.review_gate_service is not None ): app_state.review_gate_service.set_vision_gate(services.vision_gate) + # Same seam for the adversarial red-team gate: built in the + # runtime wiring once the boot engine exists, attached here so a + # review pipeline supplied with red_team_input reaches the live + # gate. ``None`` when the red-team subsystem is disabled. + if ( + services.red_team_runtime is not None + and app_state.review_gate_service is not None + ): + app_state.review_gate_service.set_red_team_gate( + services.red_team_runtime.gate, + ) # Bring the real client-request, goal/objective, and # task-board work-entry paths online: ensure the configured # default projects exist and attach the entry adapters. No-op diff --git a/src/synthorg/config/defaults.py b/src/synthorg/config/defaults.py index e9c9ae036e..4f52b67fcf 100644 --- a/src/synthorg/config/defaults.py +++ b/src/synthorg/config/defaults.py @@ -22,6 +22,7 @@ def default_config_dict() -> dict[str, object]: "communication": {}, "providers": {}, "routing": {}, + "stakes_routing": {}, "logging": None, "graceful_shutdown": {}, "workflow_handoffs": [], diff --git a/src/synthorg/config/schema.py b/src/synthorg/config/schema.py index bdc4a28c7a..c7c03fcb74 100644 --- a/src/synthorg/config/schema.py +++ b/src/synthorg/config/schema.py @@ -27,6 +27,7 @@ from synthorg.core.role import CustomRole # noqa: TC001 from synthorg.core.types import NotBlankStr # noqa: TC001 from synthorg.engine.coordination.section_config import CoordinationSectionConfig +from synthorg.engine.routing_policy.config import StakesRoutingConfig from synthorg.engine.strategy.models import StrategyConfig from synthorg.engine.task_engine_config import TaskEngineConfig from synthorg.engine.workflow.config import WorkflowConfig @@ -386,6 +387,8 @@ class RootConfig(BaseModel): communication: Communication configuration. providers: LLM provider configurations keyed by provider name. routing: Model routing configuration. + stakes_routing: Stakes-aware model routing configuration (strategy + discriminator, per-stakes quality floors, coordination nudge). logging: Logging configuration (``None`` to use platform defaults). graceful_shutdown: Graceful shutdown configuration. workflow_handoffs: Cross-department workflow handoffs. @@ -472,6 +475,10 @@ class RootConfig(BaseModel): default_factory=RoutingConfig, description="Model routing configuration", ) + stakes_routing: StakesRoutingConfig = Field( + default_factory=StakesRoutingConfig, + description="Stakes-aware model routing configuration", + ) logging: LogConfig | None = Field( default=None, description="Logging configuration", diff --git a/src/synthorg/core/enums.py b/src/synthorg/core/enums.py index 5ce00168aa..daeef42dd8 100644 --- a/src/synthorg/core/enums.py +++ b/src/synthorg/core/enums.py @@ -356,6 +356,61 @@ class Complexity(StrEnum): EPIC = "epic" +class Stakes(StrEnum): + """How consequential a subtask or task is for stakes-aware routing. + + Distinct from :class:`Priority` (urgency/importance) and + :class:`Complexity` (effort): stakes captures the *cost of being + wrong*. Low-stakes work tolerates a cheap model; high-stakes work + (architecture, irreversible decisions) warrants a strong model and + an adversarial red-team review. The authoritative ordering lives in + ``_STAKES_ORDER`` below. + """ + + LOW = "low" + NORMAL = "normal" + HIGH = "high" + CRITICAL = "critical" + + +# Ordering: LOW (least consequential) < NORMAL < HIGH < CRITICAL. +_STAKES_ORDER: tuple[Stakes, ...] = tuple(Stakes) + +# Guard against silent breakage if the enum is reordered or extended +# without updating the ordering tuple (mirrors _SENIORITY_ORDER). +_stakes_members = set(Stakes) +_stakes_order_set = set(_STAKES_ORDER) +if _stakes_order_set != _stakes_members: + _missing_stakes = _stakes_members - _stakes_order_set + _extra_stakes = _stakes_order_set - _stakes_members + _stakes_msg = ( + f"_STAKES_ORDER is out of sync with Stakes: " + f"missing={_missing_stakes}, extra={_extra_stakes}" + ) + raise RuntimeError(_stakes_msg) +del _stakes_members, _stakes_order_set + +_STAKES_RANK: dict[Stakes, int] = { + level: idx for idx, level in enumerate(_STAKES_ORDER) +} + + +def compare_stakes(a: Stakes, b: Stakes) -> int: + """Compare two stakes levels. + + Returns negative if *a* is lower-stakes than *b*, zero if equal, + positive if *a* is higher-stakes than *b*. + + Args: + a: First stakes level. + b: Second stakes level. + + Returns: + Integer indicating relative stakes. + """ + return _STAKES_RANK[a] - _STAKES_RANK[b] + + class WorkflowType(StrEnum): """Workflow type for organizing task execution. diff --git a/src/synthorg/core/task.py b/src/synthorg/core/task.py index 83a918c7cf..a5d7d93d99 100644 --- a/src/synthorg/core/task.py +++ b/src/synthorg/core/task.py @@ -13,6 +13,7 @@ Complexity, CoordinationTopology, Priority, + Stakes, TaskSource, TaskStatus, TaskStructure, @@ -128,6 +129,14 @@ class Task(BaseModel): default=Complexity.MEDIUM, description="Task complexity estimate", ) + stakes: Stakes = Field( + default=Stakes.NORMAL, + description=( + "How consequential this task is, driving stakes-aware model" + " routing (cheap model for low stakes, strong model plus" + " red-team for high/critical stakes)" + ), + ) budget_limit: float = Field( default=0.0, ge=0.0, diff --git a/src/synthorg/engine/agent_engine.py b/src/synthorg/engine/agent_engine.py index 0dc354a5f6..2961cf971a 100644 --- a/src/synthorg/engine/agent_engine.py +++ b/src/synthorg/engine/agent_engine.py @@ -83,6 +83,7 @@ from synthorg.engine.plan_models import PlanExecuteConfig from synthorg.engine.prompt import SystemPrompt from synthorg.engine.recovery import RecoveryStrategy + from synthorg.engine.routing_policy.router import StakesRouter from synthorg.engine.session import EventReader from synthorg.engine.stagnation.protocol import StagnationDetector from synthorg.engine.task_engine import TaskEngine @@ -200,6 +201,7 @@ def __init__( # noqa: PLR0913, PLR0915 interrupt_store: InterruptStore | None = None, approval_interrupt_timeout_seconds: float | None = None, external_api_runtime: ExternalApiRuntime | None = None, + stakes_router: StakesRouter | None = None, clock: Clock | None = None, ) -> None: self._agent_middleware_chain = agent_middleware_chain @@ -238,6 +240,7 @@ def __init__( # noqa: PLR0913, PLR0915 # the agent's registry. ``None`` (mode DISABLED) is a no-op. self._mcp_self_consumer = mcp_self_consumer self._approval_interrupt_timeout_seconds = approval_interrupt_timeout_seconds + self._stakes_router = stakes_router self._stagnation_detector = stagnation_detector self._auto_loop_config = auto_loop_config self._hybrid_loop_config = hybrid_loop_config @@ -352,6 +355,25 @@ async def coordinate( raise ExecutionStateError(msg) return await self._coordinator.coordinate(context) + async def _route_stakes( + self, + identity: AgentIdentity, + task: Task, + ) -> AgentIdentity: + """Apply stakes-aware routing, returning the adjusted identity. + + Delegates to the injected :class:`StakesRouter` to pick a model + tier matched to ``task.stakes``. The red-team requirement carried + on the decision is consumed downstream by the review pipeline, + which derives it from the persisted ``task.stakes``; this method + only adjusts the model the subtask runs with. + """ + assert self._stakes_router is not None # noqa: S101 # caller checks + decision = await self._stakes_router.route(task=task, identity=identity) + if decision.selected_model == identity.model: + return identity + return identity.model_copy(update={"model": decision.selected_model}) + async def run( # noqa: PLR0913, C901 self, *, @@ -402,6 +424,14 @@ async def run( # noqa: PLR0913, C901 max_turns=max_turns, ) + # Stakes-aware routing runs BEFORE the budget block: it + # sets the target tier from the task's stakes, then the + # budget auto-downgrade below may lower it further when + # budget is tight (a hard ceiling must win over a stakes + # upgrade). + if self._stakes_router is not None: + identity = await self._route_stakes(identity, task) + if self._budget_enforcer: preflight = await self._budget_enforcer.check_can_execute( agent_id, diff --git a/src/synthorg/engine/decomposition/models.py b/src/synthorg/engine/decomposition/models.py index 6961206599..4c09a865dd 100644 --- a/src/synthorg/engine/decomposition/models.py +++ b/src/synthorg/engine/decomposition/models.py @@ -12,6 +12,7 @@ from synthorg.core.enums import ( Complexity, CoordinationTopology, + Stakes, TaskStatus, TaskStructure, ) @@ -28,6 +29,7 @@ class SubtaskDefinition(BaseModel): description: Detailed subtask description. dependencies: IDs of other subtasks this one depends on. estimated_complexity: Complexity estimate for routing. + stakes: Stakes level for stakes-aware model routing. required_skills: Skill IDs needed for routing. required_tags: Tags needed for multi-faceted routing match. When set, the routing scorer awards a small bonus to agents whose @@ -49,6 +51,10 @@ class SubtaskDefinition(BaseModel): default=Complexity.MEDIUM, description="Complexity estimate for routing", ) + stakes: Stakes = Field( + default=Stakes.NORMAL, + description="Stakes level for stakes-aware model routing", + ) required_skills: tuple[NotBlankStr, ...] = Field( default=(), description="Skill IDs needed for routing", diff --git a/src/synthorg/engine/decomposition/service.py b/src/synthorg/engine/decomposition/service.py index a75f651e7f..48f3a6cb2e 100644 --- a/src/synthorg/engine/decomposition/service.py +++ b/src/synthorg/engine/decomposition/service.py @@ -14,6 +14,7 @@ SubtaskStatusRollup, ) from synthorg.engine.decomposition.rollup import StatusRollup +from synthorg.engine.stakes import build_stakes_assessor from synthorg.observability import get_logger from synthorg.observability.events.decomposition import ( DECOMPOSITION_COMPLETED, @@ -27,6 +28,7 @@ from synthorg.engine.decomposition.classifier import TaskStructureClassifier from synthorg.engine.decomposition.models import DecompositionContext from synthorg.engine.decomposition.protocol import DecompositionStrategy + from synthorg.engine.stakes.protocol import StakesAssessor logger = get_logger(__name__) @@ -38,15 +40,17 @@ class DecompositionService: DAG validator, and task factory to produce executable subtasks. """ - __slots__ = ("_classifier", "_strategy") + __slots__ = ("_classifier", "_stakes_assessor", "_strategy") def __init__( self, strategy: DecompositionStrategy, classifier: TaskStructureClassifier, + stakes_assessor: StakesAssessor | None = None, ) -> None: self._strategy = strategy self._classifier = classifier + self._stakes_assessor = stakes_assessor or build_stakes_assessor() async def decompose_task( self, @@ -115,6 +119,17 @@ async def _do_decompose( graph = DependencyGraph(plan.subtasks) graph.validate() + # 3b. Assess per-subtask stakes and stamp it onto both the plan + # subtasks and the tasks created from them, so the plan and the + # executable tasks agree on stakes for the routing layer. + assessed_subtasks = tuple( + st.model_copy( + update={"stakes": self._stakes_assessor.assess_subtask(st)}, + ) + for st in plan.subtasks + ) + plan = plan.model_copy(update={"subtasks": assessed_subtasks}) + # 4. Create Task objects created_tasks: list[Task] = [] for subtask_def in plan.subtasks: @@ -131,6 +146,7 @@ async def _do_decompose( dependencies=subtask_def.dependencies, status=TaskStatus.CREATED, estimated_complexity=subtask_def.estimated_complexity, + stakes=subtask_def.stakes, ) created_tasks.append(child_task) logger.debug( diff --git a/src/synthorg/engine/pipeline/service.py b/src/synthorg/engine/pipeline/service.py index 1af62d16cc..ccdc4255c5 100644 --- a/src/synthorg/engine/pipeline/service.py +++ b/src/synthorg/engine/pipeline/service.py @@ -31,6 +31,7 @@ WorkPhaseResult, WorkPipelineResult, ) +from synthorg.engine.stakes import build_stakes_assessor from synthorg.observability import get_logger, safe_error_description from synthorg.observability.events.pipeline import ( PIPELINE_PHASE_COMPLETED, @@ -49,6 +50,7 @@ from synthorg.engine.intake.engine import IntakeEngine from synthorg.engine.pipeline.policy.protocol import WorkRoutingPolicy from synthorg.engine.routing.scorer import AgentTaskScorer + from synthorg.engine.stakes.protocol import StakesAssessor from synthorg.engine.task_engine import TaskEngine from synthorg.hr.registry import AgentRegistryService from synthorg.persistence.project_protocol import ProjectRepository @@ -91,6 +93,7 @@ class DefaultWorkPipeline: "_project_repository", "_routing_policy", "_scorer", + "_stakes_assessor", "_task_engine", "_worker_execution_service", ) @@ -107,6 +110,7 @@ def __init__( # noqa: PLR0913 -- keyword-only dependency injection coordinator: MultiAgentCoordinator | None, agent_registry: AgentRegistryService, clock: Clock | None = None, + stakes_assessor: StakesAssessor | None = None, ) -> None: self._intake_engine = intake_engine self._task_engine = task_engine @@ -117,6 +121,7 @@ def __init__( # noqa: PLR0913 -- keyword-only dependency injection self._coordinator = coordinator self._agent_registry = agent_registry self._clock = clock if clock is not None else SystemClock() + self._stakes_assessor = stakes_assessor or build_stakes_assessor() async def run(self, work_item: WorkItem) -> WorkPipelineResult: """Drive *work_item* through the full spine (see module docstring).""" @@ -242,7 +247,25 @@ async def _intake(self, work_item: WorkItem) -> Task: if task is None: msg = f"intake reported task {result.task_id!r} but it is not persisted" raise WorkIntakeRejectedError(msg) - return await self._link_forecast(task, work_item) + task = await self._link_forecast(task, work_item) + return await self._assess_stakes(task, work_item) + + async def _assess_stakes(self, task: Task, work_item: WorkItem) -> Task: + """Assess and stamp parent-task stakes for the LEAF (solo) path. + + The decomposition service assesses each subtask on the team path, + but a LEAF task is executed directly without decomposition, so the + parent task itself must carry its stakes for the routing layer. + Stamped here, at the single intake funnel, so both paths converge. + """ + stakes = self._stakes_assessor.assess_task(task) + if stakes is task.stakes: + return task + return await self._task_engine.update_task( + task.id, + {"stakes": stakes}, + requested_by=work_item.requested_by, + ) async def _link_forecast(self, task: Task, work_item: WorkItem) -> Task: """Stamp the approved forecast id + ceiling onto the task. diff --git a/src/synthorg/engine/review_gate.py b/src/synthorg/engine/review_gate.py index 8d2d44ec19..044118cc71 100644 --- a/src/synthorg/engine/review_gate.py +++ b/src/synthorg/engine/review_gate.py @@ -107,6 +107,18 @@ def set_vision_gate(self, vision_gate: VisionVerifierGate) -> None: """ self._vision_gate = vision_gate + def set_red_team_gate(self, red_team_gate: RedTeamGate) -> None: + """Attach the red-team gate after construction (boot wiring seam). + + Mirrors :meth:`set_vision_gate`: the red-team runtime is built in + on-startup wiring once the boot ``AgentEngine`` exists, after this + service is constructed during app construction. Callers that pass + ``red_team_input`` to :meth:`run_pipeline` then reach the live + gate; building that input from a completed task's deliverable is + the review-pipeline integration's responsibility, not this seam. + """ + self._red_team_gate = red_team_gate + async def check_can_decide( self, *, diff --git a/src/synthorg/engine/routing_policy/__init__.py b/src/synthorg/engine/routing_policy/__init__.py new file mode 100644 index 0000000000..e8e25047a0 --- /dev/null +++ b/src/synthorg/engine/routing_policy/__init__.py @@ -0,0 +1,41 @@ +"""Stakes-aware model routing. + +Given a task's :class:`~synthorg.core.enums.Stakes` and an agent's +configured model, the routing layer picks a model tier matched to the +stakes: the cheapest tier that clears a benchmark-derived quality floor +for low/normal-stakes work, and a strong tier (plus a red-team review +mark) for high/critical-stakes work. Coordination metrics nudge the +choice upward when recent runs show error amplification or overhead. + +The decision is a pure function of the task, the injected +:class:`~synthorg.budget.benchmark_protocol.BenchmarkScoreProvider` +scores, recent +:class:`~synthorg.budget.coordination_store.CoordinationMetricsStore` +records, the agent identity, and the configured floors. It composes +with (runs before) the existing budget auto-downgrade, which may lower +the tier further when budget is tight. +""" + +from synthorg.engine.routing_policy.config import ( + QualityFloors, + StakesRoutingConfig, +) +from synthorg.engine.routing_policy.factory import build_stakes_router +from synthorg.engine.routing_policy.models import StakesRoutingDecision +from synthorg.engine.routing_policy.protocol import StakesRoutingStrategy +from synthorg.engine.routing_policy.router import StakesRouter +from synthorg.engine.routing_policy.strategies import ( + FlatStrategy, + StakesAwareStrategy, +) + +__all__ = [ + "FlatStrategy", + "QualityFloors", + "StakesAwareStrategy", + "StakesRouter", + "StakesRoutingConfig", + "StakesRoutingDecision", + "StakesRoutingStrategy", + "build_stakes_router", +] diff --git a/src/synthorg/engine/routing_policy/config.py b/src/synthorg/engine/routing_policy/config.py new file mode 100644 index 0000000000..cfbd512f61 --- /dev/null +++ b/src/synthorg/engine/routing_policy/config.py @@ -0,0 +1,122 @@ +"""Configuration for stakes-aware model routing. + +``StakesRoutingConfig`` carries the per-stakes quality floors, the +coordination-metrics nudge thresholds, the red-team stakes threshold, +and the ``strategy`` discriminator dispatched by ``build_stakes_router``. + +The default floors track the calibrated tier bands of +:class:`~synthorg.budget.benchmark_stub.StubBenchmarkScoreProvider` +(small 72, medium 85, large 92): a low floor admits the cheapest tier, +a normal floor requires at least medium, and a high/critical floor +requires the large tier. +""" + +from typing import Final, Self + +from pydantic import BaseModel, ConfigDict, Field, model_validator + +from synthorg.core.enums import Stakes + +# Per-stakes benchmark quality floors (0 to 100). A model tier is a +# candidate only when its benchmark score clears the floor for the +# subtask's stakes level. +_FLOOR_LOW: Final[float] = 72.0 +_FLOOR_NORMAL: Final[float] = 80.0 +_FLOOR_HIGH: Final[float] = 88.0 +_FLOOR_CRITICAL: Final[float] = 88.0 + +# Coordination-metrics nudge thresholds. When recent runs for a task +# show error amplification above this ratio (multi-agent error rate +# divided by single-agent baseline) or overhead above this percentage, +# the routing tier is bumped one step up. +_ERROR_AMPLIFICATION_THRESHOLD: Final[float] = 1.5 +_OVERHEAD_THRESHOLD_PERCENT: Final[float] = 50.0 +_COORDINATION_LOOKBACK: Final[int] = 5 + +_FLOOR_MIN: Final[float] = 0.0 +_FLOOR_MAX: Final[float] = 100.0 + + +class QualityFloors(BaseModel): + """Per-stakes minimum benchmark score a model tier must clear. + + Attributes: + low: Floor for LOW-stakes subtasks. + normal: Floor for NORMAL-stakes subtasks. + high: Floor for HIGH-stakes subtasks. + critical: Floor for CRITICAL-stakes subtasks. + """ + + model_config = ConfigDict(frozen=True, allow_inf_nan=False, extra="forbid") + + low: float = Field(default=_FLOOR_LOW, ge=_FLOOR_MIN, le=_FLOOR_MAX) + normal: float = Field(default=_FLOOR_NORMAL, ge=_FLOOR_MIN, le=_FLOOR_MAX) + high: float = Field(default=_FLOOR_HIGH, ge=_FLOOR_MIN, le=_FLOOR_MAX) + critical: float = Field(default=_FLOOR_CRITICAL, ge=_FLOOR_MIN, le=_FLOOR_MAX) + + def for_stakes(self, stakes: Stakes) -> float: + """Return the quality floor for *stakes*.""" + return { + Stakes.LOW: self.low, + Stakes.NORMAL: self.normal, + Stakes.HIGH: self.high, + Stakes.CRITICAL: self.critical, + }[stakes] + + +class StakesRoutingConfig(BaseModel): + """Configuration for the stakes-aware routing strategy. + + Attributes: + strategy: Discriminator selecting the routing strategy + (``"stakes_aware"`` default, or ``"flat"`` for the no-op + control / opt-out). + quality_floors: Per-stakes benchmark quality floors. + red_team_min_stakes: Lowest stakes level that requires the + red-team gate and forbids downgrading below the agent's + configured tier. + error_amplification_threshold: Coordination error-amplification + ratio above which the tier is nudged up. + overhead_threshold_percent: Coordination overhead percentage + above which the tier is nudged up. + coordination_lookback: Number of recent coordination records to + inspect for the nudge. + """ + + model_config = ConfigDict(frozen=True, allow_inf_nan=False, extra="forbid") + + strategy: str = Field( + default="stakes_aware", + description="Routing strategy discriminator", + ) + quality_floors: QualityFloors = Field( + default_factory=QualityFloors, + description="Per-stakes benchmark quality floors", + ) + red_team_min_stakes: Stakes = Field( + default=Stakes.HIGH, + description="Lowest stakes requiring the red-team gate", + ) + error_amplification_threshold: float = Field( + default=_ERROR_AMPLIFICATION_THRESHOLD, + gt=0.0, + description="Error-amplification ratio that triggers a tier nudge", + ) + overhead_threshold_percent: float = Field( + default=_OVERHEAD_THRESHOLD_PERCENT, + ge=0.0, + description="Coordination overhead percentage that triggers a nudge", + ) + coordination_lookback: int = Field( + default=_COORDINATION_LOOKBACK, + ge=1, + description="Recent coordination records inspected for the nudge", + ) + + @model_validator(mode="after") + def _validate_strategy(self) -> Self: + """Reject a blank strategy discriminator.""" + if not self.strategy.strip(): + msg = "strategy must not be blank" + raise ValueError(msg) + return self diff --git a/src/synthorg/engine/routing_policy/factory.py b/src/synthorg/engine/routing_policy/factory.py new file mode 100644 index 0000000000..ad32677a74 --- /dev/null +++ b/src/synthorg/engine/routing_policy/factory.py @@ -0,0 +1,95 @@ +"""Factory for building a stakes router from config. + +Dispatches on ``StakesRoutingConfig.strategy`` via a ``StrategyRegistry`` +(mirrors ``loop_selector._LOOP_REGISTRY``). The ``stakes_aware`` strategy +requires a benchmark provider; ``flat`` needs no dependencies. +""" + +from typing import TYPE_CHECKING + +from synthorg.core.registry import StrategyRegistry +from synthorg.engine.routing_policy.config import StakesRoutingConfig +from synthorg.engine.routing_policy.protocol import ( # noqa: TC001 -- registry generic + annotations + StakesRoutingStrategy, +) +from synthorg.engine.routing_policy.router import StakesRouter +from synthorg.engine.routing_policy.strategies import FlatStrategy, StakesAwareStrategy + +if TYPE_CHECKING: + from synthorg.budget.benchmark_protocol import BenchmarkScoreProvider + from synthorg.budget.coordination_store import CoordinationMetricsStore + from synthorg.providers.routing.resolver import ModelResolver + + +def _build_flat( + *, + config: StakesRoutingConfig, + benchmark_provider: BenchmarkScoreProvider | None = None, + resolver: ModelResolver | None = None, + coordination_store: CoordinationMetricsStore | None = None, +) -> StakesRoutingStrategy: + del config, benchmark_provider, resolver, coordination_store + return FlatStrategy() + + +def _build_stakes_aware( + *, + config: StakesRoutingConfig, + benchmark_provider: BenchmarkScoreProvider | None = None, + resolver: ModelResolver | None = None, + coordination_store: CoordinationMetricsStore | None = None, +) -> StakesRoutingStrategy: + if benchmark_provider is None: + msg = "stakes_aware routing requires a benchmark score provider" + raise ValueError(msg) + return StakesAwareStrategy( + benchmark_provider=benchmark_provider, + config=config, + resolver=resolver, + coordination_store=coordination_store, + ) + + +_STRATEGY_REGISTRY: StrategyRegistry[StakesRoutingStrategy] = StrategyRegistry( + { + "stakes_aware": _build_stakes_aware, + "flat": _build_flat, + }, + kind="stakes_routing_strategy", +) + + +def build_stakes_router( + config: StakesRoutingConfig | None = None, + *, + benchmark_provider: BenchmarkScoreProvider | None = None, + resolver: ModelResolver | None = None, + coordination_store: CoordinationMetricsStore | None = None, +) -> StakesRouter: + """Build a :class:`StakesRouter` from *config*. + + Args: + config: Routing config; defaults to the ``stakes_aware`` strategy. + benchmark_provider: Per-model quality scores (required for + ``stakes_aware``). + resolver: Tier-to-model resolver. When absent, ``stakes_aware`` + applies only the red-team mark. + coordination_store: Recent coordination metrics for the nudge. + + Returns: + A configured stakes router. + + Raises: + StrategyFactoryNotFoundError: If ``config.strategy`` is unknown. + ValueError: If ``stakes_aware`` is selected without a benchmark + provider. + """ + cfg = config or StakesRoutingConfig() + strategy = _STRATEGY_REGISTRY.build( + cfg.strategy, + config=cfg, + benchmark_provider=benchmark_provider, + resolver=resolver, + coordination_store=coordination_store, + ) + return StakesRouter(strategy) diff --git a/src/synthorg/engine/routing_policy/models.py b/src/synthorg/engine/routing_policy/models.py new file mode 100644 index 0000000000..603643e15d --- /dev/null +++ b/src/synthorg/engine/routing_policy/models.py @@ -0,0 +1,34 @@ +"""Domain models for stakes-aware model routing.""" + +from pydantic import BaseModel, ConfigDict, Field + +from synthorg.core.agent import ModelConfig # noqa: TC001 +from synthorg.core.enums import Stakes # noqa: TC001 -- runtime field type (Pydantic) +from synthorg.core.types import NotBlankStr # noqa: TC001 + + +class StakesRoutingDecision(BaseModel): + """Outcome of a stakes-aware routing decision. + + Attributes: + selected_model: The model config to run the subtask with. For a + no-op (flat routing, or no adjustment warranted) this equals + the agent's incoming model. + red_team_required: Whether the deliverable must pass the + adversarial red-team gate before completion. Set for stakes + at or above the configured threshold. + stakes: The stakes level that drove the decision. + reason: Human-readable explanation for surfacing/audit. + source: Machine-readable provenance, e.g. ``"stakes_aware:floor"``, + ``"stakes_aware:nudge"``, ``"stakes_aware:noop"``, ``"flat"``. + """ + + model_config = ConfigDict(frozen=True, allow_inf_nan=False, extra="forbid") + + selected_model: ModelConfig = Field(description="Model to execute with") + red_team_required: bool = Field( + description="Whether the red-team gate must run for this subtask", + ) + stakes: Stakes = Field(description="Stakes level driving the decision") + reason: NotBlankStr = Field(description="Human-readable explanation") + source: NotBlankStr = Field(description="Machine-readable decision provenance") diff --git a/src/synthorg/engine/routing_policy/protocol.py b/src/synthorg/engine/routing_policy/protocol.py new file mode 100644 index 0000000000..438e8d0759 --- /dev/null +++ b/src/synthorg/engine/routing_policy/protocol.py @@ -0,0 +1,26 @@ +"""Pluggable stakes-aware routing strategy protocol.""" + +from typing import TYPE_CHECKING, Protocol, runtime_checkable + +if TYPE_CHECKING: + from synthorg.core.agent import AgentIdentity + from synthorg.core.task import Task + from synthorg.engine.routing_policy.models import StakesRoutingDecision + + +@runtime_checkable +class StakesRoutingStrategy(Protocol): + """Picks a model (and red-team requirement) from a task's stakes. + + Implementations are deterministic given their injected score and + metric sources, so the cost/quality comparison test is reproducible. + """ + + async def route( + self, + *, + task: Task, + identity: AgentIdentity, + ) -> StakesRoutingDecision: + """Return the routing decision for *task* run by *identity*.""" + ... diff --git a/src/synthorg/engine/routing_policy/router.py b/src/synthorg/engine/routing_policy/router.py new file mode 100644 index 0000000000..eb931e58da --- /dev/null +++ b/src/synthorg/engine/routing_policy/router.py @@ -0,0 +1,60 @@ +"""Stakes router service: delegates to a strategy and logs decisions.""" + +from typing import TYPE_CHECKING + +from synthorg.observability import get_logger +from synthorg.observability.events.stakes_routing import ( + STAKES_ROUTING_DECIDED, + STAKES_ROUTING_RED_TEAM_MARKED, +) + +if TYPE_CHECKING: + from synthorg.core.agent import AgentIdentity + from synthorg.core.task import Task + from synthorg.engine.routing_policy.models import StakesRoutingDecision + from synthorg.engine.routing_policy.protocol import StakesRoutingStrategy + +logger = get_logger(__name__) + + +class StakesRouter: + """Injectable seam wrapping a :class:`StakesRoutingStrategy`. + + The engine calls :meth:`route` before the budget auto-downgrade to + obtain the stakes-adjusted model and the red-team requirement. + + Args: + strategy: The routing strategy to delegate to. + """ + + __slots__ = ("_strategy",) + + def __init__(self, strategy: StakesRoutingStrategy) -> None: + self._strategy = strategy + + async def route( + self, + *, + task: Task, + identity: AgentIdentity, + ) -> StakesRoutingDecision: + """Return the stakes-aware routing decision for *task*.""" + decision = await self._strategy.route(task=task, identity=identity) + logger.info( + STAKES_ROUTING_DECIDED, + task_id=task.id, + agent_id=str(identity.id), + stakes=decision.stakes.value, + from_model=identity.model.model_id, + to_model=decision.selected_model.model_id, + to_tier=decision.selected_model.model_tier, + source=decision.source, + red_team_required=decision.red_team_required, + ) + if decision.red_team_required: + logger.info( + STAKES_ROUTING_RED_TEAM_MARKED, + task_id=task.id, + stakes=decision.stakes.value, + ) + return decision diff --git a/src/synthorg/engine/routing_policy/strategies.py b/src/synthorg/engine/routing_policy/strategies.py new file mode 100644 index 0000000000..d04eb3251b --- /dev/null +++ b/src/synthorg/engine/routing_policy/strategies.py @@ -0,0 +1,234 @@ +"""Stakes-aware and flat routing strategies.""" + +from typing import TYPE_CHECKING + +from synthorg.core.agent import ModelConfig +from synthorg.core.enums import Stakes, compare_stakes +from synthorg.core.types import ModelTier # noqa: TC001 +from synthorg.engine.routing_policy.config import StakesRoutingConfig +from synthorg.engine.routing_policy.models import StakesRoutingDecision +from synthorg.engine.routing_policy.tiers import ( + TIER_LADDER, + bump_one, + higher_tier, +) +from synthorg.observability import get_logger +from synthorg.observability.events.stakes_routing import ( + STAKES_ROUTING_COORD_NUDGE, + STAKES_ROUTING_TIER_UNRESOLVABLE, +) + +if TYPE_CHECKING: + from synthorg.budget.benchmark_protocol import BenchmarkScoreProvider + from synthorg.budget.coordination_store import CoordinationMetricsStore + from synthorg.core.agent import AgentIdentity + from synthorg.core.task import Task + from synthorg.providers.routing.models import ResolvedModel + from synthorg.providers.routing.resolver import ModelResolver + +logger = get_logger(__name__) + + +class FlatStrategy: + """No-op routing: keeps the agent's configured model, no red-team. + + This is today's behaviour. It is the control arm of the cost/quality + comparison test and the opt-out selectable via the ``flat`` + discriminator. + """ + + async def route( + self, + *, + task: Task, + identity: AgentIdentity, + ) -> StakesRoutingDecision: + """Return a decision that leaves the model unchanged.""" + return StakesRoutingDecision( + selected_model=identity.model, + red_team_required=False, + stakes=task.stakes, + reason="flat routing: no stakes-based adjustment", + source="flat", + ) + + +class StakesAwareStrategy: + """Route by stakes, with a coordination nudge and a red-team mark. + + Picks the cheapest model tier whose benchmark score clears the + per-stakes quality floor, bumps it up when recent coordination + metrics look unhealthy, and marks high/critical work for the + red-team gate. Deterministic given the injected benchmark scores and + coordination records; performs no wall-clock reads or live provider + calls. + + Args: + benchmark_provider: Source of per-model quality scores. + config: Per-stakes floors, nudge thresholds, and red-team + threshold. + resolver: Resolves a tier alias to a concrete model. When + ``None``, the model cannot be adjusted; only the red-team + mark is applied. + coordination_store: Recent coordination metrics for the nudge. + When ``None``, the nudge is skipped. + """ + + def __init__( + self, + *, + benchmark_provider: BenchmarkScoreProvider, + config: StakesRoutingConfig | None = None, + resolver: ModelResolver | None = None, + coordination_store: CoordinationMetricsStore | None = None, + ) -> None: + self._benchmark_provider = benchmark_provider + self._config = config or StakesRoutingConfig() + self._resolver = resolver + self._coordination_store = coordination_store + + async def route( + self, + *, + task: Task, + identity: AgentIdentity, + ) -> StakesRoutingDecision: + """Pick a model tier matched to ``task.stakes`` (see class docstring).""" + stakes = task.stakes + red_team_required = ( + compare_stakes(stakes, self._config.red_team_min_stakes) >= 0 + ) + current_tier = identity.model.model_tier + + floor = self._config.quality_floors.for_stakes(stakes) + target_tier = await self._cheapest_tier_meeting_floor(floor) + + nudged = False + if target_tier is not None and self._coordination_unhealthy(task.id): + bumped = bump_one(target_tier) + if bumped != target_tier: + logger.info( + STAKES_ROUTING_COORD_NUDGE, + task_id=task.id, + from_tier=target_tier, + to_tier=bumped, + ) + nudged = True + target_tier = bumped + + # High/critical work must never run below the agent's own tier. + if red_team_required and target_tier is not None and current_tier is not None: + target_tier = higher_tier(target_tier, current_tier) + + return self._build_decision( + identity=identity, + stakes=stakes, + red_team_required=red_team_required, + target_tier=target_tier, + nudged=nudged, + floor=floor, + ) + + def _build_decision( # noqa: PLR0913 -- keyword-only assembly inputs + self, + *, + identity: AgentIdentity, + stakes: Stakes, + red_team_required: bool, + target_tier: ModelTier | None, + nudged: bool, + floor: float, + ) -> StakesRoutingDecision: + """Assemble the decision, resolving the target tier to a model.""" + current = identity.model + selected_model = current + source = "stakes_aware:noop" + reason = ( + f"stakes={stakes.value}: kept {current.model_tier or 'configured'} tier" + ) + + resolved = self._resolve_tier(target_tier) if target_tier is not None else None + changed = ( + resolved is not None + and target_tier is not None + and ( + resolved.model_id != current.model_id + or target_tier != current.model_tier + ) + ) + if changed and resolved is not None and target_tier is not None: + selected_model = ModelConfig( + provider=resolved.provider_name, + model_id=resolved.model_id, + temperature=current.temperature, + max_tokens=current.max_tokens, + fallback_model=current.fallback_model, + model_tier=target_tier, + ) + source = "stakes_aware:nudge" if nudged else "stakes_aware:floor" + reason = ( + f"stakes={stakes.value}: routed to {target_tier} tier (floor {floor:g})" + ) + + return StakesRoutingDecision( + selected_model=selected_model, + red_team_required=red_team_required, + stakes=stakes, + reason=reason, + source=source, + ) + + async def _cheapest_tier_meeting_floor(self, floor: float) -> ModelTier | None: + """Return the cheapest tier whose benchmark score clears *floor*. + + Falls back to the strongest resolvable tier when none clears the + floor, and to ``None`` when no tier resolves at all (no resolver + wired, or the provider catalogue lacks the canonical tiers). + """ + if self._resolver is None: + return None + strongest_resolvable: ModelTier | None = None + for tier in TIER_LADDER: + resolved = self._resolver.resolve_safe(tier) + if resolved is None: + continue + strongest_resolvable = tier + score = await self._benchmark_provider.get_score(resolved.model_id) + if score is not None and score.score >= floor: + return tier + if strongest_resolvable is None: + logger.warning( + STAKES_ROUTING_TIER_UNRESOLVABLE, + floor=floor, + reason="no_tier_resolved", + ) + return strongest_resolvable + + def _resolve_tier(self, tier: ModelTier) -> ResolvedModel | None: + """Resolve a tier alias to a model, or ``None``.""" + if self._resolver is None: + return None + return self._resolver.resolve_safe(tier) + + def _coordination_unhealthy(self, task_id: str) -> bool: + """True when recent coordination metrics breach a nudge threshold.""" + if self._coordination_store is None: + return False + records, _ = self._coordination_store.query( + task_id=task_id, + limit=self._config.coordination_lookback, + ) + for rec in records: + amp = rec.metrics.error_amplification + if ( + amp is not None + and amp.value > self._config.error_amplification_threshold + ): + return True + overhead = rec.metrics.overhead + if ( + overhead is not None + and overhead.value_percent > self._config.overhead_threshold_percent + ): + return True + return False diff --git a/src/synthorg/engine/routing_policy/tiers.py b/src/synthorg/engine/routing_policy/tiers.py new file mode 100644 index 0000000000..412908a2f6 --- /dev/null +++ b/src/synthorg/engine/routing_policy/tiers.py @@ -0,0 +1,36 @@ +"""Model-tier ladder helpers for stakes-aware routing. + +The canonical tiers (``small`` < ``medium`` < ``large``) double as the +model aliases the :class:`~synthorg.providers.routing.resolver.ModelResolver` +resolves, and as the keys +:class:`~synthorg.budget.benchmark_stub.StubBenchmarkScoreProvider` +scores. Cheapest-first order lets the policy pick the cheapest tier that +clears a quality floor. +""" + +from typing import Final + +from synthorg.core.types import ModelTier # noqa: TC001 -- runtime annotations + +# Cheapest-first ladder. Index doubles as the tier rank. +TIER_LADDER: Final[tuple[ModelTier, ...]] = ("small", "medium", "large") + +_TIER_RANK: Final[dict[ModelTier, int]] = { + tier: idx for idx, tier in enumerate(TIER_LADDER) +} + + +def tier_rank(tier: ModelTier) -> int: + """Return the cheapest-first rank of *tier* (small=0, large=2).""" + return _TIER_RANK[tier] + + +def higher_tier(a: ModelTier, b: ModelTier) -> ModelTier: + """Return the stronger (more expensive) of two tiers.""" + return a if tier_rank(a) >= tier_rank(b) else b + + +def bump_one(tier: ModelTier) -> ModelTier: + """Return the next stronger tier, or *tier* if already the strongest.""" + idx = min(tier_rank(tier) + 1, len(TIER_LADDER) - 1) + return TIER_LADDER[idx] diff --git a/src/synthorg/engine/stakes/__init__.py b/src/synthorg/engine/stakes/__init__.py new file mode 100644 index 0000000000..a3adb2420e --- /dev/null +++ b/src/synthorg/engine/stakes/__init__.py @@ -0,0 +1,26 @@ +"""Per-subtask stakes assessment for stakes-aware model routing. + +A :class:`StakesAssessor` classifies how consequential a subtask or task +is (:class:`~synthorg.core.enums.Stakes`). The routing layer +(:mod:`synthorg.engine.routing_policy`) consumes the result to pick a +cheap model for low-stakes work and a strong model plus a red-team +review for high-stakes work. +""" + +from synthorg.engine.stakes.config import ( + DEFAULT_COMPLEXITY_STAKES_RULES, + ComplexityStakesRule, + StakesAssessmentConfig, +) +from synthorg.engine.stakes.factory import build_stakes_assessor +from synthorg.engine.stakes.heuristic import DefaultStakesAssessor +from synthorg.engine.stakes.protocol import StakesAssessor + +__all__ = [ + "DEFAULT_COMPLEXITY_STAKES_RULES", + "ComplexityStakesRule", + "DefaultStakesAssessor", + "StakesAssessmentConfig", + "StakesAssessor", + "build_stakes_assessor", +] diff --git a/src/synthorg/engine/stakes/config.py b/src/synthorg/engine/stakes/config.py new file mode 100644 index 0000000000..962d0c7742 --- /dev/null +++ b/src/synthorg/engine/stakes/config.py @@ -0,0 +1,136 @@ +"""Configuration for stakes assessment. + +``StakesAssessmentConfig`` carries the heuristic rubric: a +complexity-to-base-stakes mapping plus keyword signal sets that elevate +stakes when a subtask's text mentions consequential or irreversible +work. The ``assessor`` field is the discriminator dispatched by +``build_stakes_assessor`` (see ``factory.py``). + +The default complexity rules follow the design intent: simple work is +low-stakes, medium is normal, and complex/epic work is high-stakes. +Keyword sets bias the assessment upward (fail-safe) when the text names +architecture, irreversibility, production, security, or data-loss risk. +""" + +from typing import Self + +from pydantic import BaseModel, ConfigDict, Field, model_validator + +from synthorg.core.enums import Complexity, Stakes +from synthorg.core.types import NotBlankStr # noqa: TC001 + + +class ComplexityStakesRule(BaseModel): + """Maps a task complexity level to a base stakes level. + + Attributes: + complexity: The complexity this rule matches. + stakes: The base stakes assigned before keyword/priority bumps. + """ + + model_config = ConfigDict(frozen=True, allow_inf_nan=False, extra="forbid") + + complexity: Complexity = Field(description="Task complexity level") + stakes: Stakes = Field(description="Base stakes level for this complexity") + + +DEFAULT_COMPLEXITY_STAKES_RULES: tuple[ComplexityStakesRule, ...] = ( + ComplexityStakesRule(complexity=Complexity.SIMPLE, stakes=Stakes.LOW), + ComplexityStakesRule(complexity=Complexity.MEDIUM, stakes=Stakes.NORMAL), + ComplexityStakesRule(complexity=Complexity.COMPLEX, stakes=Stakes.HIGH), + ComplexityStakesRule(complexity=Complexity.EPIC, stakes=Stakes.HIGH), +) + +# Import-time completeness guard (mirrors loop_selector.DEFAULT_AUTO_LOOP_RULES): +# every Complexity member must have a default base-stakes rule. +_covered = {r.complexity for r in DEFAULT_COMPLEXITY_STAKES_RULES} +if _covered != set(Complexity): + _missing = set(Complexity) - _covered + _msg = f"DEFAULT_COMPLEXITY_STAKES_RULES missing complexities: {_missing}" + raise RuntimeError(_msg) +del _covered + + +# Words signalling high-stakes work: architecture / design decisions, +# security surfaces, deployment, and anything touching production data. +DEFAULT_HIGH_STAKES_KEYWORDS: tuple[NotBlankStr, ...] = ( + "architecture", + "architectural", + "design decision", + "breaking change", + "security", + "authentication", + "authorisation", + "authorization", + "credential", + "secret", + "encryption", + "deploy", + "deployment", + "migration", + "schema change", + "payment", + "billing", + "rollback", +) + +# Words signalling critical, irreversible work: a wrong answer here is +# costly to undo, so the assessment is pinned to the top tier. +DEFAULT_CRITICAL_STAKES_KEYWORDS: tuple[NotBlankStr, ...] = ( + "irreversible", + "data loss", + "drop table", + "delete production", + "production deployment", + "destructive", +) + + +class StakesAssessmentConfig(BaseModel): + """Rubric for the heuristic stakes assessor. + + Attributes: + assessor: Discriminator selecting the assessor implementation. + complexity_rules: Complexity-to-base-stakes mapping. Each + complexity must appear at most once. + high_stakes_keywords: Substrings that raise stakes to at least + HIGH when present in a subtask's title or description. + critical_stakes_keywords: Substrings that pin stakes to CRITICAL. + elevate_on_critical_priority: When True, a CRITICAL-priority task + is assessed at least HIGH stakes (priority is urgency, not + stakes, but critical urgency is a conservative signal). + """ + + model_config = ConfigDict(frozen=True, allow_inf_nan=False, extra="forbid") + + assessor: NotBlankStr = Field( + default="heuristic", + description="Stakes assessor discriminator", + ) + complexity_rules: tuple[ComplexityStakesRule, ...] = Field( + default=DEFAULT_COMPLEXITY_STAKES_RULES, + description="Complexity-to-base-stakes mapping rules", + ) + high_stakes_keywords: tuple[NotBlankStr, ...] = Field( + default=DEFAULT_HIGH_STAKES_KEYWORDS, + description="Substrings that raise stakes to at least HIGH", + ) + critical_stakes_keywords: tuple[NotBlankStr, ...] = Field( + default=DEFAULT_CRITICAL_STAKES_KEYWORDS, + description="Substrings that pin stakes to CRITICAL", + ) + elevate_on_critical_priority: bool = Field( + default=True, + description="Assess CRITICAL-priority tasks at least HIGH stakes", + ) + + @model_validator(mode="after") + def _validate_unique_complexities(self) -> Self: + """Reject duplicate complexity entries in the rule list.""" + seen: set[Complexity] = set() + for rule in self.complexity_rules: + if rule.complexity in seen: + msg = f"Duplicate complexity in rules: {rule.complexity.value!r}" + raise ValueError(msg) + seen.add(rule.complexity) + return self diff --git a/src/synthorg/engine/stakes/factory.py b/src/synthorg/engine/stakes/factory.py new file mode 100644 index 0000000000..8050a8f98b --- /dev/null +++ b/src/synthorg/engine/stakes/factory.py @@ -0,0 +1,43 @@ +"""Factory for building a stakes assessor from config. + +Dispatches on ``StakesAssessmentConfig.assessor`` via a +``StrategyRegistry`` (mirrors ``loop_selector._LOOP_REGISTRY``). Ships +the deterministic heuristic as the only built-in; additional assessors +(e.g. an LLM-backed one) register here without touching call sites. +""" + +from synthorg.core.registry import StrategyRegistry +from synthorg.engine.stakes.config import StakesAssessmentConfig +from synthorg.engine.stakes.heuristic import DefaultStakesAssessor +from synthorg.engine.stakes.protocol import ( # noqa: TC001 -- registry generic + annotations + StakesAssessor, +) + + +def _build_heuristic(config: StakesAssessmentConfig) -> StakesAssessor: + return DefaultStakesAssessor(config) + + +_ASSESSOR_REGISTRY: StrategyRegistry[StakesAssessor] = StrategyRegistry( + {"heuristic": _build_heuristic}, + kind="stakes_assessor", +) + + +def build_stakes_assessor( + config: StakesAssessmentConfig | None = None, +) -> StakesAssessor: + """Build a :class:`StakesAssessor` from *config*. + + Args: + config: Assessment config; defaults to the built-in heuristic + rubric. + + Returns: + A concrete stakes assessor. + + Raises: + StrategyFactoryNotFoundError: If ``config.assessor`` is unknown. + """ + resolved = config or StakesAssessmentConfig() + return _ASSESSOR_REGISTRY.build(resolved.assessor, resolved) diff --git a/src/synthorg/engine/stakes/heuristic.py b/src/synthorg/engine/stakes/heuristic.py new file mode 100644 index 0000000000..f622123ea9 --- /dev/null +++ b/src/synthorg/engine/stakes/heuristic.py @@ -0,0 +1,91 @@ +"""Deterministic heuristic stakes assessor. + +Combines three signals into a single :class:`~synthorg.core.enums.Stakes` +level: a base level from task complexity, a conservative bump for +critical-priority work, and keyword signals over the title/description +that elevate stakes for consequential or irreversible work. The result +is the highest level any signal produces (fail-safe upward bias). +""" + +from typing import TYPE_CHECKING + +from synthorg.core.enums import Complexity, Priority, Stakes, compare_stakes +from synthorg.engine.stakes.config import StakesAssessmentConfig + +if TYPE_CHECKING: + from synthorg.core.task import Task + from synthorg.engine.decomposition.models import SubtaskDefinition + + +def _max_stakes(a: Stakes, b: Stakes) -> Stakes: + """Return the higher-stakes of two levels.""" + return a if compare_stakes(a, b) >= 0 else b + + +class DefaultStakesAssessor: + """Heuristic :class:`~synthorg.engine.stakes.protocol.StakesAssessor`. + + Deterministic and side-effect free. The same inputs always yield the + same stakes level, which the routing comparison test relies on. + + Args: + config: The rubric (complexity rules + keyword sets). Defaults to + the conservative built-in rubric. + """ + + def __init__(self, config: StakesAssessmentConfig | None = None) -> None: + self._config = config or StakesAssessmentConfig() + self._base_by_complexity: dict[Complexity, Stakes] = { + rule.complexity: rule.stakes for rule in self._config.complexity_rules + } + self._high_keywords = tuple( + kw.lower() for kw in self._config.high_stakes_keywords + ) + self._critical_keywords = tuple( + kw.lower() for kw in self._config.critical_stakes_keywords + ) + + def assess_subtask(self, subtask: SubtaskDefinition) -> Stakes: + """Return the stakes level for *subtask*. + + Subtasks carry no priority of their own; priority elevation + applies only to the task-level path. + """ + return self._assess( + title=subtask.title, + description=subtask.description, + complexity=subtask.estimated_complexity, + priority=None, + ) + + def assess_task(self, task: Task) -> Stakes: + """Return the stakes level for *task* (single-agent / LEAF path).""" + return self._assess( + title=task.title, + description=task.description, + complexity=task.estimated_complexity, + priority=task.priority, + ) + + def _assess( + self, + *, + title: str, + description: str, + complexity: Complexity, + priority: Priority | None, + ) -> Stakes: + """Combine complexity, priority, and keyword signals (upward bias).""" + # Unknown complexity biases upward (fail-safe) rather than to LOW. + stakes = self._base_by_complexity.get(complexity, Stakes.HIGH) + + if priority is Priority.CRITICAL and self._config.elevate_on_critical_priority: + stakes = _max_stakes(stakes, Stakes.HIGH) + + text = f"{title}\n{description}".lower() + if any(kw in text for kw in self._critical_keywords): + stakes = _max_stakes(stakes, Stakes.CRITICAL) + elif any(kw in text for kw in self._high_keywords): + stakes = _max_stakes(stakes, Stakes.HIGH) + + return stakes diff --git a/src/synthorg/engine/stakes/protocol.py b/src/synthorg/engine/stakes/protocol.py new file mode 100644 index 0000000000..f12652df3e --- /dev/null +++ b/src/synthorg/engine/stakes/protocol.py @@ -0,0 +1,26 @@ +"""Pluggable stakes-assessment protocol.""" + +from typing import TYPE_CHECKING, Protocol, runtime_checkable + +if TYPE_CHECKING: + from synthorg.core.enums import Stakes + from synthorg.core.task import Task + from synthorg.engine.decomposition.models import SubtaskDefinition + + +@runtime_checkable +class StakesAssessor(Protocol): + """Classifies how consequential a subtask or task is. + + Implementations must be deterministic and side-effect free: the + routing layer treats assessment as a pure function so the + cost/quality comparison test is reproducible. + """ + + def assess_subtask(self, subtask: SubtaskDefinition) -> Stakes: + """Return the stakes level for *subtask*.""" + ... + + def assess_task(self, task: Task) -> Stakes: + """Return the stakes level for *task* (single-agent / LEAF path).""" + ... diff --git a/src/synthorg/observability/events/stakes_routing.py b/src/synthorg/observability/events/stakes_routing.py new file mode 100644 index 0000000000..908bc9a053 --- /dev/null +++ b/src/synthorg/observability/events/stakes_routing.py @@ -0,0 +1,10 @@ +"""Stakes-aware model routing event constants.""" + +from typing import Final + +STAKES_ROUTING_DECIDED: Final[str] = "stakes_routing.decided" +STAKES_ROUTING_TIER_ADJUSTED: Final[str] = "stakes_routing.tier_adjusted" +STAKES_ROUTING_COORD_NUDGE: Final[str] = "stakes_routing.coordination_nudge" +STAKES_ROUTING_RED_TEAM_MARKED: Final[str] = "stakes_routing.red_team_marked" +STAKES_ROUTING_TIER_UNRESOLVABLE: Final[str] = "stakes_routing.tier_unresolvable" +STAKES_ROUTING_BUDGET_OVERRODE: Final[str] = "stakes_routing.budget_overrode" diff --git a/src/synthorg/workers/runtime_builder.py b/src/synthorg/workers/runtime_builder.py index 5a430ce5ca..deb33ad9dc 100644 --- a/src/synthorg/workers/runtime_builder.py +++ b/src/synthorg/workers/runtime_builder.py @@ -33,6 +33,7 @@ from synthorg.engine.mcp_self_consumer import build_mcp_self_consumer from synthorg.engine.pipeline.factory import build_work_pipeline from synthorg.engine.routing.scorer import AgentTaskScorer, RoutingScorerConfig +from synthorg.engine.routing_policy import build_stakes_router from synthorg.engine.workspace.config import WorkspaceIsolationConfig from synthorg.engine.workspace.git_worktree import PlannerWorktreeStrategy from synthorg.observability import get_logger, safe_error_description @@ -66,6 +67,7 @@ from synthorg.api.state import AppState from synthorg.engine.coordination.service import MultiAgentCoordinator from synthorg.engine.pipeline.protocol import WorkPipeline + from synthorg.engine.routing_policy.router import StakesRouter from synthorg.providers.protocol import CompletionProvider from synthorg.providers.registry import ProviderRegistry from synthorg.security.visionverify.protocol import VisionVerifierGate @@ -342,6 +344,34 @@ async def _build_external_api_runtime( ) +def _build_stakes_router_or_none(app_state: AppState) -> StakesRouter | None: + """Build the stakes-aware model router from live application state. + + Returns ``None`` when the benchmark provider is absent (cost-dial + not wired, e.g. a persistence-less boot), so the engine simply skips + stakes routing. Reads the benchmark provider and coordination-metrics + store off ``AppState`` and builds a tier resolver from the configured + providers; ships the ``stakes_aware`` default strategy. + """ + from synthorg.providers.routing.resolver import ModelResolver # noqa: PLC0415 + + benchmark_provider = app_state.benchmark_provider + if benchmark_provider is None: + return None + resolver = ModelResolver.from_config(app_state.config.providers) + coordination_store = ( + app_state.coordination_metrics_store + if app_state.has_coordination_metrics_store + else None + ) + return build_stakes_router( + app_state.config.stakes_routing, + benchmark_provider=benchmark_provider, + resolver=resolver, + coordination_store=coordination_store, + ) + + def _construct_agent_engine( # noqa: PLR0913 -- boot collaborators threaded in app_state: AppState, provider: CompletionProvider, @@ -364,6 +394,7 @@ def _construct_agent_engine( # noqa: PLR0913 -- boot collaborators threaded in provider=provider, provider_registry=registry, tool_registry=tool_registry, + stakes_router=_build_stakes_router_or_none(app_state), cost_tracker=(app_state.cost_tracker if app_state.has_cost_tracker else None), task_engine=app_state.task_engine, approval_store=app_state.approval_store, diff --git a/tests/unit/engine/routing_policy/test_acceptance_comparison.py b/tests/unit/engine/routing_policy/test_acceptance_comparison.py new file mode 100644 index 0000000000..8efe8d29c2 --- /dev/null +++ b/tests/unit/engine/routing_policy/test_acceptance_comparison.py @@ -0,0 +1,236 @@ +"""Acceptance: stakes-aware routing beats flat on a mixed brief. + +Encodes issue #1998's acceptance criterion as a deterministic +simulation: a mixed brief is decomposed (so each subtask carries an +assessed stakes level), then routed under both the flat control arm and +the stakes-aware arm. With a conservative flat baseline (every subtask +on the strong tier), stakes-aware: + +* routes low-stakes subtasks to cheap models and high/critical subtasks + to the strong model plus the red-team mark, and +* drops total cost while every selection still clears its stakes quality + floor (equal-or-better benchmark adequacy). + +The benchmark scores come from the calibrated stub, so the comparison +is fully reproducible without any LLM spend. +""" + +import pytest +from tests._shared.scripted_provider import make_e2e_identity + +from synthorg.budget.benchmark_stub import StubBenchmarkScoreProvider +from synthorg.core.agent import AgentIdentity, ModelConfig +from synthorg.core.enums import Complexity, Stakes, TaskType +from synthorg.core.task import Task +from synthorg.core.types import ModelTier +from synthorg.engine.decomposition.classifier import TaskStructureClassifier +from synthorg.engine.decomposition.models import ( + DecompositionContext, + DecompositionPlan, + SubtaskDefinition, +) +from synthorg.engine.decomposition.service import DecompositionService +from synthorg.engine.routing_policy import ( + FlatStrategy, + StakesAwareStrategy, + StakesRoutingConfig, +) +from synthorg.engine.routing_policy.config import QualityFloors +from synthorg.providers.routing.models import ResolvedModel +from synthorg.providers.routing.resolver import ModelResolver + +_PROVIDER = "example-provider" +_TIER_MODEL_IDS: dict[ModelTier, str] = { + "small": "example-small-001", + "medium": "example-medium-001", + "large": "example-large-001", +} +# total_cost_per_1k = input + output; strictly increasing by tier. +_TIER_TOTAL_COST: dict[ModelTier, float] = { + "small": 0.2, + "medium": 1.0, + "large": 4.0, +} + + +def _resolver() -> ModelResolver: + index: dict[str, tuple[ResolvedModel, ...]] = { + tier: ( + ResolvedModel( + provider_name=_PROVIDER, + model_id=_TIER_MODEL_IDS[tier], + alias=tier, + cost_per_1k_input=_TIER_TOTAL_COST[tier] / 2, + cost_per_1k_output=_TIER_TOTAL_COST[tier] / 2, + max_context=128000, + estimated_latency_ms=100, + ), + ) + for tier in _TIER_MODEL_IDS + } + return ModelResolver(index) + + +def _agent(tier: ModelTier) -> AgentIdentity: + return make_e2e_identity().model_copy( + update={ + "model": ModelConfig( + provider=_PROVIDER, + model_id=_TIER_MODEL_IDS[tier], + model_tier=tier, + ), + }, + ) + + +def _parent() -> Task: + return Task( + id="brief-1", + title="Mixed brief", + description="Several subtasks of varying stakes", + type=TaskType.DEVELOPMENT, + project="proj-1", + created_by="founder", + ) + + +def _mixed_plan() -> DecompositionPlan: + """Mostly low/normal stakes with a couple of high/critical subtasks.""" + subtasks = ( + SubtaskDefinition( + id="st-doc", + title="Update the changelog", + description="Tidy wording in the docs", + estimated_complexity=Complexity.SIMPLE, + ), + SubtaskDefinition( + id="st-format", + title="Reformat helper module", + description="Run the formatter over a utility file", + estimated_complexity=Complexity.SIMPLE, + ), + SubtaskDefinition( + id="st-feature", + title="Add a list endpoint", + description="Implement a straightforward read endpoint", + estimated_complexity=Complexity.MEDIUM, + ), + SubtaskDefinition( + id="st-arch", + title="Design the sharding architecture", + description="Make the core architecture decision for sharding", + estimated_complexity=Complexity.COMPLEX, + ), + SubtaskDefinition( + id="st-migrate", + title="Production data migration", + description="Run an irreversible production migration", + estimated_complexity=Complexity.MEDIUM, + ), + ) + return DecompositionPlan(parent_task_id="brief-1", subtasks=subtasks) + + +class _StaticStrategy: + def __init__(self, plan: DecompositionPlan) -> None: + self._plan = plan + + async def decompose( + self, + task: Task, + context: DecompositionContext, + ) -> DecompositionPlan: + del task, context + return self._plan + + def get_strategy_name(self) -> str: + return "static-test" + + +async def _decomposed_tasks() -> tuple[Task, ...]: + service = DecompositionService( + _StaticStrategy(_mixed_plan()), + TaskStructureClassifier(), + ) + result = await service.decompose_task(_parent(), DecompositionContext()) + return result.created_tasks + + +def _floor_for(task: Task, floors: QualityFloors) -> float: + return floors.for_stakes(task.stakes) + + +@pytest.mark.unit +class TestStakesAwareBeatsFlatOnMixedBrief: + """The core acceptance comparison for issue #1998.""" + + async def test_cost_drops_at_equal_or_better_quality(self) -> None: + tasks = await _decomposed_tasks() + # Conservative flat baseline: every subtask on the strong tier. + flat_agent = _agent("large") + config = StakesRoutingConfig() + floors = config.quality_floors + stakes_aware = StakesAwareStrategy( + benchmark_provider=StubBenchmarkScoreProvider(), + config=config, + resolver=_resolver(), + ) + flat = FlatStrategy() + provider = StubBenchmarkScoreProvider() + + flat_cost = 0.0 + flat_quality = 0 + aware_cost = 0.0 + aware_quality = 0 + for task in tasks: + floor = _floor_for(task, floors) + + flat_decision = await flat.route(task=task, identity=flat_agent) + flat_tier = flat_decision.selected_model.model_tier + assert flat_tier is not None + flat_cost += _TIER_TOTAL_COST[flat_tier] + flat_score = await provider.get_score( + flat_decision.selected_model.model_id, + ) + assert flat_score is not None + flat_quality += int(flat_score.score >= floor) + + aware_decision = await stakes_aware.route(task=task, identity=flat_agent) + aware_tier = aware_decision.selected_model.model_tier + assert aware_tier is not None + aware_cost += _TIER_TOTAL_COST[aware_tier] + aware_score = await provider.get_score( + aware_decision.selected_model.model_id, + ) + assert aware_score is not None + aware_quality += int(aware_score.score >= floor) + + # Equal-or-better quality: every stakes-aware selection clears its + # stakes floor, matching the all-strong flat baseline. + assert aware_quality == len(tasks) + assert aware_quality >= flat_quality + # Total cost strictly drops versus flat routing. + assert aware_cost < flat_cost + + async def test_low_stakes_cheap_high_stakes_strong_with_red_team(self) -> None: + tasks = {t.id: t for t in await _decomposed_tasks()} + stakes_aware = StakesAwareStrategy( + benchmark_provider=StubBenchmarkScoreProvider(), + resolver=_resolver(), + ) + agent = _agent("large") + + doc = await stakes_aware.route(task=tasks["st-doc"], identity=agent) + assert tasks["st-doc"].stakes is Stakes.LOW + assert doc.selected_model.model_tier == "small" + assert doc.red_team_required is False + + arch = await stakes_aware.route(task=tasks["st-arch"], identity=agent) + assert tasks["st-arch"].stakes is Stakes.HIGH + assert arch.selected_model.model_tier == "large" + assert arch.red_team_required is True + + migrate = await stakes_aware.route(task=tasks["st-migrate"], identity=agent) + assert tasks["st-migrate"].stakes is Stakes.CRITICAL + assert migrate.selected_model.model_tier == "large" + assert migrate.red_team_required is True diff --git a/tests/unit/engine/routing_policy/test_cost_properties.py b/tests/unit/engine/routing_policy/test_cost_properties.py new file mode 100644 index 0000000000..5c796c3688 --- /dev/null +++ b/tests/unit/engine/routing_policy/test_cost_properties.py @@ -0,0 +1,92 @@ +"""Property: stakes-aware never costs more than an all-strong flat policy. + +With every agent configured at the strongest tier (the conservative +flat baseline), stakes-aware routing only ever holds or lowers the tier: +it downgrades low/normal-stakes work and keeps high/critical work at the +strong tier. So for any mix of stakes the stakes-aware total cost is +always <= the flat total cost, and never exceeds it. +""" + +import pytest +from hypothesis import given +from hypothesis import strategies as st +from tests._shared.scripted_provider import make_e2e_identity + +from synthorg.budget.benchmark_stub import StubBenchmarkScoreProvider +from synthorg.core.agent import AgentIdentity, ModelConfig +from synthorg.core.enums import Stakes, TaskType +from synthorg.core.task import Task +from synthorg.core.types import ModelTier +from synthorg.engine.routing_policy import StakesAwareStrategy +from synthorg.providers.routing.models import ResolvedModel +from synthorg.providers.routing.resolver import ModelResolver + +_PROVIDER = "example-provider" +_TIER_MODEL_IDS: dict[ModelTier, str] = { + "small": "example-small-001", + "medium": "example-medium-001", + "large": "example-large-001", +} +_TIER_TOTAL_COST: dict[ModelTier, float] = {"small": 0.2, "medium": 1.0, "large": 4.0} + + +def _resolver() -> ModelResolver: + index: dict[str, tuple[ResolvedModel, ...]] = { + tier: ( + ResolvedModel( + provider_name=_PROVIDER, + model_id=_TIER_MODEL_IDS[tier], + alias=tier, + cost_per_1k_input=_TIER_TOTAL_COST[tier] / 2, + cost_per_1k_output=_TIER_TOTAL_COST[tier] / 2, + max_context=128000, + estimated_latency_ms=100, + ), + ) + for tier in _TIER_MODEL_IDS + } + return ModelResolver(index) + + +def _agent_large() -> AgentIdentity: + return make_e2e_identity().model_copy( + update={ + "model": ModelConfig( + provider=_PROVIDER, + model_id=_TIER_MODEL_IDS["large"], + model_tier="large", + ), + }, + ) + + +def _task(stakes: Stakes) -> Task: + return Task( + id="t", + title="t", + description="body", + type=TaskType.DEVELOPMENT, + project="p", + created_by="c", + stakes=stakes, + ) + + +@pytest.mark.unit +@given(stakes_mix=st.lists(st.sampled_from(list(Stakes)), min_size=1, max_size=12)) +async def test_stakes_aware_never_costs_more_than_flat_all_strong( + stakes_mix: list[Stakes], +) -> None: + strategy = StakesAwareStrategy( + benchmark_provider=StubBenchmarkScoreProvider(), + resolver=_resolver(), + ) + agent = _agent_large() + flat_cost = len(stakes_mix) * _TIER_TOTAL_COST["large"] + aware_cost = 0.0 + for stakes in stakes_mix: + decision = await strategy.route(task=_task(stakes), identity=agent) + tier = decision.selected_model.model_tier + assert tier is not None + aware_cost += _TIER_TOTAL_COST[tier] + assert aware_cost <= flat_cost diff --git a/tests/unit/engine/routing_policy/test_engine_integration.py b/tests/unit/engine/routing_policy/test_engine_integration.py new file mode 100644 index 0000000000..f7b05370e7 --- /dev/null +++ b/tests/unit/engine/routing_policy/test_engine_integration.py @@ -0,0 +1,114 @@ +"""AgentEngine stakes-routing integration (the ``_route_stakes`` seam).""" + +import pytest +from tests._shared.scripted_provider import ScriptedProvider, make_e2e_identity + +from synthorg.budget.benchmark_stub import StubBenchmarkScoreProvider +from synthorg.core.agent import AgentIdentity, ModelConfig +from synthorg.core.enums import Stakes, TaskType +from synthorg.core.task import Task +from synthorg.core.types import ModelTier +from synthorg.engine.agent_engine import AgentEngine +from synthorg.engine.routing_policy import StakesRoutingConfig, build_stakes_router +from synthorg.providers.routing.models import ResolvedModel +from synthorg.providers.routing.resolver import ModelResolver + +_PROVIDER = "example-provider" +_TIER_MODEL_IDS: dict[ModelTier, str] = { + "small": "example-small-001", + "medium": "example-medium-001", + "large": "example-large-001", +} + + +def _resolver() -> ModelResolver: + index: dict[str, tuple[ResolvedModel, ...]] = { + tier: ( + ResolvedModel( + provider_name=_PROVIDER, + model_id=model_id, + alias=tier, + cost_per_1k_input=cost, + cost_per_1k_output=cost, + max_context=128000, + estimated_latency_ms=100, + ), + ) + for tier, model_id, cost in ( + ("small", _TIER_MODEL_IDS["small"], 0.1), + ("medium", _TIER_MODEL_IDS["medium"], 0.5), + ("large", _TIER_MODEL_IDS["large"], 2.0), + ) + } + return ModelResolver(index) + + +def _engine(*, stakes: bool) -> AgentEngine: + router = ( + build_stakes_router( + StakesRoutingConfig(), + benchmark_provider=StubBenchmarkScoreProvider(), + resolver=_resolver(), + ) + if stakes + else None + ) + return AgentEngine(provider=ScriptedProvider([]), stakes_router=router) + + +def _identity(tier: ModelTier) -> AgentIdentity: + return make_e2e_identity().model_copy( + update={ + "model": ModelConfig( + provider=_PROVIDER, + model_id=_TIER_MODEL_IDS[tier], + model_tier=tier, + ), + }, + ) + + +def _task(stakes: Stakes) -> Task: + return Task( + id="task-1", + title="A task", + description="Body", + type=TaskType.DEVELOPMENT, + project="proj-1", + created_by="creator", + stakes=stakes, + ) + + +@pytest.mark.unit +class TestRouteStakesSeam: + """``_route_stakes`` adjusts the identity's model from task stakes.""" + + async def test_low_stakes_downgrades(self) -> None: + engine = _engine(stakes=True) + adjusted = await engine._route_stakes( + _identity("large"), + _task(Stakes.LOW), + ) + assert adjusted.model.model_tier == "small" + + async def test_high_stakes_upgrades(self) -> None: + engine = _engine(stakes=True) + adjusted = await engine._route_stakes( + _identity("small"), + _task(Stakes.HIGH), + ) + assert adjusted.model.model_tier == "large" + + async def test_normal_stakes_keeps_medium(self) -> None: + engine = _engine(stakes=True) + identity = _identity("medium") + adjusted = await engine._route_stakes( + identity, + _task(Stakes.NORMAL), + ) + assert adjusted.model == identity.model + + def test_engine_accepts_no_router(self) -> None: + engine = _engine(stakes=False) + assert engine._stakes_router is None diff --git a/tests/unit/engine/routing_policy/test_strategies.py b/tests/unit/engine/routing_policy/test_strategies.py new file mode 100644 index 0000000000..9df7cca64d --- /dev/null +++ b/tests/unit/engine/routing_policy/test_strategies.py @@ -0,0 +1,281 @@ +"""Unit tests for stakes-aware routing strategies and factory.""" + +from datetime import UTC, datetime + +import pytest +from tests._shared.scripted_provider import make_e2e_identity + +from synthorg.budget.benchmark_stub import StubBenchmarkScoreProvider +from synthorg.budget.coordination_metrics import ( + CoordinationMetrics, + ErrorAmplification, +) +from synthorg.budget.coordination_store import ( + CoordinationMetricsRecord, + CoordinationMetricsStore, +) +from synthorg.core.agent import AgentIdentity, ModelConfig +from synthorg.core.enums import Stakes, TaskType +from synthorg.core.registry.errors import StrategyFactoryNotFoundError +from synthorg.core.task import Task +from synthorg.core.types import ModelTier +from synthorg.engine.routing_policy import ( + FlatStrategy, + StakesAwareStrategy, + StakesRoutingConfig, + build_stakes_router, +) +from synthorg.engine.routing_policy.config import QualityFloors +from synthorg.providers.routing.models import ResolvedModel +from synthorg.providers.routing.resolver import ModelResolver + +_PROVIDER = "example-provider" +_TIER_MODEL_IDS: dict[ModelTier, str] = { + "small": "example-small-001", + "medium": "example-medium-001", + "large": "example-large-001", +} +_TIER_COSTS: dict[ModelTier, float] = {"small": 0.1, "medium": 0.5, "large": 2.0} + + +def _resolver() -> ModelResolver: + index: dict[str, tuple[ResolvedModel, ...]] = { + tier: ( + ResolvedModel( + provider_name=_PROVIDER, + model_id=_TIER_MODEL_IDS[tier], + alias=tier, + cost_per_1k_input=_TIER_COSTS[tier], + cost_per_1k_output=_TIER_COSTS[tier], + max_context=128000, + estimated_latency_ms=100, + ), + ) + for tier in _TIER_MODEL_IDS + } + return ModelResolver(index) + + +def _identity(tier: ModelTier = "large") -> AgentIdentity: + base = make_e2e_identity() + return base.model_copy( + update={ + "model": ModelConfig( + provider=_PROVIDER, + model_id=_TIER_MODEL_IDS[tier], + model_tier=tier, + ), + }, + ) + + +def _task(stakes: Stakes) -> Task: + return Task( + id="task-1", + title="A task", + description="Body", + type=TaskType.DEVELOPMENT, + project="proj-1", + created_by="creator", + stakes=stakes, + ) + + +def _strategy( + *, + config: StakesRoutingConfig | None = None, + coordination_store: CoordinationMetricsStore | None = None, + resolver: ModelResolver | None = None, +) -> StakesAwareStrategy: + return StakesAwareStrategy( + benchmark_provider=StubBenchmarkScoreProvider(), + config=config or StakesRoutingConfig(), + resolver=resolver if resolver is not None else _resolver(), + coordination_store=coordination_store, + ) + + +@pytest.mark.unit +class TestStakesAwareFloorSelection: + """Cheapest tier clearing the per-stakes quality floor is selected.""" + + @pytest.mark.parametrize( + ("stakes", "expected_tier"), + [ + (Stakes.LOW, "small"), + (Stakes.NORMAL, "medium"), + (Stakes.HIGH, "large"), + (Stakes.CRITICAL, "large"), + ], + ) + async def test_floor_picks_expected_tier( + self, + stakes: Stakes, + expected_tier: ModelTier, + ) -> None: + decision = await _strategy().route( + task=_task(stakes), + identity=_identity("large"), + ) + assert decision.selected_model.model_tier == expected_tier + assert decision.selected_model.model_id == _TIER_MODEL_IDS[expected_tier] + + async def test_low_stakes_downgrades_strong_agent(self) -> None: + """A large-tier agent on low-stakes work is routed down to small.""" + decision = await _strategy().route( + task=_task(Stakes.LOW), + identity=_identity("large"), + ) + assert decision.selected_model.model_tier == "small" + assert decision.source == "stakes_aware:floor" + + async def test_high_stakes_upgrades_weak_agent(self) -> None: + """A small-tier agent on high-stakes work is routed up to large.""" + decision = await _strategy().route( + task=_task(Stakes.HIGH), + identity=_identity("small"), + ) + assert decision.selected_model.model_tier == "large" + + +@pytest.mark.unit +class TestRedTeamMarking: + """High/critical stakes set the red-team requirement; lower do not.""" + + @pytest.mark.parametrize( + ("stakes", "expected"), + [ + (Stakes.LOW, False), + (Stakes.NORMAL, False), + (Stakes.HIGH, True), + (Stakes.CRITICAL, True), + ], + ) + async def test_red_team_threshold(self, stakes: Stakes, expected: bool) -> None: + decision = await _strategy().route( + task=_task(stakes), + identity=_identity("medium"), + ) + assert decision.red_team_required is expected + + +@pytest.mark.unit +class TestNeverDowngradeBelowConfiguredTier: + """High/critical work never runs below the agent's own tier.""" + + async def test_high_stakes_keeps_large_even_with_low_floor(self) -> None: + # Floors set so the cheapest-meeting tier would be small, but the + # agent is configured large and stakes are HIGH: stay at large. + config = StakesRoutingConfig( + quality_floors=QualityFloors(low=0, normal=0, high=0, critical=0), + ) + decision = await _strategy(config=config).route( + task=_task(Stakes.HIGH), + identity=_identity("large"), + ) + assert decision.selected_model.model_tier == "large" + + +@pytest.mark.unit +class TestCoordinationNudge: + """Unhealthy coordination metrics bump the tier one step up.""" + + def _unhealthy_store(self, task_id: str) -> CoordinationMetricsStore: + store = CoordinationMetricsStore() + store.record( + CoordinationMetricsRecord( + task_id=task_id, + computed_at=datetime.now(UTC), + team_size=3, + metrics=CoordinationMetrics( + error_amplification=ErrorAmplification( + error_rate_mas=0.6, + error_rate_sas=0.2, + ), + ), + ), + ) + return store + + async def test_nudge_bumps_tier(self) -> None: + store = self._unhealthy_store("task-1") + decision = await _strategy(coordination_store=store).route( + task=_task(Stakes.NORMAL), # floor -> medium + identity=_identity("small"), + ) + # medium nudged to large by the amplification breach. + assert decision.selected_model.model_tier == "large" + assert decision.source == "stakes_aware:nudge" + + async def test_no_records_no_nudge(self) -> None: + store = CoordinationMetricsStore() + decision = await _strategy(coordination_store=store).route( + task=_task(Stakes.NORMAL), + identity=_identity("small"), + ) + assert decision.selected_model.model_tier == "medium" + + +@pytest.mark.unit +class TestResolverAbsent: + """Without a resolver the model is unchanged but red-team still marks.""" + + async def test_no_resolver_keeps_model_marks_red_team(self) -> None: + strategy = StakesAwareStrategy( + benchmark_provider=StubBenchmarkScoreProvider(), + resolver=None, + ) + decision = await strategy.route( + task=_task(Stakes.HIGH), + identity=_identity("small"), + ) + assert decision.selected_model.model_tier == "small" + assert decision.source == "stakes_aware:noop" + assert decision.red_team_required is True + + +@pytest.mark.unit +class TestFlatStrategy: + """Flat routing is a true no-op control arm.""" + + async def test_flat_keeps_model_and_never_marks_red_team(self) -> None: + identity = _identity("large") + decision = await FlatStrategy().route( + task=_task(Stakes.CRITICAL), + identity=identity, + ) + assert decision.selected_model == identity.model + assert decision.red_team_required is False + assert decision.source == "flat" + + +@pytest.mark.unit +class TestBuildStakesRouter: + """Factory dispatch on the ``strategy`` discriminator.""" + + async def test_default_builds_stakes_aware(self) -> None: + router = build_stakes_router( + benchmark_provider=StubBenchmarkScoreProvider(), + resolver=_resolver(), + ) + decision = await router.route( + task=_task(Stakes.LOW), + identity=_identity("large"), + ) + assert decision.selected_model.model_tier == "small" + + async def test_flat_strategy_via_discriminator(self) -> None: + router = build_stakes_router(StakesRoutingConfig(strategy="flat")) + decision = await router.route( + task=_task(Stakes.HIGH), + identity=_identity("large"), + ) + assert decision.source == "flat" + + def test_unknown_strategy_raises(self) -> None: + with pytest.raises(StrategyFactoryNotFoundError): + build_stakes_router(StakesRoutingConfig(strategy="nope")) + + def test_stakes_aware_without_benchmark_raises(self) -> None: + with pytest.raises(ValueError, match="benchmark"): + build_stakes_router(StakesRoutingConfig(strategy="stakes_aware")) diff --git a/tests/unit/engine/stakes/test_assessor.py b/tests/unit/engine/stakes/test_assessor.py new file mode 100644 index 0000000000..8a64f83795 --- /dev/null +++ b/tests/unit/engine/stakes/test_assessor.py @@ -0,0 +1,193 @@ +"""Unit tests for stakes assessment (enum ordering, config, heuristic).""" + +import pytest +from pydantic import ValidationError + +from synthorg.core.enums import ( + Complexity, + Priority, + Stakes, + TaskType, + compare_stakes, +) +from synthorg.core.registry.errors import StrategyFactoryNotFoundError +from synthorg.core.task import Task +from synthorg.engine.decomposition.models import SubtaskDefinition +from synthorg.engine.stakes import ( + DefaultStakesAssessor, + StakesAssessmentConfig, + build_stakes_assessor, +) +from synthorg.engine.stakes.config import ComplexityStakesRule + + +def _subtask( + *, + title: str = "Subtask", + description: str = "Do the thing", + complexity: Complexity = Complexity.MEDIUM, +) -> SubtaskDefinition: + return SubtaskDefinition( + id="st-1", + title=title, + description=description, + estimated_complexity=complexity, + ) + + +def _task( + *, + title: str = "Task", + description: str = "Do the thing", + complexity: Complexity = Complexity.MEDIUM, + priority: Priority = Priority.MEDIUM, +) -> Task: + return Task( + id="task-1", + title=title, + description=description, + type=TaskType.DEVELOPMENT, + project="proj-1", + created_by="creator", + estimated_complexity=complexity, + priority=priority, + ) + + +@pytest.mark.unit +class TestStakesOrdering: + """``compare_stakes`` follows LOW < NORMAL < HIGH < CRITICAL.""" + + def test_strict_ascending_order(self) -> None: + assert compare_stakes(Stakes.LOW, Stakes.NORMAL) < 0 + assert compare_stakes(Stakes.NORMAL, Stakes.HIGH) < 0 + assert compare_stakes(Stakes.HIGH, Stakes.CRITICAL) < 0 + + def test_equal_is_zero(self) -> None: + assert compare_stakes(Stakes.HIGH, Stakes.HIGH) == 0 + + def test_reverse_is_positive(self) -> None: + assert compare_stakes(Stakes.CRITICAL, Stakes.LOW) > 0 + + def test_field_defaults_to_normal(self) -> None: + assert _task().stakes is Stakes.NORMAL + assert _subtask().stakes is Stakes.NORMAL + + +@pytest.mark.unit +class TestStakesAssessmentConfig: + """Config defaults cover every complexity and reject duplicates.""" + + def test_default_rules_cover_all_complexities(self) -> None: + cfg = StakesAssessmentConfig() + covered = {r.complexity for r in cfg.complexity_rules} + assert covered == set(Complexity) + + def test_duplicate_complexity_rejected(self) -> None: + with pytest.raises(ValidationError): + StakesAssessmentConfig( + complexity_rules=( + ComplexityStakesRule( + complexity=Complexity.SIMPLE, stakes=Stakes.LOW + ), + ComplexityStakesRule( + complexity=Complexity.SIMPLE, stakes=Stakes.HIGH + ), + ), + ) + + def test_config_is_frozen(self) -> None: + cfg = StakesAssessmentConfig() + with pytest.raises(ValidationError): + cfg.assessor = "other" # type: ignore[misc] + + +@pytest.mark.unit +class TestDefaultStakesAssessor: + """Heuristic mapping, keyword bumps, priority elevation, fail-safe.""" + + @pytest.mark.parametrize( + ("complexity", "expected"), + [ + (Complexity.SIMPLE, Stakes.LOW), + (Complexity.MEDIUM, Stakes.NORMAL), + (Complexity.COMPLEX, Stakes.HIGH), + (Complexity.EPIC, Stakes.HIGH), + ], + ) + def test_complexity_base_mapping( + self, + complexity: Complexity, + expected: Stakes, + ) -> None: + assessor = DefaultStakesAssessor() + assert assessor.assess_subtask(_subtask(complexity=complexity)) is expected + + def test_high_keyword_elevates_to_high(self) -> None: + assessor = DefaultStakesAssessor() + subtask = _subtask( + description="Make an architecture decision for the gateway", + complexity=Complexity.SIMPLE, + ) + assert assessor.assess_subtask(subtask) is Stakes.HIGH + + def test_critical_keyword_pins_to_critical(self) -> None: + assessor = DefaultStakesAssessor() + subtask = _subtask( + description="This is an irreversible production change", + complexity=Complexity.SIMPLE, + ) + assert assessor.assess_subtask(subtask) is Stakes.CRITICAL + + def test_keyword_match_is_case_insensitive(self) -> None: + assessor = DefaultStakesAssessor() + subtask = _subtask( + description="Touches the SECURITY boundary", + complexity=Complexity.SIMPLE, + ) + assert assessor.assess_subtask(subtask) is Stakes.HIGH + + def test_never_downgrades_below_complexity_base(self) -> None: + """A complex subtask with no keyword stays HIGH, not pulled to NORMAL.""" + assessor = DefaultStakesAssessor() + assert assessor.assess_subtask(_subtask(complexity=Complexity.COMPLEX)) is ( + Stakes.HIGH + ) + + def test_critical_priority_elevates_task(self) -> None: + assessor = DefaultStakesAssessor() + task = _task(complexity=Complexity.SIMPLE, priority=Priority.CRITICAL) + assert assessor.assess_task(task) is Stakes.HIGH + + def test_critical_priority_elevation_can_be_disabled(self) -> None: + assessor = DefaultStakesAssessor( + StakesAssessmentConfig(elevate_on_critical_priority=False), + ) + task = _task(complexity=Complexity.SIMPLE, priority=Priority.CRITICAL) + assert assessor.assess_task(task) is Stakes.LOW + + def test_subtask_priority_path_does_not_elevate(self) -> None: + """Subtasks carry no priority; only complexity/keywords apply.""" + assessor = DefaultStakesAssessor() + assert assessor.assess_subtask(_subtask(complexity=Complexity.SIMPLE)) is ( + Stakes.LOW + ) + + +@pytest.mark.unit +class TestBuildStakesAssessor: + """Factory dispatch on the ``assessor`` discriminator.""" + + def test_default_builds_heuristic(self) -> None: + assessor = build_stakes_assessor() + assert isinstance(assessor, DefaultStakesAssessor) + + def test_explicit_heuristic(self) -> None: + assessor = build_stakes_assessor( + StakesAssessmentConfig(assessor="heuristic"), + ) + assert isinstance(assessor, DefaultStakesAssessor) + + def test_unknown_assessor_raises(self) -> None: + with pytest.raises(StrategyFactoryNotFoundError): + build_stakes_assessor(StakesAssessmentConfig(assessor="nope")) diff --git a/tests/unit/engine/stakes/test_propagation.py b/tests/unit/engine/stakes/test_propagation.py new file mode 100644 index 0000000000..135202b46d --- /dev/null +++ b/tests/unit/engine/stakes/test_propagation.py @@ -0,0 +1,107 @@ +"""Stakes propagation from decomposition into created tasks.""" + +import pytest + +from synthorg.core.enums import Complexity, Stakes, TaskType +from synthorg.core.task import Task +from synthorg.engine.decomposition.classifier import TaskStructureClassifier +from synthorg.engine.decomposition.models import ( + DecompositionContext, + DecompositionPlan, + SubtaskDefinition, +) +from synthorg.engine.decomposition.service import DecompositionService + + +class _StaticStrategy: + """Decomposition strategy returning a fixed plan (no LLM).""" + + def __init__(self, plan: DecompositionPlan) -> None: + self._plan = plan + + async def decompose( + self, + task: Task, + context: DecompositionContext, + ) -> DecompositionPlan: + del task, context + return self._plan + + def get_strategy_name(self) -> str: + return "static-test" + + +def _parent_task() -> Task: + return Task( + id="parent-1", + title="Parent", + description="Parent task", + type=TaskType.DEVELOPMENT, + project="proj-1", + created_by="creator", + ) + + +@pytest.mark.unit +class TestDecompositionStakesPropagation: + """The service assesses each subtask and stamps stakes on its task.""" + + async def test_mixed_subtasks_get_distinct_stakes(self) -> None: + plan = DecompositionPlan( + parent_task_id="parent-1", + subtasks=( + SubtaskDefinition( + id="low", + title="Update changelog", + description="Tidy the docs wording", + estimated_complexity=Complexity.SIMPLE, + ), + SubtaskDefinition( + id="high", + title="Design the architecture", + description="Make the core architecture decision", + estimated_complexity=Complexity.COMPLEX, + ), + SubtaskDefinition( + id="critical", + title="Production migration", + description="Run an irreversible production deployment", + estimated_complexity=Complexity.MEDIUM, + ), + ), + ) + service = DecompositionService(_StaticStrategy(plan), TaskStructureClassifier()) + + result = await service.decompose_task( + _parent_task(), + DecompositionContext(), + ) + + stakes_by_id = {t.id: t.stakes for t in result.created_tasks} + assert stakes_by_id["low"] is Stakes.LOW + assert stakes_by_id["high"] is Stakes.HIGH + assert stakes_by_id["critical"] is Stakes.CRITICAL + + async def test_plan_subtasks_carry_same_stakes_as_tasks(self) -> None: + plan = DecompositionPlan( + parent_task_id="parent-1", + subtasks=( + SubtaskDefinition( + id="only", + title="Refactor the payment flow", + description="Touch the billing path", + estimated_complexity=Complexity.SIMPLE, + ), + ), + ) + service = DecompositionService(_StaticStrategy(plan), TaskStructureClassifier()) + + result = await service.decompose_task( + _parent_task(), + DecompositionContext(), + ) + + plan_stakes = {s.id: s.stakes for s in result.plan.subtasks} + task_stakes = {t.id: t.stakes for t in result.created_tasks} + assert plan_stakes == task_stakes + assert task_stakes["only"] is Stakes.HIGH diff --git a/web/src/api/types/enum-values.gen.ts b/web/src/api/types/enum-values.gen.ts index f06473a367..a9481ca1ab 100644 --- a/web/src/api/types/enum-values.gen.ts +++ b/web/src/api/types/enum-values.gen.ts @@ -693,6 +693,14 @@ export const SOURCE_TYPE_VALUES = [ ] as const export type SourceType = (typeof SOURCE_TYPE_VALUES)[number] +export const STAKES_VALUES = [ + 'low', + 'normal', + 'high', + 'critical', +] as const +export type Stakes = (typeof STAKES_VALUES)[number] + export const STRATEGIC_OUTPUT_MODE_VALUES = [ 'option_expander', 'advisor', diff --git a/web/src/api/types/openapi.gen.ts b/web/src/api/types/openapi.gen.ts index 2a86f4979d..c5d3178729 100644 --- a/web/src/api/types/openapi.gen.ts +++ b/web/src/api/types/openapi.gen.ts @@ -12007,6 +12007,20 @@ export type components = { readonly stage_result: components["schemas"]["ReviewStageResult"]; readonly task_id: string; }; + /** + * Stakes + * @description How consequential a subtask or task is for stakes-aware routing. + * + * Distinct from :class:`Priority` (urgency/importance) and + * :class:`Complexity` (effort): stakes captures the *cost of being + * wrong*. Low-stakes work tolerates a cheap model; high-stakes work + * (architecture, irreversible decisions) warrants a strong model and + * an adversarial red-team review. The authoritative ordering lives in + * ``_STAKES_ORDER`` below. + * @default normal + * @enum {string} + */ + readonly Stakes: "low" | "normal" | "high" | "critical"; /** StartSimulationPayload */ readonly StartSimulationPayload: { readonly config: components["schemas"]["SimulationConfig"]; @@ -12158,6 +12172,7 @@ export type components = { * @enum {string|null} */ readonly source: "internal" | "client" | "simulation" | null; + readonly stakes: components["schemas"]["Stakes"]; readonly status: components["schemas"]["TaskStatus"]; /** * @description Classification of subtask relationships (None = not classified) From 094951b6a1fe9908e7712f3a9517cf253eb97a0c Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Thu, 21 May 2026 19:16:40 +0200 Subject: [PATCH 2/8] fix: pin red-team review task to critical stakes so its agent is not downgraded --- src/synthorg/security/redteam/runner.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/synthorg/security/redteam/runner.py b/src/synthorg/security/redteam/runner.py index 8f5bb28b41..5f699f125c 100644 --- a/src/synthorg/security/redteam/runner.py +++ b/src/synthorg/security/redteam/runner.py @@ -13,7 +13,7 @@ from typing import TYPE_CHECKING from uuid import uuid4 -from synthorg.core.enums import Complexity, Priority, TaskStatus, TaskType +from synthorg.core.enums import Complexity, Priority, Stakes, TaskStatus, TaskType from synthorg.core.task import AcceptanceCriterion, Task from synthorg.core.types import NotBlankStr # noqa: TC001 from synthorg.observability import get_logger, safe_error_description @@ -126,4 +126,8 @@ def _build_transient_task( acceptance_criteria=criteria, status=TaskStatus.IN_PROGRESS, estimated_complexity=Complexity.SIMPLE, + # The adversarial review is the highest-stakes check: pin it so + # stakes-aware routing keeps the red-team agent on its strong + # configured tier rather than downgrading the reviewer. + stakes=Stakes.CRITICAL, ) From 274b8700ce63fc1be54bc01ae6352d4fad6effdf Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Thu, 21 May 2026 19:27:12 +0200 Subject: [PATCH 3/8] test: register stakes_routing event module in discovery test --- tests/unit/observability/test_events.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/observability/test_events.py b/tests/unit/observability/test_events.py index 49c8ddb5a0..e70f14da52 100644 --- a/tests/unit/observability/test_events.py +++ b/tests/unit/observability/test_events.py @@ -271,6 +271,7 @@ def test_all_domain_modules_discovered(self) -> None: "security", "session", "stagnation", + "stakes_routing", "strategy", "task", "task_assignment", From 99a953d1d421bffff7e0cb24b8c18d5d84eecc97 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Thu, 21 May 2026 19:33:45 +0200 Subject: [PATCH 4/8] fix: satisfy persistence-boundary and review-origin gates --- src/synthorg/engine/stakes/config.py | 2 +- .../unit/engine/routing_policy/test_acceptance_comparison.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/synthorg/engine/stakes/config.py b/src/synthorg/engine/stakes/config.py index 962d0c7742..77a11b1f24 100644 --- a/src/synthorg/engine/stakes/config.py +++ b/src/synthorg/engine/stakes/config.py @@ -79,7 +79,7 @@ class ComplexityStakesRule(BaseModel): DEFAULT_CRITICAL_STAKES_KEYWORDS: tuple[NotBlankStr, ...] = ( "irreversible", "data loss", - "drop table", + "drop table", # lint-allow: persistence-boundary -- detection keyword "delete production", "production deployment", "destructive", diff --git a/tests/unit/engine/routing_policy/test_acceptance_comparison.py b/tests/unit/engine/routing_policy/test_acceptance_comparison.py index 8efe8d29c2..2f85612836 100644 --- a/tests/unit/engine/routing_policy/test_acceptance_comparison.py +++ b/tests/unit/engine/routing_policy/test_acceptance_comparison.py @@ -1,6 +1,6 @@ """Acceptance: stakes-aware routing beats flat on a mixed brief. -Encodes issue #1998's acceptance criterion as a deterministic +Encodes the acceptance criterion as a deterministic simulation: a mixed brief is decomposed (so each subtask carries an assessed stakes level), then routed under both the flat control arm and the stakes-aware arm. With a conservative flat baseline (every subtask @@ -162,7 +162,7 @@ def _floor_for(task: Task, floors: QualityFloors) -> float: @pytest.mark.unit class TestStakesAwareBeatsFlatOnMixedBrief: - """The core acceptance comparison for issue #1998.""" + """The core acceptance comparison for stakes-aware routing.""" async def test_cost_drops_at_equal_or_better_quality(self) -> None: tasks = await _decomposed_tasks() From a157ab9ac680f720219ed9e4e733e74f6f2770ee Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Fri, 22 May 2026 07:15:34 +0200 Subject: [PATCH 5/8] fix: address pre-PR review findings for stakes-aware routing NotBlankStr strategy discriminator, QualityFloors ordering validator, under-floor and benchmark-failure guards in the router, redaction-safe decomposition logging, stakes-assessed state log, ghost-wiring manifest entry, tier/boundary/fallback tests, e2e cost-drop simulation, and docs. --- docs/design/engine.md | 1 + docs/design/providers.md | 14 + docs/reference/pluggable-subsystems.md | 14 + scripts/_ghost_wiring_manifest.txt | 1 + src/synthorg/engine/decomposition/service.py | 10 +- src/synthorg/engine/pipeline/service.py | 11 +- src/synthorg/engine/routing_policy/config.py | 28 +- .../engine/routing_policy/strategies.py | 68 ++- .../observability/events/stakes_routing.py | 1 + tests/e2e/test_stakes_routing_e2e.py | 399 ++++++++++++++++++ .../engine/routing_policy/test_strategies.py | 120 ++++++ .../unit/engine/routing_policy/test_tiers.py | 68 +++ tests/unit/engine/stakes/test_assessor.py | 13 + 13 files changed, 722 insertions(+), 26 deletions(-) create mode 100644 tests/e2e/test_stakes_routing_e2e.py create mode 100644 tests/unit/engine/routing_policy/test_tiers.py diff --git a/docs/design/engine.md b/docs/design/engine.md index 0dc44a2f49..1e437a9746 100644 --- a/docs/design/engine.md +++ b/docs/design/engine.md @@ -130,6 +130,7 @@ task: - "Unit and integration tests with >80% coverage" - "API documentation" estimated_complexity: "medium" # simple, medium, complex, epic + stakes: "normal" # low, normal, high, critical (assessed; drives stakes-aware model routing) task_structure: "parallel" # sequential, parallel, mixed coordination_topology: "auto" # auto, sas, centralized, decentralized, context_dependent budget_limit: 2.00 # max spend for this task in base currency (display formatted per budget.currency) diff --git a/docs/design/providers.md b/docs/design/providers.md index ba7f07016e..35dc16a306 100644 --- a/docs/design/providers.md +++ b/docs/design/providers.md @@ -196,6 +196,20 @@ routing: - "ollama" ``` +### Stakes-aware routing (orthogonal layer) + +Model routing above selects *which provider/model* serves a request. **Stakes-aware +routing** is a separate, pluggable layer that re-tiers that selection based on how +consequential the work is. Each task (and subtask) carries a `stakes` level +(`low` / `normal` / `high` / `critical`), assessed by the `StakesAssessor`. The +`StakesRoutingStrategy` then picks the cheapest model tier whose benchmark score +clears the per-stakes quality floor, bumps one tier when coordination metrics are +unhealthy, marks high/critical work for the red-team gate, and never downgrades +below the agent's configured tier. It is config-selectable via +`stakes_routing.strategy` (`stakes_aware` default, `flat` to opt out) and applied in +the engine *before* the budget auto-downgrade, so a hard budget ceiling still wins +over a stakes upgrade. See [Pluggable Subsystems](../reference/pluggable-subsystems.md). + ### Multi-Provider Model Resolution When multiple providers register the same model ID or alias, the `ModelResolver` diff --git a/docs/reference/pluggable-subsystems.md b/docs/reference/pluggable-subsystems.md index 0a765dcb2c..d60fe7cc13 100644 --- a/docs/reference/pluggable-subsystems.md +++ b/docs/reference/pluggable-subsystems.md @@ -176,6 +176,20 @@ Domain errors live at `meta/errors.py::RollbackMutationDeniedError` (409) and `U - `engine/workspace/git_backend/external_remote.py::ExternalRemoteGitBackend` (GitHub / GitLab / Gitea / Forgejo resolved via the connection catalog; ships protocol + thin clone/push/fetch glue; deep OAuth hardening is a tracked follow-up). - `engine/workspace/git_backend/factory.py::build_git_backend()`: `StrategyRegistry[GitBackend]` keyed on `GitBackendType`. Missing required deps fail fast at construction with `GitBackendConfigError`. Wired at boot in `api/app.py::_install_runtime_services` under the `has_persistence` gate, alongside `ProjectWorkspaceService`. +### Stakes assessment (model-routing input) + +- `engine/stakes/protocol.py`: `StakesAssessor` `@runtime_checkable` Protocol (`assess_task(task)` / `assess_subtask(subtask)` returning `Stakes`). +- `engine/stakes/heuristic.py::DefaultStakesAssessor` (safe default: deterministic, combines complexity base mapping, high/critical keyword signals, and critical-priority elevation; unknown complexity fails safe upward to HIGH). +- `engine/stakes/config.py::StakesAssessmentConfig` (frozen) with `assessor: NotBlankStr` discriminator, the complexity-to-stakes rules, and the keyword sets. +- `engine/stakes/factory.py::build_stakes_assessor()`: `StrategyRegistry[StakesAssessor]` keyed on `assessor` ("heuristic" default). Consumed by `DecompositionService` (per-subtask) and the work pipeline's LEAF path (parent task). + +### Stakes-aware model routing + +- `engine/routing_policy/protocol.py`: `StakesRoutingStrategy` `@runtime_checkable` Protocol (`route(task, identity)` returning a frozen `StakesRoutingDecision`). +- `engine/routing_policy/strategies.py::StakesAwareStrategy` (safe default: picks the cheapest tier whose benchmark score clears the per-stakes `QualityFloors`, bumps one tier when coordination metrics are unhealthy, marks high/critical work for the red-team gate, and never downgrades below the agent's configured tier) and `FlatStrategy` (no-op control / opt-out). +- `engine/routing_policy/config.py::StakesRoutingConfig` (frozen) with `strategy: NotBlankStr` discriminator, `QualityFloors` (validated non-decreasing), `red_team_min_stakes`, and the coordination-nudge thresholds. +- `engine/routing_policy/factory.py::build_stakes_router()`: `StrategyRegistry[StakesRoutingStrategy]` keyed on `strategy` ("stakes_aware" default; "stakes_aware" requires a benchmark provider, "flat" is dependency-free). Wired at boot in `workers/runtime_builder.py::_build_stakes_router_or_none` and injected into `AgentEngine`, which applies routing before the budget auto-downgrade (a hard budget ceiling wins over a stakes upgrade). + ## Services are a distinct pattern (not pluggable subsystems) A **service** wraps one or more repositories to keep controllers thin and centralise audit logging, and MAY orchestrate multiple repositories (e.g. `WorkflowService` spans `workflow_definitions` + `workflow_versions`; `MemoryService` spans fine-tune checkpoints + runs + settings). diff --git a/scripts/_ghost_wiring_manifest.txt b/scripts/_ghost_wiring_manifest.txt index 38ea1c784c..f5e41b5e09 100644 --- a/scripts/_ghost_wiring_manifest.txt +++ b/scripts/_ghost_wiring_manifest.txt @@ -90,3 +90,4 @@ ENFORCED SandboxBriefRunner #1995 -- constructed by meta/toolsmith/factory.py::b ENFORCED ToolCreationApplier #1995 -- constructed by meta/toolsmith/factory.py::build_toolsmith; validates then live-registers an approved blueprint, retires on rollback ENFORCED DynamicToolRegistry #1995 -- constructed by meta/toolsmith/factory.py::build_toolsmith; mutable live authored-tool registry read behind the static surface ENFORCED install_dynamic_tool_layer #1995 -- called by api/app.py::_wire_toolsmith; layers the dynamic registry into the live MCP invoker so authored tools dispatch +ENFORCED build_stakes_router #1998 -- called by workers/runtime_builder._build_stakes_router_or_none when a benchmark provider is wired; injected into AgentEngine for stakes-aware tier selection before budget downgrade diff --git a/src/synthorg/engine/decomposition/service.py b/src/synthorg/engine/decomposition/service.py index 48f3a6cb2e..4ce878605b 100644 --- a/src/synthorg/engine/decomposition/service.py +++ b/src/synthorg/engine/decomposition/service.py @@ -15,7 +15,7 @@ ) from synthorg.engine.decomposition.rollup import StatusRollup from synthorg.engine.stakes import build_stakes_assessor -from synthorg.observability import get_logger +from synthorg.observability import get_logger, safe_error_description from synthorg.observability.events.decomposition import ( DECOMPOSITION_COMPLETED, DECOMPOSITION_FAILED, @@ -83,11 +83,15 @@ async def decompose_task( try: return await self._do_decompose(task, context) - except Exception: - logger.exception( + except MemoryError, RecursionError: + raise + except Exception as exc: + logger.warning( DECOMPOSITION_FAILED, task_id=task.id, strategy=self._strategy.get_strategy_name(), + error_type=type(exc).__name__, + error=safe_error_description(exc), ) raise diff --git a/src/synthorg/engine/pipeline/service.py b/src/synthorg/engine/pipeline/service.py index ccdc4255c5..b8c7e4ec91 100644 --- a/src/synthorg/engine/pipeline/service.py +++ b/src/synthorg/engine/pipeline/service.py @@ -41,6 +41,7 @@ PIPELINE_RUN_STARTED, PIPELINE_SOLO_AGENT_SELECTED, ) +from synthorg.observability.events.stakes_routing import STAKES_ASSESSED if TYPE_CHECKING: from collections.abc import Awaitable @@ -261,11 +262,19 @@ async def _assess_stakes(self, task: Task, work_item: WorkItem) -> Task: stakes = self._stakes_assessor.assess_task(task) if stakes is task.stakes: return task - return await self._task_engine.update_task( + updated = await self._task_engine.update_task( task.id, {"stakes": stakes}, requested_by=work_item.requested_by, ) + logger.info( + STAKES_ASSESSED, + task_id=task.id, + from_stakes=task.stakes.value, + to_stakes=stakes.value, + path="leaf", + ) + return updated async def _link_forecast(self, task: Task, work_item: WorkItem) -> Task: """Stamp the approved forecast id + ceiling onto the task. diff --git a/src/synthorg/engine/routing_policy/config.py b/src/synthorg/engine/routing_policy/config.py index cfbd512f61..6ccfc066ed 100644 --- a/src/synthorg/engine/routing_policy/config.py +++ b/src/synthorg/engine/routing_policy/config.py @@ -16,6 +16,7 @@ from pydantic import BaseModel, ConfigDict, Field, model_validator from synthorg.core.enums import Stakes +from synthorg.core.types import NotBlankStr # noqa: TC001 # Per-stakes benchmark quality floors (0 to 100). A model tier is a # candidate only when its benchmark score clears the floor for the @@ -54,6 +55,23 @@ class QualityFloors(BaseModel): high: float = Field(default=_FLOOR_HIGH, ge=_FLOOR_MIN, le=_FLOOR_MAX) critical: float = Field(default=_FLOOR_CRITICAL, ge=_FLOOR_MIN, le=_FLOOR_MAX) + @model_validator(mode="after") + def _validate_floors_ordered(self) -> Self: + """Reject floors that invert the stakes hierarchy. + + A lower-stakes subtask must never carry a higher quality bar than + a higher-stakes one; otherwise routing would send cheap work to + strong models and consequential work to weak ones. + """ + if not self.low <= self.normal <= self.high <= self.critical: + msg = ( + "quality floors must be non-decreasing across stakes: " + f"low={self.low} <= normal={self.normal} <= " + f"high={self.high} <= critical={self.critical}" + ) + raise ValueError(msg) + return self + def for_stakes(self, stakes: Stakes) -> float: """Return the quality floor for *stakes*.""" return { @@ -85,7 +103,7 @@ class StakesRoutingConfig(BaseModel): model_config = ConfigDict(frozen=True, allow_inf_nan=False, extra="forbid") - strategy: str = Field( + strategy: NotBlankStr = Field( default="stakes_aware", description="Routing strategy discriminator", ) @@ -112,11 +130,3 @@ class StakesRoutingConfig(BaseModel): ge=1, description="Recent coordination records inspected for the nudge", ) - - @model_validator(mode="after") - def _validate_strategy(self) -> Self: - """Reject a blank strategy discriminator.""" - if not self.strategy.strip(): - msg = "strategy must not be blank" - raise ValueError(msg) - return self diff --git a/src/synthorg/engine/routing_policy/strategies.py b/src/synthorg/engine/routing_policy/strategies.py index d04eb3251b..ff5c5a34bd 100644 --- a/src/synthorg/engine/routing_policy/strategies.py +++ b/src/synthorg/engine/routing_policy/strategies.py @@ -12,11 +12,12 @@ bump_one, higher_tier, ) -from synthorg.observability import get_logger +from synthorg.observability import get_logger, safe_error_description from synthorg.observability.events.stakes_routing import ( STAKES_ROUTING_COORD_NUDGE, STAKES_ROUTING_TIER_UNRESOLVABLE, ) +from synthorg.providers.errors import ProviderError if TYPE_CHECKING: from synthorg.budget.benchmark_protocol import BenchmarkScoreProvider @@ -101,7 +102,7 @@ async def route( current_tier = identity.model.model_tier floor = self._config.quality_floors.for_stakes(stakes) - target_tier = await self._cheapest_tier_meeting_floor(floor) + target_tier, floor_cleared = await self._cheapest_tier_meeting_floor(floor) nudged = False if target_tier is not None and self._coordination_unhealthy(task.id): @@ -116,7 +117,8 @@ async def route( nudged = True target_tier = bumped - # High/critical work must never run below the agent's own tier. + # Work at or above the configured red_team_min_stakes threshold + # must never run below the agent's own tier. if red_team_required and target_tier is not None and current_tier is not None: target_tier = higher_tier(target_tier, current_tier) @@ -127,6 +129,7 @@ async def route( target_tier=target_tier, nudged=nudged, floor=floor, + floor_cleared=floor_cleared, ) def _build_decision( # noqa: PLR0913 -- keyword-only assembly inputs @@ -138,6 +141,7 @@ def _build_decision( # noqa: PLR0913 -- keyword-only assembly inputs target_tier: ModelTier | None, nudged: bool, floor: float, + floor_cleared: bool, ) -> StakesRoutingDecision: """Assemble the decision, resolving the target tier to a model.""" current = identity.model @@ -165,10 +169,17 @@ def _build_decision( # noqa: PLR0913 -- keyword-only assembly inputs fallback_model=current.fallback_model, model_tier=target_tier, ) - source = "stakes_aware:nudge" if nudged else "stakes_aware:floor" + if nudged: + source = "stakes_aware:nudge" + elif not floor_cleared: + source = "stakes_aware:floor_unmet" + else: + source = "stakes_aware:floor" reason = ( f"stakes={stakes.value}: routed to {target_tier} tier (floor {floor:g})" ) + if not floor_cleared: + reason += " [floor not met; strongest available tier]" return StakesRoutingDecision( selected_model=selected_model, @@ -178,31 +189,62 @@ def _build_decision( # noqa: PLR0913 -- keyword-only assembly inputs source=source, ) - async def _cheapest_tier_meeting_floor(self, floor: float) -> ModelTier | None: - """Return the cheapest tier whose benchmark score clears *floor*. + async def _cheapest_tier_meeting_floor( + self, floor: float + ) -> tuple[ModelTier | None, bool]: + """Return the cheapest tier clearing *floor* and whether it cleared. - Falls back to the strongest resolvable tier when none clears the - floor, and to ``None`` when no tier resolves at all (no resolver - wired, or the provider catalogue lacks the canonical tiers). + The second element is ``True`` only when the returned tier's + benchmark score meets the floor. When no resolvable tier clears + the floor, the strongest resolvable tier is returned with + ``False`` and a logged fallback, so high/critical work is never + silently routed under-floor. ``(None, False)`` is returned when no + tier resolves at all (no resolver wired, or the provider catalogue + lacks the canonical tiers). + + A benchmark-provider failure for one tier is logged and skipped: + retries belong to the provider layer, and a transient lookup error + must not crash the routing decision for every task. """ if self._resolver is None: - return None + return None, False strongest_resolvable: ModelTier | None = None + strongest_score: float | None = None for tier in TIER_LADDER: resolved = self._resolver.resolve_safe(tier) if resolved is None: continue + try: + score = await self._benchmark_provider.get_score(resolved.model_id) + except ProviderError as exc: + logger.warning( + STAKES_ROUTING_TIER_UNRESOLVABLE, + floor=floor, + tier=tier, + reason="benchmark_lookup_failed", + error_type=type(exc).__name__, + error=safe_error_description(exc), + ) + continue strongest_resolvable = tier - score = await self._benchmark_provider.get_score(resolved.model_id) + strongest_score = score.score if score is not None else None if score is not None and score.score >= floor: - return tier + return tier, True if strongest_resolvable is None: logger.warning( STAKES_ROUTING_TIER_UNRESOLVABLE, floor=floor, reason="no_tier_resolved", ) - return strongest_resolvable + return None, False + logger.warning( + STAKES_ROUTING_TIER_UNRESOLVABLE, + floor=floor, + best_tier=strongest_resolvable, + best_score=strongest_score, + reason="no_tier_clears_floor", + ) + return strongest_resolvable, False def _resolve_tier(self, tier: ModelTier) -> ResolvedModel | None: """Resolve a tier alias to a model, or ``None``.""" diff --git a/src/synthorg/observability/events/stakes_routing.py b/src/synthorg/observability/events/stakes_routing.py index 908bc9a053..96e1bf1664 100644 --- a/src/synthorg/observability/events/stakes_routing.py +++ b/src/synthorg/observability/events/stakes_routing.py @@ -2,6 +2,7 @@ from typing import Final +STAKES_ASSESSED: Final[str] = "stakes_routing.assessed" STAKES_ROUTING_DECIDED: Final[str] = "stakes_routing.decided" STAKES_ROUTING_TIER_ADJUSTED: Final[str] = "stakes_routing.tier_adjusted" STAKES_ROUTING_COORD_NUDGE: Final[str] = "stakes_routing.coordination_nudge" diff --git a/tests/e2e/test_stakes_routing_e2e.py b/tests/e2e/test_stakes_routing_e2e.py new file mode 100644 index 0000000000..0c398b47e4 --- /dev/null +++ b/tests/e2e/test_stakes_routing_e2e.py @@ -0,0 +1,399 @@ +"""Acceptance: stakes-aware routing cuts cost on a mixed-stakes brief. + +Drives the REAL runtime through the production ``build_runtime_services`` +(the exact code the boot hook runs) with a deterministic +``ScriptedDriver`` and a tier-priced provider catalogue, under the +simulation harness (zero real LLM spend). A single brief decomposes into +a low-stakes subtask and a critical-stakes subtask; the same brief is run +twice, once with the ``stakes_aware`` routing strategy and once with the +``flat`` control arm. + +The acceptance for #1998 is that, on a mixed brief, cheap models handle +low-stakes subtasks and strong models handle high/critical ones, so total +cost drops versus flat routing at no quality-floor regression. The +scripted driver prices each completion by the model tier it is called +with, so the cost the ``CostTracker`` accrues reflects the tier the +router selected: stakes-aware routes the low-stakes subtask down to the +cheap tier while flat keeps every subtask on the agent's configured large +tier, so the stakes-aware run costs strictly less. +""" + +from collections.abc import AsyncGenerator +from datetime import date +from pathlib import Path +from typing import Any +from uuid import uuid4 + +import pytest + +from synthorg.api.approval_store import ApprovalStore +from synthorg.api.state import AppState +from synthorg.budget.benchmark_stub import StubBenchmarkScoreProvider +from synthorg.budget.coordination_config import CoordinationMetricsConfig +from synthorg.budget.tracker import CostTracker +from synthorg.client.simulation_state import ClientSimulationState +from synthorg.config.provider_schema import ProviderConfig, ProviderModelConfig +from synthorg.config.schema import RootConfig +from synthorg.core.agent import AgentIdentity, ModelConfig, SkillSet +from synthorg.core.enums import ( + AgentStatus, + Complexity, + Priority, + SeniorityLevel, + TaskType, +) +from synthorg.core.role import Authority, Skill +from synthorg.core.types import ModelTier +from synthorg.engine.intake.engine import IntakeEngine +from synthorg.engine.intake.models import IntakeResult +from synthorg.engine.pipeline.models import ( + ExecutionPath, + RoutingVerdict, + WorkItem, + WorkSource, +) +from synthorg.engine.pipeline.service import DefaultWorkPipeline +from synthorg.engine.routing_policy.config import StakesRoutingConfig +from synthorg.engine.task_engine import TaskEngine +from synthorg.engine.task_engine_models import CreateTaskData +from synthorg.hr.registry import AgentRegistryService +from synthorg.providers.drivers.scripted import ScriptedDriver +from synthorg.providers.enums import FinishReason +from synthorg.providers.models import ( + ChatMessage, + CompletionConfig, + CompletionResponse, + TokenUsage, + ToolCall, + ToolDefinition, +) +from synthorg.providers.registry import ProviderRegistry +from synthorg.settings.registry import get_registry +from synthorg.settings.resolver import ConfigResolver +from synthorg.settings.service import SettingsService +from synthorg.workers.runtime_builder import build_runtime_services +from tests._shared import FakeClock, mock_of +from tests.unit.api.fakes import FakePersistenceBackend + +pytestmark = pytest.mark.e2e + +_DECOMPOSITION_TOOL = "submit_decomposition_plan" +_DEBUG_SKILL = "debug" +_DATABASE_SKILL = "database" +_PROVIDER = "test-provider" + +# Tier-priced model catalogue. Model ids carry the tier token so the +# StubBenchmarkScoreProvider scores them (small 72, medium 85, large 92) +# and the scripted driver can price each completion by tier. +_TIER_MODEL_IDS: dict[ModelTier, str] = { + "small": "example-small-001", + "medium": "example-medium-001", + "large": "example-large-001", +} +_TIER_COST_PER_1K: dict[ModelTier, float] = { + "small": 0.001, + "medium": 0.005, + "large": 0.02, +} + + +def _cost_for_model(model_id: str) -> float: + """Price a completion by the tier embedded in *model_id*.""" + for tier, cost in _TIER_COST_PER_1K.items(): + if tier in model_id: + return cost + return _TIER_COST_PER_1K["large"] + + +class _MixedStakesStrategy: + """Decompose into a low-stakes and a critical-stakes subtask. + + Prices every completion by the model tier it is invoked with, so the + accrued cost reflects the router's tier choice. + """ + + def next_response( + self, + messages: list[ChatMessage], + model: str, + tools: list[ToolDefinition] | None, + config: CompletionConfig | None, + ) -> CompletionResponse: + del messages, config + cost = _cost_for_model(model) + usage = TokenUsage(input_tokens=8, output_tokens=4, cost=cost) + is_decomposition = tools is not None and any( + t.name == _DECOMPOSITION_TOOL for t in tools + ) + if is_decomposition: + return CompletionResponse( + content=None, + tool_calls=( + ToolCall( + id="decomp-1", + name=_DECOMPOSITION_TOOL, + arguments={ + "task_structure": "parallel", + "coordination_topology": "centralized", + "subtasks": [ + { + "id": "sub-cheap", + "title": "Tidy the log formatting", + "description": "Adjust logger output spacing.", + "estimated_complexity": "simple", + "required_skills": [_DEBUG_SKILL], + }, + { + "id": "sub-critical", + "title": "Migrate the production schema", + "description": ( + "Run an irreversible production " + "migration of the live schema." + ), + "estimated_complexity": "complex", + "required_skills": [_DATABASE_SKILL], + }, + ], + }, + ), + ), + finish_reason=FinishReason.TOOL_USE, + usage=usage, + model=model, + ) + return CompletionResponse( + content="Work complete.", + finish_reason=FinishReason.STOP, + usage=usage, + model=model, + ) + + +class _TaskCreatingIntakeStrategy: + """Deterministic intake: persist a real task via the task engine.""" + + def __init__(self, task_engine: TaskEngine) -> None: + self._task_engine = task_engine + + async def process(self, request: Any) -> IntakeResult: + meta = request.metadata + created = await self._task_engine.create_task( + CreateTaskData( + title=request.requirement.title, + description=request.requirement.description, + type=TaskType.DEVELOPMENT, + project=str(meta["project"]), + created_by=str(meta["requested_by"]), + priority=Priority.MEDIUM, + estimated_complexity=Complexity.MEDIUM, + ), + requested_by=str(meta["requested_by"]), + ) + return IntakeResult.accepted_result( + request_id=request.request_id, + task_id=created.id, + ) + + +def _provider_catalogue() -> dict[str, ProviderConfig]: + """A single provider exposing the three tier aliases the router uses.""" + return { + _PROVIDER: ProviderConfig( + driver="scripted", + models=tuple( + ProviderModelConfig( + id=_TIER_MODEL_IDS[tier], + alias=tier, + cost_per_1k_input=_TIER_COST_PER_1K[tier], + cost_per_1k_output=_TIER_COST_PER_1K[tier], + max_context=128000, + ) + for tier in _TIER_MODEL_IDS + ), + ), + } + + +def _large_tier_agent(name: str, skill: str) -> AgentIdentity: + """An agent configured on the large tier (flat keeps it there).""" + return AgentIdentity( + id=uuid4(), + name=name, + role="developer", + department="engineering", + level=SeniorityLevel.MID, + skills=SkillSet(primary=(Skill(id=skill, name=skill),)), + authority=Authority(budget_limit=100.0), + model=ModelConfig( + provider=_PROVIDER, + model_id=_TIER_MODEL_IDS["large"], + model_tier="large", + ), + hiring_date=date(2026, 1, 1), + status=AgentStatus.ACTIVE, + ) + + +def _project(project_id: str) -> Any: + from synthorg.core.enums import ProjectStatus + from synthorg.core.project import Project + + return Project( + id=project_id, + name=project_id, + description="acceptance project", + status=ProjectStatus.ACTIVE, + ) + + +@pytest.fixture +async def persistence() -> AsyncGenerator[FakePersistenceBackend]: + backend = FakePersistenceBackend() + await backend.connect() + yield backend + await backend.disconnect() + + +@pytest.fixture +async def task_engine( + persistence: FakePersistenceBackend, +) -> AsyncGenerator[TaskEngine]: + engine = TaskEngine(persistence=persistence) + await engine.start() + yield engine + await engine.stop() + + +async def _build_pipeline( + *, + persistence: FakePersistenceBackend, + task_engine: TaskEngine, + tmp_path: Path, + stakes_strategy: str, + cost_tracker: CostTracker, +) -> DefaultWorkPipeline: + provider = ScriptedDriver(_PROVIDER, strategy=_MixedStakesStrategy()) + registry = ProviderRegistry({_PROVIDER: provider}) + agent_registry = AgentRegistryService() + for agent in ( + _large_tier_agent("debugger", _DEBUG_SKILL), + _large_tier_agent("dba", _DATABASE_SKILL), + ): + await agent_registry.register(agent) + + root_config = RootConfig( + company_name="stakes-routing-e2e", + coordination_metrics=CoordinationMetricsConfig(enabled=True), + providers=_provider_catalogue(), + stakes_routing=StakesRoutingConfig(strategy=stakes_strategy), + ) + settings_service = SettingsService( + repository=persistence.settings, + registry=get_registry(), + ) + await settings_service.set("coordination", "routing_policy", "always-team") + config_resolver = ConfigResolver( + settings_service=settings_service, + config=root_config, + ) + intake = IntakeEngine(strategy=_TaskCreatingIntakeStrategy(task_engine)) + app_state = mock_of[AppState]( + has_active_provider=True, + provider_registry=registry, + config=root_config, + config_resolver=config_resolver, + task_engine=task_engine, + agent_registry=agent_registry, + approval_store=ApprovalStore(), + clock=FakeClock(), + event_stream_hub=None, + interrupt_store=None, + project_workspace_service=None, + agent_workspace_root=tmp_path, + persistence=persistence, + has_simulation_runtime=True, + client_simulation_state=mock_of[ClientSimulationState]( + intake_engine=intake, + ), + benchmark_provider=StubBenchmarkScoreProvider(), + has_cost_tracker=True, + cost_tracker=cost_tracker, + has_message_bus=False, + has_coordination_metrics_store=False, + coordination_metrics_store=None, + has_audit_log=False, + has_memory_backend=False, + has_performance_tracker=False, + has_trust_service=False, + ) + runtime = await build_runtime_services(app_state, workspace_root=tmp_path) + pipeline = runtime.work_pipeline + assert isinstance(pipeline, DefaultWorkPipeline) + return pipeline + + +async def _run_brief( + *, + persistence: FakePersistenceBackend, + task_engine: TaskEngine, + tmp_path: Path, + stakes_strategy: str, + project: str, +) -> float: + """Run the mixed-stakes brief and return the total accrued cost.""" + cost_tracker = CostTracker() + pipeline = await _build_pipeline( + persistence=persistence, + task_engine=task_engine, + tmp_path=tmp_path, + stakes_strategy=stakes_strategy, + cost_tracker=cost_tracker, + ) + work_item = WorkItem( + origin_adapter_id="harness", + source=WorkSource.SIMULATION, + title="Production incident remediation", + raw_intent="Tidy a log line and migrate the production schema.", + project=project, + requested_by="operator", + ) + result = await pipeline.run(work_item) + assert result.verdict is RoutingVerdict.SPLITTABLE + assert result.execution_path is ExecutionPath.TEAM + assert result.is_success is True + return await cost_tracker.get_total_cost() + + +async def test_stakes_aware_costs_less_than_flat_on_mixed_brief( + persistence: FakePersistenceBackend, + task_engine: TaskEngine, + tmp_path: Path, +) -> None: + """Stakes-aware routing accrues strictly less cost than the flat arm. + + The flat arm keeps every subtask on the agent's configured large + tier; stakes-aware routes the low-stakes subtask down to the cheap + tier, so the same brief costs less end-to-end with no quality-floor + regression (each selected tier still clears its per-stakes floor). + """ + await persistence.projects.create(_project("proj-aware")) + await persistence.projects.create(_project("proj-flat")) + + aware_cost = await _run_brief( + persistence=persistence, + task_engine=task_engine, + tmp_path=tmp_path, + stakes_strategy="stakes_aware", + project="proj-aware", + ) + flat_cost = await _run_brief( + persistence=persistence, + task_engine=task_engine, + tmp_path=tmp_path, + stakes_strategy="flat", + project="proj-flat", + ) + + assert aware_cost > 0.0 + assert flat_cost > 0.0 + assert aware_cost < flat_cost diff --git a/tests/unit/engine/routing_policy/test_strategies.py b/tests/unit/engine/routing_policy/test_strategies.py index 9df7cca64d..0b2a256e9b 100644 --- a/tests/unit/engine/routing_policy/test_strategies.py +++ b/tests/unit/engine/routing_policy/test_strategies.py @@ -1,10 +1,12 @@ """Unit tests for stakes-aware routing strategies and factory.""" +from collections.abc import Mapping from datetime import UTC, datetime import pytest from tests._shared.scripted_provider import make_e2e_identity +from synthorg.budget.benchmark_protocol import BenchmarkScore from synthorg.budget.benchmark_stub import StubBenchmarkScoreProvider from synthorg.budget.coordination_metrics import ( CoordinationMetrics, @@ -29,6 +31,22 @@ from synthorg.providers.routing.models import ResolvedModel from synthorg.providers.routing.resolver import ModelResolver + +class _NoScoreProvider: + """Benchmark provider that has no score for any model. + + Exercises the under-floor fallback: every tier resolves but none can + clear its quality floor, so routing must pick the strongest tier and + flag the decision rather than crash or silently downgrade. + """ + + async def get_score(self, model_id: str) -> BenchmarkScore | None: + return None + + async def list_scores(self) -> Mapping[str, BenchmarkScore]: + return {} + + _PROVIDER = "example-provider" _TIER_MODEL_IDS: dict[ModelTier, str] = { "small": "example-small-001", @@ -279,3 +297,105 @@ def test_unknown_strategy_raises(self) -> None: def test_stakes_aware_without_benchmark_raises(self) -> None: with pytest.raises(ValueError, match="benchmark"): build_stakes_router(StakesRoutingConfig(strategy="stakes_aware")) + + +@pytest.mark.unit +class TestFloorBoundary: + """A score exactly equal to the floor clears it (``>=`` boundary). + + Stub scores: small=72, medium=85, large=92. NORMAL stakes avoid the + red-team tier-floor interaction so the floor selection is observed + directly on a large-tier agent. + """ + + @pytest.mark.parametrize( + ("normal_floor", "expected_tier"), + [ + (72.0, "small"), # small score == floor: clears + (72.01, "medium"), # just above small: small fails, medium clears + (85.0, "medium"), # medium score == floor: clears + (85.01, "large"), # just above medium: large clears + ], + ) + async def test_score_equal_to_floor_clears( + self, + normal_floor: float, + expected_tier: ModelTier, + ) -> None: + config = StakesRoutingConfig( + quality_floors=QualityFloors( + low=0, normal=normal_floor, high=100, critical=100 + ), + ) + decision = await _strategy(config=config).route( + task=_task(Stakes.NORMAL), + identity=_identity("large"), + ) + assert decision.selected_model.model_tier == expected_tier + + +@pytest.mark.unit +class TestCoordinationNudgeBoundary: + """The nudge fires only when amplification is strictly above threshold.""" + + def _store_with_amplification( + self, + *, + error_rate_mas: float, + error_rate_sas: float, + ) -> CoordinationMetricsStore: + store = CoordinationMetricsStore() + store.record( + CoordinationMetricsRecord( + task_id="task-1", + computed_at=datetime.now(UTC), + team_size=3, + metrics=CoordinationMetrics( + error_amplification=ErrorAmplification( + error_rate_mas=error_rate_mas, + error_rate_sas=error_rate_sas, + ), + ), + ), + ) + return store + + async def test_amplification_at_threshold_does_not_nudge(self) -> None: + # 0.3 / 0.2 == 1.5, exactly the default threshold (strict ">"). + store = self._store_with_amplification(error_rate_mas=0.3, error_rate_sas=0.2) + decision = await _strategy(coordination_store=store).route( + task=_task(Stakes.NORMAL), # floor -> medium + identity=_identity("small"), + ) + assert decision.selected_model.model_tier == "medium" + assert decision.source == "stakes_aware:floor" + + async def test_amplification_above_threshold_nudges(self) -> None: + # 0.32 / 0.2 == 1.6 > 1.5. + store = self._store_with_amplification(error_rate_mas=0.32, error_rate_sas=0.2) + decision = await _strategy(coordination_store=store).route( + task=_task(Stakes.NORMAL), # floor -> medium, nudged to large + identity=_identity("small"), + ) + assert decision.selected_model.model_tier == "large" + assert decision.source == "stakes_aware:nudge" + + +@pytest.mark.unit +class TestBenchmarkUnavailable: + """No tier clears the floor: fall back to the strongest tier, flagged.""" + + async def test_no_score_falls_back_to_strongest_and_flags(self) -> None: + strategy = StakesAwareStrategy( + benchmark_provider=_NoScoreProvider(), + resolver=_resolver(), + ) + decision = await strategy.route( + task=_task(Stakes.LOW), + identity=_identity("small"), + ) + # No tier clears the floor, so the strongest resolvable tier is + # chosen and the decision is flagged rather than silently kept. + assert decision.selected_model.model_tier == "large" + assert decision.source == "stakes_aware:floor_unmet" + assert "floor not met" in decision.reason diff --git a/tests/unit/engine/routing_policy/test_tiers.py b/tests/unit/engine/routing_policy/test_tiers.py new file mode 100644 index 0000000000..b7eb60edb2 --- /dev/null +++ b/tests/unit/engine/routing_policy/test_tiers.py @@ -0,0 +1,68 @@ +"""Unit tests for the model-tier ladder helpers.""" + +import pytest + +from synthorg.core.types import ModelTier +from synthorg.engine.routing_policy.tiers import ( + TIER_LADDER, + bump_one, + higher_tier, + tier_rank, +) + + +@pytest.mark.unit +class TestTierRank: + """Cheapest-first rank: small=0, medium=1, large=2.""" + + @pytest.mark.parametrize( + ("tier", "rank"), + [("small", 0), ("medium", 1), ("large", 2)], + ) + def test_rank(self, tier: ModelTier, rank: int) -> None: + assert tier_rank(tier) == rank + + def test_ladder_is_cheapest_first(self) -> None: + assert TIER_LADDER == ("small", "medium", "large") + + +@pytest.mark.unit +class TestHigherTier: + """``higher_tier`` returns the stronger of two tiers, order-independent.""" + + @pytest.mark.parametrize( + ("a", "b", "expected"), + [ + ("small", "large", "large"), + ("large", "small", "large"), + ("small", "medium", "medium"), + ("medium", "small", "medium"), + ("medium", "large", "large"), + ("medium", "medium", "medium"), + ("large", "large", "large"), + ("small", "small", "small"), + ], + ) + def test_returns_stronger( + self, + a: ModelTier, + b: ModelTier, + expected: ModelTier, + ) -> None: + assert higher_tier(a, b) == expected + + +@pytest.mark.unit +class TestBumpOne: + """``bump_one`` steps up one tier and saturates at the strongest.""" + + @pytest.mark.parametrize( + ("tier", "expected"), + [ + ("small", "medium"), + ("medium", "large"), + ("large", "large"), + ], + ) + def test_bump(self, tier: ModelTier, expected: ModelTier) -> None: + assert bump_one(tier) == expected diff --git a/tests/unit/engine/stakes/test_assessor.py b/tests/unit/engine/stakes/test_assessor.py index 8a64f83795..3c35149a4f 100644 --- a/tests/unit/engine/stakes/test_assessor.py +++ b/tests/unit/engine/stakes/test_assessor.py @@ -173,6 +173,19 @@ def test_subtask_priority_path_does_not_elevate(self) -> None: Stakes.LOW ) + def test_complexity_without_rule_fails_safe_to_high(self) -> None: + """A complexity absent from the rules biases upward, not to LOW.""" + config = StakesAssessmentConfig( + complexity_rules=( + ComplexityStakesRule(complexity=Complexity.SIMPLE, stakes=Stakes.LOW), + ), + ) + assessor = DefaultStakesAssessor(config) + # EPIC has no rule in this partial config: fail-safe to HIGH. + assert assessor.assess_subtask(_subtask(complexity=Complexity.EPIC)) is ( + Stakes.HIGH + ) + @pytest.mark.unit class TestBuildStakesAssessor: From ac5cdc935a6a30c29a1df1087e072092a45997a3 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Fri, 22 May 2026 07:26:09 +0200 Subject: [PATCH 6/8] test: pin stakes_routing default in RootConfig polyfactory build The new QualityFloors non-decreasing validator rejects polyfactory's independent random floor draws, mirroring the existing IntegrationsConfig pin in the same test. --- tests/unit/config/test_schema.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tests/unit/config/test_schema.py b/tests/unit/config/test_schema.py index 4556080e62..3571169e46 100644 --- a/tests/unit/config/test_schema.py +++ b/tests/unit/config/test_schema.py @@ -596,9 +596,16 @@ def test_factory(self) -> None: # (``degraded_threshold <= unhealthy_threshold``) that polyfactory # cannot satisfy with independent random draws, so we pin it to # its default here rather than pollute the shared factory. + # ``QualityFloors`` likewise requires non-decreasing floors + # (low <= normal <= high <= critical), which independent random + # draws violate, so pin ``stakes_routing`` to its default too. + from synthorg.engine.routing_policy.config import StakesRoutingConfig from synthorg.integrations.config import IntegrationsConfig - cfg = RootConfigFactory.build(integrations=IntegrationsConfig()) + cfg = RootConfigFactory.build( + integrations=IntegrationsConfig(), + stakes_routing=StakesRoutingConfig(), + ) assert isinstance(cfg, RootConfig) assert cfg.company_name From da1786653c5f89b23ac4f61c804cce1268d57af9 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Fri, 22 May 2026 08:06:39 +0200 Subject: [PATCH 7/8] fix: babysit round 1, dashboard stakes fixtures + CI gates + reviewer feedback CI failures: - Add required 'stakes' field to all web Task fixtures/stores/stories/mocks (dashboard type-check, build, melange, and lighthouse failed because stakes was generated as required on the Task DTO but the hand-written TS fixtures were never updated). Re-export STAKES_VALUES/Stakes from the enums barrel and validate stakes in the WS task-frame guard like other behavioural enums. - Regenerate data/runtime_stats.yaml (tests bucket 32,000+ to 33,000+) and re-inject the RS markers in README.md and docs/roadmap/index.md. Reviewer feedback: - runtime_builder: scope the stakes-router ModelResolver to the single active provider (CodeRabbit) so a tier can never resolve to an inactive provider model and execute with the wrong client. - strategies: use model_copy(update=) instead of reconstructing ModelConfig (Gemini). - providers.md: clarify only high/critical work is floored at the configured tier; low/normal may downgrade to save cost (Gemini). - test_acceptance_comparison: Final annotations on module constants plus a note documenting the integrated stakes-assessment dependency (CodeRabbit). --- README.md | 2 +- data/runtime_stats.yaml | 12 ++++----- docs/design/providers.md | 5 ++-- docs/roadmap/index.md | 2 +- .../engine/routing_policy/strategies.py | 14 +++++------ src/synthorg/workers/runtime_builder.py | 25 +++++++++++++++---- .../test_acceptance_comparison.py | 14 ++++++++--- web/src/__tests__/helpers/factories.ts | 1 + .../__tests__/pages/TaskDetailPage.test.tsx | 1 + .../pages/tasks/TaskDetailPanel.test.tsx | 1 + web/src/__tests__/stores/agents.test.ts | 1 + web/src/__tests__/stores/tasks.test.ts | 1 + .../__tests__/utils/tasks.property.test.ts | 1 + web/src/api/types/enums.ts | 2 ++ web/src/mocks/handlers/tasks.ts | 1 + web/src/pages/agents/TaskHistory.stories.tsx | 1 + web/src/pages/tasks/TaskCard.stories.tsx | 1 + web/src/pages/tasks/TaskColumn.stories.tsx | 1 + .../pages/tasks/TaskDetailActions.stories.tsx | 1 + .../pages/tasks/TaskDetailHeader.stories.tsx | 1 + .../tasks/TaskDetailMetadata.stories.tsx | 1 + .../pages/tasks/TaskDetailPanel.stories.tsx | 1 + .../tasks/TaskDetailTimeline.stories.tsx | 1 + web/src/pages/tasks/TaskListView.stories.tsx | 1 + web/src/stores/tasks.ts | 21 ++++++++++++---- 25 files changed, 82 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index b16fd34200..886e73787b 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ SynthOrg is a self-contained, self-hostable platform for **synthetic organisatio It is provider-agnostic (2700+ LLMs via [LiteLLM](https://github.com/BerriAI/litellm)), configuration-driven ([Pydantic v2](https://docs.pydantic.dev/) models), and licensed BUSL-1.1 (converts to Apache 2.0 at the Change Date). -> **Project status (read this).** The framework and infrastructure are built and tested (32,000+ tests, 80%+ coverage): API, dashboard, CLI, dual-backend persistence, the provider layer, and every subsystem as importable, unit-tested components. The autonomous agent **runtime** that makes the organisation actually execute work is **in active development** and tracked openly on the [roadmap](https://synthorg.io/docs/roadmap/) and the [issue tracker](https://github.com/Aureliolo/synthorg/issues). Today, starting SynthOrg brings up the platform and dashboard; running a company end to end is the work in flight. We would rather you see exactly what is built versus in progress than discover it later. +> **Project status (read this).** The framework and infrastructure are built and tested (33,000+ tests, 80%+ coverage): API, dashboard, CLI, dual-backend persistence, the provider layer, and every subsystem as importable, unit-tested components. The autonomous agent **runtime** that makes the organisation actually execute work is **in active development** and tracked openly on the [roadmap](https://synthorg.io/docs/roadmap/) and the [issue tracker](https://github.com/Aureliolo/synthorg/issues). Today, starting SynthOrg brings up the platform and dashboard; running a company end to end is the work in flight. We would rather you see exactly what is built versus in progress than discover it later. ## What is available now diff --git a/data/runtime_stats.yaml b/data/runtime_stats.yaml index c4cb2c0caa..8ede662246 100644 --- a/data/runtime_stats.yaml +++ b/data/runtime_stats.yaml @@ -1,13 +1,13 @@ schema_version: 1 -last_generated_utc: '2026-05-20T21:40:55Z' -generator_revision: 0fafe591c +last_generated_utc: '2026-05-22T06:03:17Z' +generator_revision: ac5cdc935 stats: tests: - raw: 32241 - rounded: 32000 - display: 32,000+ + raw: 33060 + rounded: 33000 + display: 33,000+ mem0_stars: - raw: 56281 + raw: 56395 rounded: 56000 display: 56k+ providers_curated: diff --git a/docs/design/providers.md b/docs/design/providers.md index 35dc16a306..cbd8bbb51f 100644 --- a/docs/design/providers.md +++ b/docs/design/providers.md @@ -204,8 +204,9 @@ consequential the work is. Each task (and subtask) carries a `stakes` level (`low` / `normal` / `high` / `critical`), assessed by the `StakesAssessor`. The `StakesRoutingStrategy` then picks the cheapest model tier whose benchmark score clears the per-stakes quality floor, bumps one tier when coordination metrics are -unhealthy, marks high/critical work for the red-team gate, and never downgrades -below the agent's configured tier. It is config-selectable via +unhealthy, and marks high/critical work for the red-team gate. High/critical work +is never routed below the agent's configured tier; low/normal work may drop to a +cheaper tier (still clearing the floor) to save cost. It is config-selectable via `stakes_routing.strategy` (`stakes_aware` default, `flat` to opt out) and applied in the engine *before* the budget auto-downgrade, so a hard budget ceiling still wins over a stakes upgrade. See [Pluggable Subsystems](../reference/pluggable-subsystems.md). diff --git a/docs/roadmap/index.md b/docs/roadmap/index.md index ae3736d2a2..cf7305b0a4 100644 --- a/docs/roadmap/index.md +++ b/docs/roadmap/index.md @@ -3,7 +3,7 @@ ## Current status SynthOrg is in **active development**. The platform, infrastructure, and -subsystem libraries are built and tested (32,000+ +subsystem libraries are built and tested (33,000+ tests in the latest run, 80%+ coverage) and integrated through a REST + WebSocket API, a React 19 dashboard, and a Go CLI. The autonomous agent **runtime** that makes the organisation actually execute work is the focus of diff --git a/src/synthorg/engine/routing_policy/strategies.py b/src/synthorg/engine/routing_policy/strategies.py index ff5c5a34bd..a1fcab999e 100644 --- a/src/synthorg/engine/routing_policy/strategies.py +++ b/src/synthorg/engine/routing_policy/strategies.py @@ -2,7 +2,6 @@ from typing import TYPE_CHECKING -from synthorg.core.agent import ModelConfig from synthorg.core.enums import Stakes, compare_stakes from synthorg.core.types import ModelTier # noqa: TC001 from synthorg.engine.routing_policy.config import StakesRoutingConfig @@ -161,13 +160,12 @@ def _build_decision( # noqa: PLR0913 -- keyword-only assembly inputs ) ) if changed and resolved is not None and target_tier is not None: - selected_model = ModelConfig( - provider=resolved.provider_name, - model_id=resolved.model_id, - temperature=current.temperature, - max_tokens=current.max_tokens, - fallback_model=current.fallback_model, - model_tier=target_tier, + selected_model = current.model_copy( + update={ + "provider": resolved.provider_name, + "model_id": resolved.model_id, + "model_tier": target_tier, + } ) if nudged: source = "stakes_aware:nudge" diff --git a/src/synthorg/workers/runtime_builder.py b/src/synthorg/workers/runtime_builder.py index deb33ad9dc..b92ba19028 100644 --- a/src/synthorg/workers/runtime_builder.py +++ b/src/synthorg/workers/runtime_builder.py @@ -344,21 +344,31 @@ async def _build_external_api_runtime( ) -def _build_stakes_router_or_none(app_state: AppState) -> StakesRouter | None: +def _build_stakes_router_or_none( + app_state: AppState, + *, + active_provider_name: str, +) -> StakesRouter | None: """Build the stakes-aware model router from live application state. Returns ``None`` when the benchmark provider is absent (cost-dial not wired, e.g. a persistence-less boot), so the engine simply skips stakes routing. Reads the benchmark provider and coordination-metrics - store off ``AppState`` and builds a tier resolver from the configured - providers; ships the ``stakes_aware`` default strategy. + store off ``AppState`` and builds a tier resolver scoped to the + single active provider that the runtime executes against, so the + router can never resolve a tier to a model owned by an inactive + provider and hand it to the wrong client; ships the ``stakes_aware`` + default strategy. """ from synthorg.providers.routing.resolver import ModelResolver # noqa: PLC0415 benchmark_provider = app_state.benchmark_provider if benchmark_provider is None: return None - resolver = ModelResolver.from_config(app_state.config.providers) + provider_cfg = app_state.config.providers.get(active_provider_name) + if provider_cfg is None: + return None + resolver = ModelResolver.from_config({active_provider_name: provider_cfg}) coordination_store = ( app_state.coordination_metrics_store if app_state.has_coordination_metrics_store @@ -379,6 +389,8 @@ def _construct_agent_engine( # noqa: PLR0913 -- boot collaborators threaded in tool_registry: ToolRegistry, coordination_metrics_collector: CoordinationMetricsCollector | None, external_api_runtime: ExternalApiRuntime | None = None, + *, + active_provider_name: str, ) -> AgentEngine: """Assemble the boot ``AgentEngine`` from live application state. @@ -394,7 +406,9 @@ def _construct_agent_engine( # noqa: PLR0913 -- boot collaborators threaded in provider=provider, provider_registry=registry, tool_registry=tool_registry, - stakes_router=_build_stakes_router_or_none(app_state), + stakes_router=_build_stakes_router_or_none( + app_state, active_provider_name=active_provider_name + ), cost_tracker=(app_state.cost_tracker if app_state.has_cost_tracker else None), task_engine=app_state.task_engine, approval_store=app_state.approval_store, @@ -683,6 +697,7 @@ async def build_runtime_services( tool_registry, coordination_metrics_collector, external_api_runtime, + active_provider_name=names[0], ) autonomy_resolver = AutonomyResolver( registry=ActionTypeRegistry(), diff --git a/tests/unit/engine/routing_policy/test_acceptance_comparison.py b/tests/unit/engine/routing_policy/test_acceptance_comparison.py index 2f85612836..3eb4bcddb9 100644 --- a/tests/unit/engine/routing_policy/test_acceptance_comparison.py +++ b/tests/unit/engine/routing_policy/test_acceptance_comparison.py @@ -15,6 +15,8 @@ is fully reproducible without any LLM spend. """ +from typing import Final + import pytest from tests._shared.scripted_provider import make_e2e_identity @@ -39,14 +41,14 @@ from synthorg.providers.routing.models import ResolvedModel from synthorg.providers.routing.resolver import ModelResolver -_PROVIDER = "example-provider" -_TIER_MODEL_IDS: dict[ModelTier, str] = { +_PROVIDER: Final[str] = "example-provider" +_TIER_MODEL_IDS: Final[dict[ModelTier, str]] = { "small": "example-small-001", "medium": "example-medium-001", "large": "example-large-001", } # total_cost_per_1k = input + output; strictly increasing by tier. -_TIER_TOTAL_COST: dict[ModelTier, float] = { +_TIER_TOTAL_COST: Final[dict[ModelTier, float]] = { "small": 0.2, "medium": 1.0, "large": 4.0, @@ -96,6 +98,12 @@ def _parent() -> Task: def _mixed_plan() -> DecompositionPlan: """Mostly low/normal stakes with a couple of high/critical subtasks.""" + # Subtasks deliberately omit an explicit ``stakes=``: this acceptance + # test exercises the assessment end-to-end, so ``DecompositionService`` + # derives each stakes level from ``estimated_complexity`` plus the + # description keywords ("architecture", "production", "irreversible"). + # Keep those keyword cues in sync with the heuristic's inputs if they + # change, or the per-subtask tier assertions below will drift. subtasks = ( SubtaskDefinition( id="st-doc", diff --git a/web/src/__tests__/helpers/factories.ts b/web/src/__tests__/helpers/factories.ts index 3f09148079..7acfd0420a 100644 --- a/web/src/__tests__/helpers/factories.ts +++ b/web/src/__tests__/helpers/factories.ts @@ -33,6 +33,7 @@ export function makeTask(id: string, titleOrOverrides?: string | Partial = {}): Task { artifacts_expected: [], acceptance_criteria: [], estimated_complexity: 'medium', + stakes: 'normal', budget_limit: 1.0, deadline: null, max_retries: 3, diff --git a/web/src/__tests__/stores/tasks.test.ts b/web/src/__tests__/stores/tasks.test.ts index 371305e381..d69bfd933a 100644 --- a/web/src/__tests__/stores/tasks.test.ts +++ b/web/src/__tests__/stores/tasks.test.ts @@ -26,6 +26,7 @@ const mockTask: Task = { artifacts_expected: [], acceptance_criteria: [], estimated_complexity: 'medium', + stakes: 'normal', budget_limit: 10, deadline: null, max_retries: 3, diff --git a/web/src/__tests__/utils/tasks.property.test.ts b/web/src/__tests__/utils/tasks.property.test.ts index 6d957662f4..aaa57626f3 100644 --- a/web/src/__tests__/utils/tasks.property.test.ts +++ b/web/src/__tests__/utils/tasks.property.test.ts @@ -39,6 +39,7 @@ function arbTask(): fc.Arbitrary { artifacts_expected: fc.constant([] as readonly { readonly path: string; readonly type: 'code' | 'tests' | 'documentation' }[]), acceptance_criteria: fc.constant([] as readonly { readonly description: string; readonly met: boolean }[]), estimated_complexity: fc.constantFrom('simple', 'medium', 'complex', 'epic') as fc.Arbitrary, + stakes: fc.constantFrom('low', 'normal', 'high', 'critical') as fc.Arbitrary, budget_limit: fc.nat({ max: 1000 }), deadline: fc.constant(null), max_retries: fc.nat({ max: 5 }), diff --git a/web/src/api/types/enums.ts b/web/src/api/types/enums.ts index a957cd1776..233121de3e 100644 --- a/web/src/api/types/enums.ts +++ b/web/src/api/types/enums.ts @@ -37,6 +37,7 @@ export { PROJECT_STATUS_VALUES, RISK_TOLERANCE_VALUES, SENIORITY_LEVEL_VALUES, + STAKES_VALUES, TASK_SOURCE_VALUES, TASK_STATUS_VALUES, TASK_STRUCTURE_VALUES, @@ -65,6 +66,7 @@ export { type ProjectStatus, type RiskTolerance, type SeniorityLevel, + type Stakes, type TaskSource, type TaskStatus, type TaskStructure, diff --git a/web/src/mocks/handlers/tasks.ts b/web/src/mocks/handlers/tasks.ts index 2f50c17e1f..d185404a10 100644 --- a/web/src/mocks/handlers/tasks.ts +++ b/web/src/mocks/handlers/tasks.ts @@ -27,6 +27,7 @@ export function buildTask(overrides: Partial = {}): Task { artifacts_expected: [], acceptance_criteria: [], estimated_complexity: 'medium', + stakes: 'normal', budget_limit: 10, deadline: null, max_retries: 3, diff --git a/web/src/pages/agents/TaskHistory.stories.tsx b/web/src/pages/agents/TaskHistory.stories.tsx index 2f6aed4382..136e67faec 100644 --- a/web/src/pages/agents/TaskHistory.stories.tsx +++ b/web/src/pages/agents/TaskHistory.stories.tsx @@ -18,6 +18,7 @@ function makeTask(overrides: Partial & { id: string; title: string }): Tas artifacts_expected: [], acceptance_criteria: [], estimated_complexity: 'medium', + stakes: 'normal', budget_limit: 10, deadline: null, max_retries: 3, diff --git a/web/src/pages/tasks/TaskCard.stories.tsx b/web/src/pages/tasks/TaskCard.stories.tsx index ce9c1fbdb6..8a587a751f 100644 --- a/web/src/pages/tasks/TaskCard.stories.tsx +++ b/web/src/pages/tasks/TaskCard.stories.tsx @@ -18,6 +18,7 @@ function makeTask(overrides: Partial = {}): Task { artifacts_expected: [], acceptance_criteria: [], estimated_complexity: 'complex', + stakes: 'normal', budget_limit: 10, cost: 3.45, deadline: new Date(Date.now() + 86400000 * 2).toISOString(), diff --git a/web/src/pages/tasks/TaskColumn.stories.tsx b/web/src/pages/tasks/TaskColumn.stories.tsx index 3af8c8d51e..7e8d073a26 100644 --- a/web/src/pages/tasks/TaskColumn.stories.tsx +++ b/web/src/pages/tasks/TaskColumn.stories.tsx @@ -20,6 +20,7 @@ function makeTask(id: string, title: string, overrides: Partial = artifacts_expected: [], acceptance_criteria: [], estimated_complexity: 'medium', + stakes: 'normal', budget_limit: 10, deadline: null, max_retries: 3, diff --git a/web/src/pages/tasks/TaskDetailActions.stories.tsx b/web/src/pages/tasks/TaskDetailActions.stories.tsx index 1aa5fd5690..2279df627b 100644 --- a/web/src/pages/tasks/TaskDetailActions.stories.tsx +++ b/web/src/pages/tasks/TaskDetailActions.stories.tsx @@ -18,6 +18,7 @@ const baseTask: DashboardTask = { artifacts_expected: [], acceptance_criteria: [], estimated_complexity: 'medium', + stakes: 'normal', budget_limit: 100, deadline: null, max_retries: 3, diff --git a/web/src/pages/tasks/TaskDetailHeader.stories.tsx b/web/src/pages/tasks/TaskDetailHeader.stories.tsx index 716d0d99ea..8c34722fb0 100644 --- a/web/src/pages/tasks/TaskDetailHeader.stories.tsx +++ b/web/src/pages/tasks/TaskDetailHeader.stories.tsx @@ -17,6 +17,7 @@ const baseTask: DashboardTask = { artifacts_expected: [], acceptance_criteria: [], estimated_complexity: 'medium', + stakes: 'normal', budget_limit: 100, deadline: null, max_retries: 3, diff --git a/web/src/pages/tasks/TaskDetailMetadata.stories.tsx b/web/src/pages/tasks/TaskDetailMetadata.stories.tsx index 6161b496d4..a986975f6b 100644 --- a/web/src/pages/tasks/TaskDetailMetadata.stories.tsx +++ b/web/src/pages/tasks/TaskDetailMetadata.stories.tsx @@ -17,6 +17,7 @@ const baseTask: DashboardTask = { artifacts_expected: [], acceptance_criteria: [], estimated_complexity: 'medium', + stakes: 'normal', budget_limit: 100, deadline: null, max_retries: 3, diff --git a/web/src/pages/tasks/TaskDetailPanel.stories.tsx b/web/src/pages/tasks/TaskDetailPanel.stories.tsx index 8cf5bcfd36..048a9ea2e2 100644 --- a/web/src/pages/tasks/TaskDetailPanel.stories.tsx +++ b/web/src/pages/tasks/TaskDetailPanel.stories.tsx @@ -21,6 +21,7 @@ const mockTask: DashboardTask = { { description: 'JWT refresh rotation implemented', met: false }, ], estimated_complexity: 'complex', + stakes: 'normal', budget_limit: 10, cost: 3.45, deadline: '2026-04-01T00:00:00.000Z', diff --git a/web/src/pages/tasks/TaskDetailTimeline.stories.tsx b/web/src/pages/tasks/TaskDetailTimeline.stories.tsx index 1b7f56853f..bc1f388383 100644 --- a/web/src/pages/tasks/TaskDetailTimeline.stories.tsx +++ b/web/src/pages/tasks/TaskDetailTimeline.stories.tsx @@ -17,6 +17,7 @@ const baseTask: DashboardTask = { artifacts_expected: [], acceptance_criteria: [], estimated_complexity: 'medium', + stakes: 'normal', budget_limit: 0, deadline: null, max_retries: 0, diff --git a/web/src/pages/tasks/TaskListView.stories.tsx b/web/src/pages/tasks/TaskListView.stories.tsx index d9800e4bee..c8dd0d88e0 100644 --- a/web/src/pages/tasks/TaskListView.stories.tsx +++ b/web/src/pages/tasks/TaskListView.stories.tsx @@ -18,6 +18,7 @@ function makeTask(id: string, title: string, overrides: Partial = artifacts_expected: [], acceptance_criteria: [], estimated_complexity: 'medium', + stakes: 'normal', budget_limit: 10, cost: 2.50, deadline: null, diff --git a/web/src/stores/tasks.ts b/web/src/stores/tasks.ts index e563e62018..cce5da3456 100644 --- a/web/src/stores/tasks.ts +++ b/web/src/stores/tasks.ts @@ -15,6 +15,7 @@ import { import type { Complexity, CoordinationTopology, + Stakes, TaskStatus, TaskStructure, } from '@/api/types/enums' @@ -61,6 +62,12 @@ const COORDINATION_TOPOLOGY_SET: ReadonlySet = new Set([ 'context_dependent', 'auto', ] satisfies readonly CoordinationTopology[]) +const STAKES_SET: ReadonlySet = new Set([ + 'low', + 'normal', + 'high', + 'critical', +] satisfies readonly Stakes[]) const log = createLogger('tasks') // ``metadata`` is an arbitrary key-value bag on the wire @@ -235,6 +242,7 @@ function sanitizeTask(c: DashboardTask): DashboardTask { met: ac.met ?? false, })), estimated_complexity: c.estimated_complexity, + stakes: c.stakes, budget_limit: c.budget_limit, cost: c.cost, deadline: sanitizeNullable(c.deadline ?? null, 64), @@ -441,11 +449,12 @@ function isTaskShape(c: Record): c is Record & // the ceiling math / id sanitizer invariants downstream. isNullableNumber(c.hard_ceiling) && isNullableString(c.forecast_id) && - // Enum scalars: complexity / task_structure / coordination_topology - // are intentionally NOT routed through sanitizeWsEnum -- they're - // closed enums coupled to coordination + scheduling code paths - // that branch on the exact value (e.g. coordination_topology - // selects a specific orchestrator). A backend-only addition of + // Enum scalars: complexity / stakes / task_structure / + // coordination_topology are intentionally NOT routed through + // sanitizeWsEnum -- they're closed enums coupled to routing + + // coordination + scheduling code paths that branch on the exact + // value (e.g. coordination_topology selects a specific + // orchestrator; stakes selects a model tier). A backend-only addition of // a new value would silently degrade behaviour rather than just // a label mismatch, so dropping the frame here is the safer // failure mode. If/when a new value is rolled out, the frontend @@ -454,6 +463,8 @@ function isTaskShape(c: Record): c is Record & // facing labels with no behavioural branching. typeof c.estimated_complexity === 'string' && COMPLEXITY_SET.has(c.estimated_complexity) && + typeof c.stakes === 'string' && + STAKES_SET.has(c.stakes) && (c.task_structure === null || (typeof c.task_structure === 'string' && TASK_STRUCTURE_SET.has(c.task_structure))) && From f7a101e0ce14b82fdbd089f83be5de57ac262bda Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Fri, 22 May 2026 08:25:30 +0200 Subject: [PATCH 8/8] fix: babysit round 3, 1 finding (1 coderabbit) web/src/stores/tasks.ts: build the runtime-check enum sets from the generated *_VALUES tuples (COMPLEXITY/TASK_STRUCTURE/COORDINATION_TOPOLOGY/ STAKES) instead of re-declared literal lists, so a value added to an enum cannot drift out of sync with its frame-guard validator within a build (CodeRabbit flagged STAKES_SET; applied to all four for consistency and to match the file's own header comment + the DEPARTMENT_NAME_SET precedent in enums.ts). Behaviour is unchanged: the generated tuple is still build-time-frozen, so an unknown behavioural enum value is still dropped rather than mis-routed. --- web/src/stores/tasks.ts | 69 ++++++++++++++--------------------------- 1 file changed, 24 insertions(+), 45 deletions(-) diff --git a/web/src/stores/tasks.ts b/web/src/stores/tasks.ts index cce5da3456..7d27d81b01 100644 --- a/web/src/stores/tasks.ts +++ b/web/src/stores/tasks.ts @@ -7,18 +7,16 @@ import { sanitizeWsEnum, sanitizeWsString } from '@/utils/ws-sanitize' import { useToastStore } from '@/stores/toast' import { ARTIFACT_TYPE_VALUES, + COMPLEXITY_VALUES, + COORDINATION_TOPOLOGY_VALUES, PRIORITY_VALUES, + STAKES_VALUES, TASK_SOURCE_VALUES, TASK_STATUS_VALUES as TASK_STATUS_VALUES_TUPLE, + TASK_STRUCTURE_VALUES, TASK_TYPE_VALUES as TASK_TYPE_VALUES_TUPLE, } from '@/api/types/enums' -import type { - Complexity, - CoordinationTopology, - Stakes, - TaskStatus, - TaskStructure, -} from '@/api/types/enums' +import type { TaskStatus } from '@/api/types/enums' import type { CancelTaskRequest, CreateTaskRequest, @@ -30,44 +28,25 @@ import type { } from '@/api/types/tasks' import type { WsEvent } from '@/api/types/websocket' -// Runtime-check sets derived from the canonical enum tuples in -// `@/api/types/enums`. Building them here (rather than re-declaring the -// literal list) keeps the validator in lockstep with the type union -// -- drift between the runtime check and the declared enum is caught -// at compile time. -// Status / priority / type / source are no longer pre-validated -// against the allowlist; sanitizeWsEnum owns that responsibility -// (see sanitizeTask). Rejecting unknown enum values here would drop -// the whole frame on rolling backend deploys. - -// Enum sets for the remaining scalar/enum fields that ``sanitizeTask`` -// previously copied through unchecked. Declared here so the validator -// and the TS union stay in lockstep via the ``as const satisfies`` -// tuples these are derived from. -const COMPLEXITY_SET: ReadonlySet = new Set([ - 'simple', - 'medium', - 'complex', - 'epic', -] satisfies readonly Complexity[]) -const TASK_STRUCTURE_SET: ReadonlySet = new Set([ - 'sequential', - 'parallel', - 'mixed', -] satisfies readonly TaskStructure[]) -const COORDINATION_TOPOLOGY_SET: ReadonlySet = new Set([ - 'sas', - 'centralized', - 'decentralized', - 'context_dependent', - 'auto', -] satisfies readonly CoordinationTopology[]) -const STAKES_SET: ReadonlySet = new Set([ - 'low', - 'normal', - 'high', - 'critical', -] satisfies readonly Stakes[]) +// Runtime-check sets for the behavioural enum fields ``sanitizeTask`` +// copies through unchecked. Built from the generated ``*_VALUES`` tuples +// in `@/api/types/enums` (the single source of truth, regenerated from +// the OpenAPI schema) rather than re-declared literal lists, so a value +// added to an enum cannot drift out of sync with its validator within a +// build. The frame guard's drop-on-unknown rationale is unchanged: the +// tuple is still build-time-frozen to what this frontend ships, so a +// behavioural enum value the frontend does not yet know is dropped +// rather than mis-routed. +// Status / priority / type / source are NOT pre-validated here; +// sanitizeWsEnum owns that responsibility (see sanitizeTask). +const COMPLEXITY_SET: ReadonlySet = new Set(COMPLEXITY_VALUES) +const TASK_STRUCTURE_SET: ReadonlySet = new Set( + TASK_STRUCTURE_VALUES, +) +const COORDINATION_TOPOLOGY_SET: ReadonlySet = new Set( + COORDINATION_TOPOLOGY_VALUES, +) +const STAKES_SET: ReadonlySet = new Set(STAKES_VALUES) const log = createLogger('tasks') // ``metadata`` is an arbitrary key-value bag on the wire