Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions tests/listeners/test_task_instance_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,29 @@

from pathlib import Path
from types import SimpleNamespace
from typing import Any
from unittest.mock import patch

import pytest
from airflow.models.connection import Connection

try: # Airflow 3
from airflow.sdk import Context
except ImportError: # Airflow 2
from airflow.utils.context import Context


from cosmos import ProfileConfig
from cosmos.constants import InvocationMode
from cosmos.listeners import task_instance_listener
from cosmos.operators.base import AbstractDbtBase
from cosmos.profiles import get_automatic_profile_mapping

try:
from airflow.sdk.bases.operator import BaseOperator # Airflow 3
except ImportError:
from airflow.models import BaseOperator # Airflow 2

DBT_PROJECT_PROFILE = Path(__file__).parent.parent / "sample/mini/profiles.yml"


Expand All @@ -35,6 +47,11 @@ def mock_postgres_conn(): # type: ignore
yield conn


class DummyOperator(BaseOperator):
def execute(self, context: Context) -> Any:
pass


class DummyDbtOperator(AbstractDbtBase):
base_cmd = ["run"]

Expand Down Expand Up @@ -155,6 +172,18 @@ def test_profile_file_metrics():
assert metrics["database"] == "postgres"


def test_profile_metrics_with_non_cosmos_operator():
operator = DummyOperator(task_id="test")
ti = _make_task_instance(operator)
metrics = task_instance_listener._build_task_metrics(ti, status="success")

assert metrics["operator_name"] == "DummyOperator"
assert metrics["is_cosmos_operator_subclass"] is False
assert metrics["profile_strategy"] is None
assert metrics["profile_mapping_class"] is None
assert metrics["database"] is None


def test_build_task_metrics_records_core_fields():
operator = DummyDbtOperator()
ti = _make_task_instance(operator)
Expand Down