Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ PYTHONPATH=. uv run zensical build # docs
- Two phases: **construction** (`create_app` body) wires synchronous services; **on_startup** (`_build_lifecycle.on_startup`) wires services that need a connected persistence backend.
- Construction-phase ordering invariants: `agent_registry` must be built BEFORE `auto_wire_meetings`; `tunnel_provider` is wired unconditionally (not gated by `integrations.enabled`).
- On-startup ordering invariants: `SettingsService` auto-wire must precede `WorkflowExecutionObserver` registration (so it picks up resolver-driven `max_subworkflow_depth` instead of the seed default); `OntologyService` wires after `persistence.connect()` via `_wire_ontology_service`.
- Setup completion: `post_setup_reinit()` (provider reload + agent bootstrap, defined in `src/synthorg/api/controllers/setup/agent_helpers.py`) propagates failures, and `settings_svc.set("api", "setup_complete", "true")` only runs if reinit returns clean. The whole check/validate/reinit/persist sequence is serialised under `COMPLETE_LOCK` in the same module so two concurrent `/setup/complete` requests cannot race on the flag write. A half-configured runtime presenting itself as "complete" is worse than a clear error the operator can retry after fixing the underlying provider config.
- Worker execution service: `synthorg.workers.runtime_builder.build_worker_execution_service` selects behind the provider-present switch (`AgentEngineExecutionService` with a provider, `NoProviderExecutionService` empty-company backstop) and installs via the `AppState.worker_execution_service` seam. The boot install hook is appended FIRST after the persistence/SettingsService hooks so the once-only `set_worker_execution_service` cannot lose the race with the property's lazy `LifecycleAdvancingExecutionService` default. Empty-company also rejects task creation at the controller (`AgentRuntimeNotConfiguredError`, 4014). `swap_worker_execution_service` / `swap_provider_registry` hold a lock (synchronised against lazy reads).
- Setup completion: `post_setup_reinit()` (provider reload, agent bootstrap, AND worker-execution-service rebuild + hot-swap, defined in `src/synthorg/api/controllers/setup/agent_helpers.py`) propagates failures, and `settings_svc.set("api", "setup_complete", "true")` only runs if reinit returns clean. The whole check/validate/reinit/persist sequence is serialised under `COMPLETE_LOCK` in the same module so two concurrent `/setup/complete` requests cannot race on the flag write. A half-configured runtime presenting itself as "complete" is worse than a clear error the operator can retry after fixing the underlying provider config.

## MCP / Telemetry / Resilience

Expand Down
2 changes: 1 addition & 1 deletion scripts/_ghost_wiring_manifest.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
# Symbol = the class or factory-function name (the thing that is
# instantiated/called). Bare name match on ast.Call(func=Name|Attribute).

PENDING AgentEngine #1956 -- runtime root; construct at boot behind the provider switch
ENFORCED AgentEngine #1956 -- runtime root; construct at boot behind the provider switch
PENDING build_coordinator #1958 -- call at boot to populate app_state.coordinator
PENDING BaselineStore #1959 -- construct at boot (window from budget.baseline_window_size)
PENDING CoordinationMetricsCollector #1959 -- construct at boot, thread into execution
Expand Down
51 changes: 51 additions & 0 deletions src/synthorg/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
_make_expire_callback,
_make_meeting_publisher,
_resolve_artifact_dir_env,
resolve_agent_workspace_root_env,
)
from synthorg.api.approval_store import ApprovalStore
from synthorg.api.auth.controller_helpers import require_password_changed
Expand Down Expand Up @@ -976,6 +977,56 @@ def create_app( # noqa: C901, PLR0912, PLR0913, PLR0915
effective_config=effective_config,
)

_worker_service_installed = False

async def _install_worker_execution_service() -> None:
# Installs the worker execution service behind the
# provider-present switch. Appended first (runs immediately
# after the core startup hooks that connect persistence and
# wire SettingsService / ConfigResolver), and before any other
# appended hook, so the once-only ``set_worker_execution_service``
# cannot lose a race with the property's lazy lifecycle-only
# default. With no provider this installs the empty-company
# backstop; a provider added later swaps in the live service via
# ``post_setup_reinit`` (no restart). The closure flag keeps the
# one-shot ``set_`` idempotent across a lifespan re-entry
# (shared-app test fixtures), mirroring ``_wire_chief_of_staff_chat``.
nonlocal _worker_service_installed
if _worker_service_installed:
return
from synthorg.workers.runtime_builder import ( # noqa: PLC0415
build_worker_execution_service,
)

