-
Notifications
You must be signed in to change notification settings - Fork 1
feat: wire all modules into observability system #97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,5 @@ | ||
| """YAML configuration loader with layered merging and validation.""" | ||
|
|
||
| import logging | ||
| import os | ||
| import re | ||
| from pathlib import Path | ||
|
|
@@ -18,8 +17,18 @@ | |
| ) | ||
| from ai_company.config.schema import RootConfig | ||
| from ai_company.config.utils import deep_merge | ||
| from ai_company.observability import get_logger | ||
| from ai_company.observability.events import ( | ||
| CONFIG_DISCOVERY_FOUND, | ||
| CONFIG_DISCOVERY_STARTED, | ||
| CONFIG_ENV_VAR_RESOLVED, | ||
| CONFIG_LOADED, | ||
| CONFIG_OVERRIDE_APPLIED, | ||
| CONFIG_PARSE_FAILED, | ||
| CONFIG_VALIDATION_FAILED, | ||
| ) | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
| logger = get_logger(__name__) | ||
|
|
||
| _ENV_VAR_PATTERN = re.compile(r"\$\{([^}:]+?)(?::-([^}]*))?\}") | ||
|
|
||
|
|
@@ -109,6 +118,12 @@ def _parse_yaml_string( | |
| line = exc.problem_mark.line + 1 | ||
| col = exc.problem_mark.column + 1 | ||
| msg = f"YAML syntax error in {source_name}: {exc}" | ||
| logger.warning( | ||
| CONFIG_PARSE_FAILED, | ||
| source=source_name, | ||
| line=line, | ||
| column=col, | ||
| ) | ||
| raise ConfigParseError( | ||
| msg, | ||
| locations=( | ||
|
|
@@ -158,8 +173,8 @@ def _walk_node( | |
| _walk_node(value_node, path, result) | ||
| else: | ||
| logger.debug( | ||
| "Skipping non-scalar YAML key: %s", | ||
| type(key_node).__name__, | ||
| "config.yaml.non_scalar_key", | ||
| key_type=type(key_node).__name__, | ||
| ) | ||
|
Comment on lines
177
to
180
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This file has a couple of log calls using bare strings for event names, which goes against the new logging conventions. Please define constants in
For example, for the first case, you could add logger.debug(
CONFIG_YAML_NON_SCALAR_KEY,
key_type=type(key_node).__name__,
)
Comment on lines
177
to
180
|
||
| elif isinstance(node, yaml.SequenceNode): | ||
| for idx, item_node in enumerate(node.value): | ||
|
|
@@ -189,9 +204,8 @@ def _build_line_map(yaml_text: str) -> dict[str, tuple[int, int]]: | |
| root = yaml.compose(yaml_text, Loader=yaml.SafeLoader) | ||
| except yaml.YAMLError as exc: | ||
| logger.warning( | ||
| "Failed to compose YAML AST for line mapping; " | ||
| "validation errors will lack line/column information: %s", | ||
| exc, | ||
| "config.line_map.compose_failed", | ||
| error=str(exc), | ||
| ) | ||
|
coderabbitai[bot] marked this conversation as resolved.
Comment on lines
208
to
211
|
||
| return {} | ||
| if root is None or not isinstance(root, yaml.MappingNode): | ||
|
|
@@ -223,6 +237,11 @@ def _validate_config_dict( | |
| try: | ||
| return RootConfig(**data) | ||
| except ValidationError as exc: | ||
| logger.warning( | ||
| CONFIG_VALIDATION_FAILED, | ||
| source=source_file, | ||
| error_count=len(exc.errors()), | ||
| ) | ||
| if line_map is None: | ||
| line_map = {} | ||
| locations: list[ConfigLocation] = [] | ||
|
|
@@ -270,6 +289,7 @@ def _resolve_env_var_match( | |
| default = match.group(2) | ||
| value = os.environ.get(var_name) | ||
| if value is not None: | ||
| logger.debug(CONFIG_ENV_VAR_RESOLVED, var_name=var_name) | ||
| return value | ||
| if default is not None: | ||
| return default | ||
|
|
@@ -354,9 +374,15 @@ def discover_config() -> Path: | |
| at any searched location. | ||
| """ | ||
| candidates = [*_CWD_CONFIG_LOCATIONS, Path.home() / _HOME_CONFIG_RELATIVE] | ||
| logger.debug( | ||
| CONFIG_DISCOVERY_STARTED, | ||
| searched_paths=[str(c) for c in candidates], | ||
| ) | ||
| for candidate in candidates: | ||
| if candidate.is_file(): | ||
| return candidate.resolve() | ||
| resolved = candidate.resolve() | ||
| logger.info(CONFIG_DISCOVERY_FOUND, config_path=str(resolved)) | ||
| return resolved | ||
|
|
||
| searched = [str(c) for c in candidates] | ||
| msg = "No configuration file found. Searched:\n" + "\n".join( | ||
|
|
@@ -426,6 +452,10 @@ def load_config( | |
| for override_path in override_paths: | ||
| override = _parse_yaml_file(Path(override_path)) | ||
| merged = deep_merge(merged, override) | ||
| logger.debug( | ||
| CONFIG_OVERRIDE_APPLIED, | ||
| override_path=str(override_path), | ||
| ) | ||
|
|
||
| # 4. Substitute environment variables on the fully merged config. | ||
| # Use a neutral label so env-var errors aren't misattributed solely | ||
|
|
@@ -436,11 +466,35 @@ def load_config( | |
| line_map = _build_line_map(yaml_text) | ||
|
|
||
| # Validate merged config | ||
| return _validate_config_dict( | ||
| result = _validate_config_dict( | ||
| merged, | ||
| source_file=str(config_path), | ||
| line_map=line_map, | ||
| ) | ||
| logger.info( | ||
| CONFIG_LOADED, | ||
| config_path=str(config_path), | ||
| override_count=len(override_paths), | ||
| ) | ||
| return result | ||
|
|
||
|
|
||
| def bootstrap_logging(config: RootConfig | None = None) -> None: | ||
| """Activate the observability pipeline after config is loaded. | ||
|
|
||
| Calls :func:`~ai_company.observability.configure_logging` with | ||
| ``config.logging``, or sensible defaults if *config* is ``None``. | ||
| Should be called **once** at startup after :func:`load_config` | ||
| returns. | ||
|
|
||
| Args: | ||
| config: Validated root configuration. When ``None``, the | ||
| logging system uses default settings. | ||
| """ | ||
| from ai_company.observability import configure_logging # noqa: PLC0415 | ||
|
|
||
| log_cfg = config.logging if config is not None else None | ||
| configure_logging(log_cfg) | ||
|
|
||
|
|
||
| def load_config_from_string( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,8 +9,6 @@ | |
| code because Python's :mod:`contextvars` is natively async-aware. | ||
| """ | ||
|
|
||
| # TODO: Add with_correlation_async() for async functions (engine/API) | ||
|
|
||
| import functools | ||
| import inspect | ||
| import uuid | ||
|
|
@@ -19,7 +17,7 @@ | |
| import structlog | ||
|
|
||
| if TYPE_CHECKING: | ||
| from collections.abc import Callable | ||
| from collections.abc import Callable, Coroutine | ||
|
|
||
|
Comment on lines
15
to
21
|
||
| _P = ParamSpec("_P") | ||
| _T = TypeVar("_T") | ||
|
|
@@ -131,8 +129,7 @@ def decorator(func: Callable[_P, _T]) -> Callable[_P, _T]: | |
| if inspect.iscoroutinefunction(func): | ||
| msg = ( | ||
| "with_correlation() does not support async functions. " | ||
| "Manually call bind_correlation_id/unbind_correlation_id " | ||
| "in a try/finally block." | ||
| "Use with_correlation_async() instead." | ||
| ) | ||
| raise TypeError(msg) | ||
|
|
||
|
|
@@ -152,3 +149,63 @@ def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T: | |
| return wrapper | ||
|
|
||
| return decorator | ||
|
|
||
|
|
||
| def with_correlation_async( | ||
| *, | ||
| request_id: str | None = None, | ||
| task_id: str | None = None, | ||
| agent_id: str | None = None, | ||
| ) -> Callable[ | ||
| [Callable[_P, Coroutine[object, object, _T]]], | ||
| Callable[_P, Coroutine[object, object, _T]], | ||
| ]: | ||
| """Decorator that binds correlation IDs for an async function's duration. | ||
|
|
||
| Correlation IDs are bound before the coroutine executes and unbound | ||
| after it returns or raises. Only non-``None`` IDs are managed. | ||
|
|
||
| Note: | ||
| This decorator is for **async** functions only. Applying it to | ||
| a synchronous function raises :exc:`TypeError`. For sync | ||
| functions use :func:`with_correlation`. | ||
|
|
||
| Args: | ||
| request_id: Request correlation identifier to bind. | ||
| task_id: Task correlation identifier to bind. | ||
| agent_id: Agent correlation identifier to bind. | ||
|
|
||
| Returns: | ||
| A decorator that manages correlation ID lifecycle for async | ||
| functions. | ||
|
|
||
| Raises: | ||
| TypeError: If the decorated function is not a coroutine function. | ||
| """ | ||
|
|
||
| def decorator( | ||
| func: Callable[_P, Coroutine[object, object, _T]], | ||
| ) -> Callable[_P, Coroutine[object, object, _T]]: | ||
| if not inspect.iscoroutinefunction(func): | ||
| msg = ( | ||
| "with_correlation_async() requires an async function. " | ||
| "Use with_correlation() for synchronous functions." | ||
| ) | ||
| raise TypeError(msg) | ||
|
|
||
| @functools.wraps(func) | ||
| async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T: | ||
| bindings: dict[str, str] = {} | ||
| if request_id is not None: | ||
| bindings["request_id"] = request_id | ||
| if task_id is not None: | ||
| bindings["task_id"] = task_id | ||
| if agent_id is not None: | ||
| bindings["agent_id"] = agent_id | ||
|
|
||
| with structlog.contextvars.bound_contextvars(**bindings): | ||
| return await func(*args, **kwargs) | ||
|
|
||
| return wrapper | ||
|
|
||
| return decorator | ||
Uh oh!
There was an error while loading. Please reload this page.