Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from cosmos import cache, settings
from cosmos.airflow.graph import build_airflow_graph
from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import ExecutionMode, LoadMode
from cosmos.constants import DbtResourceType, ExecutionMode, LoadMode
from cosmos.dbt.graph import DbtGraph
from cosmos.dbt.project import has_non_empty_dependencies_file
from cosmos.dbt.selector import retrieve_by_label
Expand Down Expand Up @@ -280,6 +280,9 @@ def __init__(
cache_identifier = cache._create_cache_identifier(dag, task_group)
cache_dir = cache._obtain_cache_dir_path(cache_identifier=cache_identifier)

# Store the initial load method before it gets resolved by dbt_graph.load()
initial_load_method = render_config.load_method

previous_time = time.perf_counter()
self.dbt_graph = DbtGraph(
project=project_config,
Expand All @@ -294,6 +297,7 @@ def __init__(
self.dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode)

self._add_dbt_project_hash_to_dag_docs(dag)
self._store_cosmos_telemetry_metadata_on_dag(dag, render_config, project_config, initial_load_method)

current_time = time.perf_counter()
elapsed_time = current_time - previous_time
Expand Down Expand Up @@ -372,3 +376,48 @@ def _add_dbt_project_hash_to_dag_docs(self, dag: DAG | None) -> None:
logger.debug(f"Appended dbt project hash {dbt_project_hash} to DAG {dag.dag_id} documentation")
except Exception as e:
logger.warning(f"Failed to append dbt project hash to DAG documentation: {e}")

def _store_cosmos_telemetry_metadata_on_dag( # noqa: C901
self,
dag: DAG | None,
render_config: RenderConfig,
project_config: ProjectConfig,
initial_load_method: LoadMode,
) -> None:
"""
Store Cosmos configuration metadata on the DAG for telemetry purposes.

This metadata is used by the DAG run listener to emit telemetry metrics
about how Cosmos is configured and used.

:param dag: The Airflow DAG to store metadata on. If None, no action is taken.
:param render_config: The render configuration
:param project_config: The project configuration
"""
if dag is None:
return

metadata: dict[str, Any] = {"used_automatic_load_mode": initial_load_method == LoadMode.AUTOMATIC}

if render_config is not None:
metadata["invocation_mode"] = str(render_config.invocation_mode.value)
metadata["install_deps"] = (
bool(render_config.dbt_deps) if render_config.dbt_deps is not None else project_config.install_dbt_deps
)
metadata["uses_node_converter"] = render_config.node_converters is not None
metadata["test_behavior"] = str(render_config.test_behavior.value)
metadata["source_behavior"] = str(render_config.source_rendering_behavior.value)

if self.dbt_graph is not None:
metadata["actual_load_mode"] = str(self.dbt_graph.load_method.value)
metadata["total_dbt_models"] = sum(
1 for node in self.dbt_graph.nodes.values() if node.resource_type == DbtResourceType.MODEL
)
metadata["selected_dbt_models"] = sum(
1 for node in self.dbt_graph.filtered_nodes.values() if node.resource_type == DbtResourceType.MODEL
)

# Store metadata in dag.params which is preserved during serialization
# Using a key that's unlikely to conflict with user params
dag.params["__cosmos_telemetry_metadata__"] = metadata
logger.debug(f"Stored Cosmos telemetry metadata in DAG {dag.dag_id} params: {metadata}")
21 changes: 20 additions & 1 deletion cosmos/listeners/dag_run_listener.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import hashlib
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

from airflow.listeners import hookimpl

Expand Down Expand Up @@ -57,6 +57,17 @@ def get_execution_modes(dag: DAG) -> str:
return "__".join(sorted(modes))


def get_cosmos_telemetry_metadata(dag: DAG) -> dict[str, Any]:
"""
Extract Cosmos telemetry metadata from a DAG.

Returns the metadata dictionary stored by the converter in dag.params, or an empty dict if not present.
"""
# Metadata is stored in dag.params to survive serialization
metadata = dag.params.get("__cosmos_telemetry_metadata__", {})
return metadata if isinstance(metadata, dict) else {}


@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str) -> None:
logger.debug("Running on_dag_run_success")
Expand All @@ -82,6 +93,10 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None:
"execution_modes": get_execution_modes(serialized_dag),
}