# Pin the sandbox workspace onto the mounted data volume in an
# env-driven deployment so agent file/sandbox tools persist with
# the runtime data, not a process temp dir. Injected/dev apps
# return None and keep the documented temp fallback.
env_workspace_root = resolve_agent_workspace_root_env()
if env_workspace_root is not None:
app_state.set_agent_workspace_root(env_workspace_root)

try:
service = await build_worker_execution_service(
app_state,
workspace_root=app_state.agent_workspace_root,
)
except MemoryError, RecursionError:
raise
except Exception as exc:
logger.error(
API_APP_STARTUP,
service="worker_execution_service",
note="failed to build the worker execution service at boot",
error_type=type(exc).__name__,
error=safe_error_description(exc),
)
raise
app_state.set_worker_execution_service(service)
_worker_service_installed = True

startup = [*startup, _install_worker_execution_service]

# Project telemetry: build collector (reads SYNTHORG_TELEMETRY_ENABLED env for
# opt-in, defaults to disabled). Attach to app_state so the health
# endpoint can report the state, and hook start()/shutdown() into the
Expand Down
42 changes: 40 additions & 2 deletions src/synthorg/api/app_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from collections.abc import Callable # noqa: TC003
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from typing import Any, Final

# ``ChannelsPlugin`` appears in the public signatures of the helpers
# below. Under PEP 649 lazy annotations, ``typing.get_type_hints()``
Expand Down Expand Up @@ -45,6 +45,9 @@

logger = get_logger(__name__)

_AGENT_WORKSPACES_SUBDIR: Final[str] = "agent-workspaces"
_POSTGRES_VOLUME_DATA_DIR: Final[str] = "/data"


