Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ PYTHONPATH=. uv run zensical build # docs
- Two phases: **construction** (`create_app` body) wires synchronous services; **on_startup** (`_build_lifecycle.on_startup`) wires services that need a connected persistence backend.
- Construction-phase ordering invariants: `agent_registry` must be built BEFORE `auto_wire_meetings`; `tunnel_provider` is wired unconditionally (not gated by `integrations.enabled`).
- On-startup ordering invariants: `SettingsService` auto-wire must precede `WorkflowExecutionObserver` registration (so it picks up resolver-driven `max_subworkflow_depth` instead of the seed default); `OntologyService` wires after `persistence.connect()` via `_wire_ontology_service`.
- Setup completion: `post_setup_reinit()` (provider reload + agent bootstrap, defined in `src/synthorg/api/controllers/setup/agent_helpers.py`) propagates failures, and `settings_svc.set("api", "setup_complete", "true")` only runs if reinit returns clean. The whole check/validate/reinit/persist sequence is serialised under `COMPLETE_LOCK` in the same module so two concurrent `/setup/complete` requests cannot race on the flag write. A half-configured runtime presenting itself as "complete" is worse than a clear error the operator can retry after fixing the underlying provider config.
- Worker execution service: `synthorg.workers.runtime_builder.build_worker_execution_service` selects behind the provider-present switch (`AgentEngineExecutionService` with a provider, `NoProviderExecutionService` empty-company backstop) and installs via the `AppState.worker_execution_service` seam. The boot install hook is appended FIRST after the persistence/SettingsService hooks so the once-only `set_worker_execution_service` cannot lose the race with the property's lazy `LifecycleAdvancingExecutionService` default. Empty-company also rejects task creation at the controller (`AgentRuntimeNotConfiguredError`, 4014). `swap_worker_execution_service` / `swap_provider_registry` hold a lock (synchronised against lazy reads).
- Setup completion: `post_setup_reinit()` (provider reload, agent bootstrap, AND worker-execution-service rebuild + hot-swap, defined in `src/synthorg/api/controllers/setup/agent_helpers.py`) propagates failures, and `settings_svc.set("api", "setup_complete", "true")` only runs if reinit returns clean. The whole check/validate/reinit/persist sequence is serialised under `COMPLETE_LOCK` in the same module so two concurrent `/setup/complete` requests cannot race on the flag write. A half-configured runtime presenting itself as "complete" is worse than a clear error the operator can retry after fixing the underlying provider config.

## MCP / Telemetry / Resilience

Expand Down
2 changes: 1 addition & 1 deletion scripts/_ghost_wiring_manifest.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
# Symbol = the class or factory-function name (the thing that is
# instantiated/called). Bare name match on ast.Call(func=Name|Attribute).

PENDING AgentEngine #1956 -- runtime root; construct at boot behind the provider switch
ENFORCED AgentEngine #1956 -- runtime root; construct at boot behind the provider switch
PENDING build_coordinator #1958 -- call at boot to populate app_state.coordinator
PENDING BaselineStore #1959 -- construct at boot (window from budget.baseline_window_size)
PENDING CoordinationMetricsCollector #1959 -- construct at boot, thread into execution
Expand Down
51 changes: 51 additions & 0 deletions src/synthorg/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
_make_expire_callback,
_make_meeting_publisher,
_resolve_artifact_dir_env,
resolve_agent_workspace_root_env,
)
from synthorg.api.approval_store import ApprovalStore
from synthorg.api.auth.controller_helpers import require_password_changed
Expand Down Expand Up @@ -976,6 +977,56 @@ def create_app( # noqa: C901, PLR0912, PLR0913, PLR0915
effective_config=effective_config,
)

_worker_service_installed = False

async def _install_worker_execution_service() -> None:
# Installs the worker execution service behind the
# provider-present switch. Appended first (runs immediately
# after the core startup hooks that connect persistence and
# wire SettingsService / ConfigResolver), and before any other
# appended hook, so the once-only ``set_worker_execution_service``
# cannot lose a race with the property's lazy lifecycle-only
# default. With no provider this installs the empty-company
# backstop; a provider added later swaps in the live service via
# ``post_setup_reinit`` (no restart). The closure flag keeps the
# one-shot ``set_`` idempotent across a lifespan re-entry
# (shared-app test fixtures), mirroring ``_wire_chief_of_staff_chat``.
nonlocal _worker_service_installed
if _worker_service_installed:
return
from synthorg.workers.runtime_builder import ( # noqa: PLC0415
build_worker_execution_service,
)

