From 97f2e3d3133bb1209cdcf3782e6f180773132bb3 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Fri, 12 Dec 2025 17:45:17 +0530 Subject: [PATCH 01/10] Collect cosmos profile metrics --- cosmos/listeners/task_instance_listener.py | 23 ++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index 0a64c3a61a..d27ae83494 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -6,6 +6,7 @@ if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance + from cosmos import ProfileConfig from cosmos import telemetry from cosmos.constants import InvocationMode @@ -108,6 +109,28 @@ def _has_callback(task_instance: TaskInstance) -> bool: return bool(callback) +def get_profile_metrics(task_instance: TaskInstance) -> tuple[str, str, str]: + profile_config: ProfileConfig = task_instance.task.profile_config + + # Determine strategy + profile_strategy = ( + "yaml_file" + if profile_config.profiles_yml_filepath is not None + else "mapping" + ) + + # Default + profile_mapping_class = "" + + # Populate mapping class only when strategy is "mapping" + if profile_strategy == "mapping": + profile_mapping_class = str(profile_config.profile_mapping) + + # Get database or profile type + database = profile_config.get_profile_type() + + return profile_strategy, profile_mapping_class, database + def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, object]: """Build telemetry payload for task completion events.""" From c020061cceacc61ab7f4a0a340a9e1df28ca99c8 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 12 Dec 2025 12:16:08 +0000 Subject: [PATCH 02/10] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/listeners/task_instance_listener.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index d27ae83494..c90c8fba15 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -113,11 +113,7 @@ def get_profile_metrics(task_instance: TaskInstance) -> tuple[str, str, str]: profile_config: ProfileConfig = task_instance.task.profile_config # Determine strategy - profile_strategy = ( - "yaml_file" - if profile_config.profiles_yml_filepath is not None - else "mapping" - ) + profile_strategy = "yaml_file" if profile_config.profiles_yml_filepath is not None else "mapping" # Default profile_mapping_class = "" From 309487da9ad9452c3b22779a375e0935c63cbb8c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 18 Dec 2025 13:23:33 +0000 Subject: [PATCH 03/10] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/listeners/task_instance_listener.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index c90c8fba15..3a98f1a2fc 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -6,6 +6,7 @@ if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance + from cosmos import ProfileConfig from cosmos import telemetry @@ -127,6 +128,7 @@ def get_profile_metrics(task_instance: TaskInstance) -> tuple[str, str, str]: return profile_strategy, profile_mapping_class, database + def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, object]: """Build telemetry payload for task completion events.""" From 63ea4c712a6f06674706bcf9c8b98d41180f20d9 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Thu, 18 Dec 2025 19:03:17 +0530 Subject: [PATCH 04/10] Add metrics --- cosmos/listeners/task_instance_listener.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index 3a98f1a2fc..2e0591e940 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -7,7 +7,7 @@ if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance - from cosmos import ProfileConfig + from cosmos.config import ProfileConfig from cosmos import telemetry from cosmos.constants import InvocationMode @@ -110,8 +110,13 @@ def _has_callback(task_instance: TaskInstance) -> bool: return bool(callback) -def get_profile_metrics(task_instance: TaskInstance) -> tuple[str, str, str]: - profile_config: ProfileConfig = task_instance.task.profile_config +def get_profile_metrics(task_instance: TaskInstance) -> tuple[None, None, None] | tuple[str, str, str]: + + task = task_instance.task + if not isinstance(task, AbstractDbtBase): + return None, None, None + + profile_config: ProfileConfig = getattr(task, "profile_config", None) # Determine strategy profile_strategy = "yaml_file" if profile_config.profiles_yml_filepath is not None else "mapping" @@ -132,6 +137,8 @@ def get_profile_metrics(task_instance: TaskInstance) -> tuple[str, str, str]: def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, object]: """Build telemetry payload for task completion events.""" + profile_strategy, profile_mapping_class, database = get_profile_metrics(task_instance) + metrics: dict[str, object] = { "dag_id": task_instance.dag_id, "task_id": task_instance.task_id, @@ -141,6 +148,9 @@ def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, o "invocation_mode": _invocation_mode(task_instance), "execution_mode": _execution_mode_from_task(task_instance), "map_index": task_instance.map_index, + "profile_strategy": profile_strategy, + "profile_mapping_class": profile_mapping_class, + "database": database, } dbt_command = _dbt_command(task_instance) From bd1344af7cf5e0d4c0538536a16ed9b611a00a5a Mon Sep 17 00:00:00 2001 From: pankajastro Date: Thu, 18 Dec 2025 19:05:04 +0530 Subject: [PATCH 05/10] Add metrics --- cosmos/listeners/task_instance_listener.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index 2e0591e940..1b99f58b2f 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -110,7 +110,7 @@ def _has_callback(task_instance: TaskInstance) -> bool: return bool(callback) -def get_profile_metrics(task_instance: TaskInstance) -> tuple[None, None, None] | tuple[str, str, str]: +def get_profile_metrics(task_instance: TaskInstance) -> tuple[str | None, str | None, str | None]: task = task_instance.task if not isinstance(task, AbstractDbtBase): @@ -122,7 +122,7 @@ def get_profile_metrics(task_instance: TaskInstance) -> tuple[None, None, None] profile_strategy = "yaml_file" if profile_config.profiles_yml_filepath is not None else "mapping" # Default - profile_mapping_class = "" + profile_mapping_class = None # Populate mapping class only when strategy is "mapping" if profile_strategy == "mapping": From 052eddaec8c1191c43ddad7fadb0b83fb89c8443 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Thu, 18 Dec 2025 19:12:17 +0530 Subject: [PATCH 06/10] Add metrics --- cosmos/listeners/task_instance_listener.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index 1b99f58b2f..c15ef7283d 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -7,7 +7,6 @@ if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance - from cosmos.config import ProfileConfig from cosmos import telemetry from cosmos.constants import InvocationMode @@ -112,11 +111,13 @@ def _has_callback(task_instance: TaskInstance) -> bool: def get_profile_metrics(task_instance: TaskInstance) -> tuple[str | None, str | None, str | None]: - task = task_instance.task - if not isinstance(task, AbstractDbtBase): + if not _is_cosmos_task(task_instance): return None, None, None - profile_config: ProfileConfig = getattr(task, "profile_config", None) + profile_config = getattr(task_instance.task, "profile_config", None) + + if not profile_config: + return None, None, None # Determine strategy profile_strategy = "yaml_file" if profile_config.profiles_yml_filepath is not None else "mapping" From 27ea0c97f02a4baf9d74bf8f1b604810ea44c3e3 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Thu, 18 Dec 2025 19:55:58 +0530 Subject: [PATCH 07/10] Add tests --- cosmos/config.py | 3 + cosmos/listeners/task_instance_listener.py | 2 +- .../listeners/test_task_instance_listener.py | 75 +++++++++++++++++++ 3 files changed, 79 insertions(+), 1 deletion(-) diff --git a/cosmos/config.py b/cosmos/config.py index 3d46ccb9b1..6ae2a527a8 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -353,6 +353,9 @@ def _get_profile_path(self, use_mock_values: bool = False) -> Path: Check if profile object version is exist then reuse it Otherwise, create profile yml for requested object and return the profile path """ + if self.profiles_yml_filepath: + return Path(self.profiles_yml_filepath) + assert self.profile_mapping # To satisfy MyPy current_profile_version = self.profile_mapping.version(self.profile_name, self.target_name, use_mock_values) cached_profile_path = get_cached_profile(current_profile_version) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index c15ef7283d..e570a50836 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -127,7 +127,7 @@ def get_profile_metrics(task_instance: TaskInstance) -> tuple[str | None, str | # Populate mapping class only when strategy is "mapping" if profile_strategy == "mapping": - profile_mapping_class = str(profile_config.profile_mapping) + profile_mapping_class = profile_config.profile_mapping.__class__.__name__ # Get database or profile type database = profile_config.get_profile_type() diff --git a/tests/listeners/test_task_instance_listener.py b/tests/listeners/test_task_instance_listener.py index bbf6de6184..779e4f445d 100644 --- a/tests/listeners/test_task_instance_listener.py +++ b/tests/listeners/test_task_instance_listener.py @@ -1,11 +1,38 @@ from __future__ import annotations +from pathlib import Path from types import SimpleNamespace from unittest.mock import patch +import pytest +from airflow.models.connection import Connection + +from cosmos import ProfileConfig from cosmos.constants import InvocationMode from cosmos.listeners import task_instance_listener from cosmos.operators.base import AbstractDbtBase +from cosmos.profiles import get_automatic_profile_mapping + +DBT_PROJECT_PROFILE = Path(__file__).parent.parent / "sample/mini/profiles.yml" + + +@pytest.fixture() +def mock_postgres_conn(): # type: ignore + """ + Sets the connection as an environment variable. + """ + conn = Connection( + conn_id="my_postgres_connection", + conn_type="postgres", + host="my_host", + login="my_user", + password="my_password", + port=5432, + schema="my_database", + ) + + with patch("cosmos.profiles.base.BaseHook.get_connection", return_value=conn): + yield conn class DummyDbtOperator(AbstractDbtBase): @@ -18,10 +45,12 @@ def __init__( install_deps: bool | None = True, callback=None, runner_callbacks=None, + profile_config=None, ) -> None: super().__init__(project_dir="/tmp") self.invocation_mode = InvocationMode.DBT_RUNNER self._task_module = module + self.profile_config = profile_config if install_deps is not None: self.install_deps = install_deps if callback is not None: @@ -82,6 +111,52 @@ def _make_task_instance(task, **overrides) -> SimpleNamespace: return SimpleNamespace(**defaults) +def test_profile_mapping_metrics(mock_postgres_conn): + profile_mapping = get_automatic_profile_mapping( + mock_postgres_conn.conn_id, + {"schema": "my_schema"}, + ) + + operator = DummyDbtOperator( + profile_config=ProfileConfig(profile_name="postgres", target_name="test", profile_mapping=profile_mapping) + ) + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["operator_name"] == "DummyDbtOperator" + assert metrics["dbt_command"] == "run" + assert metrics["install_deps"] is True + assert metrics["invocation_mode"] == InvocationMode.DBT_RUNNER.value + assert metrics["execution_mode"] == "local" + assert metrics["is_cosmos_operator_subclass"] is False + assert metrics["dag_run_id"] == "run-1" + assert metrics["profile_strategy"] == "mapping" + assert metrics["profile_mapping_class"] == "PostgresUserPasswordProfileMapping" + assert metrics["database"] == "postgres" + + +def test_profile_file_metrics(): + + operator = DummyDbtOperator( + profile_config=ProfileConfig(profiles_yml_filepath=DBT_PROJECT_PROFILE, profile_name="mini", target_name="dev") + ) + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["operator_name"] == "DummyDbtOperator" + assert metrics["dbt_command"] == "run" + assert metrics["install_deps"] is True + assert metrics["invocation_mode"] == InvocationMode.DBT_RUNNER.value + assert metrics["execution_mode"] == "local" + assert metrics["is_cosmos_operator_subclass"] is False + assert metrics["dag_run_id"] == "run-1" + assert metrics["profile_strategy"] == "yaml_file" + assert metrics["profile_mapping_class"] is None + assert metrics["database"] == "postgres" + + def test_build_task_metrics_records_core_fields(): operator = DummyDbtOperator() ti = _make_task_instance(operator) From 6636bc8f80ecac7ec0a14c7a95ddcc33daf185d0 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Thu, 18 Dec 2025 20:05:54 +0530 Subject: [PATCH 08/10] Update cosmos/listeners/task_instance_listener.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- cosmos/listeners/task_instance_listener.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index e570a50836..5720452d45 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -129,8 +129,12 @@ def get_profile_metrics(task_instance: TaskInstance) -> tuple[str | None, str | if profile_strategy == "mapping": profile_mapping_class = profile_config.profile_mapping.__class__.__name__ - # Get database or profile type - database = profile_config.get_profile_type() + # Get database or profile type, but don't let telemetry failures break tasks + try: + database = profile_config.get_profile_type() + except Exception as exc: + logger.debug("Failed to get profile type from profile_config: %s", exc) + database = None return profile_strategy, profile_mapping_class, database From 50dc035b742c3752097fe9a211d6b78ee02b091f Mon Sep 17 00:00:00 2001 From: pankajastro Date: Thu, 18 Dec 2025 20:07:45 +0530 Subject: [PATCH 09/10] Add docstring --- cosmos/listeners/task_instance_listener.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index 5720452d45..867920be4a 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -110,6 +110,7 @@ def _has_callback(task_instance: TaskInstance) -> bool: def get_profile_metrics(task_instance: TaskInstance) -> tuple[str | None, str | None, str | None]: + """Extract dbt profile-related telemetry metrics for a Cosmos-powered task.""" if not _is_cosmos_task(task_instance): return None, None, None From 2194b4ff145750df8a8450122270448577329d16 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Thu, 18 Dec 2025 20:12:56 +0530 Subject: [PATCH 10/10] Apply suggestion from @pankajastro --- cosmos/listeners/task_instance_listener.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index 469513a301..3263e2dee9 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -153,7 +153,6 @@ def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, o "is_cosmos_operator_subclass": _is_cosmos_subclass(task_instance), "invocation_mode": _invocation_mode(task_instance), "execution_mode": _execution_mode_from_task(task_instance), - "map_index": task_instance.map_index, "profile_strategy": profile_strategy, "profile_mapping_class": profile_mapping_class, "database": database,