def _make_expire_callback(
channels_plugin: ChannelsPlugin,
Expand Down Expand Up @@ -104,7 +107,7 @@ def _resolve_artifact_dir_env() -> str:
"""
artifact_dir_str = os.environ.get("SYNTHORG_ARTIFACT_DIR", "").strip()
if not artifact_dir_str:
return "/data"
return _POSTGRES_VOLUME_DATA_DIR
artifact_path = Path(artifact_dir_str)
if not artifact_path.is_absolute():
msg = (
Expand All @@ -123,6 +126,41 @@ def _resolve_artifact_dir_env() -> str:
return artifact_dir_str


def resolve_agent_workspace_root_env() -> Path | None:
"""Resolve the agent sandbox workspace root from the environment.

Returns ``<runtime data dir>/agent-workspaces`` when an env-driven
deployment is in effect, so the agent's file-system / sandbox tools
write onto the mounted data volume rather than a process temp dir.
Returns ``None`` for injected / dev apps (no deployment env vars),
where :attr:`AppState.agent_workspace_root` keeps its documented
process-stable temp fallback.

Precedence mirrors the persistence env resolution:
``SYNTHORG_ARTIFACT_DIR`` (explicit), then ``SYNTHORG_DB_PATH``
parent (sqlite volume), then ``/data`` when only
``SYNTHORG_DATABASE_URL`` is set (postgres compose volume).
"""
artifact_dir = os.environ.get("SYNTHORG_ARTIFACT_DIR", "").strip()
if artifact_dir:
return Path(_resolve_artifact_dir_env()) / _AGENT_WORKSPACES_SUBDIR
db_path = os.environ.get("SYNTHORG_DB_PATH", "").strip()
if db_path:
db_path_obj = Path(db_path)
if not db_path_obj.is_absolute():
msg = (
f"SYNTHORG_DB_PATH={db_path!r} must be an absolute path when "
f"deriving the agent workspace root so sandbox writes land on "
f"the mounted data volume, not the process working directory"
)
logger.warning(API_APP_STARTUP, error=msg, reason="non_absolute_db_path")
raise ValueError(msg)
return db_path_obj.parent / _AGENT_WORKSPACES_SUBDIR
if os.environ.get("SYNTHORG_DATABASE_URL", "").strip():
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return Path(_POSTGRES_VOLUME_DATA_DIR) / _AGENT_WORKSPACES_SUBDIR
return None
Comment on lines +129 to +161

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The string "agent-workspaces" is hardcoded and repeated three times in this function. Additionally, "/data" is a hardcoded path. Per the "no-hardcoded-values" rule in CLAUDE.md, these should be defined as constants to improve maintainability and adhere to project standards.

_AGENT_WORKSPACE_DIR: Final[str] = "agent-workspaces"
_DEFAULT_DATA_DIR: Final[str] = "/data"


def resolve_agent_workspace_root_env() -> Path | None:
    """Resolve the agent sandbox workspace root from the environment.

    Returns <runtime data dir>/agent-workspaces when an env-driven
    deployment is in effect, so the agent's file-system / sandbox tools
    write onto the mounted data volume rather than a process temp dir.
    Returns None for injected / dev apps (no deployment env vars),
    where AppState.agent_workspace_root keeps its documented
    process-stable temp fallback.

    Precedence mirrors the persistence env resolution:
    SYNTHORG_ARTIFACT_DIR (explicit), then SYNTHORG_DB_PATH
    parent (sqlite volume), then /data when only
    SYNTHORG_DATABASE_URL is set (postgres compose volume).
    """
    artifact_dir = os.environ.get("SYNTHORG_ARTIFACT_DIR", "").strip()
    if artifact_dir:
        return Path(_resolve_artifact_dir_env()) / _AGENT_WORKSPACE_DIR
    db_path = os.environ.get("SYNTHORG_DB_PATH", "").strip()
    if db_path:
        return Path(db_path).parent / _AGENT_WORKSPACE_DIR
    if os.environ.get("SYNTHORG_DATABASE_URL", "").strip():
        return Path(_DEFAULT_DATA_DIR) / _AGENT_WORKSPACE_DIR
    return None
References
  1. Avoid hardcoded values; define them as constants. (link)



def _make_meeting_publisher(
channels_plugin: ChannelsPlugin,
) -> Callable[[str, dict[str, Any]], None]:
Expand Down
26 changes: 26 additions & 0 deletions src/synthorg/api/controllers/setup/agent_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,32 @@ async def post_setup_reinit(app_state: AppState) -> None:
)
raise

# 3. Rebuild + hot-swap the worker execution service so a provider
# added after an empty-company start wakes the agent runtime
# live, with no process restart. Raise on failure so the caller
# keeps ``setup_complete=false`` rather than presenting a
# half-configured runtime as complete.
try:
from synthorg.workers.runtime_builder import ( # noqa: PLC0415
build_worker_execution_service,
)

service = await build_worker_execution_service(
app_state,
workspace_root=app_state.agent_workspace_root,
)
app_state.swap_worker_execution_service(service)
except MemoryError, RecursionError:
raise
except Exception as exc:
logger.warning(
SETUP_AGENT_BOOTSTRAP_FAILED,
context="worker_execution_service_rebuild",
error_type=type(exc).__name__,
error=safe_error_description(exc),
)
raise


async def check_needs_admin(
persistence: PersistenceBackend,
Expand Down
9 changes: 9 additions & 0 deletions src/synthorg/api/controllers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from synthorg.api.rate_limits import per_op_rate_limit_from_policy
from synthorg.api.responses import require_resource_or_404
from synthorg.api.state import AppState # noqa: TC001
from synthorg.core.domain_errors import AgentRuntimeNotConfiguredError
from synthorg.core.enums import TaskStatus # noqa: TC001
from synthorg.core.error_taxonomy import ErrorCode
from synthorg.core.task import Task # noqa: TC001
Expand All @@ -33,6 +34,7 @@
API_TASK_CANCELLED,
API_TASK_CREATED_BY_MISMATCH,
API_TASK_DELETED,
API_TASK_REJECTED_NO_PROVIDER,
API_TASK_UPDATED,
)
from synthorg.observability.events.task import (
Expand Down Expand Up @@ -177,6 +179,13 @@ async def create_task(
"""
app_state: AppState = state.app_state
requester = _extract_requester(state)
if not app_state.has_active_provider:
logger.warning(
API_TASK_REJECTED_NO_PROVIDER,
title=data.title,
requester=requester,
)
raise AgentRuntimeNotConfiguredError
task_data = CreateTaskData(
title=data.title,
description=data.description,
Expand Down
100 changes: 98 additions & 2 deletions src/synthorg/api/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
"""

import asyncio
import tempfile
import threading
from collections import OrderedDict
from typing import TYPE_CHECKING
from pathlib import Path
from typing import TYPE_CHECKING, Final

from synthorg.api.auth.presence import UserPresence
from synthorg.api.auth.service import AuthService # noqa: TC001
Expand Down Expand Up @@ -79,7 +81,11 @@
NotificationDispatcher, # noqa: TC001
)
from synthorg.observability import get_logger
from synthorg.observability.events.api import API_APP_STARTUP, API_SERVICE_UNAVAILABLE
from synthorg.observability.events.api import (
API_APP_STARTUP,
API_SERVICE_AUTO_WIRED,
API_SERVICE_UNAVAILABLE,
)
from synthorg.observability.prometheus_collector import (
PrometheusCollector, # noqa: TC001
)
Expand Down Expand Up @@ -151,6 +157,8 @@

logger = get_logger(__name__)

_DEFAULT_WORKSPACE_TEMP_SUBDIR: Final[str] = "synthorg-agent-workspaces"


class AppState(AppStateServicesMixin):
"""Typed application state container.
Expand All @@ -167,6 +175,7 @@ class AppState(AppStateServicesMixin):
"_agent_health_service",
"_agent_registry",
"_agent_version_service",
"_agent_workspace_root",
"_analytics_service",
"_api_bridge_config",
"_api_bridge_config_lock",
Expand Down Expand Up @@ -418,12 +427,19 @@ def __init__( # noqa: PLR0913, PLR0915
# deployments may swap the implementation to invoke the full
# AgentEngine instead of the baseline lifecycle walk.
self._worker_execution_service: WorkerExecutionService | None = None
# Filesystem root the agent's file-system / sandbox tools use.
# Pinned once at startup from the runtime data dir via
# ``set_agent_workspace_root``; the property falls back to a
# process-stable temp directory so dev / empty-company runs
# still have a valid absolute workspace.
self._agent_workspace_root: Path | None = None
# Guards the double-checked locking on first-access lazy wiring
# of worker_execution_service / experiment_service. Both
# properties may be invoked from concurrent request handlers
# before any explicit ``set_*`` call, so the bare None check
# without a lock could construct two instances and lose state.
self._lazy_service_lock: threading.Lock = threading.Lock()
self._provider_registry_lock: threading.Lock = threading.Lock()
# Lazily constructed against an in-memory repository so the
# ``/experiments`` controller works out of the box; deployments
# swap in a durable repository via ``set_experiment_service``.
Expand Down Expand Up @@ -841,6 +857,12 @@ def worker_execution_service(self) -> WorkerExecutionService:
self._worker_execution_service = LifecycleAdvancingExecutionService(
task_engine=self.task_engine,
)
logger.info(
API_SERVICE_AUTO_WIRED,
service="worker_execution_service",
implementation="LifecycleAdvancingExecutionService",
note="lazy baseline; boot install did not run first",
)
return self._worker_execution_service

def set_worker_execution_service(
Expand All @@ -858,6 +880,74 @@ def set_worker_execution_service(
"Worker execution service",
)

def swap_worker_execution_service(
self,
service: WorkerExecutionService,
) -> None:
"""Replace the worker execution service (hot-reload).

Distinct from :meth:`set_worker_execution_service`, which is
once-only: this method replaces an already-wired service so a
provider configured against an empty company brings the runtime
online without a restart. The swap goes through this seam, and
the ``WorkerExecutionService`` contract is unchanged.

Holds ``_lazy_service_lock`` so the write is synchronised
against the property's lazy-construction read; otherwise an
in-flight execute could race the reinit-wake swap.
"""
with self._lazy_service_lock:
previous = self._worker_execution_service
if previous is service:
transition = "noop"
elif previous is None:
transition = "attached"
else:
transition = "replaced"
self._worker_execution_service = service
logger.info(
API_APP_STARTUP,
service="worker_execution_service",
transition=transition,
)

@property
def agent_workspace_root(self) -> Path:
"""Filesystem root the agent's file-system / sandbox tools use.

The env-driven deployment startup path pins this once to the
runtime data directory via :meth:`set_agent_workspace_root`
(see ``resolve_agent_workspace_root_env``). Injected / dev /
empty-company apps set no env data dir and fall back to a
process-stable temp directory, so the workspace is always a
valid absolute path and the reinit-wake rebuild resolves the
same directory.
"""
if self._agent_workspace_root is not None:
return self._agent_workspace_root
return Path(tempfile.gettempdir()) / _DEFAULT_WORKSPACE_TEMP_SUBDIR

def set_agent_workspace_root(self, path: Path) -> None:
"""Pin the agent workspace root (once-only, startup).

Rejects relative paths so agent filesystem/sandbox tools cannot
be routed to a cwd-relative location instead of the mounted
data volume.
"""
if not path.is_absolute():
msg = f"Agent workspace root must be an absolute path, got {path!r}"
logger.warning(
API_APP_STARTUP,
service="agent_workspace_root",
reason="non_absolute_workspace_root",
)
raise ValueError(msg)
self._set_once(
"_agent_workspace_root",
path,
"Agent workspace root",
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

@property
def experiment_service(self) -> ExperimentService:
"""Return the A/B experiment service, auto-wiring the default.
Expand All @@ -875,6 +965,12 @@ def experiment_service(self) -> ExperimentService:
repository=InMemoryExperimentRepository(),
clock=self.clock,
)
logger.info(
API_SERVICE_AUTO_WIRED,
service="experiment_service",
implementation="ExperimentService",
note="lazy in-memory repository; no durable backend set",
)
return self._experiment_service

def set_experiment_service(self, service: ExperimentService) -> None:
Expand Down
Loading
Loading