diff --git a/cosmos/listeners/task_instance_listener.py b/cosmos/listeners/task_instance_listener.py new file mode 100644 index 0000000000..0a64c3a61a --- /dev/null +++ b/cosmos/listeners/task_instance_listener.py @@ -0,0 +1,165 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from airflow.listeners import hookimpl + +if TYPE_CHECKING: + from airflow.models.taskinstance import TaskInstance + +from cosmos import telemetry +from cosmos.constants import InvocationMode +from cosmos.log import get_logger +from cosmos.operators.base import AbstractDbtBase + +logger = get_logger(__name__) + +TASK_INSTANCE_EVENT = "task_instance" + + +def _is_cosmos_task(task_instance: TaskInstance) -> bool: + """Return True if the task instance is powered by Cosmos operators.""" + + task = task_instance.task + module = _operator_module(task_instance) + return module.startswith("cosmos.") or isinstance(task, AbstractDbtBase) + + +def _execution_mode_from_task(task_instance: TaskInstance) -> str | None: + """Extract Cosmos execution mode from the task's module path.""" + + module = _operator_module(task_instance) + parts = module.split(".") + if len(parts) >= 3 and parts[0] == "cosmos" and parts[1] == "operators": + return parts[2] + # TODO: When users subclass Cosmos operators in external modules, encode execution mode directly on the task + # so telemetry does not rely on module inspection. + return None + + +def _operator_module(task_instance: TaskInstance) -> str: + """Return the module path for the operator backing the given task instance.""" + + return getattr(task_instance.task, "_task_module", None) or task_instance.task.__class__.__module__ + + +def _is_cosmos_subclass(task_instance: TaskInstance) -> bool: + """Return True when the task is a custom subclass extending Cosmos operators.""" + + return isinstance(task_instance.task, AbstractDbtBase) and not _operator_module(task_instance).startswith("cosmos.") + + +def _invocation_mode(task_instance: TaskInstance) -> str | None: + """Return the invocation mode recorded in Cosmos operators.""" + + mode = getattr(task_instance.task, "invocation_mode", None) + if mode is None: + return None + if isinstance(mode, InvocationMode): + return mode.value + return str(mode) + + +def _dbt_command(task_instance: TaskInstance) -> str | None: + """Return the dbt sub-command encoded on Cosmos operators.""" + + task = task_instance.task + if not isinstance(task, AbstractDbtBase): + return None + + command = getattr(task, "base_cmd", None) + if command is None: + return None + + if isinstance(command, (list, tuple)): + return " ".join(str(part) for part in command if part is not None) + + return str(command) + + +def _install_deps(task_instance: TaskInstance) -> bool | None: + """Return the effective install_deps flag when available.""" + + task = task_instance.task + if not isinstance(task, AbstractDbtBase): + return None + + install_deps = getattr(task, "install_deps", None) + if install_deps is None: + return None + + return bool(install_deps) + + +def _has_callback(task_instance: TaskInstance) -> bool: + """Return True when a Cosmos operator includes user-defined callbacks.""" + + task = task_instance.task + if not isinstance(task, AbstractDbtBase): + return False + + callback = getattr(task, "callback", None) + if callback is None: + return False + + if isinstance(callback, (list, tuple)): + return any(callback) + + return bool(callback) + + +def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, object]: + """Build telemetry payload for task completion events.""" + + metrics: dict[str, object] = { + "dag_id": task_instance.dag_id, + "task_id": task_instance.task_id, + "status": status, + "operator_name": task_instance.task.__class__.__name__, + "is_cosmos_operator_subclass": _is_cosmos_subclass(task_instance), + "invocation_mode": _invocation_mode(task_instance), + "execution_mode": _execution_mode_from_task(task_instance), + "map_index": task_instance.map_index, + } + + dbt_command = _dbt_command(task_instance) + if dbt_command: + metrics["dbt_command"] = dbt_command + + install_deps = _install_deps(task_instance) + if install_deps is not None: + metrics["install_deps"] = install_deps + + metrics["has_callback"] = _has_callback(task_instance) + + dag_run = getattr(task_instance, "dag_run", None) + if dag_run is not None: + metrics["dag_run_id"] = dag_run.run_id + + duration = getattr(task_instance, "duration", None) + if duration is not None: + metrics["duration"] = duration + + return metrics + + +@hookimpl +def on_task_instance_success(previous_state: Any, task_instance: TaskInstance, *args: Any, **kwargs: Any) -> None: # type: ignore[override] + """Handle task instance success for both Airflow 2 (with session) and Airflow 3 (without session).""" + if not _is_cosmos_task(task_instance): + return + + logger.debug("Telemetry task listener success for %s.%s", task_instance.dag_id, task_instance.task_id) + metrics = _build_task_metrics(task_instance, "success") + telemetry.emit_usage_metrics_if_enabled(TASK_INSTANCE_EVENT, metrics) + + +@hookimpl +def on_task_instance_failed(previous_state: Any, task_instance: TaskInstance, *args: Any, **kwargs: Any) -> None: # type: ignore[override] + """Handle task instance failure for both Airflow 2 (with session) and Airflow 3 (with error and without session).""" + if not _is_cosmos_task(task_instance): + return + + logger.debug("Telemetry task listener failure for %s.%s", task_instance.dag_id, task_instance.task_id) + metrics = _build_task_metrics(task_instance, "failed") + telemetry.emit_usage_metrics_if_enabled(TASK_INSTANCE_EVENT, metrics) diff --git a/cosmos/plugin/airflow2.py b/cosmos/plugin/airflow2.py index 16f0fb62d5..9ae26c4dd6 100644 --- a/cosmos/plugin/airflow2.py +++ b/cosmos/plugin/airflow2.py @@ -10,7 +10,7 @@ from flask import abort from flask_appbuilder import AppBuilder, expose -from cosmos.listeners import dag_run_listener +from cosmos.listeners import dag_run_listener, task_instance_listener from cosmos.plugin.snippets import IFRAME_SCRIPT from cosmos.settings import dbt_docs_conn_id, dbt_docs_dir, dbt_docs_index_file_name, in_astro_cloud @@ -190,4 +190,4 @@ class CosmosPlugin(AirflowPlugin): "href": conf.get("webserver", "base_url") + "/cosmos/dbt_docs", } appbuilder_views = [item] - listeners = [dag_run_listener] + listeners = [dag_run_listener, task_instance_listener] diff --git a/cosmos/plugin/airflow3.py b/cosmos/plugin/airflow3.py index 3fe73f58e4..10f142c878 100644 --- a/cosmos/plugin/airflow3.py +++ b/cosmos/plugin/airflow3.py @@ -20,7 +20,7 @@ from packaging.version import Version from cosmos.constants import AIRFLOW_OBJECT_STORAGE_PATH_URL_SCHEMES -from cosmos.listeners import dag_run_listener +from cosmos.listeners import dag_run_listener, task_instance_listener from cosmos.plugin.snippets import IFRAME_SCRIPT # Airflow version gating: External views feature for the plugins used here (CosmosAF3Plugin) exist only in >= 3.1 @@ -265,7 +265,7 @@ class CosmosAF3Plugin(AirflowPlugin): # Register external views for navigation external_views: list[dict[str, Any]] = [] - listeners = [dag_run_listener] + listeners = [dag_run_listener, task_instance_listener] def __init__(self) -> None: super().__init__() diff --git a/tests/listeners/test_task_instance_listener.py b/tests/listeners/test_task_instance_listener.py new file mode 100644 index 0000000000..bbf6de6184 --- /dev/null +++ b/tests/listeners/test_task_instance_listener.py @@ -0,0 +1,247 @@ +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import patch + +from cosmos.constants import InvocationMode +from cosmos.listeners import task_instance_listener +from cosmos.operators.base import AbstractDbtBase + + +class DummyDbtOperator(AbstractDbtBase): + base_cmd = ["run"] + + def __init__( + self, + *, + module: str = "cosmos.operators.local.fake", + install_deps: bool | None = True, + callback=None, + runner_callbacks=None, + ) -> None: + super().__init__(project_dir="/tmp") + self.invocation_mode = InvocationMode.DBT_RUNNER + self._task_module = module + if install_deps is not None: + self.install_deps = install_deps + if callback is not None: + self.callback = callback + if runner_callbacks is not None: + self._dbt_runner_callbacks = runner_callbacks + + def build_and_run_cmd( + self, context, cmd_flags, run_as_async=False, async_context=None, **kwargs + ): # pragma: no cover + return None + + +class DummyDbtOperatorNoDeps(DummyDbtOperator): + base_cmd = ["seed"] + + def __init__(self) -> None: + super().__init__(module="cosmos.operators.kubernetes.fake", install_deps=None) + self.invocation_mode = InvocationMode.SUBPROCESS + if hasattr(self, "install_deps"): + delattr(self, "install_deps") + + +class CustomDbtSubclass(DummyDbtOperator): + def __init__(self) -> None: + super().__init__(module="custom.pipeline.dummy") + + +class DummyDbtOperatorNoCommand(DummyDbtOperator): + base_cmd = None + + +class DummyDbtOperatorStringCommand(DummyDbtOperator): + base_cmd = "deps" + + +class DummyDbtOperatorTupleCommand(DummyDbtOperator): + base_cmd = ("run", None, "--full-refresh") + + +class NonCosmosOperator: + __module__ = "airflow.operators.bash" + + def __init__(self) -> None: + self._task_module = "airflow.operators.bash" + + +def _make_task_instance(task, **overrides) -> SimpleNamespace: + defaults = dict( + dag_id="example_dag", + task_id="example_task", + task=task, + map_index=-1, + dag_run=SimpleNamespace(run_id="run-1"), + duration=7.0, + ) + defaults.update(overrides) + return SimpleNamespace(**defaults) + + +def test_build_task_metrics_records_core_fields(): + operator = DummyDbtOperator() + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["operator_name"] == "DummyDbtOperator" + assert metrics["dbt_command"] == "run" + assert metrics["install_deps"] is True + assert metrics["invocation_mode"] == InvocationMode.DBT_RUNNER.value + assert metrics["execution_mode"] == "local" + assert metrics["is_cosmos_operator_subclass"] is False + assert metrics["dag_run_id"] == "run-1" + + +def test_build_task_metrics_ignores_missing_install_deps(): + operator = DummyDbtOperatorNoDeps() + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="failed") + + assert metrics["dbt_command"] == "seed" + assert "install_deps" not in metrics + assert metrics["execution_mode"] == "kubernetes" + + +def test_build_task_metrics_marks_custom_subclasses(): + operator = CustomDbtSubclass() + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["is_cosmos_operator_subclass"] is True + assert metrics["execution_mode"] is None + assert metrics["has_callback"] is False + + +def test_build_task_metrics_sets_has_callback_for_callable(): + operator = DummyDbtOperator(callback=lambda *_: None) + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["has_callback"] is True + + +def test_build_task_metrics_interprets_tuple_callbacks(): + operator = DummyDbtOperator(callback=(None, lambda *_: None)) + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["has_callback"] is True + + +def test_build_task_metrics_skips_dbt_command_when_missing(): + operator = DummyDbtOperatorNoCommand() + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert "dbt_command" not in metrics + + +def test_build_task_metrics_handles_string_dbt_command(): + operator = DummyDbtOperatorStringCommand() + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["dbt_command"] == "deps" + + +def test_build_task_metrics_flattens_iterable_commands(): + operator = DummyDbtOperatorTupleCommand() + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["dbt_command"] == "run --full-refresh" + + +def test_build_task_metrics_handles_missing_invocation_mode(): + operator = DummyDbtOperator() + delattr(operator, "invocation_mode") + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["invocation_mode"] is None + + +def test_build_task_metrics_handles_custom_invocation_mode_string(): + operator = DummyDbtOperator() + operator.invocation_mode = "custom-mode" + ti = _make_task_instance(operator) + + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["invocation_mode"] == "custom-mode" + + +def test_has_callback_returns_false_for_non_cosmos_task(): + ti = _make_task_instance(NonCosmosOperator()) + + assert task_instance_listener._has_callback(ti) is False + + +def test_install_deps_returns_none_for_non_cosmos_task(): + ti = _make_task_instance(NonCosmosOperator()) + + assert task_instance_listener._install_deps(ti) is None + + +def test_dbt_command_returns_none_for_non_cosmos_task(): + ti = _make_task_instance(NonCosmosOperator()) + + assert task_instance_listener._dbt_command(ti) is None + + +@patch("cosmos.listeners.task_instance_listener.telemetry.emit_usage_metrics_if_enabled") +def test_on_task_instance_success_emits_for_cosmos_task(mock_emit): + operator = DummyDbtOperator() + ti = _make_task_instance(operator) + + task_instance_listener.on_task_instance_success(None, ti, session=None) + + mock_emit.assert_called_once() + args, _ = mock_emit.call_args + assert args[0] == task_instance_listener.TASK_INSTANCE_EVENT + assert args[1]["status"] == "success" + assert args[1]["dbt_command"] == "run" + + +@patch("cosmos.listeners.task_instance_listener.telemetry.emit_usage_metrics_if_enabled") +def test_on_task_instance_failed_emits_failed_status(mock_emit): + operator = DummyDbtOperator() + ti = _make_task_instance(operator) + + task_instance_listener.on_task_instance_failed(None, ti, error=None, session=None) + + mock_emit.assert_called_once() + args, _ = mock_emit.call_args + assert args[0] == task_instance_listener.TASK_INSTANCE_EVENT + assert args[1]["status"] == "failed" + + +@patch("cosmos.listeners.task_instance_listener.telemetry.emit_usage_metrics_if_enabled") +def test_on_task_instance_success_skips_non_cosmos_task(mock_emit): + ti = _make_task_instance(NonCosmosOperator()) + + task_instance_listener.on_task_instance_success(None, ti, session=None) + + mock_emit.assert_not_called() + + +@patch("cosmos.listeners.task_instance_listener.telemetry.emit_usage_metrics_if_enabled") +def test_on_task_instance_failed_skips_non_cosmos_task(mock_emit): + ti = _make_task_instance(NonCosmosOperator()) + + task_instance_listener.on_task_instance_failed(None, ti, error=None, session=None) + + mock_emit.assert_not_called()