# Pin the sandbox workspace onto the mounted data volume in an
# env-driven deployment so agent file/sandbox tools persist with
# the runtime data, not a process temp dir. Injected/dev apps
# return None and keep the documented temp fallback.
env_workspace_root = resolve_agent_workspace_root_env()
if env_workspace_root is not None:
app_state.set_agent_workspace_root(env_workspace_root)

try:
service = await build_worker_execution_service(
app_state,
workspace_root=app_state.agent_workspace_root,
)
except MemoryError, RecursionError:
raise
except Exception as exc:
logger.error(
API_APP_STARTUP,
service="worker_execution_service",
note="failed to build the worker execution service at boot",
error_type=type(exc).__name__,
error=safe_error_description(exc),
)
raise
app_state.set_worker_execution_service(service)
_worker_service_installed = True

startup = [*startup, _install_worker_execution_service]

# Project telemetry: build collector (reads SYNTHORG_TELEMETRY_ENABLED env for
# opt-in, defaults to disabled). Attach to app_state so the health
# endpoint can report the state, and hook start()/shutdown() into the
Expand Down
42 changes: 40 additions & 2 deletions src/synthorg/api/app_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from collections.abc import Callable # noqa: TC003
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from typing import Any, Final

# ``ChannelsPlugin`` appears in the public signatures of the helpers
# below. Under PEP 649 lazy annotations, ``typing.get_type_hints()``
Expand Down Expand Up @@ -45,6 +45,9 @@

logger = get_logger(__name__)

_AGENT_WORKSPACES_SUBDIR: Final[str] = "agent-workspaces"
_POSTGRES_VOLUME_DATA_DIR: Final[str] = "/data"