# Add Cosmos telemetry metadata if available
cosmos_metadata = get_cosmos_telemetry_metadata(serialized_dag)
additional_telemetry_metrics.update(cosmos_metadata)

telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics)
logger.debug("Completed on_dag_run_success")

Expand Down Expand Up @@ -111,5 +126,9 @@ def on_dag_run_failed(dag_run: DagRun, msg: str) -> None:
"execution_modes": get_execution_modes(serialized_dag),
}

# Add Cosmos telemetry metadata if available
cosmos_metadata = get_cosmos_telemetry_metadata(serialized_dag)
additional_telemetry_metrics.update(cosmos_metadata)

telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics)
logger.debug("Completed on_dag_run_failed")
117 changes: 116 additions & 1 deletion tests/listeners/test_dag_run_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
from cosmos import DbtRunLocalOperator, ProfileConfig, ProjectConfig
from cosmos.airflow.dag import DbtDag
from cosmos.airflow.task_group import DbtTaskGroup
from cosmos.constants import AIRFLOW_VERSION
from cosmos.config import ExecutionConfig, RenderConfig
from cosmos.constants import AIRFLOW_VERSION, InvocationMode, LoadMode, SourceRenderingBehavior, TestBehavior
from cosmos.listeners.dag_run_listener import on_dag_run_failed, on_dag_run_success, total_cosmos_tasks
from cosmos.profiles import PostgresUserPasswordProfileMapping

Expand Down Expand Up @@ -183,3 +184,117 @@ def test_on_dag_run_failed(mock_emit_usage_metrics_if_enabled, caplog):
assert "Running on_dag_run_failed" in caplog.text
assert "Completed on_dag_run_failed" in caplog.text
assert mock_emit_usage_metrics_if_enabled.call_count == 1


@pytest.mark.skipif(
AIRFLOW_VERSION >= Version("3.1.0"),
reason="TODO: Fix create_dag_run to work with AF 3.1 and remove this skip.",
)
@pytest.mark.integration
@patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled")
def test_on_dag_run_success_with_telemetry_metadata(mock_emit_usage_metrics_if_enabled, caplog):
"""Test that DAG run success includes Cosmos telemetry metadata."""
caplog.set_level(logging.DEBUG)

dag = DbtDag(
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
execution_config=ExecutionConfig(invocation_mode=InvocationMode.SUBPROCESS),
render_config=RenderConfig(
load_method=LoadMode.AUTOMATIC,
test_behavior=TestBehavior.AFTER_EACH,
source_rendering_behavior=SourceRenderingBehavior.NONE,
),
operator_args={"install_deps": True},
start_date=datetime(2023, 1, 1),
dag_id="cosmos_dag_with_metadata",
)
run_id = str(uuid.uuid1())
run_after = datetime.now(timezone.utc) - timedelta(seconds=1)
dag_run = create_dag_run(dag, run_id, run_after)

on_dag_run_success(dag_run, msg="test success")
assert mock_emit_usage_metrics_if_enabled.call_count == 1

# Verify telemetry call includes new metrics
call_args = mock_emit_usage_metrics_if_enabled.call_args
metrics = call_args[0][1] # Second argument is the metrics dict

# Check that Cosmos metadata fields are present
assert "used_automatic_load_mode" in metrics
assert "actual_load_mode" in metrics
assert "invocation_mode" in metrics
assert "install_deps" in metrics
assert "uses_node_converter" in metrics
assert "test_behavior" in metrics
assert "source_behavior" in metrics
assert "total_dbt_models" in metrics
assert "selected_dbt_models" in metrics

# Verify some expected values
assert metrics["used_automatic_load_mode"] is True
assert metrics["invocation_mode"] == "dbt_runner"
Comment thread
pankajkoti marked this conversation as resolved.
assert metrics["install_deps"] is True
assert metrics["uses_node_converter"] is False
assert metrics["test_behavior"] == "after_each"
assert metrics["source_behavior"] == "none"


