diff --git a/docs/api/observability.md b/docs/api/observability.md index 97069de72b..2b0a6a0168 100644 --- a/docs/api/observability.md +++ b/docs/api/observability.md @@ -34,6 +34,10 @@ Structured logging, event constants, correlation tracking, and log sinks. ::: synthorg.observability.sinks +## Sink Config Builder + +::: synthorg.observability.sink_config_builder + ## Enums ::: synthorg.observability.enums diff --git a/docs/design/operations.md b/docs/design/operations.md index ffa1ad4061..eb89b96289 100644 --- a/docs/design/operations.md +++ b/docs/design/operations.md @@ -1398,12 +1398,21 @@ the console stream for `docker logs` access. ### Runtime Settings -Two observability settings are runtime-editable via `SettingsService`: +Four observability settings are runtime-editable via `SettingsService`: - `root_log_level` (enum: debug/info/warning/error/critical) -- changes the root logger level - `enable_correlation` (boolean) -- toggles correlation ID injection +- `sink_overrides` (JSON) -- per-sink overrides keyed by sink identifier (`__console__` for the + console sink, file path for file sinks). Each value is an object with optional fields: + `enabled` (bool), `level` (string), `json_format` (bool), `rotation` (object with `max_bytes`, + `backup_count`, `strategy`). The console sink cannot be disabled (`enabled: false` is rejected). +- `custom_sinks` (JSON) -- additional file sinks as a JSON array. Each entry requires `file_path` + and optionally: `level` (default info), `json_format` (default true), `rotation` (object), + `routing_prefixes` (array of logger name prefix strings for targeted routing). Console sink level can also be overridden via `SYNTHORG_LOG_LEVEL` env var. -Full sink CRUD via SettingsService (add/remove/reconfigure sinks at runtime) is planned as a -future enhancement. +Changes take effect without restart -- the `ObservabilitySettingsSubscriber` rebuilds the entire +logging pipeline via `configure_logging()` (idempotent) when any of the four observability +settings change (`root_log_level`, `enable_correlation`, `sink_overrides`, or `custom_sinks`). +Custom sink file paths cannot collide with default sink paths (reserved even if disabled). diff --git a/src/synthorg/api/app.py b/src/synthorg/api/app.py index f91a4509b1..a8d1d14fe7 100644 --- a/src/synthorg/api/app.py +++ b/src/synthorg/api/app.py @@ -86,6 +86,7 @@ from synthorg.settings.subscribers import ( BackupSettingsSubscriber, MemorySettingsSubscriber, + ObservabilitySettingsSubscriber, ProviderSettingsSubscriber, ) from synthorg.tools.invocation_tracker import ToolInvocationTracker # noqa: TC001 @@ -399,25 +400,10 @@ async def on_shutdown() -> None: return [on_startup], [on_shutdown] -# ── 2-Phase Initialisation ──────────────────────────────────────── -# -# Phase 1 (construct): Litestar bakes middleware, CORS, and routes -# into the app at construction time -- these read directly from -# RootConfig and are immutable after construction. Bootstrap-only -# settings (server_host, server_port, api_prefix, cors_allowed_origins, -# rate_limit_exclude_paths, auth_exclude_paths) are therefore NOT -# resolved through SettingsService. -# -# Phase 2 (on_startup): After persistence connects and migrations -# run, SettingsService + ConfigResolver become available. Runtime- -# editable settings (rate_limit_max_requests, rate_limit_time_unit, -# jwt_expiry_minutes, min_password_length) are resolved through -# ConfigResolver.get_api_config() by consumers that need current -# values post-startup. -# -# Note: Litestar's rate-limit middleware reads max_requests and -# time_unit at construction; runtime DB changes are visible only -# to code calling get_api_config(), not to the middleware itself. +# 2-Phase Init: Phase 1 (construct) bakes immutable middleware/CORS/routes +# from RootConfig. Phase 2 (on_startup) wires SettingsService + ConfigResolver +# for runtime-editable settings. Litestar rate-limit middleware reads config at +# construction; runtime DB changes only affect code calling get_api_config(). def _bootstrap_app_logging(effective_config: RootConfig) -> RootConfig: @@ -628,7 +614,11 @@ def create_app( # noqa: PLR0913 startup_time=time.monotonic(), ) - bridge = _build_bridge(message_bus, channels_plugin) + bridge = ( + MessageBusBridge(message_bus, channels_plugin) + if message_bus is not None + else None + ) backup_service = build_backup_service( effective_config, resolved_db_path=resolved_db_path, @@ -717,16 +707,6 @@ def create_app( # noqa: PLR0913 ) -def _build_bridge( - message_bus: MessageBus | None, - channels_plugin: ChannelsPlugin, -) -> MessageBusBridge | None: - """Create message bus bridge if bus is available.""" - if message_bus is None: - return None - return MessageBusBridge(message_bus, channels_plugin) - - def _build_settings_dispatcher( message_bus: MessageBus | None, settings_service: SettingsService | None, @@ -743,7 +723,12 @@ def _build_settings_dispatcher( settings_service=settings_service, ) memory_sub = MemorySettingsSubscriber() - subs: list[SettingsSubscriber] = [provider_sub, memory_sub] + log_dir = config.logging.log_dir if config.logging is not None else "logs" + observability_sub = ObservabilitySettingsSubscriber( + settings_service=settings_service, + log_dir=log_dir, + ) + subs: list[SettingsSubscriber] = [provider_sub, memory_sub, observability_sub] if backup_service is not None: subs.append( BackupSettingsSubscriber( diff --git a/src/synthorg/observability/events/settings.py b/src/synthorg/observability/events/settings.py index 5ada9948a4..9d2e6fac59 100644 --- a/src/synthorg/observability/events/settings.py +++ b/src/synthorg/observability/events/settings.py @@ -31,3 +31,15 @@ SETTINGS_SERVICE_SWAPPED: Final[str] = "settings.service.swapped" SETTINGS_SERVICE_SWAP_FAILED: Final[str] = "settings.service.swap_failed" SETTINGS_CHANNEL_CREATED: Final[str] = "settings.channel.created" + +# ── Observability subscriber events ────────────────────────────── + +SETTINGS_OBSERVABILITY_PIPELINE_REBUILT: Final[str] = ( + "settings.observability.pipeline_rebuilt" +) +SETTINGS_OBSERVABILITY_REBUILD_FAILED: Final[str] = ( + "settings.observability.rebuild_failed" +) +SETTINGS_OBSERVABILITY_VALIDATION_FAILED: Final[str] = ( + "settings.observability.validation_failed" +) diff --git a/src/synthorg/observability/setup.py b/src/synthorg/observability/setup.py index 829bad79ca..09a3e3ac97 100644 --- a/src/synthorg/observability/setup.py +++ b/src/synthorg/observability/setup.py @@ -8,14 +8,18 @@ import os import sys from pathlib import Path -from typing import Any +from types import MappingProxyType +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from collections.abc import Mapping import structlog from synthorg.observability.config import DEFAULT_SINKS, LogConfig, SinkConfig from synthorg.observability.enums import LogLevel, SinkType from synthorg.observability.processors import sanitize_sensitive_fields -from synthorg.observability.sinks import build_handler +from synthorg.observability.sinks import SINK_ROUTING, build_handler # Default per-logger levels applied when no config overrides are given. _DEFAULT_LOGGER_LEVELS: tuple[tuple[str, LogLevel], ...] = ( @@ -176,6 +180,8 @@ def _attach_handlers( config: LogConfig, root_logger: logging.Logger, shared_processors: list[Any], + *, + routing_overrides: Mapping[str, tuple[str, ...]] | None = None, ) -> None: """Build and attach a handler for each configured sink. @@ -187,10 +193,19 @@ def _attach_handlers( config: The logging configuration. root_logger: The stdlib root logger. shared_processors: Processor chain for the foreign pre-chain. + routing_overrides: Optional extra routing entries (e.g. from + custom sinks) merged with the default ``SINK_ROUTING``. + An empty mapping is treated the same as ``None``. Raises: RuntimeError: If a critical sink fails to initialise. """ + effective_routing: Mapping[str, tuple[str, ...]] | None = None + if routing_overrides: + merged = dict(SINK_ROUTING) + merged.update(routing_overrides) + effective_routing = MappingProxyType(merged) + log_dir = Path(config.log_dir) for sink in config.sinks: try: @@ -198,6 +213,7 @@ def _attach_handlers( sink=sink, log_dir=log_dir, foreign_pre_chain=shared_processors, + routing=effective_routing, ) root_logger.addHandler(handler) except (OSError, RuntimeError, ValueError) as exc: @@ -308,7 +324,11 @@ def _apply_console_level_override(config: LogConfig) -> LogConfig: return config.model_copy(update={"sinks": tuple(new_sinks)}) -def configure_logging(config: LogConfig | None = None) -> None: +def configure_logging( + config: LogConfig | None = None, + *, + routing_overrides: Mapping[str, tuple[str, ...]] | None = None, +) -> None: """Configure the structured logging system. Sets up structlog processor chains, stdlib handlers, and per-logger @@ -321,6 +341,9 @@ def configure_logging(config: LogConfig | None = None) -> None: Args: config: Logging configuration. When ``None``, uses sensible defaults with all standard sinks. + routing_overrides: Optional extra logger-name routing entries + (e.g. from custom sinks) merged with the default + ``SINK_ROUTING`` table. Raises: RuntimeError: If a critical sink fails to initialise. @@ -349,7 +372,12 @@ def configure_logging(config: LogConfig | None = None) -> None: _configure_structlog(enable_correlation=config.enable_correlation) # 6. Build and attach handlers for each sink - _attach_handlers(config, root_logger, shared) + _attach_handlers( + config, + root_logger, + shared, + routing_overrides=routing_overrides, + ) # 7. Tame third-party loggers (clear duplicate handlers, set defaults) _tame_third_party_loggers() diff --git a/src/synthorg/observability/sink_config_builder.py b/src/synthorg/observability/sink_config_builder.py new file mode 100644 index 0000000000..e54d275530 --- /dev/null +++ b/src/synthorg/observability/sink_config_builder.py @@ -0,0 +1,484 @@ +# ruff: noqa: TRY004 -- all type-validation paths deliberately raise ValueError +# for a consistent public API contract (callers use `except ValueError`). +"""Build a LogConfig from DEFAULT_SINKS + runtime overrides + custom sinks. + +Pure-function module that merges static defaults with runtime settings +to produce a validated :class:`LogConfig` suitable for +:func:`configure_logging`. + +The two JSON inputs come from ``SettingsService`` settings: + +- ``sink_overrides``: JSON object keyed by sink identifier + (``__console__`` for the console sink, file path for file sinks). + Each value is an object with optional fields: ``enabled``, ``level``, + ``json_format``, ``rotation``. +- ``custom_sinks``: JSON array of objects, each describing a new FILE + sink with ``file_path`` (required) and optional ``level``, + ``json_format``, ``rotation``, ``routing_prefixes``. +""" + +import json +from dataclasses import dataclass +from types import MappingProxyType +from typing import Any, cast + +from synthorg.observability import get_logger +from synthorg.observability.config import ( + DEFAULT_SINKS, + LogConfig, + RotationConfig, + SinkConfig, +) +from synthorg.observability.enums import LogLevel, RotationStrategy, SinkType + +logger = get_logger(__name__) + +_CONSOLE_ID: str = "__console__" + +# Set of file paths belonging to DEFAULT_SINKS (reserved, even if disabled). +_DEFAULT_FILE_PATHS: frozenset[str] = frozenset( + s.file_path for s in DEFAULT_SINKS if s.file_path is not None +) + +# Valid sink identifiers for overrides. +_VALID_OVERRIDE_KEYS: frozenset[str] = _DEFAULT_FILE_PATHS | {_CONSOLE_ID} + +_LEVEL_MAP: dict[str, LogLevel] = {level.value.lower(): level for level in LogLevel} + +_STRATEGY_MAP: dict[str, RotationStrategy] = { + s.value.lower(): s for s in RotationStrategy +} + +# Allowed field names for strict validation. +_OVERRIDE_FIELDS: frozenset[str] = frozenset( + {"enabled", "level", "json_format", "rotation"}, +) +_CUSTOM_SINK_FIELDS: frozenset[str] = frozenset( + {"file_path", "level", "json_format", "rotation", "routing_prefixes"}, +) +_ROTATION_FIELDS: frozenset[str] = frozenset( + {"strategy", "max_bytes", "backup_count"}, +) + +_MAX_CUSTOM_SINKS: int = 20 +_MAX_ROUTING_PREFIXES: int = 50 + + +@dataclass(frozen=True, slots=True) +class SinkBuildResult: + """Result of building a LogConfig from settings. + + Attributes: + config: The fully validated logging configuration. + routing_overrides: Custom sink routing entries keyed by + file_path, mapping to logger name prefix tuples. + """ + + config: LogConfig + routing_overrides: MappingProxyType[str, tuple[str, ...]] + + +# -- Validation helpers -------------------------------------------- + + +def _reject_unknown_fields( + fields: dict[str, Any], + allowed: frozenset[str], + context: str, +) -> None: + """Raise ValueError if *fields* contains keys not in *allowed*.""" + unknown = set(fields) - allowed + if unknown: + msg = f"Unknown fields in {context}: {sorted(unknown)}" + raise ValueError(msg) + + +def _parse_bool(raw: Any, *, field_name: str) -> bool: + """Require an actual JSON boolean. + + Raises: + ValueError: If *raw* is not a ``bool``. + """ + if not isinstance(raw, bool): + msg = f"{field_name} must be a boolean, got {type(raw).__name__}" + raise ValueError(msg) + return raw + + +# -- JSON parsing helpers ------------------------------------------ + + +def _parse_json(raw: str, label: str) -> Any: + """Parse a JSON string, raising ValueError on failure.""" + try: + return json.loads(raw) + except json.JSONDecodeError as exc: + msg = f"Invalid JSON for {label}: {exc}" + raise ValueError(msg) from exc + + +def _parse_sink_overrides(raw: str) -> dict[str, dict[str, Any]]: + """Parse and validate the ``sink_overrides`` JSON string. + + Returns: + A dict mapping sink identifiers to override dicts. + + Raises: + ValueError: On invalid JSON, wrong structure, unknown sink + identifiers, or unknown override fields. + """ + data = _parse_json(raw, "sink_overrides") + if not isinstance(data, dict): + msg = "sink_overrides must be a JSON object" + raise ValueError(msg) + + for key, value in data.items(): + if key not in _VALID_OVERRIDE_KEYS: + msg = ( + f"Unknown sink identifier in sink_overrides: {key!r}. " + f"Valid keys: {sorted(_VALID_OVERRIDE_KEYS)}" + ) + raise ValueError(msg) + if not isinstance(value, dict): + msg = ( + f"Override value for {key!r} must be a JSON object, " + f"got {type(value).__name__}" + ) + raise ValueError(msg) + _reject_unknown_fields( + value, + _OVERRIDE_FIELDS, + f"sink_overrides[{key!r}]", + ) + return data + + +def _parse_custom_sinks(raw: str) -> list[dict[str, Any]]: + """Parse and validate the ``custom_sinks`` JSON string. + + Returns: + A list of custom sink definition dicts. + + Raises: + ValueError: On invalid JSON, wrong structure, too many entries, + or unknown fields. + """ + data = _parse_json(raw, "custom_sinks") + if not isinstance(data, list): + msg = "custom_sinks must be a JSON array" + raise ValueError(msg) + + if len(data) > _MAX_CUSTOM_SINKS: + msg = f"custom_sinks exceeds maximum of {_MAX_CUSTOM_SINKS} entries" + raise ValueError(msg) + + for i, entry in enumerate(data): + if not isinstance(entry, dict): + msg = f"custom_sinks[{i}] must be a JSON object, got {type(entry).__name__}" + raise ValueError(msg) + _reject_unknown_fields( + entry, + _CUSTOM_SINK_FIELDS, + f"custom_sinks[{i}]", + ) + return data + + +# -- Level / rotation helpers -------------------------------------- + + +def _parse_level(raw: Any) -> LogLevel: + """Convert a level value to LogLevel (case-insensitive). + + Raises: + ValueError: If *raw* is not a string or not a recognized level. + """ + if not isinstance(raw, str): + msg = f"level must be a string, got {type(raw).__name__}" + raise ValueError(msg) + level = _LEVEL_MAP.get(raw.lower()) + if level is None: + valid = ", ".join(sorted(_LEVEL_MAP)) + msg = f"Invalid level {raw!r}. Valid levels: {valid}" + raise ValueError(msg) + return level + + +def _parse_rotation_override( + raw: Any, + base: RotationConfig | None, +) -> RotationConfig: + """Merge a rotation override dict into an existing RotationConfig. + + Only fields present in *raw* are overridden; others are preserved + from *base* (or defaults if base is None). + + Raises: + ValueError: If *raw* is not a dict, contains unknown fields, + or field values are invalid. + """ + if not isinstance(raw, dict): + msg = f"rotation must be a JSON object, got {type(raw).__name__}" + raise ValueError(msg) + _reject_unknown_fields(raw, _ROTATION_FIELDS, "rotation") + + base = base or RotationConfig() + updates: dict[str, Any] = {} + + if "strategy" in raw: + strategy = _STRATEGY_MAP.get(str(raw["strategy"]).lower()) + if strategy is None: + valid = ", ".join(sorted(_STRATEGY_MAP)) + msg = f"Invalid rotation strategy {raw['strategy']!r}. Valid: {valid}" + raise ValueError(msg) + updates["strategy"] = strategy + + if "max_bytes" in raw: + val = raw["max_bytes"] + if not isinstance(val, int) or isinstance(val, bool): + msg = f"Invalid max_bytes value {val!r}: must be an integer" + raise ValueError(msg) + updates["max_bytes"] = val + + if "backup_count" in raw: + val = raw["backup_count"] + if not isinstance(val, int) or isinstance(val, bool): + msg = f"Invalid backup_count value {val!r}: must be an integer" + raise ValueError(msg) + updates["backup_count"] = val + + return base.model_copy(update=updates) if updates else base + + +# -- Override application ------------------------------------------ + + +def _apply_override( + sink: SinkConfig, + override: dict[str, Any], + identifier: str, +) -> SinkConfig | None: + """Apply an override dict to a single SinkConfig. + + Returns: + The updated SinkConfig, or ``None`` if the sink is disabled. + + Raises: + ValueError: If the console sink is disabled, types are wrong, + or fields are invalid. + """ + if "enabled" in override: + enabled = _parse_bool( + override["enabled"], + field_name=f"sink_overrides[{identifier!r}].enabled", + ) + if not enabled: + if identifier == _CONSOLE_ID: + msg = ( + "Cannot disable the console sink -- at least one output must remain" + ) + raise ValueError(msg) + return None + + updates: dict[str, Any] = {} + + if "level" in override: + updates["level"] = _parse_level(override["level"]) + + if "json_format" in override: + updates["json_format"] = _parse_bool( + override["json_format"], + field_name=f"sink_overrides[{identifier!r}].json_format", + ) + + if "rotation" in override: + updates["rotation"] = _parse_rotation_override( + override["rotation"], + sink.rotation, + ) + + return sink.model_copy(update=updates) if updates else sink + + +# -- Custom sink construction -------------------------------------- + + +def _build_custom_sink( + entry: dict[str, Any], + index: int, +) -> SinkConfig: + """Construct a SinkConfig from a custom sink definition dict. + + Raises: + ValueError: If ``file_path`` is missing or fields are invalid. + """ + if "file_path" not in entry: + msg = f"custom_sinks[{index}] is missing required field 'file_path'" + raise ValueError(msg) + + raw_path = entry["file_path"] + if not isinstance(raw_path, str) or not raw_path.strip(): + msg = ( + f"custom_sinks[{index}].file_path must be a non-empty string, " + f"got {raw_path!r}" + ) + raise ValueError(msg) + file_path = raw_path + level = _parse_level(entry["level"]) if "level" in entry else LogLevel.INFO + + json_format = True + if "json_format" in entry: + json_format = _parse_bool( + entry["json_format"], + field_name=f"custom_sinks[{index}].json_format", + ) + + rotation: RotationConfig | None = None + if "rotation" in entry: + rotation = _parse_rotation_override(entry["rotation"], None) + else: + rotation = RotationConfig() + + # SinkConfig's own validator handles path safety (absolute, traversal). + return SinkConfig( + sink_type=SinkType.FILE, + level=level, + file_path=file_path, + rotation=rotation, + json_format=json_format, + ) + + +def _extract_routing( + entry: dict[str, Any], + file_path: str, +) -> tuple[str, ...] | None: + """Extract and validate routing prefixes from a custom sink entry. + + Returns: + A tuple of prefix strings, or ``None`` if no routing specified. + + Raises: + ValueError: If prefixes are invalid, not an array, or too many. + """ + raw = entry.get("routing_prefixes") + if raw is None: + return None + if not isinstance(raw, list): + msg = f"routing_prefixes for {file_path!r} must be an array" + raise ValueError(msg) + + if len(raw) > _MAX_ROUTING_PREFIXES: + msg = ( + f"routing_prefixes for {file_path!r} exceeds " + f"maximum of {_MAX_ROUTING_PREFIXES} entries" + ) + raise ValueError(msg) + + prefixes: list[str] = [] + for i, prefix in enumerate(raw): + if not isinstance(prefix, str) or not prefix.strip(): + msg = f"routing_prefixes[{i}] for {file_path!r} must be a non-empty string" + raise ValueError(msg) + prefixes.append(prefix.strip()) + + return tuple(prefixes) if prefixes else None + + +# -- Main builder -------------------------------------------------- + + +def _merge_default_sinks( + overrides: dict[str, dict[str, Any]], +) -> list[SinkConfig]: + """Apply overrides to DEFAULT_SINKS, returning the merged list.""" + merged: list[SinkConfig] = [] + for sink in DEFAULT_SINKS: + identifier = cast( + "str", + _CONSOLE_ID if sink.sink_type == SinkType.CONSOLE else sink.file_path, + ) + override = overrides.get(identifier) + if override is not None: + result = _apply_override(sink, override, identifier) + if result is not None: + merged.append(result) + else: + merged.append(sink) + return merged + + +def _process_custom_entries( + custom_entries: list[dict[str, Any]], + merged: list[SinkConfig], +) -> MappingProxyType[str, tuple[str, ...]]: + """Build custom sinks, append to *merged*, return routing overrides.""" + used_paths = _DEFAULT_FILE_PATHS # reserved even if disabled + custom_paths: set[str] = set() + routing_overrides: dict[str, tuple[str, ...]] = {} + + for i, entry in enumerate(custom_entries): + sink = _build_custom_sink(entry, i) + path = cast("str", sink.file_path) + + if path in used_paths: + msg = ( + f"custom_sinks[{i}] file_path {path!r} conflicts " + "with a default sink (reserved even if disabled)" + ) + raise ValueError(msg) + if path in custom_paths: + msg = ( + f"custom_sinks[{i}] file_path {path!r} is duplicated " + "within custom_sinks" + ) + raise ValueError(msg) + + custom_paths.add(path) + merged.append(sink) + + prefixes = _extract_routing(entry, path) + if prefixes is not None: + routing_overrides[path] = prefixes + + return MappingProxyType(routing_overrides) + + +def build_log_config_from_settings( + *, + root_level: LogLevel, + enable_correlation: bool, + sink_overrides_json: str, + custom_sinks_json: str, + log_dir: str = "logs", +) -> SinkBuildResult: + """Merge DEFAULT_SINKS with runtime overrides and custom sinks. + + Args: + root_level: Root logger level. + enable_correlation: Whether to enable correlation ID tracking. + sink_overrides_json: JSON object of per-sink overrides. + custom_sinks_json: JSON array of custom sink definitions. + log_dir: Directory for log files. + + Returns: + A :class:`SinkBuildResult` containing the validated + :class:`LogConfig` and any routing overrides for custom sinks. + + Raises: + ValueError: On invalid JSON, validation failures, or + attempts to disable the console sink. + """ + overrides = _parse_sink_overrides(sink_overrides_json) + custom_entries = _parse_custom_sinks(custom_sinks_json) + + merged = _merge_default_sinks(overrides) + routing = _process_custom_entries(custom_entries, merged) + + config = LogConfig( + root_level=root_level, + enable_correlation=enable_correlation, + sinks=tuple(merged), + log_dir=log_dir, + ) + return SinkBuildResult(config=config, routing_overrides=routing) diff --git a/src/synthorg/observability/sinks.py b/src/synthorg/observability/sinks.py index e0f12a8440..f65fe84586 100644 --- a/src/synthorg/observability/sinks.py +++ b/src/synthorg/observability/sinks.py @@ -12,6 +12,7 @@ from typing import TYPE_CHECKING, Any if TYPE_CHECKING: + from collections.abc import Mapping from pathlib import Path import structlog @@ -53,7 +54,7 @@ def emit(self, record: logging.LogRecord) -> None: # Maps sink file_path to the logger name prefixes that should be # routed to that sink. Sinks not listed here are catch-all sinks # (no name filter attached). -_SINK_ROUTING: MappingProxyType[str, tuple[str, ...]] = MappingProxyType( +SINK_ROUTING: MappingProxyType[str, tuple[str, ...]] = MappingProxyType( { "audit.log": ( "synthorg.security.", @@ -203,6 +204,8 @@ def build_handler( sink: SinkConfig, log_dir: Path, foreign_pre_chain: list[Any], + *, + routing: Mapping[str, tuple[str, ...]] | None = None, ) -> logging.Handler: """Build a stdlib logging handler from a sink configuration. @@ -214,10 +217,15 @@ def build_handler( sink: The sink configuration describing the handler to build. log_dir: Base directory for log files. foreign_pre_chain: Processor chain for stdlib-originated logs. + routing: Optional routing table to use instead of the + module-level ``SINK_ROUTING``. When ``None``, the + default routing is used. Returns: A configured :class:`logging.Handler` with formatter attached. """ + effective_routing = routing if routing is not None else SINK_ROUTING + if sink.sink_type == SinkType.CONSOLE: handler: logging.Handler = logging.StreamHandler(sys.stderr) else: @@ -226,9 +234,9 @@ def build_handler( handler.setLevel(sink.level.value) handler.setFormatter(_build_formatter(sink, foreign_pre_chain)) - if sink.file_path is not None and sink.file_path in _SINK_ROUTING: + if sink.file_path is not None and sink.file_path in effective_routing: name_filter = _LoggerNameFilter( - include_prefixes=_SINK_ROUTING[sink.file_path], + include_prefixes=effective_routing[sink.file_path], ) handler.addFilter(name_filter) diff --git a/src/synthorg/settings/definitions/observability.py b/src/synthorg/settings/definitions/observability.py index d8bef3ab0c..7c33a3face 100644 --- a/src/synthorg/settings/definitions/observability.py +++ b/src/synthorg/settings/definitions/observability.py @@ -31,3 +31,40 @@ yaml_path="logging.enable_correlation", ) ) + +_r.register( + SettingDefinition( + namespace=SettingNamespace.OBSERVABILITY, + key="sink_overrides", + type=SettingType.JSON, + default="{}", + description=( + "Per-sink overrides keyed by sink identifier " + "(__console__ or file path). Each value is an object with " + "optional fields: enabled (bool), level (string), " + "json_format (bool), rotation (object with max_bytes, " + "backup_count, strategy)" + ), + group="Sinks", + level=SettingLevel.ADVANCED, + yaml_path="logging.sink_overrides", + ) +) + +_r.register( + SettingDefinition( + namespace=SettingNamespace.OBSERVABILITY, + key="custom_sinks", + type=SettingType.JSON, + default="[]", + description=( + "Additional file sinks as JSON array. Each entry: " + "file_path (required), level (string, default info), " + "json_format (bool, default true), rotation (object), " + "routing_prefixes (array of logger name prefix strings)" + ), + group="Sinks", + level=SettingLevel.ADVANCED, + yaml_path="logging.custom_sinks", + ) +) diff --git a/src/synthorg/settings/subscribers/__init__.py b/src/synthorg/settings/subscribers/__init__.py index 4ba66e3acc..dd507cb5a8 100644 --- a/src/synthorg/settings/subscribers/__init__.py +++ b/src/synthorg/settings/subscribers/__init__.py @@ -6,6 +6,9 @@ from synthorg.settings.subscribers.memory_subscriber import ( MemorySettingsSubscriber, ) +from synthorg.settings.subscribers.observability_subscriber import ( + ObservabilitySettingsSubscriber, +) from synthorg.settings.subscribers.provider_subscriber import ( ProviderSettingsSubscriber, ) @@ -13,5 +16,6 @@ __all__ = [ "BackupSettingsSubscriber", "MemorySettingsSubscriber", + "ObservabilitySettingsSubscriber", "ProviderSettingsSubscriber", ] diff --git a/src/synthorg/settings/subscribers/observability_subscriber.py b/src/synthorg/settings/subscribers/observability_subscriber.py new file mode 100644 index 0000000000..c5b6a945af --- /dev/null +++ b/src/synthorg/settings/subscribers/observability_subscriber.py @@ -0,0 +1,229 @@ +"""Observability settings subscriber -- reconfigure log pipeline at runtime.""" + +import asyncio +import sys +from typing import TYPE_CHECKING, Any + +from synthorg.observability import get_logger +from synthorg.observability.enums import LogLevel +from synthorg.observability.events.settings import ( + SETTINGS_OBSERVABILITY_PIPELINE_REBUILT, + SETTINGS_OBSERVABILITY_REBUILD_FAILED, + SETTINGS_OBSERVABILITY_VALIDATION_FAILED, + SETTINGS_SUBSCRIBER_NOTIFIED, +) +from synthorg.observability.setup import configure_logging +from synthorg.observability.sink_config_builder import ( + SinkBuildResult, + build_log_config_from_settings, +) + +if TYPE_CHECKING: + from synthorg.settings.service import SettingsService + +logger = get_logger(__name__) + +_WATCHED: frozenset[tuple[str, str]] = frozenset( + { + ("observability", "root_log_level"), + ("observability", "enable_correlation"), + ("observability", "sink_overrides"), + ("observability", "custom_sinks"), + } +) + +_VALID_BOOL_STRINGS: frozenset[str] = frozenset({"true", "false"}) + + +class ObservabilitySettingsSubscriber: + """React to observability-namespace settings changes. + + Any watched key change triggers a full logging pipeline rebuild + via :func:`configure_logging`. The subscriber reads all current + settings, merges defaults with overrides, and reconfigures. + + On settings-read or validation failure, the existing logging + configuration is preserved. On ``configure_logging`` failure, + the pipeline may be in a degraded state because old handlers are + torn down before new ones are attached (not atomic). + + Rapid successive changes are serialized by an ``asyncio.Lock`` + so the final configuration reflects the last completed rebuild + (last-write-wins). + + Args: + settings_service: Settings service for reading current values. + log_dir: Log file directory (fixed at construction time). + """ + + def __init__( + self, + settings_service: SettingsService, + log_dir: str = "logs", + ) -> None: + self._settings_service = settings_service + self._log_dir = log_dir + self._rebuild_lock = asyncio.Lock() + + @property + def watched_keys(self) -> frozenset[tuple[str, str]]: + """Return observability-namespace keys this subscriber watches.""" + return _WATCHED + + @property + def subscriber_name(self) -> str: + """Human-readable subscriber name.""" + return "observability-settings" + + async def on_settings_changed( + self, + namespace: str, + key: str, + ) -> None: + """Handle an observability setting change. + + Acquires the rebuild lock to serialize concurrent rebuilds, + then delegates to :meth:`_rebuild_pipeline`. + + Args: + namespace: Changed setting namespace. + key: Changed setting key. + """ + if namespace != "observability": + logger.warning( + SETTINGS_SUBSCRIBER_NOTIFIED, + subscriber=self.subscriber_name, + namespace=namespace, + key=key, + note="ignored unexpected namespace", + ) + return + + async with self._rebuild_lock: + await self._rebuild_pipeline(key) + + async def _read_all_settings(self) -> tuple[Any, ...]: + """Read all 4 observability settings in parallel.""" + return await asyncio.gather( + self._settings_service.get("observability", "root_log_level"), + self._settings_service.get( + "observability", + "enable_correlation", + ), + self._settings_service.get("observability", "sink_overrides"), + self._settings_service.get("observability", "custom_sinks"), + ) + + def _parse_and_build( + self, + results: tuple[Any, ...], + key: str, + ) -> SinkBuildResult | None: + """Parse settings and build log config. ``None`` on failure.""" + root_result, corr_result, over_result, cust_result = results + + try: + root_level = LogLevel(root_result.value.upper()) + except ValueError, AttributeError: + logger.error( + SETTINGS_OBSERVABILITY_VALIDATION_FAILED, + subscriber=self.subscriber_name, + key=key, + note=f"invalid root_log_level: {root_result.value!r}", + exc_info=True, + ) + return None + + raw_corr = str(corr_result.value).strip().lower() + if raw_corr not in _VALID_BOOL_STRINGS: + logger.error( + SETTINGS_OBSERVABILITY_VALIDATION_FAILED, + subscriber=self.subscriber_name, + key=key, + note=f"invalid enable_correlation: {corr_result.value!r}", + ) + return None + enable_correlation = raw_corr == "true" + + try: + return build_log_config_from_settings( + root_level=root_level, + enable_correlation=enable_correlation, + sink_overrides_json=over_result.value, + custom_sinks_json=cust_result.value, + log_dir=self._log_dir, + ) + except MemoryError, RecursionError: + raise + except Exception: + logger.error( + SETTINGS_OBSERVABILITY_VALIDATION_FAILED, + subscriber=self.subscriber_name, + key=key, + note="invalid sink configuration -- keeping existing config", + exc_info=True, + ) + return None + + def _apply_config( + self, + build_result: SinkBuildResult, + key: str, + ) -> None: + """Call ``configure_logging`` with stderr fallback on failure.""" + routing = build_result.routing_overrides or None + try: + configure_logging( + build_result.config, + routing_overrides=routing, + ) + except MemoryError, RecursionError: + raise + except Exception: + # Pipeline may be degraded -- stderr as fallback. + sys.stderr.write( + f"WARNING: configure_logging failed during hot reload " + f"for key={key!r}; logging may be degraded\n", + ) + sys.stderr.flush() + logger.error( + SETTINGS_OBSERVABILITY_REBUILD_FAILED, + subscriber=self.subscriber_name, + key=key, + note=( + "configure_logging failed -- old pipeline was already " + "torn down; logging may be degraded" + ), + exc_info=True, + ) + return + + logger.info( + SETTINGS_OBSERVABILITY_PIPELINE_REBUILT, + subscriber=self.subscriber_name, + key=key, + sink_count=len(build_result.config.sinks), + custom_routing_count=len(build_result.routing_overrides), + ) + + async def _rebuild_pipeline(self, key: str) -> None: + """Full pipeline rebuild: read, parse, build, apply.""" + try: + results = await self._read_all_settings() + except MemoryError, RecursionError: + raise + except Exception: + logger.error( + SETTINGS_OBSERVABILITY_REBUILD_FAILED, + subscriber=self.subscriber_name, + key=key, + note="failed to read settings", + exc_info=True, + ) + return + + build_result = self._parse_and_build(results, key) + if build_result is None: + return + + self._apply_config(build_result, key) diff --git a/tests/integration/observability/test_runtime_sink_config_integration.py b/tests/integration/observability/test_runtime_sink_config_integration.py new file mode 100644 index 0000000000..14d96bb359 --- /dev/null +++ b/tests/integration/observability/test_runtime_sink_config_integration.py @@ -0,0 +1,228 @@ +"""Integration tests for runtime sink configuration (hot reload).""" + +import json +import logging +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + from pathlib import Path + +from synthorg.observability.config import DEFAULT_SINKS, LogConfig +from synthorg.observability.enums import LogLevel +from synthorg.observability.setup import configure_logging +from synthorg.observability.sink_config_builder import build_log_config_from_settings + + +def _read_log(path: Path) -> str: + """Read a log file, returning empty string if not found.""" + if path.is_file(): + return path.read_text(encoding="utf-8") + return "" + + +def _configure_defaults(log_dir: Path) -> None: + """Configure logging with all DEFAULT_SINKS.""" + config = LogConfig( + root_level=LogLevel.DEBUG, + sinks=DEFAULT_SINKS, + log_dir=str(log_dir), + ) + configure_logging(config) + + +@pytest.fixture +def log_dir(tmp_path: Path) -> Path: + """Provide a temp directory for log files.""" + return tmp_path / "logs" + + +@pytest.mark.integration +class TestHotReloadDisableSink: + """Disabling a file sink stops messages from reaching it.""" + + def test_disable_audit_sink_stops_messages(self, log_dir: Path) -> None: + _configure_defaults(log_dir) + + security_logger = logging.getLogger("synthorg.security.audit") + security_logger.info("before disable") + + content = _read_log(log_dir / "audit.log") + assert "before disable" in content + + # Hot reload: disable audit.log + overrides = json.dumps({"audit.log": {"enabled": False}}) + result = build_log_config_from_settings( + root_level=LogLevel.DEBUG, + enable_correlation=True, + sink_overrides_json=overrides, + custom_sinks_json="[]", + log_dir=str(log_dir), + ) + configure_logging(result.config) + + security_logger.info("after disable") + + content = _read_log(log_dir / "audit.log") + assert "before disable" in content + assert "after disable" not in content + + +@pytest.mark.integration +class TestHotReloadLevelChange: + """Changing a sink's level filters messages appropriately.""" + + def test_raise_level_filters_lower_messages(self, log_dir: Path) -> None: + _configure_defaults(log_dir) + + security_logger = logging.getLogger("synthorg.security.scanner") + security_logger.info("info before level change") + + content = _read_log(log_dir / "audit.log") + assert "info before level change" in content + + # Hot reload: raise audit.log to ERROR + overrides = json.dumps({"audit.log": {"level": "error"}}) + result = build_log_config_from_settings( + root_level=LogLevel.DEBUG, + enable_correlation=True, + sink_overrides_json=overrides, + custom_sinks_json="[]", + log_dir=str(log_dir), + ) + configure_logging(result.config) + + security_logger.info("info after level change") + security_logger.error("error after level change") + + content = _read_log(log_dir / "audit.log") + assert "info after level change" not in content + assert "error after level change" in content + + +@pytest.mark.integration +class TestHotReloadAddCustomSink: + """Adding a custom sink routes messages to the new file.""" + + def test_custom_sink_receives_messages(self, log_dir: Path) -> None: + _configure_defaults(log_dir) + + custom = json.dumps( + [ + { + "file_path": "custom_test.log", + "level": "debug", + } + ] + ) + result = build_log_config_from_settings( + root_level=LogLevel.DEBUG, + enable_correlation=True, + sink_overrides_json="{}", + custom_sinks_json=custom, + log_dir=str(log_dir), + ) + configure_logging(result.config) + + test_logger = logging.getLogger("synthorg.test.custom") + test_logger.info("custom sink message") + + content = _read_log(log_dir / "custom_test.log") + assert "custom sink message" in content + + def test_custom_sink_with_routing_filters(self, log_dir: Path) -> None: + custom = json.dumps( + [ + { + "file_path": "routed_custom.log", + "level": "debug", + "routing_prefixes": ["synthorg.tools."], + } + ] + ) + result = build_log_config_from_settings( + root_level=LogLevel.DEBUG, + enable_correlation=True, + sink_overrides_json="{}", + custom_sinks_json=custom, + log_dir=str(log_dir), + ) + configure_logging( + result.config, + routing_overrides=result.routing_overrides, + ) + + tools_logger = logging.getLogger("synthorg.tools.invoker") + engine_logger = logging.getLogger("synthorg.engine.run") + tools_logger.info("tool invoked") + engine_logger.info("engine event") + + content = _read_log(log_dir / "routed_custom.log") + assert "tool invoked" in content + assert "engine event" not in content + + +@pytest.mark.integration +class TestHotReloadModuleLevelLoggers: + """Module-level loggers route correctly after pipeline rebuild.""" + + def test_module_level_logger_routes_after_rebuild( + self, + log_dir: Path, + ) -> None: + # Create logger BEFORE first configure + budget_logger = logging.getLogger("synthorg.budget.tracker") + + _configure_defaults(log_dir) + budget_logger.info("first message") + + content = _read_log(log_dir / "cost_usage.log") + assert "first message" in content + + # Rebuild with modified config + overrides = json.dumps({"cost_usage.log": {"level": "error"}}) + result = build_log_config_from_settings( + root_level=LogLevel.DEBUG, + enable_correlation=True, + sink_overrides_json=overrides, + custom_sinks_json="[]", + log_dir=str(log_dir), + ) + configure_logging(result.config) + + # Same logger still works after rebuild + budget_logger.info("info after rebuild") + budget_logger.error("error after rebuild") + + content = _read_log(log_dir / "cost_usage.log") + assert "info after rebuild" not in content + assert "error after rebuild" in content + + +@pytest.mark.integration +class TestHotReloadPreservesMessages: + """Messages emitted before and after rebuild both reach their sinks.""" + + def test_messages_before_and_after_rebuild(self, log_dir: Path) -> None: + _configure_defaults(log_dir) + + main_logger = logging.getLogger("synthorg.core.test") + main_logger.info("message before rebuild") + + # Rebuild with same config (no changes) + result = build_log_config_from_settings( + root_level=LogLevel.DEBUG, + enable_correlation=True, + sink_overrides_json="{}", + custom_sinks_json="[]", + log_dir=str(log_dir), + ) + configure_logging(result.config) + + main_logger.info("message after rebuild") + + # Both messages in the catch-all synthorg.log + content = _read_log(log_dir / "synthorg.log") + assert "message before rebuild" in content + assert "message after rebuild" in content diff --git a/tests/unit/observability/test_sink_config_builder.py b/tests/unit/observability/test_sink_config_builder.py new file mode 100644 index 0000000000..3dc5e05556 --- /dev/null +++ b/tests/unit/observability/test_sink_config_builder.py @@ -0,0 +1,546 @@ +"""Tests for sink configuration builder (runtime sink overrides + custom sinks).""" + +import json + +import pytest + +from synthorg.observability.config import DEFAULT_SINKS +from synthorg.observability.enums import LogLevel, RotationStrategy, SinkType +from synthorg.observability.sink_config_builder import ( + SinkBuildResult, + build_log_config_from_settings, +) + +_DEFAULTS_COUNT = len(DEFAULT_SINKS) + + +def _build( + *, + overrides: str = "{}", + custom: str = "[]", + root_level: LogLevel = LogLevel.DEBUG, + enable_correlation: bool = True, +) -> SinkBuildResult: + """Shorthand builder with sensible defaults.""" + return build_log_config_from_settings( + root_level=root_level, + enable_correlation=enable_correlation, + sink_overrides_json=overrides, + custom_sinks_json=custom, + ) + + +# ── Empty / default behaviour ──────────────────────────────────── + + +@pytest.mark.unit +class TestEmptyOverrides: + """Empty overrides and custom sinks produce unmodified DEFAULT_SINKS.""" + + def test_empty_overrides_returns_all_default_sinks(self) -> None: + result = _build() + assert len(result.config.sinks) == _DEFAULTS_COUNT + + def test_default_sink_levels_preserved(self) -> None: + result = _build() + levels = {s.file_path or "__console__": s.level for s in result.config.sinks} + assert levels["__console__"] == LogLevel.INFO + assert levels["errors.log"] == LogLevel.ERROR + assert levels["debug.log"] == LogLevel.DEBUG + + def test_root_level_propagated(self) -> None: + result = _build(root_level=LogLevel.WARNING) + assert result.config.root_level == LogLevel.WARNING + + def test_enable_correlation_propagated(self) -> None: + result = _build(enable_correlation=False) + assert result.config.enable_correlation is False + + def test_routing_overrides_empty(self) -> None: + result = _build() + assert result.routing_overrides == {} + + +# ── Disable file sinks ─────────────────────────────────────────── + + +@pytest.mark.unit +class TestDisableSinks: + """Disabling sinks via sink_overrides.""" + + def test_disable_file_sink_removes_it(self) -> None: + overrides = json.dumps({"synthorg.log": {"enabled": False}}) + result = _build(overrides=overrides) + paths = {s.file_path for s in result.config.sinks if s.file_path} + assert "synthorg.log" not in paths + assert len(result.config.sinks) == _DEFAULTS_COUNT - 1 + + def test_disable_console_raises(self) -> None: + overrides = json.dumps({"__console__": {"enabled": False}}) + with pytest.raises(ValueError, match=r"[Cc]onsole"): + _build(overrides=overrides) + + def test_disable_all_file_sinks_leaves_console(self) -> None: + disabled = { + s.file_path: {"enabled": False} + for s in DEFAULT_SINKS + if s.sink_type == SinkType.FILE + } + result = _build(overrides=json.dumps(disabled)) + assert len(result.config.sinks) == 1 + assert result.config.sinks[0].sink_type == SinkType.CONSOLE + + def test_disable_multiple_sinks(self) -> None: + overrides = json.dumps( + { + "audit.log": {"enabled": False}, + "debug.log": {"enabled": False}, + } + ) + result = _build(overrides=overrides) + paths = {s.file_path for s in result.config.sinks if s.file_path} + assert "audit.log" not in paths + assert "debug.log" not in paths + assert len(result.config.sinks) == _DEFAULTS_COUNT - 2 + + +# ── Level overrides ────────────────────────────────────────────── + + +@pytest.mark.unit +class TestLevelOverrides: + """Per-sink log level overrides.""" + + def test_level_override_applied(self) -> None: + overrides = json.dumps({"errors.log": {"level": "debug"}}) + result = _build(overrides=overrides) + errors_sink = next( + s for s in result.config.sinks if s.file_path == "errors.log" + ) + assert errors_sink.level == LogLevel.DEBUG + + def test_console_level_override_applied(self) -> None: + overrides = json.dumps({"__console__": {"level": "error"}}) + result = _build(overrides=overrides) + console = next( + s for s in result.config.sinks if s.sink_type == SinkType.CONSOLE + ) + assert console.level == LogLevel.ERROR + + def test_invalid_level_raises(self) -> None: + overrides = json.dumps({"audit.log": {"level": "nonexistent"}}) + with pytest.raises(ValueError, match=r"[Ll]evel"): + _build(overrides=overrides) + + +# ── JSON format overrides ──────────────────────────────────────── + + +@pytest.mark.unit +class TestJsonFormatOverrides: + """Per-sink JSON format toggle.""" + + def test_json_format_override_applied(self) -> None: + overrides = json.dumps({"__console__": {"json_format": True}}) + result = _build(overrides=overrides) + console = next( + s for s in result.config.sinks if s.sink_type == SinkType.CONSOLE + ) + assert console.json_format is True + + def test_disable_json_on_file_sink(self) -> None: + overrides = json.dumps({"synthorg.log": {"json_format": False}}) + result = _build(overrides=overrides) + main_sink = next( + s for s in result.config.sinks if s.file_path == "synthorg.log" + ) + assert main_sink.json_format is False + + +# ── Rotation overrides ─────────────────────────────────────────── + + +@pytest.mark.unit +class TestRotationOverrides: + """Per-sink rotation configuration overrides.""" + + def test_rotation_max_bytes_override(self) -> None: + overrides = json.dumps( + { + "audit.log": {"rotation": {"max_bytes": 20_971_520}}, + } + ) + result = _build(overrides=overrides) + audit = next(s for s in result.config.sinks if s.file_path == "audit.log") + assert audit.rotation is not None + assert audit.rotation.max_bytes == 20_971_520 + # backup_count preserved from default + assert audit.rotation.backup_count == 5 + + def test_rotation_backup_count_override(self) -> None: + overrides = json.dumps( + { + "synthorg.log": {"rotation": {"backup_count": 10}}, + } + ) + result = _build(overrides=overrides) + main = next(s for s in result.config.sinks if s.file_path == "synthorg.log") + assert main.rotation is not None + assert main.rotation.backup_count == 10 + + def test_rotation_strategy_override(self) -> None: + overrides = json.dumps( + { + "debug.log": {"rotation": {"strategy": "external"}}, + } + ) + result = _build(overrides=overrides) + debug = next(s for s in result.config.sinks if s.file_path == "debug.log") + assert debug.rotation is not None + assert debug.rotation.strategy == RotationStrategy.EXTERNAL + + def test_invalid_rotation_strategy_raises(self) -> None: + overrides = json.dumps({"audit.log": {"rotation": {"strategy": "daily"}}}) + with pytest.raises(ValueError, match=r"[Ss]trategy"): + _build(overrides=overrides) + + +# ── Validation ─────────────────────────────────────────────────── + + +@pytest.mark.unit +class TestOverrideValidation: + """Validation of sink_overrides JSON structure.""" + + def test_unknown_sink_identifier_raises(self) -> None: + overrides = json.dumps({"nonexistent.log": {"level": "info"}}) + with pytest.raises(ValueError, match=r"nonexistent\.log"): + _build(overrides=overrides) + + def test_invalid_json_raises(self) -> None: + with pytest.raises(ValueError, match=r"[Jj]SON"): + _build(overrides="not-json") + + def test_non_object_top_level_raises(self) -> None: + with pytest.raises(ValueError, match=r"[Oo]bject"): + _build(overrides="[]") + + def test_override_value_must_be_object(self) -> None: + overrides = json.dumps({"audit.log": "not-an-object"}) + with pytest.raises(ValueError, match=r"[Oo]bject"): + _build(overrides=overrides) + + +# ── Custom sinks ───────────────────────────────────────────────── + + +@pytest.mark.unit +class TestCustomSinks: + """Adding custom file sinks via custom_sinks JSON.""" + + def test_custom_sink_added(self) -> None: + custom = json.dumps([{"file_path": "my_custom.log"}]) + result = _build(custom=custom) + assert len(result.config.sinks) == _DEFAULTS_COUNT + 1 + custom_sink = next( + s for s in result.config.sinks if s.file_path == "my_custom.log" + ) + assert custom_sink.sink_type == SinkType.FILE + assert custom_sink.level == LogLevel.INFO + assert custom_sink.json_format is True + assert custom_sink.rotation is not None + + def test_custom_sink_with_level(self) -> None: + custom = json.dumps( + [ + { + "file_path": "debug_custom.log", + "level": "debug", + } + ] + ) + result = _build(custom=custom) + sink = next(s for s in result.config.sinks if s.file_path == "debug_custom.log") + assert sink.level == LogLevel.DEBUG + + def test_custom_sink_with_routing_prefixes(self) -> None: + custom = json.dumps( + [ + { + "file_path": "custom_routed.log", + "routing_prefixes": ["synthorg.tools.", "synthorg.memory."], + } + ] + ) + result = _build(custom=custom) + assert "custom_routed.log" in result.routing_overrides + assert result.routing_overrides["custom_routed.log"] == ( + "synthorg.tools.", + "synthorg.memory.", + ) + + def test_custom_sink_duplicate_path_with_default_raises(self) -> None: + custom = json.dumps([{"file_path": "audit.log"}]) + with pytest.raises(ValueError, match=r"audit\.log"): + _build(custom=custom) + + def test_custom_sink_duplicate_path_between_custom_raises(self) -> None: + custom = json.dumps( + [ + {"file_path": "dup.log"}, + {"file_path": "dup.log"}, + ] + ) + with pytest.raises(ValueError, match=r"dup\.log"): + _build(custom=custom) + + def test_custom_sink_path_traversal_raises(self) -> None: + custom = json.dumps([{"file_path": "../evil.log"}]) + with pytest.raises(ValueError, match="\\.\\."): + _build(custom=custom) + + def test_custom_sink_absolute_path_raises(self) -> None: + custom = json.dumps([{"file_path": "/var/log/evil.log"}]) + with pytest.raises(ValueError, match=r"[Rr]elative|[Aa]bsolute"): + _build(custom=custom) + + def test_custom_sink_missing_file_path_raises(self) -> None: + custom = json.dumps([{"level": "info"}]) + with pytest.raises(ValueError, match="file_path"): + _build(custom=custom) + + def test_custom_sink_empty_routing_prefixes_ignored(self) -> None: + custom = json.dumps( + [ + { + "file_path": "no_route.log", + "routing_prefixes": [], + } + ] + ) + result = _build(custom=custom) + assert "no_route.log" not in result.routing_overrides + + +# ── Custom sinks JSON validation ───────────────────────────────── + + +@pytest.mark.unit +class TestCustomSinksValidation: + """Validation of custom_sinks JSON structure.""" + + def test_invalid_json_raises(self) -> None: + with pytest.raises(ValueError, match=r"[Jj]SON"): + _build(custom="not-json") + + def test_non_array_top_level_raises(self) -> None: + with pytest.raises(ValueError, match=r"[Aa]rray"): + _build(custom="{}") + + def test_non_object_entry_raises(self) -> None: + with pytest.raises(ValueError, match=r"[Oo]bject"): + _build(custom='["not-an-object"]') + + def test_invalid_routing_prefix_raises(self) -> None: + custom = json.dumps( + [ + { + "file_path": "bad_route.log", + "routing_prefixes": [""], + } + ] + ) + with pytest.raises(ValueError, match=r"[Pp]refix"): + _build(custom=custom) + + def test_routing_prefixes_non_array_raises(self) -> None: + custom = json.dumps( + [{"file_path": "x.log", "routing_prefixes": "not-an-array"}] + ) + with pytest.raises(ValueError, match=r"[Aa]rray"): + _build(custom=custom) + + def test_non_string_file_path_raises(self) -> None: + custom = json.dumps([{"file_path": 123}]) + with pytest.raises(ValueError, match=r"non-empty string"): + _build(custom=custom) + + def test_null_file_path_raises(self) -> None: + custom = json.dumps([{"file_path": None}]) + with pytest.raises(ValueError, match=r"non-empty string"): + _build(custom=custom) + + +# ── Combined overrides + custom ────────────────────────────────── + + +@pytest.mark.unit +class TestCombined: + """Overrides and custom sinks combined.""" + + def test_disable_sink_and_add_custom(self) -> None: + overrides = json.dumps({"debug.log": {"enabled": False}}) + custom = json.dumps([{"file_path": "my_debug.log", "level": "debug"}]) + result = _build(overrides=overrides, custom=custom) + paths = {s.file_path for s in result.config.sinks if s.file_path} + assert "debug.log" not in paths + assert "my_debug.log" in paths + # Same count: removed one, added one + assert len(result.config.sinks) == _DEFAULTS_COUNT + + def test_custom_sink_cannot_reuse_disabled_default_path(self) -> None: + """Even if a default sink is disabled, its path is reserved.""" + overrides = json.dumps({"audit.log": {"enabled": False}}) + custom = json.dumps([{"file_path": "audit.log"}]) + with pytest.raises(ValueError, match=r"audit\.log"): + _build(overrides=overrides, custom=custom) + + def test_multiple_fields_overridden_simultaneously(self) -> None: + overrides = json.dumps( + { + "synthorg.log": { + "level": "warning", + "json_format": False, + "rotation": {"max_bytes": 20_000_000}, + }, + } + ) + result = _build(overrides=overrides) + sink = next(s for s in result.config.sinks if s.file_path == "synthorg.log") + assert sink.level == LogLevel.WARNING + assert sink.json_format is False + assert sink.rotation is not None + assert sink.rotation.max_bytes == 20_000_000 + + def test_custom_sink_with_rotation_override(self) -> None: + custom = json.dumps( + [ + { + "file_path": "large.log", + "rotation": {"max_bytes": 50_000_000, "backup_count": 3}, + } + ] + ) + result = _build(custom=custom) + sink = next(s for s in result.config.sinks if s.file_path == "large.log") + assert sink.rotation is not None + assert sink.rotation.max_bytes == 50_000_000 + assert sink.rotation.backup_count == 3 + + def test_custom_log_dir_propagated(self) -> None: + result = build_log_config_from_settings( + root_level=LogLevel.DEBUG, + enable_correlation=True, + sink_overrides_json="{}", + custom_sinks_json="[]", + log_dir="custom_logs", + ) + assert result.config.log_dir == "custom_logs" + + +# -- Strict type validation ---------------------------------------- + + +@pytest.mark.unit +class TestStrictTypeValidation: + """Strict type checks for boolean and level fields.""" + + def test_enabled_string_false_raises(self) -> None: + overrides = json.dumps({"audit.log": {"enabled": "false"}}) + with pytest.raises(ValueError, match=r"boolean"): + _build(overrides=overrides) + + def test_enabled_int_zero_raises(self) -> None: + overrides = json.dumps({"audit.log": {"enabled": 0}}) + with pytest.raises(ValueError, match=r"boolean"): + _build(overrides=overrides) + + def test_json_format_string_false_raises(self) -> None: + overrides = json.dumps({"synthorg.log": {"json_format": "false"}}) + with pytest.raises(ValueError, match=r"boolean"): + _build(overrides=overrides) + + def test_custom_sink_json_format_string_raises(self) -> None: + custom = json.dumps([{"file_path": "x.log", "json_format": "true"}]) + with pytest.raises(ValueError, match=r"boolean"): + _build(custom=custom) + + def test_level_null_raises(self) -> None: + overrides = json.dumps({"audit.log": {"level": None}}) + with pytest.raises(ValueError, match=r"string"): + _build(overrides=overrides) + + def test_level_number_raises(self) -> None: + overrides = json.dumps({"audit.log": {"level": 42}}) + with pytest.raises(ValueError, match=r"string"): + _build(overrides=overrides) + + def test_rotation_non_object_raises(self) -> None: + overrides = json.dumps({"audit.log": {"rotation": "disabled"}}) + with pytest.raises(ValueError, match=r"[Oo]bject"): + _build(overrides=overrides) + + def test_rotation_array_raises(self) -> None: + overrides = json.dumps({"audit.log": {"rotation": []}}) + with pytest.raises(ValueError, match=r"[Oo]bject"): + _build(overrides=overrides) + + def test_invalid_max_bytes_raises(self) -> None: + overrides = json.dumps( + {"audit.log": {"rotation": {"max_bytes": "not-a-number"}}}, + ) + with pytest.raises(ValueError, match=r"max_bytes"): + _build(overrides=overrides) + + def test_invalid_backup_count_raises(self) -> None: + overrides = json.dumps( + {"audit.log": {"rotation": {"backup_count": None}}}, + ) + with pytest.raises(ValueError, match=r"backup_count"): + _build(overrides=overrides) + + +# -- Unknown field rejection --------------------------------------- + + +@pytest.mark.unit +class TestUnknownFieldRejection: + """Unknown fields in override/custom sink dicts are rejected.""" + + def test_unknown_override_field_raises(self) -> None: + overrides = json.dumps({"audit.log": {"levle": "debug"}}) + with pytest.raises(ValueError, match=r"Unknown fields"): + _build(overrides=overrides) + + def test_unknown_custom_sink_field_raises(self) -> None: + custom = json.dumps([{"file_path": "x.log", "routing_prefix": ["a."]}]) + with pytest.raises(ValueError, match=r"Unknown fields"): + _build(custom=custom) + + def test_unknown_rotation_field_raises(self) -> None: + overrides = json.dumps( + {"audit.log": {"rotation": {"max_size": 1000}}}, + ) + with pytest.raises(ValueError, match=r"Unknown fields"): + _build(overrides=overrides) + + +# -- Limits -------------------------------------------------------- + + +@pytest.mark.unit +class TestLimits: + """Caps on custom sinks and routing prefixes.""" + + def test_too_many_custom_sinks_raises(self) -> None: + sinks = [{"file_path": f"sink_{i}.log"} for i in range(21)] + with pytest.raises(ValueError, match=r"exceeds maximum"): + _build(custom=json.dumps(sinks)) + + def test_too_many_routing_prefixes_raises(self) -> None: + prefixes = [f"synthorg.mod{i}." for i in range(51)] + custom = json.dumps( + [{"file_path": "x.log", "routing_prefixes": prefixes}], + ) + with pytest.raises(ValueError, match=r"exceeds maximum"): + _build(custom=custom) diff --git a/tests/unit/observability/test_sink_routing.py b/tests/unit/observability/test_sink_routing.py index 81f6c157e9..5273cac284 100644 --- a/tests/unit/observability/test_sink_routing.py +++ b/tests/unit/observability/test_sink_routing.py @@ -4,7 +4,7 @@ import pytest -from synthorg.observability.sinks import _SINK_ROUTING, _LoggerNameFilter +from synthorg.observability.sinks import SINK_ROUTING, _LoggerNameFilter def _make_record(name: str) -> logging.LogRecord: @@ -67,24 +67,24 @@ def test_multiple_include_prefixes(self) -> None: @pytest.mark.unit class TestSinkRoutingTable: def test_audit_routes_security(self) -> None: - assert "audit.log" in _SINK_ROUTING - assert "synthorg.security." in _SINK_ROUTING["audit.log"] + assert "audit.log" in SINK_ROUTING + assert "synthorg.security." in SINK_ROUTING["audit.log"] def test_cost_usage_routes_budget_and_providers(self) -> None: - assert "cost_usage.log" in _SINK_ROUTING - prefixes = _SINK_ROUTING["cost_usage.log"] + assert "cost_usage.log" in SINK_ROUTING + prefixes = SINK_ROUTING["cost_usage.log"] assert "synthorg.budget." in prefixes assert "synthorg.providers." in prefixes def test_agent_activity_routes_engine_and_core(self) -> None: - assert "agent_activity.log" in _SINK_ROUTING - prefixes = _SINK_ROUTING["agent_activity.log"] + assert "agent_activity.log" in SINK_ROUTING + prefixes = SINK_ROUTING["agent_activity.log"] assert "synthorg.engine." in prefixes assert "synthorg.core." in prefixes def test_access_routes_api(self) -> None: - assert "access.log" in _SINK_ROUTING - assert "synthorg.api." in _SINK_ROUTING["access.log"] + assert "access.log" in SINK_ROUTING + assert "synthorg.api." in SINK_ROUTING["access.log"] @pytest.mark.parametrize( ("sink", "prefix"), @@ -112,10 +112,10 @@ def test_access_routes_api(self) -> None: ], ) def test_sink_routes_prefix(self, sink: str, prefix: str) -> None: - assert prefix in _SINK_ROUTING[sink] + assert prefix in SINK_ROUTING[sink] def test_routing_table_has_exactly_expected_sinks(self) -> None: - assert set(_SINK_ROUTING.keys()) == { + assert set(SINK_ROUTING.keys()) == { "audit.log", "cost_usage.log", "agent_activity.log", @@ -127,4 +127,4 @@ def test_routing_table_has_exactly_expected_sinks(self) -> None: def test_catchall_sinks_not_in_routing(self) -> None: for name in ("synthorg.log", "errors.log", "debug.log"): - assert name not in _SINK_ROUTING + assert name not in SINK_ROUTING diff --git a/tests/unit/settings/test_observability_subscriber.py b/tests/unit/settings/test_observability_subscriber.py new file mode 100644 index 0000000000..7b6e14c523 --- /dev/null +++ b/tests/unit/settings/test_observability_subscriber.py @@ -0,0 +1,309 @@ +"""Tests for ObservabilitySettingsSubscriber.""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from synthorg.settings.subscriber import SettingsSubscriber +from synthorg.settings.subscribers.observability_subscriber import ( + ObservabilitySettingsSubscriber, +) + + +def _make_subscriber( + *, + root_log_level: str = "debug", + enable_correlation: str = "true", + sink_overrides: str = "{}", + custom_sinks: str = "[]", +) -> tuple[ObservabilitySettingsSubscriber, MagicMock]: + """Create a subscriber with a mock SettingsService. + + Returns: + Tuple of (subscriber, mock_settings_service). + """ + settings_service = MagicMock() + + async def _mock_get(namespace: str, key: str) -> MagicMock: + result = MagicMock() + values = { + "root_log_level": root_log_level, + "enable_correlation": enable_correlation, + "sink_overrides": sink_overrides, + "custom_sinks": custom_sinks, + } + result.value = values.get(key, "") + return result + + settings_service.get = AsyncMock(side_effect=_mock_get) + + sub = ObservabilitySettingsSubscriber( + settings_service=settings_service, + log_dir="logs", + ) + return sub, settings_service + + +# ── Protocol conformance ───────────────────────────────────────── + + +@pytest.mark.unit +class TestObservabilitySubscriberProtocol: + """ObservabilitySettingsSubscriber conforms to SettingsSubscriber.""" + + def test_isinstance_check(self) -> None: + sub, _ = _make_subscriber() + assert isinstance(sub, SettingsSubscriber) + + def test_watched_keys_returns_expected_frozenset(self) -> None: + sub, _ = _make_subscriber() + expected = frozenset( + { + ("observability", "root_log_level"), + ("observability", "enable_correlation"), + ("observability", "sink_overrides"), + ("observability", "custom_sinks"), + } + ) + assert sub.watched_keys == expected + + def test_subscriber_name(self) -> None: + sub, _ = _make_subscriber() + assert sub.subscriber_name == "observability-settings" + + +# ── Pipeline rebuild on changes ────────────────────────────────── + + +@pytest.mark.unit +class TestObservabilitySubscriberRebuild: + """on_settings_changed rebuilds the logging pipeline.""" + + @pytest.mark.parametrize( + "key", + [ + "root_log_level", + "enable_correlation", + "sink_overrides", + "custom_sinks", + ], + ) + async def test_rebuilds_pipeline_on_any_watched_key( + self, + key: str, + ) -> None: + sub, _ = _make_subscriber() + with patch( + "synthorg.settings.subscribers.observability_subscriber.configure_logging", + ) as mock_configure: + await sub.on_settings_changed("observability", key) + mock_configure.assert_called_once() + + async def test_passes_correct_root_level(self) -> None: + sub, _ = _make_subscriber(root_log_level="warning") + with patch( + "synthorg.settings.subscribers.observability_subscriber.configure_logging", + ) as mock_configure: + await sub.on_settings_changed("observability", "root_log_level") + call_kwargs = mock_configure.call_args + config = call_kwargs[0][0] + assert config.root_level.value == "WARNING" + + async def test_passes_correct_enable_correlation(self) -> None: + sub, _ = _make_subscriber(enable_correlation="false") + with patch( + "synthorg.settings.subscribers.observability_subscriber.configure_logging", + ) as mock_configure: + await sub.on_settings_changed( + "observability", + "enable_correlation", + ) + config = mock_configure.call_args[0][0] + assert config.enable_correlation is False + + async def test_passes_routing_overrides_for_custom_sinks(self) -> None: + custom = ( + '[{"file_path": "custom.log", "routing_prefixes": ["synthorg.tools."]}]' + ) + sub, _ = _make_subscriber(custom_sinks=custom) + with patch( + "synthorg.settings.subscribers.observability_subscriber.configure_logging", + ) as mock_configure: + await sub.on_settings_changed("observability", "custom_sinks") + call_kwargs = mock_configure.call_args + routing = call_kwargs[1]["routing_overrides"] + assert "custom.log" in routing + assert routing["custom.log"] == ("synthorg.tools.",) + + +# ── Error handling ─────────────────────────────────────────────── + + +@pytest.mark.unit +class TestObservabilitySubscriberErrorHandling: + """Error handling preserves existing config.""" + + async def test_settings_read_failure_preserves_config(self) -> None: + sub, settings_service = _make_subscriber() + settings_service.get = AsyncMock( + side_effect=RuntimeError("DB unavailable"), + ) + with patch( + "synthorg.settings.subscribers.observability_subscriber.configure_logging", + ) as mock_configure: + # Should not raise -- error is caught internally + await sub.on_settings_changed("observability", "sink_overrides") + mock_configure.assert_not_called() + + async def test_validation_failure_preserves_config(self) -> None: + sub, _ = _make_subscriber(sink_overrides="not-valid-json") + with patch( + "synthorg.settings.subscribers.observability_subscriber.configure_logging", + ) as mock_configure: + await sub.on_settings_changed("observability", "sink_overrides") + mock_configure.assert_not_called() + + async def test_configure_logging_failure_does_not_raise(self) -> None: + sub, _ = _make_subscriber() + with patch( + "synthorg.settings.subscribers.observability_subscriber.configure_logging", + side_effect=RuntimeError("Critical sink failed"), + ): + # Should not raise -- error is caught internally + await sub.on_settings_changed("observability", "root_log_level") + + async def test_invalid_root_level_preserves_config(self) -> None: + sub, _ = _make_subscriber(root_log_level="verbose") + with patch( + "synthorg.settings.subscribers.observability_subscriber.configure_logging", + ) as mock_configure: + await sub.on_settings_changed("observability", "root_log_level") + mock_configure.assert_not_called() + + +# ── Namespace guard ────────────────────────────────────────────── + + +@pytest.mark.unit +class TestObservabilitySubscriberNamespaceGuard: + """Ignores unexpected namespaces.""" + + async def test_ignores_unexpected_namespace(self) -> None: + sub, settings_service = _make_subscriber() + with patch( + "synthorg.settings.subscribers.observability_subscriber.configure_logging", + ) as mock_configure: + await sub.on_settings_changed("budget", "total_monthly") + mock_configure.assert_not_called() + # Should not have read any settings + settings_service.get.assert_not_awaited() + + +# ── Idempotency ────────────────────────────────────────────────── + + +@pytest.mark.unit +class TestObservabilitySubscriberIdempotency: + """Calling on_settings_changed multiple times is safe.""" + + async def test_idempotent_repeated_calls(self) -> None: + sub, _ = _make_subscriber() + with patch( + "synthorg.settings.subscribers.observability_subscriber.configure_logging", + ) as mock_configure: + await sub.on_settings_changed("observability", "sink_overrides") + await sub.on_settings_changed("observability", "sink_overrides") + assert mock_configure.call_count == 2 + + +# -- enable_correlation validation --------------------------------- + + +@pytest.mark.unit +class TestObservabilitySubscriberCorrelationValidation: + """Invalid enable_correlation values are rejected.""" + + @pytest.mark.parametrize( + "value", + ["yes", "1", "on", "banana", ""], + ) + async def test_invalid_correlation_value_preserves_config( + self, + value: str, + ) -> None: + sub, _ = _make_subscriber(enable_correlation=value) + with patch( + "synthorg.settings.subscribers.observability_subscriber.configure_logging", + ) as mock_configure: + await sub.on_settings_changed("observability", "enable_correlation") + mock_configure.assert_not_called() + + async def test_true_string_accepted(self) -> None: + sub, _ = _make_subscriber(enable_correlation="true") + with patch( + "synthorg.settings.subscribers.observability_subscriber.configure_logging", + ) as mock_configure: + await sub.on_settings_changed("observability", "enable_correlation") + config = mock_configure.call_args[0][0] + assert config.enable_correlation is True + + async def test_false_string_accepted(self) -> None: + sub, _ = _make_subscriber(enable_correlation="false") + with patch( + "synthorg.settings.subscribers.observability_subscriber.configure_logging", + ) as mock_configure: + await sub.on_settings_changed("observability", "enable_correlation") + config = mock_configure.call_args[0][0] + assert config.enable_correlation is False + + +# -- MemoryError/RecursionError re-raise --------------------------- + + +@pytest.mark.unit +class TestObservabilitySubscriberFatalErrors: + """MemoryError and RecursionError propagate through the subscriber.""" + + async def test_memory_error_from_settings_read_propagates(self) -> None: + sub, settings_service = _make_subscriber() + settings_service.get = AsyncMock(side_effect=MemoryError) + with pytest.raises(MemoryError): + await sub.on_settings_changed("observability", "sink_overrides") + + async def test_recursion_error_from_build_propagates(self) -> None: + sub, _ = _make_subscriber() + with ( + patch( + "synthorg.settings.subscribers.observability_subscriber" + ".build_log_config_from_settings", + side_effect=RecursionError, + ), + pytest.raises(RecursionError), + ): + await sub.on_settings_changed("observability", "sink_overrides") + + async def test_memory_error_from_configure_propagates(self) -> None: + sub, _ = _make_subscriber() + with ( + patch( + "synthorg.settings.subscribers.observability_subscriber" + ".configure_logging", + side_effect=MemoryError, + ), + pytest.raises(MemoryError), + ): + await sub.on_settings_changed("observability", "sink_overrides") + + +# -- Rebuild lock -------------------------------------------------- + + +@pytest.mark.unit +class TestObservabilitySubscriberLock: + """Subscriber has a rebuild lock for serialization.""" + + def test_has_rebuild_lock(self) -> None: + import asyncio + + sub, _ = _make_subscriber() + assert isinstance(sub._rebuild_lock, asyncio.Lock) diff --git a/web/public/mockServiceWorker.js b/web/public/mockServiceWorker.js index b17fcd650c..0b1bcdd4dc 100644 --- a/web/public/mockServiceWorker.js +++ b/web/public/mockServiceWorker.js @@ -21,6 +21,11 @@ addEventListener('activate', function (event) { }) addEventListener('message', async function (event) { + // Verify same-origin (CWE-020/CWE-940) + if (event.origin && event.origin !== self.location.origin) { + return + } + const clientId = Reflect.get(event.source || {}, 'id') if (!clientId || !self.clients) {