Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions scripts/_module_size_baseline.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
"src/synthorg/engine/workflow/execution_service.py": 587,
"src/synthorg/engine/workflow/strategies/milestone_driven.py": 541,
"src/synthorg/engine/workflow/strategies/throughput_adaptive.py": 567,
"src/synthorg/engine/workspace/git_backend/external_remote.py": 523,
"src/synthorg/engine/workspace/git_worktree.py": 712,
"src/synthorg/hr/activity_service.py": 636,
"src/synthorg/hr/performance/tracker.py": 873,
Expand Down Expand Up @@ -157,6 +156,6 @@
"src/synthorg/tools/sandbox/subprocess_sandbox.py": 630,
"src/synthorg/workers/claim.py": 612,
"src/synthorg/workers/execution_service.py": 772,
"src/synthorg/workers/runtime_builder.py": 809
"src/synthorg/workers/runtime_builder.py": 842
}
}
1 change: 1 addition & 0 deletions src/synthorg/workers/execution_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# module-kind: service
"""Backend-side service called by the worker-callable execute endpoint.

When the worker pool fetches a JetStream claim, it posts to
Expand Down
58 changes: 46 additions & 12 deletions src/synthorg/workers/runtime_builder.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# module-kind: orchestrator
"""Provider-present switch: build the boot runtime services.