@pytest.mark.skipif(
AIRFLOW_VERSION >= Version("3.1.0"),
reason="TODO: Fix create_dag_run to work with AF 3.1 and remove this skip.",
)
@pytest.mark.integration
@patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled")
def test_on_dag_run_failed_with_telemetry_metadata(mock_emit_usage_metrics_if_enabled, caplog):
"""Test that DAG run failure includes Cosmos telemetry metadata."""
caplog.set_level(logging.DEBUG)

dag = DbtDag(
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json",
),
profile_config=profile_config,
execution_config=ExecutionConfig(invocation_mode=InvocationMode.DBT_RUNNER),
render_config=RenderConfig(
load_method=LoadMode.DBT_MANIFEST,
test_behavior=TestBehavior.NONE,
source_rendering_behavior=SourceRenderingBehavior.ALL,
dbt_deps=False,
),
operator_args={"install_deps": True},
start_date=datetime(2023, 1, 1),
dag_id="cosmos_dag_with_metadata_failed",
)
run_id = str(uuid.uuid1())
run_after = datetime.now(timezone.utc) - timedelta(seconds=1)
dag_run = create_dag_run(dag, run_id, run_after)

on_dag_run_failed(dag_run, msg="test failed")
assert mock_emit_usage_metrics_if_enabled.call_count == 1

# Verify telemetry call includes new metrics
call_args = mock_emit_usage_metrics_if_enabled.call_args
metrics = call_args[0][1] # Second argument is the metrics dict

# Check that Cosmos metadata fields are present
assert "used_automatic_load_mode" in metrics
assert "actual_load_mode" in metrics
assert "invocation_mode" in metrics
assert "install_deps" in metrics
assert "uses_node_converter" in metrics
assert "test_behavior" in metrics
assert "source_behavior" in metrics
assert "total_dbt_models" in metrics
assert "selected_dbt_models" in metrics

# Verify some expected values for failed case
assert metrics["used_automatic_load_mode"] is False
assert metrics["invocation_mode"] == "dbt_runner"
assert metrics["install_deps"] is False
assert metrics["uses_node_converter"] is False
assert metrics["test_behavior"] == "none"
assert metrics["source_behavior"] == "all"
41 changes: 39 additions & 2 deletions tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -1112,8 +1112,10 @@ def test_dag_versioning_successful_logging(mock_load_dbt_graph, mock_hash_func,
execution_config=execution_config,
)

mock_logger.debug.assert_called_once_with(
"Appended dbt project hash test_hash_123 to DAG test_dag_logging documentation"
# Check that the hash logging call was made (there are multiple debug calls now)
debug_calls = [str(call) for call in mock_logger.debug.call_args_list]
assert any(
"Appended dbt project hash test_hash_123 to DAG test_dag_logging documentation" in call for call in debug_calls
)


Expand Down Expand Up @@ -1146,3 +1148,38 @@ def test_converter_logs_parsing_group_order(mock_load_dbt_graph, mock_logger):

# Verify that start comes before end
assert group_start_idx < group_end_idx


@patch("cosmos.converter.DbtGraph.load")
def test_telemetry_metadata_storage(mock_load_dbt_graph):
"""Test that telemetry metadata is stored correctly in DAG params."""
dag = DAG("test_dag_telemetry", start_date=datetime(2024, 1, 1))

project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT)
profile_config = ProfileConfig(
profile_name="test",
target_name="test",
profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}),
)
execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL)
render_config = RenderConfig()

_ = DbtToAirflowConverter(
dag=dag,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=render_config,
)

# Verify metadata is stored in dag.params
assert "__cosmos_telemetry_metadata__" in dag.params
metadata = dag.params["__cosmos_telemetry_metadata__"]

# Verify expected metadata keys are present
assert "used_automatic_load_mode" in metadata
assert "invocation_mode" in metadata
assert "install_deps" in metadata
assert "uses_node_converter" in metadata
assert "test_behavior" in metadata
assert "source_behavior" in metadata