diff --git a/tests/listeners/test_task_instance_listener.py b/tests/listeners/test_task_instance_listener.py index 2afce2809c..1ac1c5a395 100644 --- a/tests/listeners/test_task_instance_listener.py +++ b/tests/listeners/test_task_instance_listener.py @@ -2,17 +2,29 @@ from pathlib import Path from types import SimpleNamespace +from typing import Any from unittest.mock import patch import pytest from airflow.models.connection import Connection +try: # Airflow 3 + from airflow.sdk import Context +except ImportError: # Airflow 2 + from airflow.utils.context import Context + + from cosmos import ProfileConfig from cosmos.constants import InvocationMode from cosmos.listeners import task_instance_listener from cosmos.operators.base import AbstractDbtBase from cosmos.profiles import get_automatic_profile_mapping +try: + from airflow.sdk.bases.operator import BaseOperator # Airflow 3 +except ImportError: + from airflow.models import BaseOperator # Airflow 2 + DBT_PROJECT_PROFILE = Path(__file__).parent.parent / "sample/mini/profiles.yml" @@ -35,6 +47,11 @@ def mock_postgres_conn(): # type: ignore yield conn +class DummyOperator(BaseOperator): + def execute(self, context: Context) -> Any: + pass + + class DummyDbtOperator(AbstractDbtBase): base_cmd = ["run"] @@ -155,6 +172,18 @@ def test_profile_file_metrics(): assert metrics["database"] == "postgres" +def test_profile_metrics_with_non_cosmos_operator(): + operator = DummyOperator(task_id="test") + ti = _make_task_instance(operator) + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["operator_name"] == "DummyOperator" + assert metrics["is_cosmos_operator_subclass"] is False + assert metrics["profile_strategy"] is None + assert metrics["profile_mapping_class"] is None + assert metrics["database"] is None + + def test_build_task_metrics_records_core_fields(): operator = DummyDbtOperator() ti = _make_task_instance(operator)