This is the construction site for the agent runtime. With a provider
Expand Down Expand Up @@ -181,12 +182,15 @@ def _select_active_provider(
Logs the empty-company path and the unsupported multi-provider
fan-in so the boot decision is observable.
"""
security = app_state.config.security
if not app_state.has_active_provider:
logger.info(
API_APP_STARTUP,
service="runtime_services",
mode="no_provider",
note="empty company -- task execution rejected at the seam",
security_enabled=security.enabled,
security_enforcement_mode=security.enforcement_mode.value,
)
return None

Expand All @@ -198,6 +202,8 @@ def _select_active_provider(
service="runtime_services",
mode="no_provider",
note="provider registry present but empty",
security_enabled=security.enabled,
security_enforcement_mode=security.enforcement_mode.value,
)
return None
if len(names) > 1:
Expand Down Expand Up @@ -567,15 +573,28 @@ async def _build_runtime_coordinator(
(one routing surface, no divergence). The resolved decomposition
model is returned so the ``llm-judged`` routing policy reuses it.
"""
async with asyncio.TaskGroup() as tg:
model_task = tg.create_task(
app_state.config_resolver.get_str(
_DECOMPOSITION_NS,
_DECOMPOSITION_KEY,
try:
async with asyncio.TaskGroup() as tg:
model_task = tg.create_task(
app_state.config_resolver.get_str(
_DECOMPOSITION_NS,
_DECOMPOSITION_KEY,
)
)
scorer_task = tg.create_task(_resolve_routing_scorer_config(app_state))
workspace_task = tg.create_task(_build_workspace_strategy(app_state))
except MemoryError, RecursionError:
raise
except Exception as exc:
log_exception_redacted(
logger,
API_APP_STARTUP,
exc,
service="coordinator",
context="resolve_failed",
note="decomposition / routing-scorer / workspace config resolve failed",
)
scorer_task = tg.create_task(_resolve_routing_scorer_config(app_state))
workspace_task = tg.create_task(_build_workspace_strategy(app_state))
raise
decomposition_model = model_task.result()
routing_scorer_config = scorer_task.result()
workspace_strategy, workspace_config = workspace_task.result()
Expand Down Expand Up @@ -752,12 +771,15 @@ async def build_runtime_services(
provider,
coordination_metrics_collector,
)
security = app_state.config.security
logger.info(
API_APP_STARTUP,
service="runtime_services",
mode="agent_engine",
provider=names[0],
tool_count=tool_count,
security_enabled=security.enabled,
security_enforcement_mode=security.enforcement_mode.value,
)
# The env runner provisions the declaration into the same backend the
# build/test tool categories resolve to (not necessarily Docker), so
Expand Down Expand Up @@ -796,16 +818,28 @@ async def build_runtime_services(
provider_name=names[0],
seed=red_team_seed,
)
vision_gate = _build_vision_gate_or_none(
app_state=app_state,
workspace_root=workspace_root,
provider=provider,
)
logger.info(
API_APP_STARTUP,
service="runtime_services",
mode="agent_engine_built",
coordinator_wired=coordinator is not None,
work_pipeline_wired=work_pipeline is not None,
red_team_wired=red_team_runtime is not None,
vision_gate_wired=vision_gate is not None,
security_enabled=security.enabled,
security_enforcement_mode=security.enforcement_mode.value,
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return RuntimeServices(
worker_execution_service=worker_execution_service,
coordinator=coordinator,
work_pipeline=work_pipeline,
red_team_runtime=red_team_runtime,
vision_gate=_build_vision_gate_or_none(
app_state=app_state,
workspace_root=workspace_root,
provider=provider,
),
vision_gate=vision_gate,
)


Expand Down
177 changes: 175 additions & 2 deletions tests/unit/workers/test_runtime_builder.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Unit tests for the provider-present runtime-services switch."""

from collections.abc import Mapping, Sequence
from pathlib import Path
from typing import cast
from typing import Any, cast
from unittest.mock import AsyncMock

import pytest
from structlog.testing import capture_logs

from synthorg.api.state import AppState
from synthorg.budget.coordination_collector import CoordinationMetricsCollector
Expand All @@ -19,6 +21,7 @@
from synthorg.engine.pipeline.service import DefaultWorkPipeline
from synthorg.engine.task_engine import TaskEngine
from synthorg.hr.registry import AgentRegistryService
from synthorg.observability.events.api import API_APP_STARTUP
from synthorg.persistence.project_protocol import ProjectRepository
from synthorg.persistence.protocol import PersistenceBackend
from synthorg.providers.registry import ProviderRegistry
Expand Down Expand Up @@ -64,6 +67,7 @@ def _provider_app_state( # noqa: PLR0913 -- test builder with keyword-only knob
workspace: Path,
*,
bridge_config_error: Exception | None = None,
decomposition_error: Exception | None = None,
cost_tracker: CostTracker | None = None,
coordination_metrics_store: CoordinationMetricsStore | None = None,
simulation_runtime: bool = False,
Expand All @@ -72,6 +76,9 @@ def _provider_app_state( # noqa: PLR0913 -- test builder with keyword-only knob

``bridge_config_error`` makes ``get_engine_bridge_config`` raise, to
exercise the fail-open routing-scorer-config resolve branch.
``decomposition_error`` makes ``get_str`` raise on the
``decomposition_model`` key, to exercise the
``_build_runtime_coordinator`` redacted-log + re-raise branch.
``cost_tracker`` (and the paired ``coordination_metrics_store``)
drive the coordination-metrics collector wiring: absent, the
collector is not constructed (mirrors the empty/degraded path).
Expand All @@ -80,6 +87,20 @@ def _provider_app_state( # noqa: PLR0913 -- test builder with keyword-only knob
bridge_mock = AsyncMock(return_value=EngineBridgeConfig())
else:
bridge_mock = AsyncMock(side_effect=bridge_config_error)
if decomposition_error is None:
get_str_mock = AsyncMock(side_effect=_get_str)
else:
# Narrow-raise on the decomposition key only so the upstream
# boot calls (browser settings, sandbox images, etc.) still
# resolve normally; we want the failure to surface from the
# `_build_runtime_coordinator` TaskGroup, not from
# `_build_tool_registry` upstream of it.
async def _get_str_failing(namespace: str, key: str) -> str:
if key == "decomposition_model":
raise decomposition_error
return await _get_str(namespace, key)

get_str_mock = AsyncMock(side_effect=_get_str_failing)
# ``mock_of[T](...)`` is ``Any`` by design; cast back to the spec so
# the helper keeps a precise signature for its callers.
return cast(
Expand All @@ -90,7 +111,7 @@ def _provider_app_state( # noqa: PLR0913 -- test builder with keyword-only knob
config=RootConfig(company_name="test-corp"),
config_resolver=mock_of[ConfigResolver](
get_float=AsyncMock(return_value=30.0),
get_str=AsyncMock(side_effect=_get_str),
get_str=get_str_mock,
get_int=AsyncMock(return_value=1),
get_engine_bridge_config=bridge_mock,
),
Expand Down Expand Up @@ -283,6 +304,158 @@ async def test_builds_nonexistent_deep_workspace_path(
assert deep.is_dir()


class TestBootLogSafetySpineState:
"""The boot log carries the safety-spine state on every branch.

Operators reading ``synthorg.log`` must see whether the SecOps
interceptor is ``active`` / ``shadow`` / ``disabled`` at startup
without grepping config files; the agent runtime's go/no-go decision
log is the single observable place for it.
"""

@staticmethod
def _runtime_services_logs(
logs: Sequence[Mapping[str, Any]],
) -> list[Mapping[str, Any]]:
return [
entry
for entry in logs
if entry.get("event") == API_APP_STARTUP
and entry.get("service") == "runtime_services"
]

async def test_no_provider_log_carries_safety_spine_state(
self,
tmp_path: Path,
) -> None:
"""Empty-company boot still emits the safety-spine fields."""
app_state = mock_of[AppState](
has_active_provider=False,
config=RootConfig(company_name="empty-co"),
)
with capture_logs() as logs:
await build_runtime_services(app_state, workspace_root=tmp_path)
runtime_logs = self._runtime_services_logs(logs)
assert runtime_logs, "no runtime_services boot log captured"
no_provider = next(e for e in runtime_logs if e.get("mode") == "no_provider")
assert no_provider["security_enabled"] is True
assert no_provider["security_enforcement_mode"] == "active"

async def test_empty_registry_log_carries_safety_spine_state(
self,
tmp_path: Path,
) -> None:
"""Provider-registry-empty boot still emits the safety-spine fields."""
app_state = mock_of[AppState](
has_active_provider=True,
provider_registry=ProviderRegistry({}),
config=RootConfig(company_name="registry-empty-co"),
)
with capture_logs() as logs:
await build_runtime_services(app_state, workspace_root=tmp_path)
runtime_logs = self._runtime_services_logs(logs)
no_provider = next(e for e in runtime_logs if e.get("mode") == "no_provider")
assert no_provider["security_enabled"] is True
assert no_provider["security_enforcement_mode"] == "active"

async def test_provider_present_log_carries_safety_spine_state(
self,
tmp_path: Path,
) -> None:
"""Provider-present boot emits the spine fields on both startup events.

The ``agent_engine`` decision log AND the post-construction
``agent_engine_built`` summary log must both surface the spine
state -- otherwise operators reading the boot trail see schema
drift between the two ``runtime_services`` events.
"""
registry = ProviderRegistry.from_config(
{"test-provider": ProviderConfig(driver="scripted")}
)
app_state = _provider_app_state(registry, tmp_path)
with capture_logs() as logs:
await build_runtime_services(app_state, workspace_root=tmp_path)
runtime_logs = self._runtime_services_logs(logs)
agent_engine = next(e for e in runtime_logs if e.get("mode") == "agent_engine")
assert agent_engine["security_enabled"] is True
assert agent_engine["security_enforcement_mode"] == "active"
agent_engine_built = next(
e for e in runtime_logs if e.get("mode") == "agent_engine_built"
)
assert agent_engine_built["security_enabled"] is True
assert agent_engine_built["security_enforcement_mode"] == "active"


class TestRuntimeCoordinatorResolveFailure:
"""The coordinator resolve-failure path logs redacted context and re-raises.

The ``_build_runtime_coordinator`` TaskGroup runs three independent
config resolves (decomposition model, routing-scorer bridge,
workspace strategy). When any of them raises an
``ExceptionGroup``-propagated error, the wrapper must record the
failure with the SecOps-safe redactor and surface the exception to
the boot caller so the API doesn't come up with a half-wired
coordinator.
"""

async def test_resolve_failure_logs_and_propagates(
self,
tmp_path: Path,
) -> None:
"""Decomposition resolve failure surfaces a redacted log + raises."""
registry = ProviderRegistry.from_config(
{"test-provider": ProviderConfig(driver="scripted")}
)
app_state = _provider_app_state(
registry,
tmp_path,
decomposition_error=RuntimeError("decomposition backend unreachable"),
)
with (
capture_logs() as logs,
pytest.raises(BaseExceptionGroup) as excinfo,
):
await build_runtime_services(app_state, workspace_root=tmp_path)
# TaskGroup collapses task failures into a BaseExceptionGroup
# whose ``str()`` is the generic "unhandled errors in a TaskGroup"
# banner -- the original RuntimeError must travel inside it
# (asserted by manual unwrap, not by ``pytest.raises(match=...)``
# which only matches the outer banner).
flattened = list(excinfo.value.exceptions)
assert any(
isinstance(exc, RuntimeError)
and "decomposition backend unreachable" in str(exc)
for exc in flattened
)
coordinator_failure = next(
(
entry
for entry in logs
if entry.get("event") == API_APP_STARTUP
and entry.get("service") == "coordinator"
and entry.get("context") == "resolve_failed"
),
None,
)
assert coordinator_failure is not None, (
"coordinator resolve_failed log not emitted"
)
# The redactor surfaces the typed exception name and the
# canonical ``{Type}: {scrubbed-message}`` shape, never the raw
# traceback -- attaching ``exc_info`` would serialise the
# frame-locals (incl. any in-scope credential) into the record.
# ``_build_runtime_coordinator`` catches the TaskGroup's
# ExceptionGroup wrapper, so the typed name on the log is
# ``ExceptionGroup`` (the original ``RuntimeError`` is asserted
# above on the propagated exception).
assert coordinator_failure["error_type"] == "ExceptionGroup"
assert coordinator_failure["error"].startswith("ExceptionGroup")
assert coordinator_failure["note"].startswith(
"decomposition / routing-scorer / workspace config resolve failed"
)
assert "exc_info" not in coordinator_failure


class TestCoordinationMetricsWiring:
"""The coordination-metrics collector is built and shared at boot."""

Expand Down
Loading