diff --git a/cosmos/converter.py b/cosmos/converter.py index 1bd1d4c92d..23ee819b52 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -23,7 +23,7 @@ from cosmos import cache, settings from cosmos.airflow.graph import build_airflow_graph from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig -from cosmos.constants import ExecutionMode, LoadMode +from cosmos.constants import DbtResourceType, ExecutionMode, LoadMode from cosmos.dbt.graph import DbtGraph from cosmos.dbt.project import has_non_empty_dependencies_file from cosmos.dbt.selector import retrieve_by_label @@ -280,6 +280,9 @@ def __init__( cache_identifier = cache._create_cache_identifier(dag, task_group) cache_dir = cache._obtain_cache_dir_path(cache_identifier=cache_identifier) + # Store the initial load method before it gets resolved by dbt_graph.load() + initial_load_method = render_config.load_method + previous_time = time.perf_counter() self.dbt_graph = DbtGraph( project=project_config, @@ -294,6 +297,7 @@ def __init__( self.dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode) self._add_dbt_project_hash_to_dag_docs(dag) + self._store_cosmos_telemetry_metadata_on_dag(dag, render_config, project_config, initial_load_method) current_time = time.perf_counter() elapsed_time = current_time - previous_time @@ -372,3 +376,48 @@ def _add_dbt_project_hash_to_dag_docs(self, dag: DAG | None) -> None: logger.debug(f"Appended dbt project hash {dbt_project_hash} to DAG {dag.dag_id} documentation") except Exception as e: logger.warning(f"Failed to append dbt project hash to DAG documentation: {e}") + + def _store_cosmos_telemetry_metadata_on_dag( # noqa: C901 + self, + dag: DAG | None, + render_config: RenderConfig, + project_config: ProjectConfig, + initial_load_method: LoadMode, + ) -> None: + """ + Store Cosmos configuration metadata on the DAG for telemetry purposes. + + This metadata is used by the DAG run listener to emit telemetry metrics + about how Cosmos is configured and used. + + :param dag: The Airflow DAG to store metadata on. If None, no action is taken. + :param render_config: The render configuration + :param project_config: The project configuration + """ + if dag is None: + return + + metadata: dict[str, Any] = {"used_automatic_load_mode": initial_load_method == LoadMode.AUTOMATIC} + + if render_config is not None: + metadata["invocation_mode"] = str(render_config.invocation_mode.value) + metadata["install_deps"] = ( + bool(render_config.dbt_deps) if render_config.dbt_deps is not None else project_config.install_dbt_deps + ) + metadata["uses_node_converter"] = render_config.node_converters is not None + metadata["test_behavior"] = str(render_config.test_behavior.value) + metadata["source_behavior"] = str(render_config.source_rendering_behavior.value) + + if self.dbt_graph is not None: + metadata["actual_load_mode"] = str(self.dbt_graph.load_method.value) + metadata["total_dbt_models"] = sum( + 1 for node in self.dbt_graph.nodes.values() if node.resource_type == DbtResourceType.MODEL + ) + metadata["selected_dbt_models"] = sum( + 1 for node in self.dbt_graph.filtered_nodes.values() if node.resource_type == DbtResourceType.MODEL + ) + + # Store metadata in dag.params which is preserved during serialization + # Using a key that's unlikely to conflict with user params + dag.params["__cosmos_telemetry_metadata__"] = metadata + logger.debug(f"Stored Cosmos telemetry metadata in DAG {dag.dag_id} params: {metadata}") diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 4b73481845..5d8ca2ed49 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -1,7 +1,7 @@ from __future__ import annotations import hashlib -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from airflow.listeners import hookimpl @@ -57,6 +57,17 @@ def get_execution_modes(dag: DAG) -> str: return "__".join(sorted(modes)) +def get_cosmos_telemetry_metadata(dag: DAG) -> dict[str, Any]: + """ + Extract Cosmos telemetry metadata from a DAG. + + Returns the metadata dictionary stored by the converter in dag.params, or an empty dict if not present. + """ + # Metadata is stored in dag.params to survive serialization + metadata = dag.params.get("__cosmos_telemetry_metadata__", {}) + return metadata if isinstance(metadata, dict) else {} + + @hookimpl def on_dag_run_success(dag_run: DagRun, msg: str) -> None: logger.debug("Running on_dag_run_success") @@ -82,6 +93,10 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: "execution_modes": get_execution_modes(serialized_dag), } + # Add Cosmos telemetry metadata if available + cosmos_metadata = get_cosmos_telemetry_metadata(serialized_dag) + additional_telemetry_metrics.update(cosmos_metadata) + telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) logger.debug("Completed on_dag_run_success") @@ -111,5 +126,9 @@ def on_dag_run_failed(dag_run: DagRun, msg: str) -> None: "execution_modes": get_execution_modes(serialized_dag), } + # Add Cosmos telemetry metadata if available + cosmos_metadata = get_cosmos_telemetry_metadata(serialized_dag) + additional_telemetry_metrics.update(cosmos_metadata) + telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) logger.debug("Completed on_dag_run_failed") diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index 5ad94430b5..1ff3162d78 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -12,7 +12,8 @@ from cosmos import DbtRunLocalOperator, ProfileConfig, ProjectConfig from cosmos.airflow.dag import DbtDag from cosmos.airflow.task_group import DbtTaskGroup -from cosmos.constants import AIRFLOW_VERSION +from cosmos.config import ExecutionConfig, RenderConfig +from cosmos.constants import AIRFLOW_VERSION, InvocationMode, LoadMode, SourceRenderingBehavior, TestBehavior from cosmos.listeners.dag_run_listener import on_dag_run_failed, on_dag_run_success, total_cosmos_tasks from cosmos.profiles import PostgresUserPasswordProfileMapping @@ -183,3 +184,117 @@ def test_on_dag_run_failed(mock_emit_usage_metrics_if_enabled, caplog): assert "Running on_dag_run_failed" in caplog.text assert "Completed on_dag_run_failed" in caplog.text assert mock_emit_usage_metrics_if_enabled.call_count == 1 + + +@pytest.mark.skipif( + AIRFLOW_VERSION >= Version("3.1.0"), + reason="TODO: Fix create_dag_run to work with AF 3.1 and remove this skip.", +) +@pytest.mark.integration +@patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") +def test_on_dag_run_success_with_telemetry_metadata(mock_emit_usage_metrics_if_enabled, caplog): + """Test that DAG run success includes Cosmos telemetry metadata.""" + caplog.set_level(logging.DEBUG) + + dag = DbtDag( + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + execution_config=ExecutionConfig(invocation_mode=InvocationMode.SUBPROCESS), + render_config=RenderConfig( + load_method=LoadMode.AUTOMATIC, + test_behavior=TestBehavior.AFTER_EACH, + source_rendering_behavior=SourceRenderingBehavior.NONE, + ), + operator_args={"install_deps": True}, + start_date=datetime(2023, 1, 1), + dag_id="cosmos_dag_with_metadata", + ) + run_id = str(uuid.uuid1()) + run_after = datetime.now(timezone.utc) - timedelta(seconds=1) + dag_run = create_dag_run(dag, run_id, run_after) + + on_dag_run_success(dag_run, msg="test success") + assert mock_emit_usage_metrics_if_enabled.call_count == 1 + + # Verify telemetry call includes new metrics + call_args = mock_emit_usage_metrics_if_enabled.call_args + metrics = call_args[0][1] # Second argument is the metrics dict + + # Check that Cosmos metadata fields are present + assert "used_automatic_load_mode" in metrics + assert "actual_load_mode" in metrics + assert "invocation_mode" in metrics + assert "install_deps" in metrics + assert "uses_node_converter" in metrics + assert "test_behavior" in metrics + assert "source_behavior" in metrics + assert "total_dbt_models" in metrics + assert "selected_dbt_models" in metrics + + # Verify some expected values + assert metrics["used_automatic_load_mode"] is True + assert metrics["invocation_mode"] == "dbt_runner" + assert metrics["install_deps"] is True + assert metrics["uses_node_converter"] is False + assert metrics["test_behavior"] == "after_each" + assert metrics["source_behavior"] == "none" + + +@pytest.mark.skipif( + AIRFLOW_VERSION >= Version("3.1.0"), + reason="TODO: Fix create_dag_run to work with AF 3.1 and remove this skip.", +) +@pytest.mark.integration +@patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") +def test_on_dag_run_failed_with_telemetry_metadata(mock_emit_usage_metrics_if_enabled, caplog): + """Test that DAG run failure includes Cosmos telemetry metadata.""" + caplog.set_level(logging.DEBUG) + + dag = DbtDag( + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json", + ), + profile_config=profile_config, + execution_config=ExecutionConfig(invocation_mode=InvocationMode.DBT_RUNNER), + render_config=RenderConfig( + load_method=LoadMode.DBT_MANIFEST, + test_behavior=TestBehavior.NONE, + source_rendering_behavior=SourceRenderingBehavior.ALL, + dbt_deps=False, + ), + operator_args={"install_deps": True}, + start_date=datetime(2023, 1, 1), + dag_id="cosmos_dag_with_metadata_failed", + ) + run_id = str(uuid.uuid1()) + run_after = datetime.now(timezone.utc) - timedelta(seconds=1) + dag_run = create_dag_run(dag, run_id, run_after) + + on_dag_run_failed(dag_run, msg="test failed") + assert mock_emit_usage_metrics_if_enabled.call_count == 1 + + # Verify telemetry call includes new metrics + call_args = mock_emit_usage_metrics_if_enabled.call_args + metrics = call_args[0][1] # Second argument is the metrics dict + + # Check that Cosmos metadata fields are present + assert "used_automatic_load_mode" in metrics + assert "actual_load_mode" in metrics + assert "invocation_mode" in metrics + assert "install_deps" in metrics + assert "uses_node_converter" in metrics + assert "test_behavior" in metrics + assert "source_behavior" in metrics + assert "total_dbt_models" in metrics + assert "selected_dbt_models" in metrics + + # Verify some expected values for failed case + assert metrics["used_automatic_load_mode"] is False + assert metrics["invocation_mode"] == "dbt_runner" + assert metrics["install_deps"] is False + assert metrics["uses_node_converter"] is False + assert metrics["test_behavior"] == "none" + assert metrics["source_behavior"] == "all" diff --git a/tests/test_converter.py b/tests/test_converter.py index af4fdf7369..d9ed637fb3 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -1112,8 +1112,10 @@ def test_dag_versioning_successful_logging(mock_load_dbt_graph, mock_hash_func, execution_config=execution_config, ) - mock_logger.debug.assert_called_once_with( - "Appended dbt project hash test_hash_123 to DAG test_dag_logging documentation" + # Check that the hash logging call was made (there are multiple debug calls now) + debug_calls = [str(call) for call in mock_logger.debug.call_args_list] + assert any( + "Appended dbt project hash test_hash_123 to DAG test_dag_logging documentation" in call for call in debug_calls ) @@ -1146,3 +1148,38 @@ def test_converter_logs_parsing_group_order(mock_load_dbt_graph, mock_logger): # Verify that start comes before end assert group_start_idx < group_end_idx + + +@patch("cosmos.converter.DbtGraph.load") +def test_telemetry_metadata_storage(mock_load_dbt_graph): + """Test that telemetry metadata is stored correctly in DAG params.""" + dag = DAG("test_dag_telemetry", start_date=datetime(2024, 1, 1)) + + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}), + ) + execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL) + render_config = RenderConfig() + + _ = DbtToAirflowConverter( + dag=dag, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + ) + + # Verify metadata is stored in dag.params + assert "__cosmos_telemetry_metadata__" in dag.params + metadata = dag.params["__cosmos_telemetry_metadata__"] + + # Verify expected metadata keys are present + assert "used_automatic_load_mode" in metadata + assert "invocation_mode" in metadata + assert "install_deps" in metadata + assert "uses_node_converter" in metadata + assert "test_behavior" in metadata + assert "source_behavior" in metadata