Skip to content
3 changes: 3 additions & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ def _get_profile_path(self, use_mock_values: bool = False) -> Path:
Check if profile object version is exist then reuse it
Otherwise, create profile yml for requested object and return the profile path
"""
if self.profiles_yml_filepath:
return Path(self.profiles_yml_filepath)
Comment thread
pankajastro marked this conversation as resolved.

assert self.profile_mapping # To satisfy MyPy
current_profile_version = self.profile_mapping.version(self.profile_name, self.target_name, use_mock_values)
cached_profile_path = get_cached_profile(current_profile_version)
Expand Down
37 changes: 37 additions & 0 deletions cosmos/listeners/task_instance_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance


from cosmos import telemetry
from cosmos.constants import InvocationMode
from cosmos.log import get_logger
Expand Down Expand Up @@ -108,9 +109,42 @@ def _has_callback(task_instance: TaskInstance) -> bool:
return bool(callback)


def get_profile_metrics(task_instance: TaskInstance) -> tuple[str | None, str | None, str | None]:
"""Extract dbt profile-related telemetry metrics for a Cosmos-powered task."""

Comment thread
pankajastro marked this conversation as resolved.
if not _is_cosmos_task(task_instance):
return None, None, None

profile_config = getattr(task_instance.task, "profile_config", None)

if not profile_config:
return None, None, None

# Determine strategy
profile_strategy = "yaml_file" if profile_config.profiles_yml_filepath is not None else "mapping"

# Default
profile_mapping_class = None

# Populate mapping class only when strategy is "mapping"
if profile_strategy == "mapping":
profile_mapping_class = profile_config.profile_mapping.__class__.__name__

# Get database or profile type, but don't let telemetry failures break tasks
try:
database = profile_config.get_profile_type()
except Exception as exc:
logger.debug("Failed to get profile type from profile_config: %s", exc)
database = None

return profile_strategy, profile_mapping_class, database


def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, object]:
"""Build telemetry payload for task completion events."""

profile_strategy, profile_mapping_class, database = get_profile_metrics(task_instance)

metrics: dict[str, object] = {
"dag_id": task_instance.dag_id,
"task_id": task_instance.task_id,
Expand All @@ -119,6 +153,9 @@ def _build_task_metrics(task_instance: TaskInstance, status: str) -> dict[str, o
"is_cosmos_operator_subclass": _is_cosmos_subclass(task_instance),
"invocation_mode": _invocation_mode(task_instance),
"execution_mode": _execution_mode_from_task(task_instance),
"profile_strategy": profile_strategy,
"profile_mapping_class": profile_mapping_class,
"database": database,
# map_index is -1 for non-mapped tasks, >= 0 for mapped tasks
"is_mapped_task": task_instance.map_index >= 0,
}
Expand Down
75 changes: 75 additions & 0 deletions tests/listeners/test_task_instance_listener.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,38 @@
from __future__ import annotations

from pathlib import Path
from types import SimpleNamespace
from unittest.mock import patch

import pytest
from airflow.models.connection import Connection

from cosmos import ProfileConfig
from cosmos.constants import InvocationMode
from cosmos.listeners import task_instance_listener
from cosmos.operators.base import AbstractDbtBase
from cosmos.profiles import get_automatic_profile_mapping

DBT_PROJECT_PROFILE = Path(__file__).parent.parent / "sample/mini/profiles.yml"


@pytest.fixture()
def mock_postgres_conn(): # type: ignore
"""
Sets the connection as an environment variable.
"""
conn = Connection(
conn_id="my_postgres_connection",
conn_type="postgres",
host="my_host",
login="my_user",
password="my_password",
port=5432,
schema="my_database",
)

with patch("cosmos.profiles.base.BaseHook.get_connection", return_value=conn):
yield conn


class DummyDbtOperator(AbstractDbtBase):
Expand All @@ -18,10 +45,12 @@ def __init__(
install_deps: bool | None = True,
callback=None,
runner_callbacks=None,
profile_config=None,
) -> None:
super().__init__(project_dir="/tmp")
self.invocation_mode = InvocationMode.DBT_RUNNER
self._task_module = module
self.profile_config = profile_config
if install_deps is not None:
self.install_deps = install_deps
if callback is not None:
Expand Down Expand Up @@ -82,6 +111,52 @@ def _make_task_instance(task, **overrides) -> SimpleNamespace:
return SimpleNamespace(**defaults)


def test_profile_mapping_metrics(mock_postgres_conn):
profile_mapping = get_automatic_profile_mapping(
mock_postgres_conn.conn_id,
{"schema": "my_schema"},
)

operator = DummyDbtOperator(
profile_config=ProfileConfig(profile_name="postgres", target_name="test", profile_mapping=profile_mapping)
)
ti = _make_task_instance(operator)

metrics = task_instance_listener._build_task_metrics(ti, status="success")

assert metrics["operator_name"] == "DummyDbtOperator"
assert metrics["dbt_command"] == "run"
assert metrics["install_deps"] is True
assert metrics["invocation_mode"] == InvocationMode.DBT_RUNNER.value
assert metrics["execution_mode"] == "local"
assert metrics["is_cosmos_operator_subclass"] is False
assert metrics["dag_run_id"] == "run-1"
assert metrics["profile_strategy"] == "mapping"
assert metrics["profile_mapping_class"] == "PostgresUserPasswordProfileMapping"
assert metrics["database"] == "postgres"


def test_profile_file_metrics():

operator = DummyDbtOperator(
profile_config=ProfileConfig(profiles_yml_filepath=DBT_PROJECT_PROFILE, profile_name="mini", target_name="dev")
)
ti = _make_task_instance(operator)

metrics = task_instance_listener._build_task_metrics(ti, status="success")

assert metrics["operator_name"] == "DummyDbtOperator"
assert metrics["dbt_command"] == "run"
assert metrics["install_deps"] is True
assert metrics["invocation_mode"] == InvocationMode.DBT_RUNNER.value
assert metrics["execution_mode"] == "local"
assert metrics["is_cosmos_operator_subclass"] is False
assert metrics["dag_run_id"] == "run-1"
assert metrics["profile_strategy"] == "yaml_file"
assert metrics["profile_mapping_class"] is None
assert metrics["database"] == "postgres"


def test_build_task_metrics_records_core_fields():
operator = DummyDbtOperator()
ti = _make_task_instance(operator)
Expand Down
Loading