diff --git a/cosmos/config.py b/cosmos/config.py index 3d46ccb9b1..6ae2a527a8 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -353,6 +353,9 @@ def _get_profile_path(self, use_mock_values: bool = False) -> Path: Check if profile object version is exist then reuse it Otherwise, create profile yml for requested object and return the profile path """ + if self.profiles_yml_filepath: + return Path(self.profiles_yml_filepath) + assert self.profile_mapping # To satisfy MyPy current_profile_version = self.profile_mapping.version(self.profile_name, self.target_name, use_mock_values) cached_profile_path = get_cached_profile(current_profile_version) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index 75381e81cd..3263e2dee9 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -7,6 +7,7 @@ if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance + from cosmos import telemetry from cosmos.constants import InvocationMode from cosmos.log import get_logger @@ -108,9 +109,42 @@ def _has_callback(task_instance: TaskInstance) -> bool: return bool(callback) +def get_profile_metrics(task_instance: TaskInstance) -> tuple[str | None, str | None, str | None]: + """Extract dbt profile-related telemetry metrics for a Cosmos-powered task.""" + + if not _is_cosmos_task(task_instance): + return None, None, None + + profile_config = getattr(task_instance.task, "profile_config", None) + + if not profile_config: + return None, None, None + + # Determine strategy + profile_strategy = "yaml_file" if profile_config.profiles_yml_filepath is not None else "mapping" + + # Default + profile_mapping_class = None + + # Populate mapping class only when strategy is "mapping" + if profile_strategy == "mapping": + profile_mapping_class = profile_config.profile_mapping.__class__.__name__ + + # Get database or profile type, but don't let telemetry failures break tasks + try: + database = profile_config.get_profile_type() + except Exception as exc: + logger.debug("Failed to get profile type from profile_config: %s", exc) + database = None + + return profile_strategy, profile_mapping_class, database + + def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, object]: """Build telemetry payload for task completion events.""" + profile_strategy, profile_mapping_class, database = get_profile_metrics(task_instance) + metrics: dict[str, object] = { "dag_id": task_instance.dag_id, "task_id": task_instance.task_id, @@ -119,6 +153,9 @@ def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, o "is_cosmos_operator_subclass": _is_cosmos_subclass(task_instance), "invocation_mode": _invocation_mode(task_instance), "execution_mode": _execution_mode_from_task(task_instance), + "profile_strategy": profile_strategy, + "profile_mapping_class": profile_mapping_class, + "database": database, # map_index is -1 for non-mapped tasks, >= 0 for mapped tasks "is_mapped_task": task_instance.map_index >= 0, } diff --git a/tests/listeners/test_task_instance_listener.py b/tests/listeners/test_task_instance_listener.py index 3f052837eb..173a1fb7af 100644 --- a/tests/listeners/test_task_instance_listener.py +++ b/tests/listeners/test_task_instance_listener.py @@ -1,11 +1,38 @@ from __future__ import annotations +from pathlib import Path from types import SimpleNamespace from unittest.mock import patch +import pytest +from airflow.models.connection import Connection + +from cosmos import ProfileConfig from cosmos.constants import InvocationMode from cosmos.listeners import task_instance_listener from cosmos.operators.base import AbstractDbtBase +from cosmos.profiles import get_automatic_profile_mapping + +DBT_PROJECT_PROFILE = Path(__file__).parent.parent / "sample/mini/profiles.yml" + + +@pytest.fixture() +def mock_postgres_conn(): # type: ignore + """ + Sets the connection as an environment variable. + """ + conn = Connection( + conn_id="my_postgres_connection", + conn_type="postgres", + host="my_host", + login="my_user", + password="my_password", + port=5432, + schema="my_database", + ) + + with patch("cosmos.profiles.base.BaseHook.get_connection", return_value=conn): + yield conn class DummyDbtOperator(AbstractDbtBase): @@ -18,10 +45,12 @@ def __init__( install_deps: bool | None = True, callback=None, runner_callbacks=None, + profile_config=None, ) -> None: super().__init__(project_dir="/tmp") self.invocation_mode = InvocationMode.DBT_RUNNER self._task_module = module + self.profile_config = profile_config if install_deps is not None: self.install_deps = install_deps if callback is not None: @@ -82,6 +111,52 @@ def _make_task_instance(task, **overrides) -> SimpleNamespace: return SimpleNamespace(**defaults) +def test_profile_mapping_metrics(mock_postgres_conn): + profile_mapping = get_automatic_profile_mapping( + mock_postgres_conn.conn_id, + {"schema": "my_schema"}, + ) + + operator = DummyDbtOperator( + profile_config=ProfileConfig(profile_name="postgres", target_name="test", profile_mapping=profile_mapping) + ) + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["operator_name"] == "DummyDbtOperator" + assert metrics["dbt_command"] == "run" + assert metrics["install_deps"] is True + assert metrics["invocation_mode"] == InvocationMode.DBT_RUNNER.value + assert metrics["execution_mode"] == "local" + assert metrics["is_cosmos_operator_subclass"] is False + assert metrics["dag_run_id"] == "run-1" + assert metrics["profile_strategy"] == "mapping" + assert metrics["profile_mapping_class"] == "PostgresUserPasswordProfileMapping" + assert metrics["database"] == "postgres" + + +def test_profile_file_metrics(): + + operator = DummyDbtOperator( + profile_config=ProfileConfig(profiles_yml_filepath=DBT_PROJECT_PROFILE, profile_name="mini", target_name="dev") + ) + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["operator_name"] == "DummyDbtOperator" + assert metrics["dbt_command"] == "run" + assert metrics["install_deps"] is True + assert metrics["invocation_mode"] == InvocationMode.DBT_RUNNER.value + assert metrics["execution_mode"] == "local" + assert metrics["is_cosmos_operator_subclass"] is False + assert metrics["dag_run_id"] == "run-1" + assert metrics["profile_strategy"] == "yaml_file" + assert metrics["profile_mapping_class"] is None + assert metrics["database"] == "postgres" + + def test_build_task_metrics_records_core_fields(): operator = DummyDbtOperator() ti = _make_task_instance(operator)