From 6f1e929d85ac6faa0643c467013a9264ec53fe49 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 11 Dec 2025 19:13:29 +0000 Subject: [PATCH 01/15] task-instance telemetry listener --- cosmos/listeners/task_instance_listener.py | 98 ++++++++++++++++++++++ cosmos/plugin/airflow2.py | 4 +- cosmos/plugin/airflow3.py | 4 +- 3 files changed, 102 insertions(+), 4 deletions(-) create mode 100644 cosmos/listeners/task_instance_listener.py diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py new file mode 100644 index 0000000000..96506c74c0 --- /dev/null +++ b/cosmos/listeners/task_instance_listener.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from airflow.listeners import hookimpl + +if TYPE_CHECKING: + from airflow.models.taskinstance import TaskInstance + +from cosmos import telemetry +from cosmos.constants import InvocationMode +from cosmos.operators.base import AbstractDbtBase +from cosmos.log import get_logger + +logger = get_logger(__name__) + +TASK_INSTANCE_EVENT = "task_instance" + + +def _is_cosmos_task(task_instance: TaskInstance) -> bool: + """Return True if the task instance is powered by Cosmos operators.""" + + task = task_instance.task + module = getattr(task, "_task_module", None) or task.__class__.__module__ + return module.startswith("cosmos.") or isinstance(task, AbstractDbtBase) + + +def _execution_mode_from_task(task_instance: TaskInstance) -> str | None: + """Extract Cosmos execution mode from the task's module path.""" + + module = getattr(task_instance.task, "_task_module", None) or task_instance.task.__class__.__module__ + parts = module.split(".") + if len(parts) >= 3 and parts[0] == "cosmos" and parts[1] == "operators": + return parts[2] + # TODO: When users subclass Cosmos operators in external modules, encode execution mode directly on the task + # so telemetry does not rely on module inspection. + return None + + +def _invocation_mode(task_instance: TaskInstance) -> str | None: + """Return the invocation mode recorded in Cosmos operators.""" + + mode = getattr(task_instance.task, "invocation_mode", None) + if mode is None: + return None + if isinstance(mode, InvocationMode): + return mode.value + return str(mode) + + +def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, object]: + """Build telemetry payload for task completion events.""" + + metrics: dict[str, object] = { + "dag_id": task_instance.dag_id, + "task_id": task_instance.task_id, + "status": status, + "operator_name": task_instance.task.__class__.__name__, + "is_cosmos_operator_subclass": isinstance(task_instance.task, AbstractDbtBase), + "invocation_mode": _invocation_mode(task_instance), + "execution_mode": _execution_mode_from_task(task_instance), + "queue": task_instance.queue, + "priority_weight": task_instance.priority_weight, + "map_index": task_instance.map_index, + } + + dag_run = getattr(task_instance, "dag_run", None) + if dag_run is not None: + metrics["dag_run_id"] = dag_run.run_id + dag_hash = getattr(dag_run, "dag_hash", None) + if dag_hash is not None: + metrics["dag_hash"] = dag_hash + + duration = getattr(task_instance, "duration", None) + if duration is not None: + metrics["duration"] = duration + + return metrics + + +@hookimpl +def on_task_instance_success(previous_state, task_instance, session): # type: ignore[override] + if not _is_cosmos_task(task_instance): + return + + logger.debug("Telemetry task listener success for %s.%s", task_instance.dag_id, task_instance.task_id) + metrics = _build_task_metrics(task_instance, "success") + telemetry.emit_usage_metrics_if_enabled(TASK_INSTANCE_EVENT, metrics) + + +@hookimpl +def on_task_instance_failed(previous_state, task_instance, error, session): # type: ignore[override] + if not _is_cosmos_task(task_instance): + return + + logger.debug("Telemetry task listener failure for %s.%s", task_instance.dag_id, task_instance.task_id) + metrics = _build_task_metrics(task_instance, "failed") + telemetry.emit_usage_metrics_if_enabled(TASK_INSTANCE_EVENT, metrics) diff --git a/cosmos/plugin/airflow2.py b/cosmos/plugin/airflow2.py index 16f0fb62d5..9ae26c4dd6 100644 --- a/cosmos/plugin/airflow2.py +++ b/cosmos/plugin/airflow2.py @@ -10,7 +10,7 @@ from flask import abort from flask_appbuilder import AppBuilder, expose -from cosmos.listeners import dag_run_listener +from cosmos.listeners import dag_run_listener, task_instance_listener from cosmos.plugin.snippets import IFRAME_SCRIPT from cosmos.settings import dbt_docs_conn_id, dbt_docs_dir, dbt_docs_index_file_name, in_astro_cloud @@ -190,4 +190,4 @@ class CosmosPlugin(AirflowPlugin): "href": conf.get("webserver", "base_url") + "/cosmos/dbt_docs", } appbuilder_views = [item] - listeners = [dag_run_listener] + listeners = [dag_run_listener, task_instance_listener] diff --git a/cosmos/plugin/airflow3.py b/cosmos/plugin/airflow3.py index 3fe73f58e4..10f142c878 100644 --- a/cosmos/plugin/airflow3.py +++ b/cosmos/plugin/airflow3.py @@ -20,7 +20,7 @@ from packaging.version import Version from cosmos.constants import AIRFLOW_OBJECT_STORAGE_PATH_URL_SCHEMES -from cosmos.listeners import dag_run_listener +from cosmos.listeners import dag_run_listener, task_instance_listener from cosmos.plugin.snippets import IFRAME_SCRIPT # Airflow version gating: External views feature for the plugins used here (CosmosAF3Plugin) exist only in >= 3.1 @@ -265,7 +265,7 @@ class CosmosAF3Plugin(AirflowPlugin): # Register external views for navigation external_views: list[dict[str, Any]] = [] - listeners = [dag_run_listener] + listeners = [dag_run_listener, task_instance_listener] def __init__(self) -> None: super().__init__() From 7a33994e35ce8b403a9162411dd101f58dd712ff Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 11 Dec 2025 20:03:43 +0000 Subject: [PATCH 02/15] Refine cosmos subclass detection in task listener --- cosmos/listeners/task_instance_listener.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index 96506c74c0..33f0c719fe 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -21,14 +21,14 @@ def _is_cosmos_task(task_instance: TaskInstance) -> bool: """Return True if the task instance is powered by Cosmos operators.""" task = task_instance.task - module = getattr(task, "_task_module", None) or task.__class__.__module__ + module = _operator_module(task_instance) return module.startswith("cosmos.") or isinstance(task, AbstractDbtBase) def _execution_mode_from_task(task_instance: TaskInstance) -> str | None: """Extract Cosmos execution mode from the task's module path.""" - module = getattr(task_instance.task, "_task_module", None) or task_instance.task.__class__.__module__ + module = _operator_module(task_instance) parts = module.split(".") if len(parts) >= 3 and parts[0] == "cosmos" and parts[1] == "operators": return parts[2] @@ -37,6 +37,18 @@ def _execution_mode_from_task(task_instance: TaskInstance) -> str | None: return None +def _operator_module(task_instance: TaskInstance) -> str: + """Return the module path for the operator backing the given task instance.""" + + return getattr(task_instance.task, "_task_module", None) or task_instance.task.__class__.__module__ + + +def _is_cosmos_subclass(task_instance: TaskInstance) -> bool: + """Return True when the task is a custom subclass extending Cosmos operators.""" + + return isinstance(task_instance.task, AbstractDbtBase) and not _operator_module(task_instance).startswith("cosmos.") + + def _invocation_mode(task_instance: TaskInstance) -> str | None: """Return the invocation mode recorded in Cosmos operators.""" @@ -56,7 +68,7 @@ def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, o "task_id": task_instance.task_id, "status": status, "operator_name": task_instance.task.__class__.__name__, - "is_cosmos_operator_subclass": isinstance(task_instance.task, AbstractDbtBase), + "is_cosmos_operator_subclass": _is_cosmos_subclass(task_instance), "invocation_mode": _invocation_mode(task_instance), "execution_mode": _execution_mode_from_task(task_instance), "queue": task_instance.queue, From 7bf4072a75147fada74c961ec7cdf2b40df49877 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 11 Dec 2025 20:27:56 +0000 Subject: [PATCH 03/15] Add dbt command to task telemetry metrics --- cosmos/listeners/task_instance_listener.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index 33f0c719fe..826b39ebbc 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -60,6 +60,23 @@ def _invocation_mode(task_instance: TaskInstance) -> str | None: return str(mode) +def _dbt_command(task_instance: TaskInstance) -> str | None: + """Return the dbt sub-command encoded on Cosmos operators.""" + + task = task_instance.task + if not isinstance(task, AbstractDbtBase): + return None + + command = getattr(task, "base_cmd", None) + if command is None: + return None + + if isinstance(command, (list, tuple)): + return " ".join(str(part) for part in command if part is not None) + + return str(command) + + def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, object]: """Build telemetry payload for task completion events.""" @@ -76,6 +93,10 @@ def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, o "map_index": task_instance.map_index, } + dbt_command = _dbt_command(task_instance) + if dbt_command: + metrics["dbt_command"] = dbt_command + dag_run = getattr(task_instance, "dag_run", None) if dag_run is not None: metrics["dag_run_id"] = dag_run.run_id From d1153c3ff0c24e9fd3c716ab64e1efce14a0a43b Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 11 Dec 2025 20:59:18 +0000 Subject: [PATCH 04/15] Add install_deps flag to task telemetry metrics --- cosmos/listeners/task_instance_listener.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index 826b39ebbc..dfc4ab688f 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -77,6 +77,20 @@ def _dbt_command(task_instance: TaskInstance) -> str | None: return str(command) +def _install_deps(task_instance: TaskInstance) -> bool | None: + """Return the effective install_deps flag when available.""" + + task = task_instance.task + if not isinstance(task, AbstractDbtBase): + return None + + install_deps = getattr(task, "install_deps", None) + if install_deps is None: + return None + + return bool(install_deps) + + def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, object]: """Build telemetry payload for task completion events.""" @@ -97,6 +111,10 @@ def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, o if dbt_command: metrics["dbt_command"] = dbt_command + install_deps = _install_deps(task_instance) + if install_deps is not None: + metrics["install_deps"] = install_deps + dag_run = getattr(task_instance, "dag_run", None) if dag_run is not None: metrics["dag_run_id"] = dag_run.run_id From ae61d97d877817608b4db932d306195891fd965b Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 12 Dec 2025 08:23:44 +0000 Subject: [PATCH 05/15] Add tests for task telemetry metrics --- .../listeners/test_task_instance_listener.py | 132 ++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 tests/listeners/test_task_instance_listener.py diff --git a/tests/listeners/test_task_instance_listener.py b/tests/listeners/test_task_instance_listener.py new file mode 100644 index 0000000000..9fc28f3d3b --- /dev/null +++ b/tests/listeners/test_task_instance_listener.py @@ -0,0 +1,132 @@ +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import patch + +from cosmos.constants import InvocationMode +from cosmos.listeners import task_instance_listener +from cosmos.operators.base import AbstractDbtBase + + +class DummyDbtOperator(AbstractDbtBase): + base_cmd = ["run"] + + def __init__(self, *, module: str = "cosmos.operators.local.fake", install_deps: bool | None = True) -> None: + super().__init__(project_dir="/tmp") + self.invocation_mode = InvocationMode.DBT_RUNNER + self._task_module = module + if install_deps is not None: + self.install_deps = install_deps + + def build_and_run_cmd(self, context, cmd_flags, run_as_async=False, async_context=None, **kwargs): # pragma: no cover + return None + + +class DummyDbtOperatorNoDeps(DummyDbtOperator): + base_cmd = ["seed"] + + def __init__(self) -> None: + super().__init__(module="cosmos.operators.kubernetes.fake", install_deps=None) + self.invocation_mode = InvocationMode.SUBPROCESS + if hasattr(self, "install_deps"): + delattr(self, "install_deps") + + +class CustomDbtSubclass(DummyDbtOperator): + def __init__(self) -> None: + super().__init__(module="custom.pipeline.dummy") + + +class NonCosmosOperator: + __module__ = "airflow.operators.bash" + + def __init__(self) -> None: + self._task_module = "airflow.operators.bash" + + +def _make_task_instance(task, **overrides) -> SimpleNamespace: + defaults = dict( + dag_id="example_dag", + task_id="example_task", + task=task, + queue="default", + priority_weight=5, + map_index=-1, + dag_run=SimpleNamespace(run_id="run-1", dag_hash="hash-123"), + duration=7.0, + ) + defaults.update(overrides) + return SimpleNamespace(**defaults) + + +def test_build_task_metrics_records_core_fields(): + operator = DummyDbtOperator() + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["operator_name"] == "DummyDbtOperator" + assert metrics["dbt_command"] == "run" + assert metrics["install_deps"] is True + assert metrics["invocation_mode"] == InvocationMode.DBT_RUNNER.value + assert metrics["execution_mode"] == "local" + assert metrics["is_cosmos_operator_subclass"] is False + assert metrics["dag_run_id"] == "run-1" + assert metrics["dag_hash"] == "hash-123" + + +def test_build_task_metrics_ignores_missing_install_deps(): + operator = DummyDbtOperatorNoDeps() + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="failed") + + assert metrics["dbt_command"] == "seed" + assert "install_deps" not in metrics + assert metrics["execution_mode"] == "kubernetes" + + +def test_build_task_metrics_marks_custom_subclasses(): + operator = CustomDbtSubclass() + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["is_cosmos_operator_subclass"] is True + assert metrics["execution_mode"] is None + + +@patch("cosmos.listeners.task_instance_listener.telemetry.emit_usage_metrics_if_enabled") +def test_on_task_instance_success_emits_for_cosmos_task(mock_emit): + operator = DummyDbtOperator() + ti = _make_task_instance(operator) + + task_instance_listener.on_task_instance_success(None, ti, None) + + mock_emit.assert_called_once() + args, _ = mock_emit.call_args + assert args[0] == task_instance_listener.TASK_INSTANCE_EVENT + assert args[1]["status"] == "success" + assert args[1]["dbt_command"] == "run" + + +@patch("cosmos.listeners.task_instance_listener.telemetry.emit_usage_metrics_if_enabled") +def test_on_task_instance_failed_emits_failed_status(mock_emit): + operator = DummyDbtOperator() + ti = _make_task_instance(operator) + + task_instance_listener.on_task_instance_failed(None, ti, RuntimeError("boom"), None) + + mock_emit.assert_called_once() + args, _ = mock_emit.call_args + assert args[0] == task_instance_listener.TASK_INSTANCE_EVENT + assert args[1]["status"] == "failed" + + +@patch("cosmos.listeners.task_instance_listener.telemetry.emit_usage_metrics_if_enabled") +def test_on_task_instance_success_skips_non_cosmos_task(mock_emit): + ti = _make_task_instance(NonCosmosOperator()) + + task_instance_listener.on_task_instance_success(None, ti, None) + + mock_emit.assert_not_called() From 7bc580a5626242256e176ab94f01252e4713ff14 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 12 Dec 2025 08:48:03 +0000 Subject: [PATCH 06/15] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/listeners/task_instance_listener.py | 2 +- tests/listeners/test_task_instance_listener.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index dfc4ab688f..1b94730748 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -9,8 +9,8 @@ from cosmos import telemetry from cosmos.constants import InvocationMode -from cosmos.operators.base import AbstractDbtBase from cosmos.log import get_logger +from cosmos.operators.base import AbstractDbtBase logger = get_logger(__name__) diff --git a/tests/listeners/test_task_instance_listener.py b/tests/listeners/test_task_instance_listener.py index 9fc28f3d3b..7d5e227c5e 100644 --- a/tests/listeners/test_task_instance_listener.py +++ b/tests/listeners/test_task_instance_listener.py @@ -18,7 +18,9 @@ def __init__(self, *, module: str = "cosmos.operators.local.fake", install_deps: if install_deps is not None: self.install_deps = install_deps - def build_and_run_cmd(self, context, cmd_flags, run_as_async=False, async_context=None, **kwargs): # pragma: no cover + def build_and_run_cmd( + self, context, cmd_flags, run_as_async=False, async_context=None, **kwargs + ): # pragma: no cover return None From a2a441a020b043efa61a0a43bdb46a202598b8ed Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 12 Dec 2025 16:18:45 +0000 Subject: [PATCH 07/15] Track user callbacks in task telemetry metrics --- cosmos/listeners/task_instance_listener.py | 19 ++++++++++++++ .../listeners/test_task_instance_listener.py | 25 ++++++++++++++++++- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index 1b94730748..915d820fa6 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -91,6 +91,23 @@ def _install_deps(task_instance: TaskInstance) -> bool | None: return bool(install_deps) +def _has_callback(task_instance: TaskInstance) -> bool: + """Return True when a Cosmos operator includes user-defined callbacks.""" + + task = task_instance.task + if not isinstance(task, AbstractDbtBase): + return False + + callback = getattr(task, "callback", None) + if callback is None: + return False + + if isinstance(callback, (list, tuple)): + return any(callback) + + return bool(callback) + + def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, object]: """Build telemetry payload for task completion events.""" @@ -115,6 +132,8 @@ def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, o if install_deps is not None: metrics["install_deps"] = install_deps + metrics["has_callback"] = _has_callback(task_instance) + dag_run = getattr(task_instance, "dag_run", None) if dag_run is not None: metrics["dag_run_id"] = dag_run.run_id diff --git a/tests/listeners/test_task_instance_listener.py b/tests/listeners/test_task_instance_listener.py index 7d5e227c5e..afa8e37f95 100644 --- a/tests/listeners/test_task_instance_listener.py +++ b/tests/listeners/test_task_instance_listener.py @@ -11,12 +11,23 @@ class DummyDbtOperator(AbstractDbtBase): base_cmd = ["run"] - def __init__(self, *, module: str = "cosmos.operators.local.fake", install_deps: bool | None = True) -> None: + def __init__( + self, + *, + module: str = "cosmos.operators.local.fake", + install_deps: bool | None = True, + callback=None, + runner_callbacks=None, + ) -> None: super().__init__(project_dir="/tmp") self.invocation_mode = InvocationMode.DBT_RUNNER self._task_module = module if install_deps is not None: self.install_deps = install_deps + if callback is not None: + self.callback = callback + if runner_callbacks is not None: + self._dbt_runner_callbacks = runner_callbacks def build_and_run_cmd( self, context, cmd_flags, run_as_async=False, async_context=None, **kwargs @@ -96,6 +107,18 @@ def test_build_task_metrics_marks_custom_subclasses(): assert metrics["is_cosmos_operator_subclass"] is True assert metrics["execution_mode"] is None + assert metrics["has_callback"] is False + + +def test_build_task_metrics_sets_has_callback_for_callable(): + operator = DummyDbtOperator(callback=lambda *_: None) + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["has_callback"] is True + + @patch("cosmos.listeners.task_instance_listener.telemetry.emit_usage_metrics_if_enabled") From cbb9fa8a3d0384e760de9e6c3a69e16528432b8d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 12 Dec 2025 16:31:42 +0000 Subject: [PATCH 08/15] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/listeners/test_task_instance_listener.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/listeners/test_task_instance_listener.py b/tests/listeners/test_task_instance_listener.py index afa8e37f95..cd2e319376 100644 --- a/tests/listeners/test_task_instance_listener.py +++ b/tests/listeners/test_task_instance_listener.py @@ -119,8 +119,6 @@ def test_build_task_metrics_sets_has_callback_for_callable(): assert metrics["has_callback"] is True - - @patch("cosmos.listeners.task_instance_listener.telemetry.emit_usage_metrics_if_enabled") def test_on_task_instance_success_emits_for_cosmos_task(mock_emit): operator = DummyDbtOperator() From 42047a14732ab4b29d9d0d86e88fea0f82091f9e Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 15 Dec 2025 14:33:55 +0000 Subject: [PATCH 09/15] Fix task instance failed hook signature --- cosmos/listeners/task_instance_listener.py | 2 +- tests/listeners/test_task_instance_listener.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index 915d820fa6..eb593c4b4a 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -159,7 +159,7 @@ def on_task_instance_success(previous_state, task_instance, session): # type: i @hookimpl -def on_task_instance_failed(previous_state, task_instance, error, session): # type: ignore[override] +def on_task_instance_failed(previous_state, task_instance, session): # type: ignore[override] if not _is_cosmos_task(task_instance): return diff --git a/tests/listeners/test_task_instance_listener.py b/tests/listeners/test_task_instance_listener.py index cd2e319376..90b9180738 100644 --- a/tests/listeners/test_task_instance_listener.py +++ b/tests/listeners/test_task_instance_listener.py @@ -138,7 +138,7 @@ def test_on_task_instance_failed_emits_failed_status(mock_emit): operator = DummyDbtOperator() ti = _make_task_instance(operator) - task_instance_listener.on_task_instance_failed(None, ti, RuntimeError("boom"), None) + task_instance_listener.on_task_instance_failed(None, ti, None) mock_emit.assert_called_once() args, _ = mock_emit.call_args From 987d70f0be5d7a1590b5264f109c1ee769dfca48 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 15 Dec 2025 16:04:36 +0000 Subject: [PATCH 10/15] Annotate telemetry hooks for mypy --- cosmos/listeners/task_instance_listener.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index eb593c4b4a..759bb3c8b4 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -1,11 +1,12 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from airflow.listeners import hookimpl if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance + from sqlalchemy.orm import Session from cosmos import telemetry from cosmos.constants import InvocationMode @@ -149,7 +150,9 @@ def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, o @hookimpl -def on_task_instance_success(previous_state, task_instance, session): # type: ignore[override] +def on_task_instance_success( + previous_state: Any, task_instance: TaskInstance, session: "Session" +) -> None: # type: ignore[override] if not _is_cosmos_task(task_instance): return @@ -159,7 +162,9 @@ def on_task_instance_success(previous_state, task_instance, session): # type: i @hookimpl -def on_task_instance_failed(previous_state, task_instance, session): # type: ignore[override] +def on_task_instance_failed( + previous_state: Any, task_instance: TaskInstance, session: "Session" +) -> None: # type: ignore[override] if not _is_cosmos_task(task_instance): return From 100f2beb2df816fadd9bee80020203f2b4f01848 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 15 Dec 2025 16:05:10 +0000 Subject: [PATCH 11/15] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/listeners/task_instance_listener.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index 759bb3c8b4..00ea6035a4 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -151,7 +151,7 @@ def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, o @hookimpl def on_task_instance_success( - previous_state: Any, task_instance: TaskInstance, session: "Session" + previous_state: Any, task_instance: TaskInstance, session: Session ) -> None: # type: ignore[override] if not _is_cosmos_task(task_instance): return @@ -163,7 +163,7 @@ def on_task_instance_success( @hookimpl def on_task_instance_failed( - previous_state: Any, task_instance: TaskInstance, session: "Session" + previous_state: Any, task_instance: TaskInstance, session: Session ) -> None: # type: ignore[override] if not _is_cosmos_task(task_instance): return From 831a5994f672ee01a45de4df152613597fd67170 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 16 Dec 2025 13:58:25 +0000 Subject: [PATCH 12/15] test: close task listener telemetry coverage gaps --- .../listeners/test_task_instance_listener.py | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/tests/listeners/test_task_instance_listener.py b/tests/listeners/test_task_instance_listener.py index 90b9180738..04c048e937 100644 --- a/tests/listeners/test_task_instance_listener.py +++ b/tests/listeners/test_task_instance_listener.py @@ -50,6 +50,18 @@ def __init__(self) -> None: super().__init__(module="custom.pipeline.dummy") +class DummyDbtOperatorNoCommand(DummyDbtOperator): + base_cmd = None + + +class DummyDbtOperatorStringCommand(DummyDbtOperator): + base_cmd = "deps" + + +class DummyDbtOperatorTupleCommand(DummyDbtOperator): + base_cmd = ("run", None, "--full-refresh") + + class NonCosmosOperator: __module__ = "airflow.operators.bash" @@ -119,6 +131,80 @@ def test_build_task_metrics_sets_has_callback_for_callable(): assert metrics["has_callback"] is True +def test_build_task_metrics_interprets_tuple_callbacks(): + operator = DummyDbtOperator(callback=(None, lambda *_: None)) + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["has_callback"] is True + + +def test_build_task_metrics_skips_dbt_command_when_missing(): + operator = DummyDbtOperatorNoCommand() + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert "dbt_command" not in metrics + + +def test_build_task_metrics_handles_string_dbt_command(): + operator = DummyDbtOperatorStringCommand() + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["dbt_command"] == "deps" + + +def test_build_task_metrics_flattens_iterable_commands(): + operator = DummyDbtOperatorTupleCommand() + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["dbt_command"] == "run --full-refresh" + + +def test_build_task_metrics_handles_missing_invocation_mode(): + operator = DummyDbtOperator() + delattr(operator, "invocation_mode") + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["invocation_mode"] is None + + +def test_build_task_metrics_handles_custom_invocation_mode_string(): + operator = DummyDbtOperator() + operator.invocation_mode = "custom-mode" + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["invocation_mode"] == "custom-mode" + + +def test_has_callback_returns_false_for_non_cosmos_task(): + ti = _make_task_instance(NonCosmosOperator()) + + assert task_instance_listener._has_callback(ti) is False + + +def test_install_deps_returns_none_for_non_cosmos_task(): + ti = _make_task_instance(NonCosmosOperator()) + + assert task_instance_listener._install_deps(ti) is None + + +def test_dbt_command_returns_none_for_non_cosmos_task(): + ti = _make_task_instance(NonCosmosOperator()) + + assert task_instance_listener._dbt_command(ti) is None + + @patch("cosmos.listeners.task_instance_listener.telemetry.emit_usage_metrics_if_enabled") def test_on_task_instance_success_emits_for_cosmos_task(mock_emit): operator = DummyDbtOperator() @@ -153,3 +239,12 @@ def test_on_task_instance_success_skips_non_cosmos_task(mock_emit): task_instance_listener.on_task_instance_success(None, ti, None) mock_emit.assert_not_called() + + +@patch("cosmos.listeners.task_instance_listener.telemetry.emit_usage_metrics_if_enabled") +def test_on_task_instance_failed_skips_non_cosmos_task(mock_emit): + ti = _make_task_instance(NonCosmosOperator()) + + task_instance_listener.on_task_instance_failed(None, ti, None) + + mock_emit.assert_not_called() From 167d02097024f532d9877634d035e134ad7e46c1 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 17 Dec 2025 13:06:50 +0000 Subject: [PATCH 13/15] Fix Airflow 3 compatibility for task instance listener hooks --- cosmos/listeners/task_instance_listener.py | 13 +++++-------- tests/listeners/test_task_instance_listener.py | 8 ++++---- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index 00ea6035a4..659ebff347 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -6,10 +6,9 @@ if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance - from sqlalchemy.orm import Session from cosmos import telemetry -from cosmos.constants import InvocationMode +from cosmos.constants import InvocationMode, _AIRFLOW3_MAJOR_VERSION, AIRFLOW_VERSION from cosmos.log import get_logger from cosmos.operators.base import AbstractDbtBase @@ -150,9 +149,8 @@ def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, o @hookimpl -def on_task_instance_success( - previous_state: Any, task_instance: TaskInstance, session: Session -) -> None: # type: ignore[override] +def on_task_instance_success(previous_state: Any, task_instance: TaskInstance, **kwargs: Any) -> None: # type: ignore[override] + """Handle task instance success for both Airflow 2 (with session) and Airflow 3 (without session).""" if not _is_cosmos_task(task_instance): return @@ -162,9 +160,8 @@ def on_task_instance_success( @hookimpl -def on_task_instance_failed( - previous_state: Any, task_instance: TaskInstance, session: Session -) -> None: # type: ignore[override] +def on_task_instance_failed(previous_state: Any, task_instance: TaskInstance, **kwargs: Any) -> None: # type: ignore[override] + """Handle task instance failure for both Airflow 2 (with session) and Airflow 3 (with error and without session).""" if not _is_cosmos_task(task_instance): return diff --git a/tests/listeners/test_task_instance_listener.py b/tests/listeners/test_task_instance_listener.py index 04c048e937..7c079c4386 100644 --- a/tests/listeners/test_task_instance_listener.py +++ b/tests/listeners/test_task_instance_listener.py @@ -210,7 +210,7 @@ def test_on_task_instance_success_emits_for_cosmos_task(mock_emit): operator = DummyDbtOperator() ti = _make_task_instance(operator) - task_instance_listener.on_task_instance_success(None, ti, None) + task_instance_listener.on_task_instance_success(None, ti, session=None) mock_emit.assert_called_once() args, _ = mock_emit.call_args @@ -224,7 +224,7 @@ def test_on_task_instance_failed_emits_failed_status(mock_emit): operator = DummyDbtOperator() ti = _make_task_instance(operator) - task_instance_listener.on_task_instance_failed(None, ti, None) + task_instance_listener.on_task_instance_failed(None, ti, error=None, session=None) mock_emit.assert_called_once() args, _ = mock_emit.call_args @@ -236,7 +236,7 @@ def test_on_task_instance_failed_emits_failed_status(mock_emit): def test_on_task_instance_success_skips_non_cosmos_task(mock_emit): ti = _make_task_instance(NonCosmosOperator()) - task_instance_listener.on_task_instance_success(None, ti, None) + task_instance_listener.on_task_instance_success(None, ti, session=None) mock_emit.assert_not_called() @@ -245,6 +245,6 @@ def test_on_task_instance_success_skips_non_cosmos_task(mock_emit): def test_on_task_instance_failed_skips_non_cosmos_task(mock_emit): ti = _make_task_instance(NonCosmosOperator()) - task_instance_listener.on_task_instance_failed(None, ti, None) + task_instance_listener.on_task_instance_failed(None, ti, error=None, session=None) mock_emit.assert_not_called() From 4558bbb6986f36b1ad30545588d97aaae4d083ee Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 17 Dec 2025 13:07:22 +0000 Subject: [PATCH 14/15] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/listeners/task_instance_listener.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index 659ebff347..ae015cc9a9 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -8,7 +8,7 @@ from airflow.models.taskinstance import TaskInstance from cosmos import telemetry -from cosmos.constants import InvocationMode, _AIRFLOW3_MAJOR_VERSION, AIRFLOW_VERSION +from cosmos.constants import InvocationMode from cosmos.log import get_logger from cosmos.operators.base import AbstractDbtBase From f07ea0ef30873397c910192feb3d21f4b15c7b66 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 17 Dec 2025 13:41:24 +0000 Subject: [PATCH 15/15] Add *args to hooks and remove queue, priority_weight, dag_hash metrics --- cosmos/listeners/task_instance_listener.py | 9 ++------- tests/listeners/test_task_instance_listener.py | 5 +---- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py index ae015cc9a9..0a64c3a61a 100644 --- a/cosmos/listeners/task_instance_listener.py +++ b/cosmos/listeners/task_instance_listener.py @@ -119,8 +119,6 @@ def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, o "is_cosmos_operator_subclass": _is_cosmos_subclass(task_instance), "invocation_mode": _invocation_mode(task_instance), "execution_mode": _execution_mode_from_task(task_instance), - "queue": task_instance.queue, - "priority_weight": task_instance.priority_weight, "map_index": task_instance.map_index, } @@ -137,9 +135,6 @@ def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, o dag_run = getattr(task_instance, "dag_run", None) if dag_run is not None: metrics["dag_run_id"] = dag_run.run_id - dag_hash = getattr(dag_run, "dag_hash", None) - if dag_hash is not None: - metrics["dag_hash"] = dag_hash duration = getattr(task_instance, "duration", None) if duration is not None: @@ -149,7 +144,7 @@ def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, o @hookimpl -def on_task_instance_success(previous_state: Any, task_instance: TaskInstance, **kwargs: Any) -> None: # type: ignore[override] +def on_task_instance_success(previous_state: Any, task_instance: TaskInstance, *args: Any, **kwargs: Any) -> None: # type: ignore[override] """Handle task instance success for both Airflow 2 (with session) and Airflow 3 (without session).""" if not _is_cosmos_task(task_instance): return @@ -160,7 +155,7 @@ def on_task_instance_success(previous_state: Any, task_instance: TaskInstance, * @hookimpl -def on_task_instance_failed(previous_state: Any, task_instance: TaskInstance, **kwargs: Any) -> None: # type: ignore[override] +def on_task_instance_failed(previous_state: Any, task_instance: TaskInstance, *args: Any, **kwargs: Any) -> None: # type: ignore[override] """Handle task instance failure for both Airflow 2 (with session) and Airflow 3 (with error and without session).""" if not _is_cosmos_task(task_instance): return diff --git a/tests/listeners/test_task_instance_listener.py b/tests/listeners/test_task_instance_listener.py index 7c079c4386..bbf6de6184 100644 --- a/tests/listeners/test_task_instance_listener.py +++ b/tests/listeners/test_task_instance_listener.py @@ -74,10 +74,8 @@ def _make_task_instance(task, **overrides) -> SimpleNamespace: dag_id="example_dag", task_id="example_task", task=task, - queue="default", - priority_weight=5, map_index=-1, - dag_run=SimpleNamespace(run_id="run-1", dag_hash="hash-123"), + dag_run=SimpleNamespace(run_id="run-1"), duration=7.0, ) defaults.update(overrides) @@ -97,7 +95,6 @@ def test_build_task_metrics_records_core_fields(): assert metrics["execution_mode"] == "local" assert metrics["is_cosmos_operator_subclass"] is False assert metrics["dag_run_id"] == "run-1" - assert metrics["dag_hash"] == "hash-123" def test_build_task_metrics_ignores_missing_install_deps():