diff --git a/CLAUDE.md b/CLAUDE.md index e7bbcc36ff..866134f33b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -114,7 +114,7 @@ curl http://localhost:3000/api/v1/health # backend (via web proxy) ```text src/synthorg/ - api/ # Litestar REST + WebSocket API (controllers, guards, channels, JWT + API key + WS ticket auth, approval gate integration, coordination endpoint, collaboration endpoint, settings endpoint, RFC 9457 structured errors (ErrorCategory, ErrorCode, ErrorDetail, ProblemDetail, CATEGORY_TITLES, category_title, category_type_uri, content negotiation)) + api/ # Litestar REST + WebSocket API (controllers, guards, channels, JWT + API key + WS ticket auth, approval gate integration, coordination endpoint, collaboration endpoint, settings endpoint, RFC 9457 structured errors (ErrorCategory, ErrorCode, ErrorDetail, ProblemDetail, CATEGORY_TITLES, category_title, category_type_uri, content negotiation)), AppState hot-reload slots (provider_registry, model_router with swap methods), settings dispatcher lifecycle auth/ # Authentication subpackage (controller, service, middleware, JWT + API key + WS ticket store, models, config) budget/ # Cost tracking, budget enforcement (pre-flight/in-flight checks, auto-downgrade), billing periods, cost tiers, quota/subscription tracking, CFO cost optimization (anomaly detection, efficiency analysis, downgrade recommendations, approval decisions), spending reports, budget errors (BudgetExhaustedError, DailyLimitExceededError, QuotaExhaustedError) cli/ # Python CLI module (superseded by top-level cli/ Go binary) @@ -128,8 +128,9 @@ src/synthorg/ persistence/ # Operational data persistence — pluggable PersistenceBackend protocol, SQLite initial, SettingsRepository (namespaced settings CRUD) (see Memory & Persistence design page) observability/ # Structured logging, correlation tracking, log sinks providers/ # LLM provider abstraction (LiteLLM adapter) - settings/ # Runtime-editable settings persistence (DB > env > YAML > code defaults), typed definitions (9 namespaces), Fernet encryption for sensitive values, config bridge, ConfigResolver (typed composed reads for controllers), validation, registry, change notifications via message bus + settings/ # Runtime-editable settings persistence (DB > env > YAML > code defaults), typed definitions (9 namespaces), Fernet encryption for sensitive values, config bridge, ConfigResolver (typed composed reads for controllers), validation, registry, change notifications via message bus, SettingsSubscriber protocol (subscriber.py), SettingsChangeDispatcher (dispatcher.py, polls #settings channel, routes to subscribers, restart_required filtering) definitions/ # Per-namespace setting definitions (api, company, providers, memory, budget, security, coordination, observability, backup) + subscribers/ # Concrete settings subscribers (ProviderSettingsSubscriber — rebuilds ModelRouter on strategy change, MemorySettingsSubscriber — advisory logging for memory config) security/ # SecOps agent, rule engine (soft-allow/hard-deny, fail-closed), audit log, output scanner, output scan response policies (redact/withhold/log-only/autonomy-tiered), risk classifier, risk tier classifier, action type registry, ToolInvoker security integration, progressive trust (4 strategies: disabled/weighted/per-category/milestone), autonomy levels (presets, resolver, change strategy), timeout policies (park/resume) templates/ # Pre-built company templates, personality presets, and builder tools/ # Tool registry, built-in tools (file_system/, git, sandbox/, code_runner), git clone SSRF prevention (git_url_validator), MCP bridge (mcp/), role-based access, approval tool (request_human_approval), tool factory (build_default_tools, build_default_tools_from_config) diff --git a/docs/design/operations.md b/docs/design/operations.md index 1875434ebf..f5afe8435b 100644 --- a/docs/design/operations.md +++ b/docs/design/operations.md @@ -973,6 +973,7 @@ future CLI tool are thin clients that call the API -- they contain no business l | `/api/v1/budget` | Spending, limits, projections | | `/api/v1/approvals` | Pending human approvals queue | | `/api/v1/analytics` | Performance metrics, dashboards | +| `/api/v1/settings` | Runtime-editable configuration (9 namespaces), schema discovery | | `/api/v1/providers` | Model provider status, config | | `/api/v1/ws` | WebSocket for real-time updates (ticket auth via `?ticket=`) | | `POST /api/v1/auth/ws-ticket` | Exchange JWT for one-time WebSocket connection ticket | @@ -1041,7 +1042,7 @@ and retry guidance. - **Budget Panel**: Spending charts, per-agent breakdown (projections/alerts planned) - **Meeting Logs**: Placeholder — coming soon - **Artifact Browser**: Placeholder — coming soon -- **Settings**: Runtime-editable configuration via DB-backed settings persistence (9 namespaces: api, company, providers, memory, budget, security, coordination, observability, backup). 4-layer resolution (DB > env > YAML > code defaults), Fernet encryption for sensitive values, REST API (GET/PUT/DELETE + schema endpoints for dynamic UI generation), change notifications via message bus. `ConfigResolver` provides typed composed reads for API controllers (assembles full Pydantic config models from individually resolved settings, using `asyncio.TaskGroup` for parallel resolution) +- **Settings**: Runtime-editable configuration via DB-backed settings persistence (9 namespaces: api, company, providers, memory, budget, security, coordination, observability, backup). 4-layer resolution (DB > env > YAML > code defaults), Fernet encryption for sensitive values, REST API (GET/PUT/DELETE + schema endpoints for dynamic UI generation), change notifications via message bus. `ConfigResolver` provides typed composed reads for API controllers (assembles full Pydantic config models from individually resolved settings, using `asyncio.TaskGroup` for parallel resolution). **Hot-reload**: `SettingsChangeDispatcher` polls the `#settings` bus channel and routes change notifications to registered `SettingsSubscriber` implementations. Settings marked `restart_required=True` are filtered (logged as WARNING, not dispatched). Concrete subscribers: `ProviderSettingsSubscriber` (rebuilds `ModelRouter` on `routing_strategy` change via `AppState.swap_model_router`), `MemorySettingsSubscriber` (advisory logging for non-restart memory settings) ### Human Roles diff --git a/src/synthorg/api/app.py b/src/synthorg/api/app.py index a8b713d6e0..1db9a76037 100644 --- a/src/synthorg/api/app.py +++ b/src/synthorg/api/app.py @@ -60,6 +60,11 @@ from synthorg.persistence.config import PersistenceConfig, SQLiteConfig from synthorg.persistence.factory import create_backend from synthorg.persistence.protocol import PersistenceBackend # noqa: TC001 +from synthorg.settings.dispatcher import SettingsChangeDispatcher +from synthorg.settings.subscribers import ( + MemorySettingsSubscriber, + ProviderSettingsSubscriber, +) if TYPE_CHECKING: from collections.abc import Awaitable, Callable, Sequence @@ -169,6 +174,7 @@ def _build_lifecycle( # noqa: PLR0913 persistence: PersistenceBackend | None, message_bus: MessageBus | None, bridge: MessageBusBridge | None, + settings_dispatcher: SettingsChangeDispatcher | None, task_engine: TaskEngine | None, meeting_scheduler: MeetingScheduler | None, app_state: AppState, @@ -202,6 +208,7 @@ async def on_startup() -> None: persistence, message_bus, bridge, + settings_dispatcher, task_engine, meeting_scheduler, app_state, @@ -223,6 +230,7 @@ async def on_shutdown() -> None: await _safe_shutdown( task_engine, meeting_scheduler, + settings_dispatcher, bridge, message_bus, persistence, @@ -258,6 +266,8 @@ async def _cleanup_on_failure( # noqa: PLR0913 started_bus: bool, bridge: MessageBusBridge | None = None, started_bridge: bool = False, + settings_dispatcher: SettingsChangeDispatcher | None = None, + started_settings_dispatcher: bool = False, task_engine: TaskEngine | None = None, started_task_engine: bool = False, meeting_scheduler: MeetingScheduler | None = None, @@ -276,6 +286,12 @@ async def _cleanup_on_failure( # noqa: PLR0913 API_APP_STARTUP, "Cleanup: failed to stop task engine", ) + if started_settings_dispatcher and settings_dispatcher is not None: + await _try_stop( + settings_dispatcher.stop(), + API_APP_STARTUP, + "Cleanup: failed to stop settings dispatcher", + ) if started_bridge and bridge is not None: await _try_stop( bridge.stop(), @@ -338,15 +354,16 @@ async def _init_persistence( raise -async def _safe_startup( # noqa: PLR0913, C901 +async def _safe_startup( # noqa: PLR0913, PLR0912, PLR0915, C901 persistence: PersistenceBackend | None, message_bus: MessageBus | None, bridge: MessageBusBridge | None, + settings_dispatcher: SettingsChangeDispatcher | None, task_engine: TaskEngine | None, meeting_scheduler: MeetingScheduler | None, app_state: AppState, ) -> None: - """Start all services: persistence, bus, bridge, task engine, scheduler. + """Start all services: persistence, bus, bridge, dispatcher, task engine, scheduler. Executes in order; on failure, cleans up already-started components in reverse order before re-raising. @@ -354,6 +371,7 @@ async def _safe_startup( # noqa: PLR0913, C901 started_bus = False started_bridge = False started_persistence = False + started_settings_dispatcher = False started_task_engine = False started_meeting_scheduler = False try: @@ -391,6 +409,16 @@ async def _safe_startup( # noqa: PLR0913, C901 ) raise started_bridge = True + if settings_dispatcher is not None: + try: + await settings_dispatcher.start() + except Exception: + logger.exception( + API_APP_STARTUP, + error="Failed to start settings dispatcher", + ) + raise + started_settings_dispatcher = True if task_engine is not None: try: task_engine.start() @@ -419,6 +447,8 @@ async def _safe_startup( # noqa: PLR0913, C901 started_bus=started_bus, bridge=bridge, started_bridge=started_bridge, + settings_dispatcher=settings_dispatcher, + started_settings_dispatcher=started_settings_dispatcher, task_engine=task_engine, started_task_engine=started_task_engine, meeting_scheduler=meeting_scheduler, @@ -427,14 +457,15 @@ async def _safe_startup( # noqa: PLR0913, C901 raise -async def _safe_shutdown( +async def _safe_shutdown( # noqa: PLR0913 task_engine: TaskEngine | None, meeting_scheduler: MeetingScheduler | None, + settings_dispatcher: SettingsChangeDispatcher | None, bridge: MessageBusBridge | None, message_bus: MessageBus | None, persistence: PersistenceBackend | None, ) -> None: - """Stop scheduler, task engine, bridge, message bus and disconnect persistence. + """Stop scheduler, task engine, dispatcher, bridge, bus, persistence. Mirrors ``_cleanup_on_failure`` reverse order: scheduler first (depends on orchestrator), then task engine so it can drain queued mutations and @@ -452,6 +483,12 @@ async def _safe_shutdown( API_APP_SHUTDOWN, "Failed to stop task engine", ) + if settings_dispatcher is not None: + await _try_stop( + settings_dispatcher.stop(), + API_APP_SHUTDOWN, + "Failed to stop settings dispatcher", + ) if bridge is not None: await _try_stop( bridge.stop(), @@ -608,6 +645,12 @@ def create_app( # noqa: PLR0913 ) bridge = _build_bridge(message_bus, channels_plugin) + settings_dispatcher = _build_settings_dispatcher( + message_bus, + settings_service, + effective_config, + app_state, + ) plugins: list[ChannelsPlugin] = [channels_plugin] middleware = _build_middleware(api_config) @@ -621,6 +664,7 @@ def create_app( # noqa: PLR0913 persistence, message_bus, bridge, + settings_dispatcher, task_engine, meeting_scheduler, app_state, @@ -666,6 +710,27 @@ def _build_bridge( return MessageBusBridge(message_bus, channels_plugin) +def _build_settings_dispatcher( + message_bus: MessageBus | None, + settings_service: SettingsService | None, + config: RootConfig, + app_state: AppState, +) -> SettingsChangeDispatcher | None: + """Create settings change dispatcher if bus and settings are available.""" + if message_bus is None or settings_service is None: + return None + provider_sub = ProviderSettingsSubscriber( + config=config, + app_state=app_state, + settings_service=settings_service, + ) + memory_sub = MemorySettingsSubscriber() + return SettingsChangeDispatcher( + message_bus=message_bus, + subscribers=(provider_sub, memory_sub), + ) + + def _build_middleware(api_config: ApiConfig) -> list[Middleware]: """Build the middleware stack from configuration.""" rl = api_config.rate_limit diff --git a/src/synthorg/api/state.py b/src/synthorg/api/state.py index 636ebef280..eed3a9be04 100644 --- a/src/synthorg/api/state.py +++ b/src/synthorg/api/state.py @@ -23,7 +23,10 @@ from synthorg.hr.registry import AgentRegistryService # noqa: TC001 from synthorg.observability import get_logger from synthorg.observability.events.api import API_APP_STARTUP, API_SERVICE_UNAVAILABLE +from synthorg.observability.events.settings import SETTINGS_SERVICE_SWAPPED from synthorg.persistence.protocol import PersistenceBackend # noqa: TC001 +from synthorg.providers.registry import ProviderRegistry # noqa: TC001 +from synthorg.providers.routing.router import ModelRouter # noqa: TC001 from synthorg.settings.resolver import ConfigResolver from synthorg.settings.service import SettingsService # noqa: TC001 @@ -33,13 +36,11 @@ class AppState: """Typed application state container. - Service fields (``persistence``, ``message_bus``, ``cost_tracker``, - ``auth_service``, ``task_engine``, ``coordinator``, - ``agent_registry``) accept ``None`` at construction time for - dev/test mode. Property - accessors raise ``ServiceUnavailableError`` (HTTP 503) when the - service is not configured, producing a clear error instead of an - opaque ``AttributeError``. + All service fields accept ``None`` at construction time for + dev/test mode. Property accessors raise + ``ServiceUnavailableError`` (HTTP 503) when the service is not + configured, producing a clear error instead of an opaque + ``AttributeError``. Attributes: config: Root company configuration. @@ -59,8 +60,10 @@ class AppState: "_meeting_orchestrator", "_meeting_scheduler", "_message_bus", + "_model_router", "_performance_tracker", "_persistence", + "_provider_registry", "_settings_service", "_task_engine", "_ticket_store", @@ -86,6 +89,8 @@ def __init__( # noqa: PLR0913 meeting_orchestrator: MeetingOrchestrator | None = None, meeting_scheduler: MeetingScheduler | None = None, settings_service: SettingsService | None = None, + provider_registry: ProviderRegistry | None = None, + model_router: ModelRouter | None = None, startup_time: float = 0.0, ) -> None: self.config = config @@ -102,6 +107,8 @@ def __init__( # noqa: PLR0913 self._meeting_orchestrator = meeting_orchestrator self._meeting_scheduler = meeting_scheduler self._settings_service = settings_service + self._provider_registry = provider_registry + self._model_router = model_router self._config_resolver: ConfigResolver | None = ( ConfigResolver(settings_service=settings_service, config=config) if settings_service is not None @@ -279,3 +286,76 @@ def set_auth_service(self, service: AuthService) -> None: logger.error(API_APP_STARTUP, error=msg) raise RuntimeError(msg) self._auth_service = service + + # ── Swappable provider services (hot-reload) ───────────────── + + @property + def has_provider_registry(self) -> bool: + """Check whether the provider registry is configured.""" + return self._provider_registry is not None + + @property + def provider_registry(self) -> ProviderRegistry: + """Return provider registry or raise 503.""" + return self._require_service( + self._provider_registry, + "provider_registry", + ) + + def swap_provider_registry(self, registry: ProviderRegistry) -> None: + """Replace the provider registry (hot-reload). + + Unlike ``set_*`` methods, this does not guard against + replacement — it is designed for repeated hot-reload swaps. + Atomic under asyncio's cooperative scheduling — no ``await`` + points, so no coroutine can observe a partially-updated state. + + .. note:: + Not yet wired to a subscriber — provided for the provider + runtime CRUD feature (issue #451). + + Args: + registry: New provider registry instance. + """ + old_count = ( + len(self._provider_registry) if self._provider_registry is not None else 0 + ) + self._provider_registry = registry + logger.info( + SETTINGS_SERVICE_SWAPPED, + service="provider_registry", + old_provider_count=old_count, + new_provider_count=len(registry), + ) + + @property + def has_model_router(self) -> bool: + """Check whether the model router is configured.""" + return self._model_router is not None + + @property + def model_router(self) -> ModelRouter: + """Return model router or raise 503.""" + return self._require_service(self._model_router, "model_router") + + def swap_model_router(self, router: ModelRouter) -> None: + """Replace the model router (hot-reload). + + Unlike ``set_*`` methods, this does not guard against + replacement — it is designed for repeated hot-reload swaps. + Atomic under asyncio's cooperative scheduling — no ``await`` + points, so no coroutine can observe a partially-updated state. + + Args: + router: New model router instance. + """ + old_strategy = ( + self._model_router.strategy_name if self._model_router is not None else None + ) + self._model_router = router + logger.info( + SETTINGS_SERVICE_SWAPPED, + service="model_router", + old_strategy=old_strategy, + new_strategy=router.strategy_name, + ) diff --git a/src/synthorg/communication/config.py b/src/synthorg/communication/config.py index 3feb679cc2..3b0340198f 100644 --- a/src/synthorg/communication/config.py +++ b/src/synthorg/communication/config.py @@ -27,6 +27,7 @@ "#design", "#incidents", "#code-review", + "#settings", "#watercooler", ) diff --git a/src/synthorg/observability/events/settings.py b/src/synthorg/observability/events/settings.py index 800fafe13c..5ada9948a4 100644 --- a/src/synthorg/observability/events/settings.py +++ b/src/synthorg/observability/events/settings.py @@ -16,3 +16,18 @@ SETTINGS_NOT_FOUND: Final[str] = "settings.not_found" SETTINGS_REGISTRY_DUPLICATE: Final[str] = "settings.registry.duplicate" SETTINGS_CONFIG_PATH_MISS: Final[str] = "settings.config_bridge.path_miss" + +# ── Dispatcher & subscriber events ──────────────────────────────── + +SETTINGS_DISPATCHER_STARTED: Final[str] = "settings.dispatcher.started" +SETTINGS_DISPATCHER_STOPPED: Final[str] = "settings.dispatcher.stopped" +SETTINGS_DISPATCHER_POLL_ERROR: Final[str] = "settings.dispatcher.poll_error" +SETTINGS_DISPATCHER_CHANNEL_DEAD: Final[str] = "settings.dispatcher.channel_dead" +SETTINGS_SUBSCRIBER_NOTIFIED: Final[str] = "settings.subscriber.notified" +SETTINGS_SUBSCRIBER_ERROR: Final[str] = "settings.subscriber.error" +SETTINGS_SUBSCRIBER_RESTART_REQUIRED: Final[str] = ( + "settings.subscriber.restart_required" +) +SETTINGS_SERVICE_SWAPPED: Final[str] = "settings.service.swapped" +SETTINGS_SERVICE_SWAP_FAILED: Final[str] = "settings.service.swap_failed" +SETTINGS_CHANNEL_CREATED: Final[str] = "settings.channel.created" diff --git a/src/synthorg/settings/__init__.py b/src/synthorg/settings/__init__.py index 8e6cd2409b..45406a6ce7 100644 --- a/src/synthorg/settings/__init__.py +++ b/src/synthorg/settings/__init__.py @@ -19,6 +19,7 @@ from synthorg.settings.models import SettingDefinition, SettingEntry, SettingValue from synthorg.settings.registry import SettingsRegistry, get_registry from synthorg.settings.resolver import ConfigResolver +from synthorg.settings.subscriber import SettingsSubscriber __all__ = [ "ConfigResolver", @@ -34,5 +35,6 @@ "SettingsEncryptionError", "SettingsError", "SettingsRegistry", + "SettingsSubscriber", "get_registry", ] diff --git a/src/synthorg/settings/definitions/providers.py b/src/synthorg/settings/definitions/providers.py index c6ab01d237..f627f3d0a8 100644 --- a/src/synthorg/settings/definitions/providers.py +++ b/src/synthorg/settings/definitions/providers.py @@ -1,5 +1,6 @@ """Providers namespace setting definitions.""" +from synthorg.providers.routing.strategies import STRATEGY_MAP from synthorg.settings.enums import SettingLevel, SettingNamespace, SettingType from synthorg.settings.models import SettingDefinition from synthorg.settings.registry import get_registry @@ -25,13 +26,7 @@ default="cost_aware", description="Model routing strategy", group="Routing", - enum_values=( - "cost_aware", - "latency_aware", - "round_robin", - "priority_chain", - "capability_match", - ), + enum_values=tuple(sorted(STRATEGY_MAP.keys())), yaml_path="routing.strategy", ) ) diff --git a/src/synthorg/settings/dispatcher.py b/src/synthorg/settings/dispatcher.py new file mode 100644 index 0000000000..b471f0139d --- /dev/null +++ b/src/synthorg/settings/dispatcher.py @@ -0,0 +1,256 @@ +"""Settings change dispatcher — polls ``#settings`` and routes to subscribers. + +Follows the same polling-loop pattern as +:class:`~synthorg.api.bus_bridge.MessageBusBridge`. +""" + +import asyncio +import contextlib +from typing import TYPE_CHECKING, Final, NamedTuple + +from synthorg.communication.bus_protocol import MessageBus # noqa: TC001 +from synthorg.communication.channel import Channel +from synthorg.communication.enums import ChannelType +from synthorg.communication.errors import ChannelAlreadyExistsError +from synthorg.observability import get_logger +from synthorg.observability.events.settings import ( + SETTINGS_CHANNEL_CREATED, + SETTINGS_DISPATCHER_CHANNEL_DEAD, + SETTINGS_DISPATCHER_POLL_ERROR, + SETTINGS_DISPATCHER_STARTED, + SETTINGS_DISPATCHER_STOPPED, + SETTINGS_SUBSCRIBER_ERROR, + SETTINGS_SUBSCRIBER_NOTIFIED, + SETTINGS_SUBSCRIBER_RESTART_REQUIRED, +) +from synthorg.settings.subscriber import SettingsSubscriber # noqa: TC001 + +if TYPE_CHECKING: + from synthorg.communication.message import Message + +logger = get_logger(__name__) + +_SUBSCRIBER_ID: Final[str] = "__settings_dispatcher__" +_POLL_TIMEOUT: Final[float] = 1.0 +_ERROR_BACKOFF: Final[float] = 1.0 +_MAX_CONSECUTIVE_ERRORS: Final[int] = 30 +_SETTINGS_CHANNEL: Final[str] = "#settings" + + +class _ChangeMetadata(NamedTuple): + """Structured metadata extracted from a ``#settings`` bus message.""" + + namespace: str + key: str + restart_required: bool + + +class SettingsChangeDispatcher: + """Dispatch ``#settings`` bus messages to registered subscribers. + + On ``start()``, subscribes to the ``#settings`` channel and + begins polling for change notifications published by + :class:`~synthorg.settings.service.SettingsService`. + + Each incoming message is matched against subscribers' + ``watched_keys``. For settings with ``restart_required=True``, + a WARNING is logged and subscribers are **not** called. For all + other settings, matching subscribers' ``on_settings_changed`` + is invoked. Errors in individual subscribers are logged and + swallowed — the poll loop is never interrupted. + + Args: + message_bus: The message bus to poll. + subscribers: Registered settings subscribers. + """ + + def __init__( + self, + message_bus: MessageBus, + subscribers: tuple[SettingsSubscriber, ...], + ) -> None: + self._bus = message_bus + self._subscribers = subscribers + self._task: asyncio.Task[None] | None = None + self._running: bool = False + + async def start(self) -> None: + """Start the polling loop. + + Raises: + RuntimeError: If the dispatcher is already running. + """ + if self._running: + msg = "SettingsChangeDispatcher is already running" + logger.warning(SETTINGS_DISPATCHER_STARTED, error=msg) + raise RuntimeError(msg) + + await self._ensure_channel() + await self._bus.subscribe(_SETTINGS_CHANNEL, _SUBSCRIBER_ID) + + self._running = True + self._task = asyncio.create_task( + self._poll_loop(), + name="settings-dispatcher", + ) + self._task.add_done_callback(self._on_task_done) + logger.info( + SETTINGS_DISPATCHER_STARTED, + subscriber_count=len(self._subscribers), + ) + + async def stop(self) -> None: + """Cancel the polling task. Idempotent.""" + if not self._running: + return + + if self._task is not None: + self._task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._task + self._task = None + + self._running = False + logger.info(SETTINGS_DISPATCHER_STOPPED) + + def _on_task_done(self, task: asyncio.Task[None]) -> None: + """Handle unexpected poll-loop exit. + + Resets ``_running`` so the dispatcher's state is honest, + and logs an error if the loop died with an exception. + """ + if task.cancelled(): + return + self._running = False + exc = task.exception() + if exc is not None: + logger.error( + SETTINGS_DISPATCHER_CHANNEL_DEAD, + error="Settings dispatcher poll loop died unexpectedly", + exc_info=exc, + ) + else: + logger.warning( + SETTINGS_DISPATCHER_STOPPED, + note="Poll loop exited (max consecutive errors or channel dead)", + ) + + async def _ensure_channel(self) -> None: + """Create ``#settings`` channel if it does not exist.""" + try: + await self._bus.create_channel( + Channel(name=_SETTINGS_CHANNEL, type=ChannelType.TOPIC), + ) + logger.debug(SETTINGS_CHANNEL_CREATED, channel=_SETTINGS_CHANNEL) + except ChannelAlreadyExistsError: + pass + + async def _poll_loop(self) -> None: + """Continuously poll ``#settings`` and dispatch to subscribers.""" + consecutive_errors = 0 + + while True: + try: + envelope = await self._bus.receive( + _SETTINGS_CHANNEL, + _SUBSCRIBER_ID, + timeout=_POLL_TIMEOUT, + ) + if envelope is None: + continue + consecutive_errors = 0 + await self._dispatch(envelope.message) + except asyncio.CancelledError: + raise + except MemoryError, RecursionError: + raise + except OSError, TimeoutError: + consecutive_errors += 1 + if consecutive_errors >= _MAX_CONSECUTIVE_ERRORS: + logger.exception( + SETTINGS_DISPATCHER_CHANNEL_DEAD, + consecutive_errors=consecutive_errors, + ) + break + logger.warning( + SETTINGS_DISPATCHER_POLL_ERROR, + consecutive_errors=consecutive_errors, + exc_info=True, + ) + await asyncio.sleep(_ERROR_BACKOFF) + except Exception: + logger.error( + SETTINGS_DISPATCHER_CHANNEL_DEAD, + exc_info=True, + ) + break + + async def _dispatch(self, message: Message) -> None: + """Route a single settings change to matching subscribers.""" + meta = _extract_metadata(message) + if meta is None: + return + + namespace, key, restart_required = meta + + if restart_required: + logger.warning( + SETTINGS_SUBSCRIBER_RESTART_REQUIRED, + namespace=namespace, + key=key, + ) + return + + for subscriber in self._subscribers: + try: + if (namespace, key) not in subscriber.watched_keys: + continue + await subscriber.on_settings_changed(namespace, key) + logger.info( + SETTINGS_SUBSCRIBER_NOTIFIED, + subscriber=subscriber.subscriber_name, + namespace=namespace, + key=key, + ) + except MemoryError, RecursionError: + raise + except Exception: + logger.error( + SETTINGS_SUBSCRIBER_ERROR, + subscriber=getattr(subscriber, "subscriber_name", "unknown"), + namespace=namespace, + key=key, + exc_info=True, + ) + + +def _extract_metadata( + message: Message, +) -> _ChangeMetadata | None: + """Extract structured change metadata from a ``#settings`` message. + + Returns: + A :class:`_ChangeMetadata` or ``None`` if the ``namespace`` or + ``key`` metadata fields are missing. The ``restart_required`` + field defaults to ``True`` when absent — fail-safe to prevent + accidental hot-reload of restart-required settings on metadata + corruption. + """ + extra = dict(message.metadata.extra) + namespace = extra.get("namespace") + key = extra.get("key") + if namespace is None or key is None: + logger.warning( + SETTINGS_DISPATCHER_POLL_ERROR, + error="Received #settings message with missing metadata", + has_namespace=namespace is not None, + has_key=key is not None, + sender=message.sender, + ) + return None + # restart_required is encoded as str(bool) by SettingsService._publish_change. + # Default to True (fail-safe): missing/corrupted metadata prevents hot-reload + # rather than accidentally allowing it for restart-required settings. + restart_raw = extra.get("restart_required", "True") + restart_required = str(restart_raw).lower() != "false" + return _ChangeMetadata(namespace, key, restart_required) diff --git a/src/synthorg/settings/subscriber.py b/src/synthorg/settings/subscriber.py new file mode 100644 index 0000000000..52fdebd93d --- /dev/null +++ b/src/synthorg/settings/subscriber.py @@ -0,0 +1,55 @@ +"""Settings change subscriber protocol. + +Defines the interface for services that react to runtime setting +changes dispatched by :class:`SettingsChangeDispatcher`. +""" + +from typing import Protocol, runtime_checkable + + +@runtime_checkable +class SettingsSubscriber(Protocol): + """Structural interface for settings change subscribers. + + Implementations declare which ``(namespace, key)`` pairs they + watch and provide a callback invoked by the + :class:`~synthorg.settings.dispatcher.SettingsChangeDispatcher` + when a matching change is detected. + + The dispatcher handles ``restart_required`` filtering: if a + setting definition has ``restart_required=True``, the dispatcher + logs a WARNING and does **not** call ``on_settings_changed``. + Subscribers only receive changes for hot-reloadable settings. + + Attributes: + watched_keys: ``(namespace, key)`` pairs this subscriber + cares about. + subscriber_name: Human-readable name for logging. + """ + + @property + def watched_keys(self) -> frozenset[tuple[str, str]]: + """Return the set of (namespace, key) pairs this subscriber watches.""" + ... + + @property + def subscriber_name(self) -> str: + """Human-readable subscriber name for logging.""" + ... + + async def on_settings_changed( + self, + namespace: str, + key: str, + ) -> None: + """Handle a setting change notification. + + Only called for settings where ``restart_required=False``. + Implementations must be idempotent. Errors are caught by the + dispatcher — they do not crash the polling loop. + + Args: + namespace: Changed setting namespace. + key: Changed setting key. + """ + ... diff --git a/src/synthorg/settings/subscribers/__init__.py b/src/synthorg/settings/subscribers/__init__.py new file mode 100644 index 0000000000..481b08d210 --- /dev/null +++ b/src/synthorg/settings/subscribers/__init__.py @@ -0,0 +1,13 @@ +"""Concrete settings change subscribers.""" + +from synthorg.settings.subscribers.memory_subscriber import ( + MemorySettingsSubscriber, +) +from synthorg.settings.subscribers.provider_subscriber import ( + ProviderSettingsSubscriber, +) + +__all__ = [ + "MemorySettingsSubscriber", + "ProviderSettingsSubscriber", +] diff --git a/src/synthorg/settings/subscribers/memory_subscriber.py b/src/synthorg/settings/subscribers/memory_subscriber.py new file mode 100644 index 0000000000..35753cbbca --- /dev/null +++ b/src/synthorg/settings/subscribers/memory_subscriber.py @@ -0,0 +1,57 @@ +"""Memory settings subscriber — operator notification for memory config.""" + +from synthorg.observability import get_logger +from synthorg.observability.events.settings import SETTINGS_SUBSCRIBER_NOTIFIED + +logger = get_logger(__name__) + +# memory/backend has restart_required=True and is filtered by the +# dispatcher (logged as WARNING, subscriber never called). Only +# non-restart-required keys are watched here. +_WATCHED: frozenset[tuple[str, str]] = frozenset( + { + ("memory", "default_level"), + ("memory", "consolidation_interval"), + } +) + + +class MemorySettingsSubscriber: + """React to memory-namespace settings changes. + + The dispatcher filters out ``restart_required=True`` changes + (e.g. ``memory/backend``) by logging a WARNING and skipping + the subscriber callback entirely. This subscriber only sees + non-restart-required keys (``default_level``, + ``consolidation_interval``), for which it logs an INFO-level + notification that the value will take effect on next use. + """ + + @property + def watched_keys(self) -> frozenset[tuple[str, str]]: + """Return memory-namespace keys this subscriber watches.""" + return _WATCHED + + @property + def subscriber_name(self) -> str: + """Human-readable subscriber name.""" + return "memory-settings" + + async def on_settings_changed( + self, + namespace: str, + key: str, + ) -> None: + """Log that a memory setting changed and will take effect on next use. + + Args: + namespace: Changed setting namespace. + key: Changed setting key. + """ + logger.info( + SETTINGS_SUBSCRIBER_NOTIFIED, + subscriber=self.subscriber_name, + namespace=namespace, + key=key, + note="will take effect on next use", + ) diff --git a/src/synthorg/settings/subscribers/provider_subscriber.py b/src/synthorg/settings/subscribers/provider_subscriber.py new file mode 100644 index 0000000000..a08ddbf0cf --- /dev/null +++ b/src/synthorg/settings/subscribers/provider_subscriber.py @@ -0,0 +1,130 @@ +"""Provider settings subscriber — rebuilds ModelRouter on strategy change.""" + +from typing import TYPE_CHECKING + +from synthorg.observability import get_logger +from synthorg.observability.events.settings import ( + SETTINGS_SERVICE_SWAP_FAILED, + SETTINGS_SUBSCRIBER_NOTIFIED, +) +from synthorg.providers.routing.router import ModelRouter + +if TYPE_CHECKING: + from synthorg.api.state import AppState + from synthorg.config.schema import RootConfig + from synthorg.settings.service import SettingsService + +logger = get_logger(__name__) + +_WATCHED: frozenset[tuple[str, str]] = frozenset( + { + ("providers", "default_provider"), + ("providers", "routing_strategy"), + ("providers", "retry_max_attempts"), + } +) + + +class ProviderSettingsSubscriber: + """React to provider-namespace settings changes. + + On ``routing_strategy`` change, rebuilds :class:`ModelRouter` + with the new strategy and swaps it into ``AppState``. + + ``default_provider`` and ``retry_max_attempts`` are advisory-only: + they are read through :class:`ConfigResolver` at use time and do + not require a service rebuild. They are watched so the operator + sees a log entry confirming the change was detected. + + Errors during rebuild propagate to the dispatcher, which logs + them with full subscriber context and continues to the next + subscriber. The old ``ModelRouter`` remains in ``AppState``. + + Args: + config: Root company configuration (providers + routing). + app_state: Application state for service swap. + settings_service: Settings service for reading new values. + """ + + def __init__( + self, + config: RootConfig, + app_state: AppState, + settings_service: SettingsService, + ) -> None: + self._config = config + self._app_state = app_state + self._settings_service = settings_service + + @property + def watched_keys(self) -> frozenset[tuple[str, str]]: + """Return provider-namespace keys this subscriber watches.""" + return _WATCHED + + @property + def subscriber_name(self) -> str: + """Human-readable subscriber name.""" + return "provider-settings" + + async def on_settings_changed( + self, + namespace: str, + key: str, + ) -> None: + """Handle a provider setting change. + + Only ``routing_strategy`` triggers a :class:`ModelRouter` + rebuild. Other keys are advisory and logged at INFO level. + + Args: + namespace: Changed setting namespace. + key: Changed setting key. + """ + if key == "routing_strategy": + await self._rebuild_router() + else: + logger.info( + SETTINGS_SUBSCRIBER_NOTIFIED, + subscriber=self.subscriber_name, + namespace=namespace, + key=key, + note="advisory — read through ConfigResolver at use time", + ) + + async def _rebuild_router(self) -> None: + """Build a new ModelRouter from current settings and swap it in. + + Reads the current ``routing_strategy`` value from + :class:`SettingsService`, extracts the string value from the + returned :class:`SettingValue`, and constructs a new router. + On failure, the existing ``ModelRouter`` in ``AppState`` + remains unchanged. Errors are logged with actionable context + via ``SETTINGS_SERVICE_SWAP_FAILED`` before re-raising to the + dispatcher. + """ + attempted_strategy: str | None = None + try: + result = await self._settings_service.get( + "providers", + "routing_strategy", + ) + attempted_strategy = result.value + config = self._app_state.config + new_routing = config.routing.model_copy( + update={"strategy": attempted_strategy}, + ) + new_router = ModelRouter( + new_routing, + dict(config.providers), + ) + except MemoryError, RecursionError: + raise + except Exception: + logger.error( + SETTINGS_SERVICE_SWAP_FAILED, + service="model_router", + attempted_strategy=attempted_strategy, + exc_info=True, + ) + raise + self._app_state.swap_model_router(new_router) diff --git a/tests/unit/api/test_app.py b/tests/unit/api/test_app.py index e47cebd787..307c463733 100644 --- a/tests/unit/api/test_app.py +++ b/tests/unit/api/test_app.py @@ -107,7 +107,7 @@ async def failing_start() -> None: bus.start = failing_start # type: ignore[method-assign] with pytest.raises(RuntimeError, match="bus boom"): - await _safe_startup(persistence, bus, None, None, None, app_state) + await _safe_startup(persistence, bus, None, None, None, None, app_state) # Persistence should have been disconnected during cleanup assert not persistence.is_connected @@ -125,7 +125,7 @@ async def failing_disconnect() -> None: persistence.disconnect = failing_disconnect # type: ignore[method-assign] # Should not raise even when disconnect fails - await _safe_shutdown(None, None, None, None, persistence) + await _safe_shutdown(None, None, None, None, None, persistence) async def test_task_engine_failure_cleans_up( self, @@ -155,7 +155,51 @@ async def test_task_engine_failure_cleans_up( ) with pytest.raises(RuntimeError, match="engine boom"): - await _safe_startup(persistence, bus, None, mock_te, None, app_state) + await _safe_startup(persistence, bus, None, None, mock_te, None, app_state) + + # Persistence and bus should be cleaned up + assert not persistence.is_connected + assert not bus.is_running + + async def test_settings_dispatcher_failure_cleans_up( + self, + root_config: Any, + ) -> None: + """Settings dispatcher start fails → persistence + bus cleaned up.""" + from unittest.mock import AsyncMock, MagicMock + + from synthorg.api.app import _safe_startup + from synthorg.api.approval_store import ApprovalStore + from synthorg.api.state import AppState + from tests.unit.api.conftest import ( + FakeMessageBus, + FakePersistenceBackend, + ) + + persistence = FakePersistenceBackend() + bus = FakeMessageBus() + mock_dispatcher = MagicMock() + mock_dispatcher.start = AsyncMock( + side_effect=RuntimeError("dispatcher boom"), + ) + mock_dispatcher.stop = AsyncMock() + + app_state = AppState( + config=root_config, + approval_store=ApprovalStore(), + persistence=persistence, + ) + + with pytest.raises(RuntimeError, match="dispatcher boom"): + await _safe_startup( + persistence, + bus, + None, + mock_dispatcher, + None, + None, + app_state, + ) # Persistence and bus should be cleaned up assert not persistence.is_connected @@ -171,7 +215,7 @@ async def test_shutdown_task_engine_failure_does_not_propagate(self) -> None: mock_te.stop = AsyncMock(side_effect=RuntimeError("stop boom")) # Should not raise even when task engine stop fails - await _safe_shutdown(mock_te, None, None, None, None) + await _safe_shutdown(mock_te, None, None, None, None, None) async def test_meeting_scheduler_lifecycle( self, @@ -205,12 +249,13 @@ async def test_meeting_scheduler_lifecycle( bus, None, None, + None, mock_sched, app_state, ) mock_sched.start.assert_awaited_once() - await _safe_shutdown(None, mock_sched, None, None, None) + await _safe_shutdown(None, mock_sched, None, None, None, None) mock_sched.stop.assert_awaited_once() diff --git a/tests/unit/api/test_state_swap.py b/tests/unit/api/test_state_swap.py new file mode 100644 index 0000000000..c928a19714 --- /dev/null +++ b/tests/unit/api/test_state_swap.py @@ -0,0 +1,106 @@ +"""Tests for AppState service swap methods (hot-reload).""" + +import pytest + +from synthorg.api.approval_store import ApprovalStore +from synthorg.api.errors import ServiceUnavailableError +from synthorg.api.state import AppState +from synthorg.config.schema import RootConfig +from synthorg.providers.registry import ProviderRegistry +from synthorg.providers.routing.router import ModelRouter + + +def _make_state(**overrides: object) -> AppState: + defaults: dict[str, object] = { + "config": RootConfig(company_name="test"), + "approval_store": ApprovalStore(), + } + defaults.update(overrides) + return AppState(**defaults) # type: ignore[arg-type] + + +def _make_registry() -> ProviderRegistry: + """Build an empty ProviderRegistry.""" + return ProviderRegistry({}) + + +def _make_router(config: RootConfig | None = None) -> ModelRouter: + """Build a ModelRouter from default config.""" + cfg = config or RootConfig(company_name="test") + return ModelRouter(cfg.routing, dict(cfg.providers)) + + +@pytest.mark.unit +class TestAppStateProviderRegistrySwap: + """Tests for provider_registry slot and swap.""" + + def test_provider_registry_raises_when_none(self) -> None: + state = _make_state(provider_registry=None) + with pytest.raises(ServiceUnavailableError): + _ = state.provider_registry + + def test_provider_registry_returns_when_set(self) -> None: + registry = _make_registry() + state = _make_state(provider_registry=registry) + assert state.provider_registry is registry + + def test_has_provider_registry_false_when_none(self) -> None: + state = _make_state() + assert state.has_provider_registry is False + + def test_has_provider_registry_true_when_set(self) -> None: + registry = _make_registry() + state = _make_state(provider_registry=registry) + assert state.has_provider_registry is True + + def test_swap_provider_registry_replaces_reference(self) -> None: + old = _make_registry() + new = _make_registry() + state = _make_state(provider_registry=old) + state.swap_provider_registry(new) + assert state.provider_registry is new + assert state.provider_registry is not old + + def test_swap_provider_registry_from_none(self) -> None: + new = _make_registry() + state = _make_state() + state.swap_provider_registry(new) + assert state.provider_registry is new + + +@pytest.mark.unit +class TestAppStateModelRouterSwap: + """Tests for model_router slot and swap.""" + + def test_model_router_raises_when_none(self) -> None: + state = _make_state(model_router=None) + with pytest.raises(ServiceUnavailableError): + _ = state.model_router + + def test_model_router_returns_when_set(self) -> None: + router = _make_router() + state = _make_state(model_router=router) + assert state.model_router is router + + def test_has_model_router_false_when_none(self) -> None: + state = _make_state() + assert state.has_model_router is False + + def test_has_model_router_true_when_set(self) -> None: + router = _make_router() + state = _make_state(model_router=router) + assert state.has_model_router is True + + def test_swap_model_router_replaces_reference(self) -> None: + old = _make_router() + new = _make_router() + state = _make_state(model_router=old) + state.swap_model_router(new) + assert state.model_router is new + assert state.model_router is not old + + def test_swap_model_router_from_none(self) -> None: + new = _make_router() + state = _make_state() + state.swap_model_router(new) + assert state.model_router is new diff --git a/tests/unit/settings/test_dispatcher.py b/tests/unit/settings/test_dispatcher.py new file mode 100644 index 0000000000..4e7a456169 --- /dev/null +++ b/tests/unit/settings/test_dispatcher.py @@ -0,0 +1,613 @@ +"""Tests for SettingsChangeDispatcher.""" + +import asyncio +import contextlib +from collections.abc import AsyncGenerator +from datetime import UTC, datetime + +import pytest + +from synthorg.communication.channel import Channel +from synthorg.communication.enums import ChannelType, MessageType +from synthorg.communication.errors import ChannelAlreadyExistsError +from synthorg.communication.message import Message, MessageMetadata +from synthorg.communication.subscription import DeliveryEnvelope, Subscription +from synthorg.settings.dispatcher import SettingsChangeDispatcher + +# ── Helpers ────────────────────────────────────────────────────── + + +def _settings_message( + namespace: str, + key: str, + restart_required: bool = False, +) -> Message: + """Build a #settings channel message matching SettingsService format.""" + return Message( + timestamp=datetime.now(UTC), + sender="system", + to="#settings", + type=MessageType.ANNOUNCEMENT, + channel="#settings", + content=f"Setting changed: {namespace}/{key}", + metadata=MessageMetadata( + extra=( + ("namespace", namespace), + ("key", key), + ("restart_required", str(restart_required)), + ), + ), + ) + + +def _envelope(msg: Message) -> DeliveryEnvelope: + return DeliveryEnvelope( + message=msg, + channel_name="#settings", + delivered_at=datetime.now(UTC), + ) + + +class _FakeSubscriber: + """Test subscriber that records calls and signals completion.""" + + def __init__( + self, + name: str, + keys: frozenset[tuple[str, str]], + ) -> None: + self._name = name + self._keys = keys + self.calls: list[tuple[str, str]] = [] + self.notified: asyncio.Event = asyncio.Event() + + @property + def watched_keys(self) -> frozenset[tuple[str, str]]: + return self._keys + + @property + def subscriber_name(self) -> str: + return self._name + + async def on_settings_changed(self, namespace: str, key: str) -> None: + self.calls.append((namespace, key)) + self.notified.set() + + +class _ErrorSubscriber(_FakeSubscriber): + """Subscriber that raises on every call.""" + + async def on_settings_changed(self, namespace: str, key: str) -> None: + msg = f"boom from {self._name}" + raise RuntimeError(msg) + + +class _FakeBus: + """Controllable message bus for dispatcher tests. + + Feed messages via ``enqueue(envelope)``; the dispatcher's polling + loop will consume them in order. + """ + + def __init__(self) -> None: + self._running = True + self._queue: asyncio.Queue[DeliveryEnvelope | None] = asyncio.Queue() + self._channels_created: list[str] = [] + self._subscriptions: list[tuple[str, str]] = [] + self._stop_event = asyncio.Event() + + @property + def is_running(self) -> bool: + return self._running + + async def start(self) -> None: + self._running = True + + async def stop(self) -> None: + self._running = False + self._stop_event.set() + + def enqueue(self, envelope: DeliveryEnvelope) -> None: + self._queue.put_nowait(envelope) + + async def subscribe(self, channel_name: str, subscriber_id: str) -> Subscription: + self._subscriptions.append((channel_name, subscriber_id)) + return Subscription( + channel_name=channel_name, + subscriber_id=subscriber_id, + subscribed_at=datetime.now(UTC), + ) + + async def unsubscribe(self, channel_name: str, subscriber_id: str) -> None: + pass + + async def receive( + self, + channel_name: str, + subscriber_id: str, + *, + timeout: float | None = None, # noqa: ASYNC109 + ) -> DeliveryEnvelope | None: + try: + return await asyncio.wait_for( + self._queue.get(), + timeout=timeout, + ) + except TimeoutError: + return None + + async def create_channel(self, channel: Channel) -> Channel: + self._channels_created.append(channel.name) + return channel + + async def get_channel(self, channel_name: str) -> Channel: + return Channel(name=channel_name, type=ChannelType.TOPIC) + + async def list_channels(self) -> tuple[Channel, ...]: + return () + + async def publish(self, message: Message) -> None: + pass + + async def send_direct(self, message: Message, *, recipient: str) -> None: + pass + + async def get_channel_history( + self, channel_name: str, *, limit: int | None = None + ) -> tuple[Message, ...]: + return () + + +@pytest.fixture +def bus() -> _FakeBus: + return _FakeBus() + + +@pytest.fixture +def provider_sub() -> _FakeSubscriber: + return _FakeSubscriber( + "provider-sub", + frozenset({("providers", "routing_strategy")}), + ) + + +@pytest.fixture +def memory_sub() -> _FakeSubscriber: + return _FakeSubscriber( + "memory-sub", + frozenset({("memory", "backend"), ("memory", "default_level")}), + ) + + +@pytest.fixture +def dispatcher( + bus: _FakeBus, + provider_sub: _FakeSubscriber, + memory_sub: _FakeSubscriber, +) -> SettingsChangeDispatcher: + return SettingsChangeDispatcher( + message_bus=bus, + subscribers=(provider_sub, memory_sub), + ) + + +@pytest.fixture +async def started_dispatcher( + dispatcher: SettingsChangeDispatcher, +) -> AsyncGenerator[SettingsChangeDispatcher]: + """Start the dispatcher and stop it on teardown.""" + await dispatcher.start() + yield dispatcher + await dispatcher.stop() + + +async def _wait_for_subscriber( + subscriber: _FakeSubscriber, + *, + timeout: float = 2.0, # noqa: ASYNC109 +) -> None: + """Wait until the subscriber's ``on_settings_changed`` has been called. + + Event-driven: blocks on ``subscriber.notified`` rather than polling + or sleeping, so the test wakes deterministically as soon as the + dispatcher finishes dispatching to this subscriber. + """ + await asyncio.wait_for(subscriber.notified.wait(), timeout=timeout) + # Reset for the next wait + subscriber.notified.clear() + + +async def _wait_for_queue_drain( + bus: _FakeBus, + *, + timeout: float = 2.0, # noqa: ASYNC109 +) -> None: + """Wait for the bus queue to empty (for negative/skip assertions). + + Used when no subscriber is expected to be called — we wait for the + dispatcher to consume the message from the queue, then give it a + tick to finish the dispatch decision (skip/restart_required). + """ + loop = asyncio.get_running_loop() + deadline = loop.time() + timeout + while bus._queue.qsize() > 0: + if loop.time() > deadline: + msg = "Queue drain timed out" + raise TimeoutError(msg) + await asyncio.sleep(0.01) + # One event-loop tick for the dispatcher to finish processing + await asyncio.sleep(0) + + +# ── Lifecycle Tests ────────────────────────────────────────────── + + +@pytest.mark.unit +class TestDispatcherLifecycle: + async def test_start_subscribes_to_settings_channel( + self, + started_dispatcher: SettingsChangeDispatcher, + bus: _FakeBus, + ) -> None: + assert ("#settings", "__settings_dispatcher__") in bus._subscriptions + + async def test_double_start_raises( + self, + started_dispatcher: SettingsChangeDispatcher, + ) -> None: + with pytest.raises(RuntimeError, match="already running"): + await started_dispatcher.start() + + async def test_stop_is_idempotent( + self, + dispatcher: SettingsChangeDispatcher, + ) -> None: + await dispatcher.start() + await dispatcher.stop() + await dispatcher.stop() # should not raise + + async def test_stop_without_start( + self, + dispatcher: SettingsChangeDispatcher, + ) -> None: + # Should not raise + await dispatcher.stop() + + +# ── Dispatch Tests ─────────────────────────────────────────────── + + +@pytest.mark.unit +class TestDispatchRouting: + async def test_dispatches_to_matching_subscriber( + self, + started_dispatcher: SettingsChangeDispatcher, + bus: _FakeBus, + provider_sub: _FakeSubscriber, + ) -> None: + msg = _settings_message("providers", "routing_strategy") + bus.enqueue(_envelope(msg)) + await _wait_for_subscriber(provider_sub) + assert ("providers", "routing_strategy") in provider_sub.calls + + async def test_does_not_dispatch_to_non_matching_subscriber( + self, + started_dispatcher: SettingsChangeDispatcher, + bus: _FakeBus, + provider_sub: _FakeSubscriber, + memory_sub: _FakeSubscriber, + ) -> None: + msg = _settings_message("providers", "routing_strategy") + bus.enqueue(_envelope(msg)) + # provider_sub matches and gets called — wait on it + await _wait_for_subscriber(provider_sub) + assert len(memory_sub.calls) == 0 + + async def test_dispatches_to_multiple_matching_subscribers( + self, + bus: _FakeBus, + ) -> None: + sub_a = _FakeSubscriber("a", frozenset({("ns", "k")})) + sub_b = _FakeSubscriber("b", frozenset({("ns", "k")})) + d = SettingsChangeDispatcher( + message_bus=bus, + subscribers=(sub_a, sub_b), + ) + await d.start() + try: + bus.enqueue(_envelope(_settings_message("ns", "k"))) + await _wait_for_subscriber(sub_b) + assert ("ns", "k") in sub_a.calls + assert ("ns", "k") in sub_b.calls + finally: + await d.stop() + + async def test_skips_restart_required_settings( + self, + started_dispatcher: SettingsChangeDispatcher, + bus: _FakeBus, + memory_sub: _FakeSubscriber, + ) -> None: + msg = _settings_message("memory", "backend", restart_required=True) + bus.enqueue(_envelope(msg)) + await _wait_for_queue_drain(bus) + assert len(memory_sub.calls) == 0 + + async def test_dispatches_non_restart_required_memory_settings( + self, + started_dispatcher: SettingsChangeDispatcher, + bus: _FakeBus, + memory_sub: _FakeSubscriber, + ) -> None: + msg = _settings_message("memory", "default_level", restart_required=False) + bus.enqueue(_envelope(msg)) + await _wait_for_subscriber(memory_sub) + assert ("memory", "default_level") in memory_sub.calls + + +# ── Error Isolation Tests ──────────────────────────────────────── + + +@pytest.mark.unit +class TestDispatcherErrorIsolation: + async def test_continues_after_subscriber_error( + self, + bus: _FakeBus, + ) -> None: + """A failing subscriber does not prevent others from being notified.""" + error_sub = _ErrorSubscriber("boom", frozenset({("ns", "k")})) + good_sub = _FakeSubscriber("ok", frozenset({("ns", "k")})) + d = SettingsChangeDispatcher( + message_bus=bus, + subscribers=(error_sub, good_sub), + ) + await d.start() + try: + bus.enqueue(_envelope(_settings_message("ns", "k"))) + await _wait_for_subscriber(good_sub) + assert ("ns", "k") in good_sub.calls + finally: + await d.stop() + + async def test_poll_loop_survives_subscriber_error( + self, + bus: _FakeBus, + ) -> None: + """After one error, the loop keeps processing subsequent messages.""" + error_sub = _ErrorSubscriber("boom", frozenset({("ns", "k")})) + good_sub = _FakeSubscriber("ok", frozenset({("ns", "k")})) + d = SettingsChangeDispatcher( + message_bus=bus, + subscribers=(error_sub, good_sub), + ) + await d.start() + try: + bus.enqueue(_envelope(_settings_message("ns", "k"))) + await _wait_for_subscriber(good_sub) + good_sub.calls.clear() + + bus.enqueue(_envelope(_settings_message("ns", "k"))) + await _wait_for_subscriber(good_sub) + assert ("ns", "k") in good_sub.calls + finally: + await d.stop() + + +# ── Metadata Extraction Tests ──────────────────────────────────── + + +@pytest.mark.unit +class TestMetadataExtraction: + async def test_ignores_message_with_missing_metadata( + self, + started_dispatcher: SettingsChangeDispatcher, + bus: _FakeBus, + provider_sub: _FakeSubscriber, + ) -> None: + """Messages without namespace/key in metadata are skipped.""" + msg = Message( + timestamp=datetime.now(UTC), + sender="system", + to="#settings", + type=MessageType.ANNOUNCEMENT, + channel="#settings", + content="bad message", + metadata=MessageMetadata(extra=()), + ) + bus.enqueue(_envelope(msg)) + await _wait_for_queue_drain(bus) + assert len(provider_sub.calls) == 0 + + async def test_partial_metadata_namespace_only( + self, + started_dispatcher: SettingsChangeDispatcher, + bus: _FakeBus, + provider_sub: _FakeSubscriber, + ) -> None: + """Message with namespace but no key is skipped.""" + msg = Message( + timestamp=datetime.now(UTC), + sender="system", + to="#settings", + type=MessageType.ANNOUNCEMENT, + channel="#settings", + content="partial", + metadata=MessageMetadata( + extra=(("namespace", "providers"),), + ), + ) + bus.enqueue(_envelope(msg)) + await _wait_for_queue_drain(bus) + assert len(provider_sub.calls) == 0 + + async def test_restart_required_defaults_to_true_when_absent( + self, + bus: _FakeBus, + ) -> None: + """Missing restart_required metadata defaults to True (fail-safe).""" + sub = _FakeSubscriber("sub", frozenset({("ns", "k")})) + d = SettingsChangeDispatcher( + message_bus=bus, + subscribers=(sub,), + ) + # Message with namespace and key but NO restart_required field + msg = Message( + timestamp=datetime.now(UTC), + sender="system", + to="#settings", + type=MessageType.ANNOUNCEMENT, + channel="#settings", + content="no restart flag", + metadata=MessageMetadata( + extra=(("namespace", "ns"), ("key", "k")), + ), + ) + await d.start() + try: + bus.enqueue(_envelope(msg)) + await _wait_for_queue_drain(bus) + # Fail-safe: missing restart_required treated as True → not dispatched + assert len(sub.calls) == 0 + finally: + await d.stop() + + +# ── Done Callback Tests ────────────────────────────────────────── + + +@pytest.mark.unit +class TestDoneCallback: + async def test_running_flag_cleared_on_unexpected_exit( + self, + ) -> None: + """_running is set to False when poll loop exits unexpectedly.""" + sub = _FakeSubscriber("sub", frozenset()) + + class _ErrorBus(_FakeBus): + async def receive( + self, + channel_name: str, + subscriber_id: str, + *, + timeout: float | None = None, # noqa: ASYNC109 + ) -> DeliveryEnvelope | None: + msg = "unexpected bus error" + raise ValueError(msg) + + err_bus = _ErrorBus() + d = SettingsChangeDispatcher( + message_bus=err_bus, + subscribers=(sub,), + ) + await d.start() + # Wait for the task to complete deterministically + assert d._task is not None + with contextlib.suppress(Exception): + await asyncio.wait_for(d._task, timeout=2.0) + # done_callback should have set _running to False + assert d._running is False + + +@pytest.mark.unit +class TestEnsureChannel: + async def test_start_succeeds_when_channel_already_exists( + self, + ) -> None: + """Dispatcher starts cleanly even if #settings channel pre-exists.""" + sub = _FakeSubscriber("sub", frozenset()) + + class _ExistingChannelBus(_FakeBus): + async def create_channel(self, channel: Channel) -> Channel: + raise ChannelAlreadyExistsError(channel.name) + + bus = _ExistingChannelBus() + d = SettingsChangeDispatcher( + message_bus=bus, + subscribers=(sub,), + ) + await d.start() + try: + assert d._running is True + finally: + await d.stop() + + +@pytest.mark.unit +class TestConsecutiveErrors: + async def test_transient_errors_do_not_kill_loop( + self, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """OSError/TimeoutError are tolerated below the threshold.""" + import synthorg.settings.dispatcher as _mod + + monkeypatch.setattr(_mod, "_ERROR_BACKOFF", 0.01) + monkeypatch.setattr(_mod, "_MAX_CONSECUTIVE_ERRORS", 5) + + sub = _FakeSubscriber("sub", frozenset({("ns", "k")})) + call_count = 0 + + class _TransientBus(_FakeBus): + async def receive( + self, + channel_name: str, + subscriber_id: str, + *, + timeout: float | None = None, # noqa: ASYNC109 + ) -> DeliveryEnvelope | None: + nonlocal call_count + call_count += 1 + if call_count <= 3: + msg = "transient" + raise OSError(msg) + if call_count == 4: + # After 3 errors, return a valid message once + return _envelope(_settings_message("ns", "k")) + # Then block (normal poll timeout) + await asyncio.sleep(timeout or 1.0) + return None + + bus = _TransientBus() + d = SettingsChangeDispatcher( + message_bus=bus, + subscribers=(sub,), + ) + await d.start() + try: + await _wait_for_subscriber(sub, timeout=10.0) + assert ("ns", "k") in sub.calls + finally: + await d.stop() + + async def test_max_consecutive_errors_kills_loop( + self, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Loop exits after _MAX_CONSECUTIVE_ERRORS OSErrors.""" + import synthorg.settings.dispatcher as _mod + + monkeypatch.setattr(_mod, "_ERROR_BACKOFF", 0.01) + monkeypatch.setattr(_mod, "_MAX_CONSECUTIVE_ERRORS", 5) + + class _PermanentErrorBus(_FakeBus): + async def receive( + self, + channel_name: str, + subscriber_id: str, + *, + timeout: float | None = None, # noqa: ASYNC109 + ) -> DeliveryEnvelope | None: + msg = "permanent" + raise OSError(msg) + + bus = _PermanentErrorBus() + sub = _FakeSubscriber("sub", frozenset()) + d = SettingsChangeDispatcher( + message_bus=bus, + subscribers=(sub,), + ) + await d.start() + assert d._task is not None + with contextlib.suppress(Exception): + await asyncio.wait_for(d._task, timeout=10.0) + assert d._running is False diff --git a/tests/unit/settings/test_memory_subscriber.py b/tests/unit/settings/test_memory_subscriber.py new file mode 100644 index 0000000000..69fb4bcbad --- /dev/null +++ b/tests/unit/settings/test_memory_subscriber.py @@ -0,0 +1,46 @@ +"""Tests for MemorySettingsSubscriber.""" + +import pytest + +from synthorg.settings.subscriber import SettingsSubscriber +from synthorg.settings.subscribers.memory_subscriber import ( + MemorySettingsSubscriber, +) + + +@pytest.mark.unit +class TestMemorySubscriberProtocol: + """MemorySettingsSubscriber conforms to SettingsSubscriber.""" + + def test_isinstance_check(self) -> None: + sub = MemorySettingsSubscriber() + assert isinstance(sub, SettingsSubscriber) + + def test_watched_keys(self) -> None: + sub = MemorySettingsSubscriber() + assert ("memory", "default_level") in sub.watched_keys + assert ("memory", "consolidation_interval") in sub.watched_keys + # memory/backend has restart_required=True — filtered by + # dispatcher, not watched by subscriber + assert ("memory", "backend") not in sub.watched_keys + + def test_subscriber_name(self) -> None: + sub = MemorySettingsSubscriber() + assert sub.subscriber_name == "memory-settings" + + +@pytest.mark.unit +class TestMemorySubscriberBehavior: + """on_settings_changed logs info (does not rebuild).""" + + async def test_on_settings_changed_does_not_raise(self) -> None: + sub = MemorySettingsSubscriber() + # Should not raise — just logs INFO + await sub.on_settings_changed("memory", "default_level") + await sub.on_settings_changed("memory", "consolidation_interval") + + async def test_on_settings_changed_is_idempotent(self) -> None: + sub = MemorySettingsSubscriber() + await sub.on_settings_changed("memory", "default_level") + await sub.on_settings_changed("memory", "default_level") + # No side effects, no error diff --git a/tests/unit/settings/test_provider_subscriber.py b/tests/unit/settings/test_provider_subscriber.py new file mode 100644 index 0000000000..e2197f3db6 --- /dev/null +++ b/tests/unit/settings/test_provider_subscriber.py @@ -0,0 +1,162 @@ +"""Tests for ProviderSettingsSubscriber.""" + +from unittest.mock import AsyncMock + +import pytest + +from synthorg.api.approval_store import ApprovalStore +from synthorg.api.state import AppState +from synthorg.config.schema import RootConfig +from synthorg.providers.routing.errors import UnknownStrategyError +from synthorg.providers.routing.router import ModelRouter +from synthorg.settings.enums import SettingNamespace, SettingSource +from synthorg.settings.models import SettingValue +from synthorg.settings.subscriber import SettingsSubscriber +from synthorg.settings.subscribers.provider_subscriber import ( + ProviderSettingsSubscriber, +) + + +def _setting_value(value: str) -> SettingValue: + """Build a SettingValue matching SettingsService.get() return type.""" + return SettingValue( + namespace=SettingNamespace.PROVIDERS, + key="routing_strategy", + value=value, + source=SettingSource.DEFAULT, + ) + + +def _make_state(config: RootConfig | None = None) -> AppState: + cfg = config or RootConfig(company_name="test") + router = ModelRouter(cfg.routing, dict(cfg.providers)) + return AppState( + config=cfg, + approval_store=ApprovalStore(), + model_router=router, + ) + + +def _make_subscriber( + config: RootConfig | None = None, + app_state: AppState | None = None, + settings_service: AsyncMock | None = None, +) -> tuple[ProviderSettingsSubscriber, AppState]: + cfg = config or RootConfig(company_name="test") + state = app_state or _make_state(cfg) + svc = settings_service or AsyncMock() + svc.get = AsyncMock(return_value=_setting_value("cost_aware")) + sub = ProviderSettingsSubscriber( + config=cfg, + app_state=state, + settings_service=svc, + ) + return sub, state + + +@pytest.mark.unit +class TestProviderSubscriberProtocol: + """ProviderSettingsSubscriber conforms to SettingsSubscriber.""" + + def test_isinstance_check(self) -> None: + sub, _ = _make_subscriber() + assert isinstance(sub, SettingsSubscriber) + + def test_watched_keys(self) -> None: + sub, _ = _make_subscriber() + assert ("providers", "routing_strategy") in sub.watched_keys + assert ("providers", "default_provider") in sub.watched_keys + assert ("providers", "retry_max_attempts") in sub.watched_keys + + def test_subscriber_name(self) -> None: + sub, _ = _make_subscriber() + assert sub.subscriber_name == "provider-settings" + + +@pytest.mark.unit +class TestProviderSubscriberRebuild: + """on_settings_changed rebuilds ModelRouter when strategy changes.""" + + async def test_routing_strategy_change_swaps_router(self) -> None: + cfg = RootConfig(company_name="test") + state = _make_state(cfg) + old_router = state.model_router + + svc = AsyncMock() + svc.get = AsyncMock(return_value=_setting_value("cost_aware")) + sub = ProviderSettingsSubscriber( + config=cfg, + app_state=state, + settings_service=svc, + ) + await sub.on_settings_changed("providers", "routing_strategy") + assert state.model_router is not old_router + + async def test_rebuild_failure_propagates(self) -> None: + """Errors in _rebuild_router propagate to the dispatcher.""" + cfg = RootConfig(company_name="test") + state = _make_state(cfg) + old_router = state.model_router + + svc = AsyncMock() + svc.get = AsyncMock( + return_value=_setting_value("nonexistent_strategy"), + ) + sub = ProviderSettingsSubscriber( + config=cfg, + app_state=state, + settings_service=svc, + ) + # Error propagates (dispatcher catches it for logging) + with pytest.raises(UnknownStrategyError): + await sub.on_settings_changed("providers", "routing_strategy") + # Old router is still in place (swap never called) + assert state.model_router is old_router + + async def test_default_provider_change_is_noop(self) -> None: + cfg = RootConfig(company_name="test") + state = _make_state(cfg) + old_router = state.model_router + + svc = AsyncMock() + svc.get = AsyncMock(return_value=_setting_value("some-provider")) + sub = ProviderSettingsSubscriber( + config=cfg, + app_state=state, + settings_service=svc, + ) + await sub.on_settings_changed("providers", "default_provider") + # Router not swapped for advisory-only settings + assert state.model_router is old_router + + async def test_retry_max_attempts_change_is_noop(self) -> None: + cfg = RootConfig(company_name="test") + state = _make_state(cfg) + old_router = state.model_router + + svc = AsyncMock() + svc.get = AsyncMock(return_value=_setting_value("5")) + sub = ProviderSettingsSubscriber( + config=cfg, + app_state=state, + settings_service=svc, + ) + await sub.on_settings_changed("providers", "retry_max_attempts") + assert state.model_router is old_router + + async def test_settings_service_failure_preserves_old_router(self) -> None: + """When SettingsService.get() fails, old router stays in place.""" + cfg = RootConfig(company_name="test") + state = _make_state(cfg) + old_router = state.model_router + + svc = AsyncMock() + svc.get = AsyncMock(side_effect=RuntimeError("db down")) + sub = ProviderSettingsSubscriber( + config=cfg, + app_state=state, + settings_service=svc, + ) + with pytest.raises(RuntimeError, match="db down"): + await sub.on_settings_changed("providers", "routing_strategy") + assert state.model_router is old_router diff --git a/tests/unit/settings/test_subscriber_protocol.py b/tests/unit/settings/test_subscriber_protocol.py new file mode 100644 index 0000000000..194ccc0b19 --- /dev/null +++ b/tests/unit/settings/test_subscriber_protocol.py @@ -0,0 +1,69 @@ +"""Tests for SettingsSubscriber protocol.""" + +import pytest + +from synthorg.settings.subscriber import SettingsSubscriber + + +class _ConformingSubscriber: + """Minimal class that satisfies the SettingsSubscriber protocol.""" + + @property + def watched_keys(self) -> frozenset[tuple[str, str]]: + return frozenset({("ns", "key")}) + + @property + def subscriber_name(self) -> str: + return "test-subscriber" + + async def on_settings_changed( + self, + namespace: str, + key: str, + ) -> None: + pass + + +class _MissingMethod: + """Missing on_settings_changed method.""" + + @property + def watched_keys(self) -> frozenset[tuple[str, str]]: + return frozenset() + + @property + def subscriber_name(self) -> str: + return "broken" + + +@pytest.mark.unit +class TestSettingsSubscriberProtocol: + """SettingsSubscriber protocol conformance.""" + + def test_runtime_checkable(self) -> None: + """Protocol is runtime-checkable via isinstance.""" + sub = _ConformingSubscriber() + assert isinstance(sub, SettingsSubscriber) + + def test_non_conforming_fails_isinstance(self) -> None: + """Class missing on_settings_changed is not a subscriber.""" + broken = _MissingMethod() + assert not isinstance(broken, SettingsSubscriber) + + def test_watched_keys_returns_frozenset(self) -> None: + """watched_keys returns a frozenset of (namespace, key) tuples.""" + sub = _ConformingSubscriber() + keys = sub.watched_keys + assert isinstance(keys, frozenset) + assert ("ns", "key") in keys + + def test_subscriber_name(self) -> None: + """subscriber_name returns a string.""" + sub = _ConformingSubscriber() + assert sub.subscriber_name == "test-subscriber" + + async def test_on_settings_changed_is_async(self) -> None: + """on_settings_changed is awaitable.""" + sub = _ConformingSubscriber() + # Should not raise + await sub.on_settings_changed("ns", "key")