From 46ad92aed1b2baf4ed25e27d8eb64922acfbce55 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 19 Dec 2025 08:17:08 +0000 Subject: [PATCH 01/17] Add DAG run telemetry metadata collection Store and emit 9 new Cosmos configuration metrics on DAG runs: - used_automatic_load_mode: Whether LoadMode.AUTOMATIC was used - actual_load_mode: The resolved load method (e.g., dbt_ls, dbt_manifest) - invocation_mode: How dbt is invoked (subprocess, dbt_runner) - install_deps: Whether dependency installation is enabled - uses_node_converter: Whether custom node converters are used - test_behavior: Test rendering behavior (after_each, none, etc.) - source_behavior: Source rendering behavior (all, none, etc.) - total_dbt_models: Total number of dbt models in the project - selected_dbt_models: Number of models selected after filtering Implementation: - Added _store_cosmos_telemetry_metadata_on_dag() in converter to store metadata on DAG object - Added get_cosmos_telemetry_metadata() helper in dag_run_listener to extract metadata - Updated on_dag_run_success and on_dag_run_failed hooks to include metadata in telemetry - Added comprehensive tests for both success and failure scenarios - Fixed existing test that was affected by additional debug logging --- cosmos/converter.py | 96 +++++++++++++++++++ cosmos/listeners/dag_run_listener.py | 19 +++- tests/listeners/test_dag_run_listener.py | 115 ++++++++++++++++++++++- tests/test_converter.py | 6 +- 4 files changed, 232 insertions(+), 4 deletions(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index 854d4f2710..b88dbc5a41 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -33,6 +33,8 @@ logger = get_logger(__name__) +from cosmos.constants import DbtResourceType + def migrate_to_new_interface( execution_config: ExecutionConfig, project_config: ProjectConfig, render_config: RenderConfig @@ -279,6 +281,9 @@ def __init__( cache_identifier = cache._create_cache_identifier(dag, task_group) cache_dir = cache._obtain_cache_dir_path(cache_identifier=cache_identifier) + # Store the initial load method before it gets resolved by dbt_graph.load() + initial_load_method = render_config.load_method + previous_time = time.perf_counter() self.dbt_graph = DbtGraph( project=project_config, @@ -293,6 +298,9 @@ def __init__( self.dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode) self._add_dbt_project_hash_to_dag_docs(dag) + self._store_cosmos_telemetry_metadata_on_dag( + dag, render_config, execution_config, project_config, operator_args, initial_load_method + ) current_time = time.perf_counter() elapsed_time = current_time - previous_time @@ -370,3 +378,91 @@ def _add_dbt_project_hash_to_dag_docs(self, dag: DAG | None) -> None: logger.debug(f"Appended dbt project hash {dbt_project_hash} to DAG {dag.dag_id} documentation") except Exception as e: logger.warning(f"Failed to append dbt project hash to DAG documentation: {e}") + + def _store_cosmos_telemetry_metadata_on_dag( # noqa: C901 + self, + dag: DAG | None, + render_config: RenderConfig, + execution_config: ExecutionConfig, + project_config: ProjectConfig, + operator_args: dict[str, Any], + initial_load_method: LoadMode, + ) -> None: + """ + Store Cosmos configuration metadata on the DAG for telemetry purposes. + + This metadata is used by the DAG run listener to emit telemetry metrics + about how Cosmos is configured and used. + + :param dag: The Airflow DAG to store metadata on. If None, no action is taken. + :param render_config: The render configuration + :param execution_config: The execution configuration + :param project_config: The project configuration + :param operator_args: The operator arguments + :param initial_load_method: The load method specified by the user (before automatic resolution) + """ + if dag is None: + return + + metadata = {} + + # Compute each metric individually with error handling + try: + metadata["used_automatic_load_mode"] = initial_load_method == LoadMode.AUTOMATIC + except Exception as e: + logger.warning(f"Failed to compute used_automatic_load_mode: {e}") + + try: + metadata["actual_load_mode"] = self.dbt_graph.load_method.value + except Exception as e: + logger.warning(f"Failed to compute actual_load_mode: {e}") + + try: + invocation_mode = None + if execution_config.invocation_mode: + invocation_mode = execution_config.invocation_mode.value + elif render_config.invocation_mode: + invocation_mode = render_config.invocation_mode.value + metadata["invocation_mode"] = invocation_mode + except Exception as e: + logger.warning(f"Failed to compute invocation_mode: {e}") + + try: + install_deps = operator_args.get("install_deps") + if install_deps is None: + install_deps = project_config.install_dbt_deps + metadata["install_deps"] = bool(install_deps) if install_deps is not None else True + except Exception as e: + logger.warning(f"Failed to compute install_deps: {e}") + + try: + metadata["uses_node_converter"] = render_config.node_converters is not None + except Exception as e: + logger.warning(f"Failed to compute uses_node_converter: {e}") + + try: + metadata["test_behavior"] = render_config.test_behavior.value + except Exception as e: + logger.warning(f"Failed to compute test_behavior: {e}") + + try: + metadata["source_behavior"] = render_config.source_rendering_behavior.value + except Exception as e: + logger.warning(f"Failed to compute source_behavior: {e}") + + try: + metadata["total_dbt_models"] = sum( + 1 for node in self.dbt_graph.nodes.values() if node.resource_type == DbtResourceType.MODEL + ) + except Exception as e: + logger.warning(f"Failed to compute total_dbt_models: {e}") + + try: + metadata["selected_dbt_models"] = sum( + 1 for node in self.dbt_graph.filtered_nodes.values() if node.resource_type == DbtResourceType.MODEL + ) + except Exception as e: + logger.warning(f"Failed to compute selected_dbt_models: {e}") + + dag._cosmos_telemetry_metadata = metadata + logger.debug(f"Stored Cosmos telemetry metadata on DAG {dag.dag_id}: {metadata}") diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 4b73481845..6715b01df5 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -1,7 +1,7 @@ from __future__ import annotations import hashlib -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from airflow.listeners import hookimpl @@ -57,6 +57,15 @@ def get_execution_modes(dag: DAG) -> str: return "__".join(sorted(modes)) +def get_cosmos_telemetry_metadata(dag: DAG) -> dict[str, Any]: + """ + Extract Cosmos telemetry metadata from a DAG. + + Returns the metadata dictionary stored by the converter, or an empty dict if not present. + """ + return getattr(dag, "_cosmos_telemetry_metadata", {}) + + @hookimpl def on_dag_run_success(dag_run: DagRun, msg: str) -> None: logger.debug("Running on_dag_run_success") @@ -82,6 +91,10 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: "execution_modes": get_execution_modes(serialized_dag), } + # Add Cosmos telemetry metadata if available + cosmos_metadata = get_cosmos_telemetry_metadata(serialized_dag) + additional_telemetry_metrics.update(cosmos_metadata) + telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) logger.debug("Completed on_dag_run_success") @@ -111,5 +124,9 @@ def on_dag_run_failed(dag_run: DagRun, msg: str) -> None: "execution_modes": get_execution_modes(serialized_dag), } + # Add Cosmos telemetry metadata if available + cosmos_metadata = get_cosmos_telemetry_metadata(serialized_dag) + additional_telemetry_metrics.update(cosmos_metadata) + telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) logger.debug("Completed on_dag_run_failed") diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index 5ad94430b5..1b1d118e00 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -12,7 +12,8 @@ from cosmos import DbtRunLocalOperator, ProfileConfig, ProjectConfig from cosmos.airflow.dag import DbtDag from cosmos.airflow.task_group import DbtTaskGroup -from cosmos.constants import AIRFLOW_VERSION +from cosmos.config import ExecutionConfig, RenderConfig +from cosmos.constants import AIRFLOW_VERSION, InvocationMode, LoadMode, SourceRenderingBehavior, TestBehavior from cosmos.listeners.dag_run_listener import on_dag_run_failed, on_dag_run_success, total_cosmos_tasks from cosmos.profiles import PostgresUserPasswordProfileMapping @@ -183,3 +184,115 @@ def test_on_dag_run_failed(mock_emit_usage_metrics_if_enabled, caplog): assert "Running on_dag_run_failed" in caplog.text assert "Completed on_dag_run_failed" in caplog.text assert mock_emit_usage_metrics_if_enabled.call_count == 1 + + +@pytest.mark.skipif( + AIRFLOW_VERSION >= Version("3.1.0"), + reason="TODO: Fix create_dag_run to work with AF 3.1 and remove this skip.", +) +@pytest.mark.integration +@patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") +def test_on_dag_run_success_with_telemetry_metadata(mock_emit_usage_metrics_if_enabled, caplog): + """Test that DAG run success includes Cosmos telemetry metadata.""" + caplog.set_level(logging.DEBUG) + + dag = DbtDag( + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + execution_config=ExecutionConfig(invocation_mode=InvocationMode.SUBPROCESS), + render_config=RenderConfig( + load_method=LoadMode.AUTOMATIC, + test_behavior=TestBehavior.AFTER_EACH, + source_rendering_behavior=SourceRenderingBehavior.NONE, + ), + operator_args={"install_deps": True}, + start_date=datetime(2023, 1, 1), + dag_id="cosmos_dag_with_metadata", + ) + run_id = str(uuid.uuid1()) + run_after = datetime.now(timezone.utc) - timedelta(seconds=1) + dag_run = create_dag_run(dag, run_id, run_after) + + on_dag_run_success(dag_run, msg="test success") + assert mock_emit_usage_metrics_if_enabled.call_count == 1 + + # Verify telemetry call includes new metrics + call_args = mock_emit_usage_metrics_if_enabled.call_args + metrics = call_args[0][1] # Second argument is the metrics dict + + # Check that Cosmos metadata fields are present + assert "used_automatic_load_mode" in metrics + assert "actual_load_mode" in metrics + assert "invocation_mode" in metrics + assert "install_deps" in metrics + assert "uses_node_converter" in metrics + assert "test_behavior" in metrics + assert "source_behavior" in metrics + assert "total_dbt_models" in metrics + assert "selected_dbt_models" in metrics + + # Verify some expected values + assert metrics["used_automatic_load_mode"] is True + assert metrics["invocation_mode"] == "subprocess" + assert metrics["install_deps"] is True + assert metrics["uses_node_converter"] is False + assert metrics["test_behavior"] == "after_each" + assert metrics["source_behavior"] == "none" + + +@pytest.mark.skipif( + AIRFLOW_VERSION >= Version("3.1.0"), + reason="TODO: Fix create_dag_run to work with AF 3.1 and remove this skip.", +) +@pytest.mark.integration +@patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") +def test_on_dag_run_failed_with_telemetry_metadata(mock_emit_usage_metrics_if_enabled, caplog): + """Test that DAG run failure includes Cosmos telemetry metadata.""" + caplog.set_level(logging.DEBUG) + + dag = DbtDag( + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + execution_config=ExecutionConfig(invocation_mode=InvocationMode.DBT_RUNNER), + render_config=RenderConfig( + load_method=LoadMode.DBT_LS, + test_behavior=TestBehavior.NONE, + source_rendering_behavior=SourceRenderingBehavior.ALL, + ), + operator_args={"install_deps": False}, + start_date=datetime(2023, 1, 1), + dag_id="cosmos_dag_with_metadata_failed", + ) + run_id = str(uuid.uuid1()) + run_after = datetime.now(timezone.utc) - timedelta(seconds=1) + dag_run = create_dag_run(dag, run_id, run_after) + + on_dag_run_failed(dag_run, msg="test failed") + assert mock_emit_usage_metrics_if_enabled.call_count == 1 + + # Verify telemetry call includes new metrics + call_args = mock_emit_usage_metrics_if_enabled.call_args + metrics = call_args[0][1] # Second argument is the metrics dict + + # Check that Cosmos metadata fields are present + assert "used_automatic_load_mode" in metrics + assert "actual_load_mode" in metrics + assert "invocation_mode" in metrics + assert "install_deps" in metrics + assert "uses_node_converter" in metrics + assert "test_behavior" in metrics + assert "source_behavior" in metrics + assert "total_dbt_models" in metrics + assert "selected_dbt_models" in metrics + + # Verify some expected values for failed case + assert metrics["used_automatic_load_mode"] is False + assert metrics["invocation_mode"] == "dbt_runner" + assert metrics["install_deps"] is False + assert metrics["uses_node_converter"] is False + assert metrics["test_behavior"] == "none" + assert metrics["source_behavior"] == "all" diff --git a/tests/test_converter.py b/tests/test_converter.py index 8cf22834ae..3e815cf01b 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -1112,6 +1112,8 @@ def test_dag_versioning_successful_logging(mock_load_dbt_graph, mock_hash_func, execution_config=execution_config, ) - mock_logger.debug.assert_called_once_with( - "Appended dbt project hash test_hash_123 to DAG test_dag_logging documentation" + # Check that the hash logging call was made (there are multiple debug calls now) + debug_calls = [str(call) for call in mock_logger.debug.call_args_list] + assert any( + "Appended dbt project hash test_hash_123 to DAG test_dag_logging documentation" in call for call in debug_calls ) From c09570ae6bfddaabbd82d6c2136da4eabc70b4bb Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 23 Dec 2025 11:51:39 +0000 Subject: [PATCH 02/17] Fix DAG run telemetry metadata serialization issue The metadata was stored as a custom attribute (_cosmos_telemetry_metadata) which is not preserved during Airflow DAG serialization. When the dag_run_listener receives the DAG, it gets a SerializedDAG where custom attributes are lost, resulting in an empty metadata dictionary. Solution: Store metadata in dag.params which is serialized by Airflow and accessible in the listener. Using key __cosmos_telemetry_metadata__ to avoid conflicts with user-defined params. Changes: - Store metadata in dag.params[__cosmos_telemetry_metadata__] in converter - Retrieve from dag.params.get(__cosmos_telemetry_metadata__, {}) in listener - Updated docstrings to reflect the new storage mechanism --- cosmos/converter.py | 6 ++++-- cosmos/listeners/dag_run_listener.py | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index b88dbc5a41..2e128ddcbc 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -464,5 +464,7 @@ def _store_cosmos_telemetry_metadata_on_dag( # noqa: C901 except Exception as e: logger.warning(f"Failed to compute selected_dbt_models: {e}") - dag._cosmos_telemetry_metadata = metadata - logger.debug(f"Stored Cosmos telemetry metadata on DAG {dag.dag_id}: {metadata}") + # Store metadata in dag.params which is preserved during serialization + # Using a key that's unlikely to conflict with user params + dag.params["__cosmos_telemetry_metadata__"] = metadata + logger.debug(f"Stored Cosmos telemetry metadata in DAG {dag.dag_id} params: {metadata}") diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 6715b01df5..5d8ca2ed49 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -61,9 +61,11 @@ def get_cosmos_telemetry_metadata(dag: DAG) -> dict[str, Any]: """ Extract Cosmos telemetry metadata from a DAG. - Returns the metadata dictionary stored by the converter, or an empty dict if not present. + Returns the metadata dictionary stored by the converter in dag.params, or an empty dict if not present. """ - return getattr(dag, "_cosmos_telemetry_metadata", {}) + # Metadata is stored in dag.params to survive serialization + metadata = dag.params.get("__cosmos_telemetry_metadata__", {}) + return metadata if isinstance(metadata, dict) else {} @hookimpl From 7cee4179469c3169f566751ebae32e22b240a202 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 24 Dec 2025 10:32:15 +0000 Subject: [PATCH 03/17] Fix test validation error in dag_run_listener test --- tests/listeners/test_dag_run_listener.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index 1b1d118e00..a4be6a9fcf 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -260,6 +260,7 @@ def test_on_dag_run_failed_with_telemetry_metadata(mock_emit_usage_metrics_if_en execution_config=ExecutionConfig(invocation_mode=InvocationMode.DBT_RUNNER), render_config=RenderConfig( load_method=LoadMode.DBT_LS, + dbt_deps=False, test_behavior=TestBehavior.NONE, source_rendering_behavior=SourceRenderingBehavior.ALL, ), From 7405a3b8e55ee2c8b9ea521c4db2dd4213462b9d Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 24 Dec 2025 10:45:52 +0000 Subject: [PATCH 04/17] Ensure telemetry metadata values are JSON-serializable --- cosmos/converter.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index 2e128ddcbc..3e9b2aef5b 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -413,16 +413,16 @@ def _store_cosmos_telemetry_metadata_on_dag( # noqa: C901 logger.warning(f"Failed to compute used_automatic_load_mode: {e}") try: - metadata["actual_load_mode"] = self.dbt_graph.load_method.value + metadata["actual_load_mode"] = str(self.dbt_graph.load_method.value) except Exception as e: logger.warning(f"Failed to compute actual_load_mode: {e}") try: invocation_mode = None if execution_config.invocation_mode: - invocation_mode = execution_config.invocation_mode.value + invocation_mode = str(execution_config.invocation_mode.value) elif render_config.invocation_mode: - invocation_mode = render_config.invocation_mode.value + invocation_mode = str(render_config.invocation_mode.value) metadata["invocation_mode"] = invocation_mode except Exception as e: logger.warning(f"Failed to compute invocation_mode: {e}") @@ -441,12 +441,12 @@ def _store_cosmos_telemetry_metadata_on_dag( # noqa: C901 logger.warning(f"Failed to compute uses_node_converter: {e}") try: - metadata["test_behavior"] = render_config.test_behavior.value + metadata["test_behavior"] = str(render_config.test_behavior.value) except Exception as e: logger.warning(f"Failed to compute test_behavior: {e}") try: - metadata["source_behavior"] = render_config.source_rendering_behavior.value + metadata["source_behavior"] = str(render_config.source_rendering_behavior.value) except Exception as e: logger.warning(f"Failed to compute source_behavior: {e}") From 7852e0671b91e6141bd65571b29ee48c0e4f7878 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 24 Dec 2025 11:09:48 +0000 Subject: [PATCH 05/17] Fix integration test to use manifest load mode --- tests/listeners/test_dag_run_listener.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index a4be6a9fcf..e550f64ca3 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -259,8 +259,7 @@ def test_on_dag_run_failed_with_telemetry_metadata(mock_emit_usage_metrics_if_en profile_config=profile_config, execution_config=ExecutionConfig(invocation_mode=InvocationMode.DBT_RUNNER), render_config=RenderConfig( - load_method=LoadMode.DBT_LS, - dbt_deps=False, + load_method=LoadMode.DBT_MANIFEST, test_behavior=TestBehavior.NONE, source_rendering_behavior=SourceRenderingBehavior.ALL, ), From ae15d02c8a5ffa1c94a7085e4b2b538189d38a40 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 24 Dec 2025 12:00:17 +0000 Subject: [PATCH 06/17] Add manifest_path to failed telemetry test --- tests/listeners/test_dag_run_listener.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index e550f64ca3..953f0a0da1 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -255,6 +255,7 @@ def test_on_dag_run_failed_with_telemetry_metadata(mock_emit_usage_metrics_if_en dag = DbtDag( project_config=ProjectConfig( DBT_ROOT_PATH / "jaffle_shop", + manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json", ), profile_config=profile_config, execution_config=ExecutionConfig(invocation_mode=InvocationMode.DBT_RUNNER), From 2f4078e367d4865786e009fe5019274194f2e3dd Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 24 Dec 2025 13:31:46 +0000 Subject: [PATCH 07/17] Add tests for telemetry metadata exception handlers Covers exception handling in _store_cosmos_telemetry_metadata_on_dag to ensure graceful degradation when metrics computation fails. Tests verify that 8 out of 9 exception handlers work correctly for actual_load_mode, invocation_mode, install_deps, uses_node_converter, test_behavior, source_behavior, total_dbt_models, and selected_dbt_models. --- tests/test_converter.py | 135 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 134 insertions(+), 1 deletion(-) diff --git a/tests/test_converter.py b/tests/test_converter.py index 3e815cf01b..ed08324e9b 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -1,7 +1,7 @@ import tempfile from datetime import datetime from pathlib import Path -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, Mock, PropertyMock, patch import pytest from airflow.models import DAG @@ -1117,3 +1117,136 @@ def test_dag_versioning_successful_logging(mock_load_dbt_graph, mock_hash_func, assert any( "Appended dbt project hash test_hash_123 to DAG test_dag_logging documentation" in call for call in debug_calls ) + + +@patch("cosmos.converter.logger") +@patch("cosmos.converter.DbtGraph.load") +def test_telemetry_metadata_exception_handling(mock_load_dbt_graph, mock_logger): + """Test that telemetry metadata computation handles exceptions gracefully.""" + dag = DAG("test_dag_telemetry", start_date=datetime(2024, 1, 1)) + + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}), + ) + execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL) + render_config = RenderConfig() + + # Create converter first + converter = DbtToAirflowConverter( + dag=dag, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + ) + + # Create a mock graph where properties raise AttributeError when converting to string + mock_graph = Mock() + + # For load_method.value - create a Mock where __str__ raises + mock_value = Mock() + mock_value.__str__ = Mock(side_effect=AttributeError("load_method error")) + mock_load_method = Mock() + mock_load_method.value = mock_value + mock_graph.load_method = mock_load_method + + # For nodes.values() - make it raise when called + mock_graph.nodes = Mock() + mock_graph.nodes.values = Mock(side_effect=AttributeError("nodes error")) + + # For filtered_nodes.values() - make it raise when called + mock_graph.filtered_nodes = Mock() + mock_graph.filtered_nodes.values = Mock(side_effect=AttributeError("filtered_nodes error")) + + converter.dbt_graph = mock_graph + + # Create mock render_config where properties raise + mock_render_config = Mock() + + # invocation_mode.value - __str__ raises + mock_invocation_value = Mock() + mock_invocation_value.__str__ = Mock(side_effect=AttributeError("invocation error")) + mock_invocation = Mock() + mock_invocation.value = mock_invocation_value + # Make invocation_mode truthy so the if condition passes + mock_invocation.__bool__ = Mock(return_value=True) + mock_render_config.invocation_mode = mock_invocation + + # Create a class that raises AttributeError for specific properties + class MockRenderConfigWithRaisingProperties: + @property + def node_converters(self): + raise AttributeError("node_converters error") + + @property + def test_behavior(self): + raise AttributeError("test_behavior error") + + @property + def source_rendering_behavior(self): + raise AttributeError("source_behavior error") + + mock_render_config_final = Mock(wraps=MockRenderConfigWithRaisingProperties()) + mock_render_config_final.invocation_mode = mock_render_config.invocation_mode + + # test_behavior.value - __str__ raises + mock_test_value = Mock() + mock_test_value.__str__ = Mock(side_effect=AttributeError("test_behavior error")) + mock_test = Mock() + mock_test.value = mock_test_value + mock_render_config.test_behavior = mock_test + + # source_rendering_behavior.value - __str__ raises + mock_source_value = Mock() + mock_source_value.__str__ = Mock(side_effect=AttributeError("source_behavior error")) + mock_source = Mock() + mock_source.value = mock_source_value + mock_render_config.source_rendering_behavior = mock_source + + # Mock execution_config - invocation_mode should be falsy initially so render_config is checked + mock_execution_config = Mock() + mock_execution_config.invocation_mode = None + + # Mock project_config - install_dbt_deps raises + mock_project_config = Mock() + type(mock_project_config).install_dbt_deps = PropertyMock(side_effect=AttributeError("install_dbt_deps error")) + + # Mock operator_args - .get() raises + mock_operator_args = Mock() + mock_operator_args.get = Mock(side_effect=AttributeError("operator_args error")) + + # Call _store_cosmos_telemetry_metadata_on_dag with mocked configs + # Create a mock load mode that raises on comparison + mock_load_mode_param = Mock() + mock_load_mode_param.__eq__ = Mock(side_effect=AttributeError("load_method comparison error")) + + converter._store_cosmos_telemetry_metadata_on_dag( + dag=dag, + render_config=mock_render_config_final, + execution_config=mock_execution_config, + project_config=mock_project_config, + operator_args=mock_operator_args, + initial_load_method=mock_load_mode_param, + ) + + # Verify warning logs were called for exceptions + warning_calls = [str(call) for call in mock_logger.warning.call_args_list] + + # Check that all exception handlers logged warnings + assert any( + "Failed to compute used_automatic_load_mode" in call for call in warning_calls + ), f"Warnings: {warning_calls}" + assert any("Failed to compute actual_load_mode" in call for call in warning_calls), f"Warnings: {warning_calls}" + assert any("Failed to compute invocation_mode" in call for call in warning_calls), f"Warnings: {warning_calls}" + assert any("Failed to compute install_deps" in call for call in warning_calls), f"Warnings: {warning_calls}" + assert any("Failed to compute uses_node_converter" in call for call in warning_calls), f"Warnings: {warning_calls}" + assert any("Failed to compute test_behavior" in call for call in warning_calls), f"Warnings: {warning_calls}" + assert any("Failed to compute source_behavior" in call for call in warning_calls), f"Warnings: {warning_calls}" + assert any("Failed to compute total_dbt_models" in call for call in warning_calls), f"Warnings: {warning_calls}" + assert any("Failed to compute selected_dbt_models" in call for call in warning_calls), f"Warnings: {warning_calls}" + + # Metadata should still be stored even with failures + assert "__cosmos_telemetry_metadata__" in dag.params From 32d662c65b142d90908074605140b0c5061ce378 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 30 Dec 2025 11:07:19 +0000 Subject: [PATCH 08/17] Address review comments --- cosmos/converter.py | 30 ++++++++++++------------------ tests/test_converter.py | 13 ++++--------- 2 files changed, 16 insertions(+), 27 deletions(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index 3e9b2aef5b..9e07abeaee 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -299,7 +299,7 @@ def __init__( self._add_dbt_project_hash_to_dag_docs(dag) self._store_cosmos_telemetry_metadata_on_dag( - dag, render_config, execution_config, project_config, operator_args, initial_load_method + dag, render_config, project_config, operator_args, initial_load_method ) current_time = time.perf_counter() @@ -383,7 +383,6 @@ def _store_cosmos_telemetry_metadata_on_dag( # noqa: C901 self, dag: DAG | None, render_config: RenderConfig, - execution_config: ExecutionConfig, project_config: ProjectConfig, operator_args: dict[str, Any], initial_load_method: LoadMode, @@ -396,7 +395,6 @@ def _store_cosmos_telemetry_metadata_on_dag( # noqa: C901 :param dag: The Airflow DAG to store metadata on. If None, no action is taken. :param render_config: The render configuration - :param execution_config: The execution configuration :param project_config: The project configuration :param operator_args: The operator arguments :param initial_load_method: The load method specified by the user (before automatic resolution) @@ -409,59 +407,55 @@ def _store_cosmos_telemetry_metadata_on_dag( # noqa: C901 # Compute each metric individually with error handling try: metadata["used_automatic_load_mode"] = initial_load_method == LoadMode.AUTOMATIC - except Exception as e: + except (AttributeError, TypeError) as e: logger.warning(f"Failed to compute used_automatic_load_mode: {e}") try: metadata["actual_load_mode"] = str(self.dbt_graph.load_method.value) - except Exception as e: + except (AttributeError, TypeError) as e: logger.warning(f"Failed to compute actual_load_mode: {e}") try: invocation_mode = None - if execution_config.invocation_mode: - invocation_mode = str(execution_config.invocation_mode.value) - elif render_config.invocation_mode: + if render_config.invocation_mode: invocation_mode = str(render_config.invocation_mode.value) metadata["invocation_mode"] = invocation_mode - except Exception as e: + except (AttributeError, TypeError) as e: logger.warning(f"Failed to compute invocation_mode: {e}") try: - install_deps = operator_args.get("install_deps") - if install_deps is None: - install_deps = project_config.install_dbt_deps + install_deps = render_config.dbt_deps metadata["install_deps"] = bool(install_deps) if install_deps is not None else True - except Exception as e: + except (AttributeError, TypeError) as e: logger.warning(f"Failed to compute install_deps: {e}") try: metadata["uses_node_converter"] = render_config.node_converters is not None - except Exception as e: + except (AttributeError, TypeError) as e: logger.warning(f"Failed to compute uses_node_converter: {e}") try: metadata["test_behavior"] = str(render_config.test_behavior.value) - except Exception as e: + except (AttributeError, TypeError) as e: logger.warning(f"Failed to compute test_behavior: {e}") try: metadata["source_behavior"] = str(render_config.source_rendering_behavior.value) - except Exception as e: + except (AttributeError, TypeError) as e: logger.warning(f"Failed to compute source_behavior: {e}") try: metadata["total_dbt_models"] = sum( 1 for node in self.dbt_graph.nodes.values() if node.resource_type == DbtResourceType.MODEL ) - except Exception as e: + except (AttributeError, TypeError) as e: logger.warning(f"Failed to compute total_dbt_models: {e}") try: metadata["selected_dbt_models"] = sum( 1 for node in self.dbt_graph.filtered_nodes.values() if node.resource_type == DbtResourceType.MODEL ) - except Exception as e: + except (AttributeError, TypeError) as e: logger.warning(f"Failed to compute selected_dbt_models: {e}") # Store metadata in dag.params which is preserved during serialization diff --git a/tests/test_converter.py b/tests/test_converter.py index ed08324e9b..149213d157 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -1206,17 +1206,13 @@ def source_rendering_behavior(self): mock_source.value = mock_source_value mock_render_config.source_rendering_behavior = mock_source - # Mock execution_config - invocation_mode should be falsy initially so render_config is checked - mock_execution_config = Mock() - mock_execution_config.invocation_mode = None + # Mock render_config.dbt_deps to raise when accessed + dbt_deps_mock = PropertyMock(side_effect=AttributeError("dbt_deps error")) + type(mock_render_config_final).dbt_deps = dbt_deps_mock - # Mock project_config - install_dbt_deps raises mock_project_config = Mock() - type(mock_project_config).install_dbt_deps = PropertyMock(side_effect=AttributeError("install_dbt_deps error")) - # Mock operator_args - .get() raises - mock_operator_args = Mock() - mock_operator_args.get = Mock(side_effect=AttributeError("operator_args error")) + mock_operator_args = {} # Call _store_cosmos_telemetry_metadata_on_dag with mocked configs # Create a mock load mode that raises on comparison @@ -1226,7 +1222,6 @@ def source_rendering_behavior(self): converter._store_cosmos_telemetry_metadata_on_dag( dag=dag, render_config=mock_render_config_final, - execution_config=mock_execution_config, project_config=mock_project_config, operator_args=mock_operator_args, initial_load_method=mock_load_mode_param, From 8560cbc938f9d835b935c6ba2d20137e2d58768a Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 30 Dec 2025 16:42:01 +0530 Subject: [PATCH 09/17] Apply suggestion from @pankajkoti --- tests/listeners/test_dag_run_listener.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index 953f0a0da1..cb98be0728 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -186,10 +186,6 @@ def test_on_dag_run_failed(mock_emit_usage_metrics_if_enabled, caplog): assert mock_emit_usage_metrics_if_enabled.call_count == 1 -@pytest.mark.skipif( - AIRFLOW_VERSION >= Version("3.1.0"), - reason="TODO: Fix create_dag_run to work with AF 3.1 and remove this skip.", -) @pytest.mark.integration @patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") def test_on_dag_run_success_with_telemetry_metadata(mock_emit_usage_metrics_if_enabled, caplog): From 8cc7dcfcbe3bf90c8bbbcf68f2388bab668c72b0 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 30 Dec 2025 16:42:10 +0530 Subject: [PATCH 10/17] Apply suggestion from @pankajkoti --- tests/listeners/test_dag_run_listener.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index cb98be0728..b99b97c332 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -238,10 +238,6 @@ def test_on_dag_run_success_with_telemetry_metadata(mock_emit_usage_metrics_if_e assert metrics["source_behavior"] == "none" -@pytest.mark.skipif( - AIRFLOW_VERSION >= Version("3.1.0"), - reason="TODO: Fix create_dag_run to work with AF 3.1 and remove this skip.", -) @pytest.mark.integration @patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") def test_on_dag_run_failed_with_telemetry_metadata(mock_emit_usage_metrics_if_enabled, caplog): From 19f282d1c66cf580c5f5da8069ddb7421331de0d Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 30 Dec 2025 17:28:51 +0530 Subject: [PATCH 11/17] Fix tests --- tests/listeners/test_dag_run_listener.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index b99b97c332..e123e0cd0d 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -186,6 +186,10 @@ def test_on_dag_run_failed(mock_emit_usage_metrics_if_enabled, caplog): assert mock_emit_usage_metrics_if_enabled.call_count == 1 +@pytest.mark.skipif( + AIRFLOW_VERSION >= Version("3.1.0"), + reason="TODO: Fix create_dag_run to work with AF 3.1 and remove this skip.", +) @pytest.mark.integration @patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") def test_on_dag_run_success_with_telemetry_metadata(mock_emit_usage_metrics_if_enabled, caplog): @@ -231,13 +235,17 @@ def test_on_dag_run_success_with_telemetry_metadata(mock_emit_usage_metrics_if_e # Verify some expected values assert metrics["used_automatic_load_mode"] is True - assert metrics["invocation_mode"] == "subprocess" + assert metrics["invocation_mode"] == "dbt_runner" assert metrics["install_deps"] is True assert metrics["uses_node_converter"] is False assert metrics["test_behavior"] == "after_each" assert metrics["source_behavior"] == "none" +@pytest.mark.skipif( + AIRFLOW_VERSION >= Version("3.1.0"), + reason="TODO: Fix create_dag_run to work with AF 3.1 and remove this skip.", +) @pytest.mark.integration @patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") def test_on_dag_run_failed_with_telemetry_metadata(mock_emit_usage_metrics_if_enabled, caplog): @@ -285,7 +293,7 @@ def test_on_dag_run_failed_with_telemetry_metadata(mock_emit_usage_metrics_if_en # Verify some expected values for failed case assert metrics["used_automatic_load_mode"] is False assert metrics["invocation_mode"] == "dbt_runner" - assert metrics["install_deps"] is False + assert metrics["install_deps"] is True assert metrics["uses_node_converter"] is False assert metrics["test_behavior"] == "none" assert metrics["source_behavior"] == "all" From 4626bc8dcdf4152885704f22ff1845dca09407e6 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 30 Dec 2025 18:53:38 +0530 Subject: [PATCH 12/17] Address Copilot's review comment --- cosmos/converter.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index ccbeb974a1..5c503a4e7e 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -23,7 +23,7 @@ from cosmos import cache, settings from cosmos.airflow.graph import build_airflow_graph from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig -from cosmos.constants import ExecutionMode, LoadMode +from cosmos.constants import DbtResourceType, ExecutionMode, LoadMode from cosmos.dbt.graph import DbtGraph from cosmos.dbt.project import has_non_empty_dependencies_file from cosmos.dbt.selector import retrieve_by_label @@ -33,8 +33,6 @@ logger = get_logger(__name__) -from cosmos.constants import DbtResourceType - def migrate_to_new_interface( execution_config: ExecutionConfig, project_config: ProjectConfig, render_config: RenderConfig From 3fa83a69e044c101d74f0cd780e19815a480c75b Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 5 Jan 2026 07:46:48 +0000 Subject: [PATCH 13/17] Remove unused operator_args parameter from telemetry metadata method The operator_args parameter in _store_cosmos_telemetry_metadata_on_dag was never used in the method body. All telemetry metadata is collected from render_config and project_config which are already passed as separate parameters. Changes: - Remove operator_args parameter from method signature and docstring - Update method call site to remove the argument - Remove unused mock_operator_args from test --- cosmos/converter.py | 7 +------ tests/test_converter.py | 3 --- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index 5c503a4e7e..6254d98910 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -297,9 +297,7 @@ def __init__( self.dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode) self._add_dbt_project_hash_to_dag_docs(dag) - self._store_cosmos_telemetry_metadata_on_dag( - dag, render_config, project_config, operator_args, initial_load_method - ) + self._store_cosmos_telemetry_metadata_on_dag(dag, render_config, project_config, initial_load_method) current_time = time.perf_counter() elapsed_time = current_time - previous_time @@ -384,7 +382,6 @@ def _store_cosmos_telemetry_metadata_on_dag( # noqa: C901 dag: DAG | None, render_config: RenderConfig, project_config: ProjectConfig, - operator_args: dict[str, Any], initial_load_method: LoadMode, ) -> None: """ @@ -396,8 +393,6 @@ def _store_cosmos_telemetry_metadata_on_dag( # noqa: C901 :param dag: The Airflow DAG to store metadata on. If None, no action is taken. :param render_config: The render configuration :param project_config: The project configuration - :param operator_args: The operator arguments - :param initial_load_method: The load method specified by the user (before automatic resolution) """ if dag is None: return diff --git a/tests/test_converter.py b/tests/test_converter.py index ebb6f6a600..c3e74f396b 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -1243,8 +1243,6 @@ def source_rendering_behavior(self): mock_project_config = Mock() - mock_operator_args = {} - # Call _store_cosmos_telemetry_metadata_on_dag with mocked configs # Create a mock load mode that raises on comparison mock_load_mode_param = Mock() @@ -1254,7 +1252,6 @@ def source_rendering_behavior(self): dag=dag, render_config=mock_render_config_final, project_config=mock_project_config, - operator_args=mock_operator_args, initial_load_method=mock_load_mode_param, ) From d487aeb4339c966b2b8af5c51fdfe67f68c41bd1 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 5 Jan 2026 18:58:19 +0530 Subject: [PATCH 14/17] Address review feedback on exception handling --- cosmos/converter.py | 52 +++++++-------------------------------------- 1 file changed, 8 insertions(+), 44 deletions(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index 6254d98910..23ee819b52 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -397,61 +397,25 @@ def _store_cosmos_telemetry_metadata_on_dag( # noqa: C901 if dag is None: return - metadata = {} + metadata: dict[str, Any] = {"used_automatic_load_mode": initial_load_method == LoadMode.AUTOMATIC} - # Compute each metric individually with error handling - try: - metadata["used_automatic_load_mode"] = initial_load_method == LoadMode.AUTOMATIC - except (AttributeError, TypeError) as e: - logger.warning(f"Failed to compute used_automatic_load_mode: {e}") - - try: - metadata["actual_load_mode"] = str(self.dbt_graph.load_method.value) - except (AttributeError, TypeError) as e: - logger.warning(f"Failed to compute actual_load_mode: {e}") - - try: - invocation_mode = None - if render_config.invocation_mode: - invocation_mode = str(render_config.invocation_mode.value) - metadata["invocation_mode"] = invocation_mode - except (AttributeError, TypeError) as e: - logger.warning(f"Failed to compute invocation_mode: {e}") - - try: - install_deps = render_config.dbt_deps - metadata["install_deps"] = bool(install_deps) if install_deps is not None else True - except (AttributeError, TypeError) as e: - logger.warning(f"Failed to compute install_deps: {e}") - - try: + if render_config is not None: + metadata["invocation_mode"] = str(render_config.invocation_mode.value) + metadata["install_deps"] = ( + bool(render_config.dbt_deps) if render_config.dbt_deps is not None else project_config.install_dbt_deps + ) metadata["uses_node_converter"] = render_config.node_converters is not None - except (AttributeError, TypeError) as e: - logger.warning(f"Failed to compute uses_node_converter: {e}") - - try: metadata["test_behavior"] = str(render_config.test_behavior.value) - except (AttributeError, TypeError) as e: - logger.warning(f"Failed to compute test_behavior: {e}") - - try: metadata["source_behavior"] = str(render_config.source_rendering_behavior.value) - except (AttributeError, TypeError) as e: - logger.warning(f"Failed to compute source_behavior: {e}") - try: + if self.dbt_graph is not None: + metadata["actual_load_mode"] = str(self.dbt_graph.load_method.value) metadata["total_dbt_models"] = sum( 1 for node in self.dbt_graph.nodes.values() if node.resource_type == DbtResourceType.MODEL ) - except (AttributeError, TypeError) as e: - logger.warning(f"Failed to compute total_dbt_models: {e}") - - try: metadata["selected_dbt_models"] = sum( 1 for node in self.dbt_graph.filtered_nodes.values() if node.resource_type == DbtResourceType.MODEL ) - except (AttributeError, TypeError) as e: - logger.warning(f"Failed to compute selected_dbt_models: {e}") # Store metadata in dag.params which is preserved during serialization # Using a key that's unlikely to conflict with user params From 55220c1978f6819b9961b715066b9e8bb70dd215 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 5 Jan 2026 14:14:59 +0000 Subject: [PATCH 15/17] Simplify test to match updated telemetry metadata implementation --- tests/test_converter.py | 118 +++++----------------------------------- 1 file changed, 14 insertions(+), 104 deletions(-) diff --git a/tests/test_converter.py b/tests/test_converter.py index c3e74f396b..d9ed637fb3 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -1,7 +1,7 @@ import tempfile from datetime import datetime from pathlib import Path -from unittest.mock import MagicMock, Mock, PropertyMock, patch +from unittest.mock import MagicMock, patch import pytest from airflow.models import DAG @@ -1150,10 +1150,9 @@ def test_converter_logs_parsing_group_order(mock_load_dbt_graph, mock_logger): assert group_start_idx < group_end_idx -@patch("cosmos.converter.logger") @patch("cosmos.converter.DbtGraph.load") -def test_telemetry_metadata_exception_handling(mock_load_dbt_graph, mock_logger): - """Test that telemetry metadata computation handles exceptions gracefully.""" +def test_telemetry_metadata_storage(mock_load_dbt_graph): + """Test that telemetry metadata is stored correctly in DAG params.""" dag = DAG("test_dag_telemetry", start_date=datetime(2024, 1, 1)) project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT) @@ -1165,8 +1164,7 @@ def test_telemetry_metadata_exception_handling(mock_load_dbt_graph, mock_logger) execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL) render_config = RenderConfig() - # Create converter first - converter = DbtToAirflowConverter( + _ = DbtToAirflowConverter( dag=dag, project_config=project_config, profile_config=profile_config, @@ -1174,102 +1172,14 @@ def test_telemetry_metadata_exception_handling(mock_load_dbt_graph, mock_logger) render_config=render_config, ) - # Create a mock graph where properties raise AttributeError when converting to string - mock_graph = Mock() - - # For load_method.value - create a Mock where __str__ raises - mock_value = Mock() - mock_value.__str__ = Mock(side_effect=AttributeError("load_method error")) - mock_load_method = Mock() - mock_load_method.value = mock_value - mock_graph.load_method = mock_load_method - - # For nodes.values() - make it raise when called - mock_graph.nodes = Mock() - mock_graph.nodes.values = Mock(side_effect=AttributeError("nodes error")) - - # For filtered_nodes.values() - make it raise when called - mock_graph.filtered_nodes = Mock() - mock_graph.filtered_nodes.values = Mock(side_effect=AttributeError("filtered_nodes error")) - - converter.dbt_graph = mock_graph - - # Create mock render_config where properties raise - mock_render_config = Mock() - - # invocation_mode.value - __str__ raises - mock_invocation_value = Mock() - mock_invocation_value.__str__ = Mock(side_effect=AttributeError("invocation error")) - mock_invocation = Mock() - mock_invocation.value = mock_invocation_value - # Make invocation_mode truthy so the if condition passes - mock_invocation.__bool__ = Mock(return_value=True) - mock_render_config.invocation_mode = mock_invocation - - # Create a class that raises AttributeError for specific properties - class MockRenderConfigWithRaisingProperties: - @property - def node_converters(self): - raise AttributeError("node_converters error") - - @property - def test_behavior(self): - raise AttributeError("test_behavior error") - - @property - def source_rendering_behavior(self): - raise AttributeError("source_behavior error") - - mock_render_config_final = Mock(wraps=MockRenderConfigWithRaisingProperties()) - mock_render_config_final.invocation_mode = mock_render_config.invocation_mode - - # test_behavior.value - __str__ raises - mock_test_value = Mock() - mock_test_value.__str__ = Mock(side_effect=AttributeError("test_behavior error")) - mock_test = Mock() - mock_test.value = mock_test_value - mock_render_config.test_behavior = mock_test - - # source_rendering_behavior.value - __str__ raises - mock_source_value = Mock() - mock_source_value.__str__ = Mock(side_effect=AttributeError("source_behavior error")) - mock_source = Mock() - mock_source.value = mock_source_value - mock_render_config.source_rendering_behavior = mock_source - - # Mock render_config.dbt_deps to raise when accessed - dbt_deps_mock = PropertyMock(side_effect=AttributeError("dbt_deps error")) - type(mock_render_config_final).dbt_deps = dbt_deps_mock - - mock_project_config = Mock() - - # Call _store_cosmos_telemetry_metadata_on_dag with mocked configs - # Create a mock load mode that raises on comparison - mock_load_mode_param = Mock() - mock_load_mode_param.__eq__ = Mock(side_effect=AttributeError("load_method comparison error")) - - converter._store_cosmos_telemetry_metadata_on_dag( - dag=dag, - render_config=mock_render_config_final, - project_config=mock_project_config, - initial_load_method=mock_load_mode_param, - ) - - # Verify warning logs were called for exceptions - warning_calls = [str(call) for call in mock_logger.warning.call_args_list] - - # Check that all exception handlers logged warnings - assert any( - "Failed to compute used_automatic_load_mode" in call for call in warning_calls - ), f"Warnings: {warning_calls}" - assert any("Failed to compute actual_load_mode" in call for call in warning_calls), f"Warnings: {warning_calls}" - assert any("Failed to compute invocation_mode" in call for call in warning_calls), f"Warnings: {warning_calls}" - assert any("Failed to compute install_deps" in call for call in warning_calls), f"Warnings: {warning_calls}" - assert any("Failed to compute uses_node_converter" in call for call in warning_calls), f"Warnings: {warning_calls}" - assert any("Failed to compute test_behavior" in call for call in warning_calls), f"Warnings: {warning_calls}" - assert any("Failed to compute source_behavior" in call for call in warning_calls), f"Warnings: {warning_calls}" - assert any("Failed to compute total_dbt_models" in call for call in warning_calls), f"Warnings: {warning_calls}" - assert any("Failed to compute selected_dbt_models" in call for call in warning_calls), f"Warnings: {warning_calls}" - - # Metadata should still be stored even with failures + # Verify metadata is stored in dag.params assert "__cosmos_telemetry_metadata__" in dag.params + metadata = dag.params["__cosmos_telemetry_metadata__"] + + # Verify expected metadata keys are present + assert "used_automatic_load_mode" in metadata + assert "invocation_mode" in metadata + assert "install_deps" in metadata + assert "uses_node_converter" in metadata + assert "test_behavior" in metadata + assert "source_behavior" in metadata From e1df2a716931721cc8624ab9815d81ebce2e3606 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 5 Jan 2026 14:57:43 +0000 Subject: [PATCH 16/17] Verify install_deps is captured from render_config during DAG parsing --- tests/listeners/test_dag_run_listener.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index e123e0cd0d..40a1a8a90c 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -206,6 +206,7 @@ def test_on_dag_run_success_with_telemetry_metadata(mock_emit_usage_metrics_if_e load_method=LoadMode.AUTOMATIC, test_behavior=TestBehavior.AFTER_EACH, source_rendering_behavior=SourceRenderingBehavior.NONE, + dbt_deps=False, ), operator_args={"install_deps": True}, start_date=datetime(2023, 1, 1), @@ -236,7 +237,7 @@ def test_on_dag_run_success_with_telemetry_metadata(mock_emit_usage_metrics_if_e # Verify some expected values assert metrics["used_automatic_load_mode"] is True assert metrics["invocation_mode"] == "dbt_runner" - assert metrics["install_deps"] is True + assert metrics["install_deps"] is False assert metrics["uses_node_converter"] is False assert metrics["test_behavior"] == "after_each" assert metrics["source_behavior"] == "none" From f9262d533c838a4d526f12f1e2fa181088a7935f Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 5 Jan 2026 21:06:02 +0530 Subject: [PATCH 17/17] Fix tests --- tests/listeners/test_dag_run_listener.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index 40a1a8a90c..1ff3162d78 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -206,7 +206,6 @@ def test_on_dag_run_success_with_telemetry_metadata(mock_emit_usage_metrics_if_e load_method=LoadMode.AUTOMATIC, test_behavior=TestBehavior.AFTER_EACH, source_rendering_behavior=SourceRenderingBehavior.NONE, - dbt_deps=False, ), operator_args={"install_deps": True}, start_date=datetime(2023, 1, 1), @@ -237,7 +236,7 @@ def test_on_dag_run_success_with_telemetry_metadata(mock_emit_usage_metrics_if_e # Verify some expected values assert metrics["used_automatic_load_mode"] is True assert metrics["invocation_mode"] == "dbt_runner" - assert metrics["install_deps"] is False + assert metrics["install_deps"] is True assert metrics["uses_node_converter"] is False assert metrics["test_behavior"] == "after_each" assert metrics["source_behavior"] == "none" @@ -264,8 +263,9 @@ def test_on_dag_run_failed_with_telemetry_metadata(mock_emit_usage_metrics_if_en load_method=LoadMode.DBT_MANIFEST, test_behavior=TestBehavior.NONE, source_rendering_behavior=SourceRenderingBehavior.ALL, + dbt_deps=False, ), - operator_args={"install_deps": False}, + operator_args={"install_deps": True}, start_date=datetime(2023, 1, 1), dag_id="cosmos_dag_with_metadata_failed", ) @@ -294,7 +294,7 @@ def test_on_dag_run_failed_with_telemetry_metadata(mock_emit_usage_metrics_if_en # Verify some expected values for failed case assert metrics["used_automatic_load_mode"] is False assert metrics["invocation_mode"] == "dbt_runner" - assert metrics["install_deps"] is True + assert metrics["install_deps"] is False assert metrics["uses_node_converter"] is False assert metrics["test_behavior"] == "none" assert metrics["source_behavior"] == "all"