def _make_expire_callback(
channels_plugin: ChannelsPlugin,
Expand Down Expand Up @@ -104,7 +107,7 @@ def _resolve_artifact_dir_env() -> str:
"""
artifact_dir_str = os.environ.get("SYNTHORG_ARTIFACT_DIR", "").strip()
if not artifact_dir_str:
return "/data"
return _POSTGRES_VOLUME_DATA_DIR
artifact_path = Path(artifact_dir_str)
if not artifact_path.is_absolute():
msg = (
Expand All @@ -123,6 +126,41 @@ def _resolve_artifact_dir_env() -> str:
return artifact_dir_str


def resolve_agent_workspace_root_env() -> Path | None:
"""Resolve the agent sandbox workspace root from the environment.

Returns ``<runtime data dir>/agent-workspaces`` when an env-driven
deployment is in effect, so the agent's file-system / sandbox tools
write onto the mounted data volume rather than a process temp dir.
Returns ``None`` for injected / dev apps (no deployment env vars),
where :attr:`AppState.agent_workspace_root` keeps its documented
process-stable temp fallback.

Precedence mirrors the persistence env resolution:
``SYNTHORG_ARTIFACT_DIR`` (explicit), then ``SYNTHORG_DB_PATH``
parent (sqlite volume), then ``/data`` when only
``SYNTHORG_DATABASE_URL`` is set (postgres compose volume).
"""
artifact_dir = os.environ.get("SYNTHORG_ARTIFACT_DIR", "").strip()
if artifact_dir:
return Path(_resolve_artifact_dir_env()) / _AGENT_WORKSPACES_SUBDIR
db_path = os.environ.get("SYNTHORG_DB_PATH", "").strip()
if db_path:
db_path_obj = Path(db_path)
if not db_path_obj.is_absolute():
msg = (
f"SYNTHORG_DB_PATH={db_path!r} must be an absolute path when "
f"deriving the agent workspace root so sandbox writes land on "
f"the mounted data volume, not the process working directory"
)
logger.warning(API_APP_STARTUP, error=msg, reason="non_absolute_db_path")
raise ValueError(msg)
return db_path_obj.parent / _AGENT_WORKSPACES_SUBDIR
if os.environ.get("SYNTHORG_DATABASE_URL", "").strip():
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return Path(_POSTGRES_VOLUME_DATA_DIR) / _AGENT_WORKSPACES_SUBDIR
return None
Comment on lines +129 to +161

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The string "agent-workspaces" is hardcoded and repeated three times in this function. Additionally, "/data" is a hardcoded path. Per the "no-hardcoded-values" rule in CLAUDE.md, these should be defined as constants to improve maintainability and adhere to project standards.

_AGENT_WORKSPACE_DIR: Final[str] = "agent-workspaces"
_DEFAULT_DATA_DIR: Final[str] = "/data"


def resolve_agent_workspace_root_env() -> Path | None:
    """Resolve the agent sandbox workspace root from the environment.

    Returns <runtime data dir>/agent-workspaces when an env-driven
    deployment is in effect, so the agent's file-system / sandbox tools
    write onto the mounted data volume rather than a process temp dir.
    Returns None for injected / dev apps (no deployment env vars),
    where AppState.agent_workspace_root keeps its documented
    process-stable temp fallback.

    Precedence mirrors the persistence env resolution:
    SYNTHORG_ARTIFACT_DIR (explicit), then SYNTHORG_DB_PATH
    parent (sqlite volume), then /data when only
    SYNTHORG_DATABASE_URL is set (postgres compose volume).
    """
    artifact_dir = os.environ.get("SYNTHORG_ARTIFACT_DIR", "").strip()
    if artifact_dir:
        return Path(_resolve_artifact_dir_env()) / _AGENT_WORKSPACE_DIR
    db_path = os.environ.get("SYNTHORG_DB_PATH", "").strip()
    if db_path:
        return Path(db_path).parent / _AGENT_WORKSPACE_DIR
    if os.environ.get("SYNTHORG_DATABASE_URL", "").strip():
        return Path(_DEFAULT_DATA_DIR) / _AGENT_WORKSPACE_DIR
    return None
References
  1. Avoid hardcoded values; define them as constants. (link)



def _make_meeting_publisher(
channels_plugin: ChannelsPlugin,
) -> Callable[[str, dict[str, Any]], None]:
Expand Down
26 changes: 26 additions & 0 deletions src/synthorg/api/controllers/setup/agent_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,32 @@ async def post_setup_reinit(app_state: AppState) -> None:
)
raise

# 3. Rebuild + hot-swap the worker execution service so a provider
# added after an empty-company start wakes the agent runtime
# live, with no process restart. Raise on failure so the caller
# keeps ``setup_complete=false`` rather than presenting a
# half-configured runtime as complete.
try:
from synthorg.workers.runtime_builder import ( # noqa: PLC0415
build_worker_execution_service,
)

service = await build_worker_execution_service(
app_state,
workspace_root=app_state.agent_workspace_root,
)
app_state.swap_worker_execution_service(service)
except MemoryError, RecursionError:
raise
except Exception as exc:
logger.warning(
SETUP_AGENT_BOOTSTRAP_FAILED,
context="worker_execution_service_rebuild",
error_type=type(exc).__name__,
error=safe_error_description(exc),
)
raise


async def check_needs_admin(
persistence: PersistenceBackend,
Expand Down
9 changes: 9 additions & 0 deletions src/synthorg/api/controllers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from synthorg.api.rate_limits import per_op_rate_limit_from_policy
from synthorg.api.responses import require_resource_or_404
from synthorg.api.state import AppState # noqa: TC001
from synthorg.core.domain_errors import AgentRuntimeNotConfiguredError
from synthorg.core.enums import TaskStatus # noqa: TC001
from synthorg.core.error_taxonomy import ErrorCode
from synthorg.core.task import Task # noqa: TC001
Expand All @@ -33,6 +34,7 @@
API_TASK_CANCELLED,
API_TASK_CREATED_BY_MISMATCH,
API_TASK_DELETED,
API_TASK_REJECTED_NO_PROVIDER,
API_TASK_UPDATED,
)
from synthorg.observability.events.task import (
Expand Down Expand Up @@ -177,6 +179,13 @@ async def create_task(
"""
app_state: AppState = state.app_state
requester = _extract_requester(state)
if not app_state.has_active_provider:
logger.warning(
API_TASK_REJECTED_NO_PROVIDER,
title=data.title,
requester=requester,
)
raise AgentRuntimeNotConfiguredError
task_data = CreateTaskData(
title=data.title,
description=data.description,
Expand Down
100 changes: 98 additions & 2 deletions src/synthorg/api/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
"""

import asyncio
import tempfile
import threading
from collections import OrderedDict
from typing import TYPE_CHECKING
from pathlib import Path
from typing import TYPE_CHECKING, Final

from synthorg.api.auth.presence import UserPresence
from synthorg.api.auth.service import AuthService # noqa: TC001
Expand Down Expand Up @@ -79,7 +81,11 @@
NotificationDispatcher, # noqa: TC001
)
from synthorg.observability import get_logger
from synthorg.observability.events.api import API_APP_STARTUP, API_SERVICE_UNAVAILABLE
from synthorg.observability.events.api import (
API_APP_STARTUP,
API_SERVICE_AUTO_WIRED,
API_SERVICE_UNAVAILABLE,
)
from synthorg.observability.prometheus_collector import (
PrometheusCollector, # noqa: TC001
)
Expand Down Expand Up @@ -151,6 +157,8 @@

logger = get_logger(__name__)

_DEFAULT_WORKSPACE_TEMP_SUBDIR: Final[str] = "synthorg-agent-workspaces"


class AppState(AppStateServicesMixin):
"""Typed application state container.
Expand All @@ -167,6 +175,7 @@ class AppState(AppStateServicesMixin):
"_agent_health_service",
"_agent_registry",
"_agent_version_service",
"_agent_workspace_root",
"_analytics_service",
"_api_bridge_config",
"_api_bridge_config_lock",
Expand Down Expand Up @@ -418,12 +427,19 @@ def __init__( # noqa: PLR0913, PLR0915
# deployments may swap the implementation to invoke the full
# AgentEngine instead of the baseline lifecycle walk.
self._worker_execution_service: WorkerExecutionService | None = None
# Filesystem root the agent's file-system / sandbox tools use.
# Pinned once at startup from the runtime data dir via
# ``set_agent_workspace_root``; the property falls back to a
# process-stable temp directory so dev / empty-company runs
# still have a valid absolute workspace.
self._agent_workspace_root: Path | None = None
# Guards the double-checked locking on first-access lazy wiring
# of worker_execution_service / experiment_service. Both
# properties may be invoked from concurrent request handlers
# before any explicit ``set_*`` call, so the bare None check
# without a lock could construct two instances and lose state.
self._lazy_service_lock: threading.Lock = threading.Lock()
self._provider_registry_lock: threading.Lock = threading.Lock()
# Lazily constructed against an in-memory repository so the
# ``/experiments`` controller works out of the box; deployments
# swap in a durable repository via ``set_experiment_service``.
Expand Down Expand Up @@ -841,6 +857,12 @@ def worker_execution_service(self) -> WorkerExecutionService:
self._worker_execution_service = LifecycleAdvancingExecutionService(
task_engine=self.task_engine,
)
logger.info(
API_SERVICE_AUTO_WIRED,
service="worker_execution_service",
implementation="LifecycleAdvancingExecutionService",
note="lazy baseline; boot install did not run first",
)
return self._worker_execution_service

def set_worker_execution_service(
Expand All @@ -858,6 +880,74 @@ def set_worker_execution_service(
"Worker execution service",
)

def swap_worker_execution_service(
self,
service: WorkerExecutionService,
) -> None:
"""Replace the worker execution service (hot-reload).

Distinct from :meth:`set_worker_execution_service`, which is
once-only: this method replaces an already-wired service so a
provider configured against an empty company brings the runtime
online without a restart. The swap goes through this seam, and
the ``WorkerExecutionService`` contract is unchanged.

Holds ``_lazy_service_lock`` so the write is synchronised
against the property's lazy-construction read; otherwise an
in-flight execute could race the reinit-wake swap.
"""
with self._lazy_service_lock:
previous = self._worker_execution_service
if previous is service:
transition = "noop"
elif previous is None:
transition = "attached"
else:
transition = "replaced"
self._worker_execution_service = service
logger.info(
API_APP_STARTUP,
service="worker_execution_service",
transition=transition,
)

@property
def agent_workspace_root(self) -> Path:
"""Filesystem root the agent's file-system / sandbox tools use.

The env-driven deployment startup path pins this once to the
runtime data directory via :meth:`set_agent_workspace_root`
(see ``resolve_agent_workspace_root_env``). Injected / dev /
empty-company apps set no env data dir and fall back to a
process-stable temp directory, so the workspace is always a
valid absolute path and the reinit-wake rebuild resolves the
same directory.
"""
if self._agent_workspace_root is not None:
return self._agent_workspace_root
return Path(tempfile.gettempdir()) / _DEFAULT_WORKSPACE_TEMP_SUBDIR

def set_agent_workspace_root(self, path: Path) -> None:
"""Pin the agent workspace root (once-only, startup).

Rejects relative paths so agent filesystem/sandbox tools cannot
be routed to a cwd-relative location instead of the mounted
data volume.
"""
if not path.is_absolute():
msg = f"Agent workspace root must be an absolute path, got {path!r}"
logger.warning(
API_APP_STARTUP,
service="agent_workspace_root",
reason="non_absolute_workspace_root",
)
raise ValueError(msg)
self._set_once(
"_agent_workspace_root",
path,
"Agent workspace root",
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

@property
def experiment_service(self) -> ExperimentService:
"""Return the A/B experiment service, auto-wiring the default.
Expand All @@ -875,6 +965,12 @@ def experiment_service(self) -> ExperimentService:
repository=InMemoryExperimentRepository(),
clock=self.clock,
)
logger.info(
API_SERVICE_AUTO_WIRED,
service="experiment_service",
implementation="ExperimentService",
note="lazy in-memory repository; no durable backend set",
)
return self._experiment_service

def set_experiment_service(self, service: ExperimentService) -> None:
Expand Down
Loading
Loading