From b6e1c1e6dbaebbeb3838708d6c55bcaee899e7e9 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 5 Feb 2026 13:56:57 +0000 Subject: [PATCH 01/10] Add debug mode feature to track memory utilisation --- cosmos/debug.py | 137 +++++++++++++ cosmos/operators/base.py | 15 +- cosmos/settings.py | 4 + tests/test_debug.py | 416 +++++++++++++++++++++++++++++++++++++++ tests/test_settings.py | 16 ++ 5 files changed, 587 insertions(+), 1 deletion(-) create mode 100644 cosmos/debug.py create mode 100644 tests/test_debug.py diff --git a/cosmos/debug.py b/cosmos/debug.py new file mode 100644 index 0000000000..53f8df68d9 --- /dev/null +++ b/cosmos/debug.py @@ -0,0 +1,137 @@ +""" +Debug utilities for Cosmos. + +When debug mode is enabled via the `enable_debug_mode` setting, Cosmos will track +memory utilization during task execution and push the maximum memory usage to XCom. +""" + +from __future__ import annotations + +import os +import threading +import time +from typing import TYPE_CHECKING + +try: + import psutil + + PSUTIL_AVAILABLE = True +except ImportError: + PSUTIL_AVAILABLE = False + +from cosmos import settings +from cosmos.log import get_logger + +if TYPE_CHECKING: + try: + from airflow.sdk.definitions.context import Context + except ImportError: + from airflow.utils.context import Context # type: ignore[attr-defined] + +logger = get_logger(__name__) + +# Global dictionary to store memory trackers per task +_memory_trackers: dict[str, MemoryTracker] = {} + + +class MemoryTracker: + """ + Tracks maximum RSS memory (bytes) for a process and all of its children. + Sampling-based to work across Airflow 2 & 3 without executor internals. + """ + + def __init__(self, pid: int, poll_interval: float = 0.5): + self.pid = pid + self.poll_interval = poll_interval + self.max_rss_bytes = 0 + self._stop_event = threading.Event() + self._thread = threading.Thread(target=self._run, daemon=True) + + def start(self) -> None: + """Start the memory tracking thread.""" + self._thread.start() + + def stop(self) -> None: + """Stop the memory tracking thread.""" + self._stop_event.set() + if self._thread.is_alive(): + self._thread.join(timeout=5) + + def _run(self) -> None: + """Background thread that polls memory usage.""" + if not PSUTIL_AVAILABLE: + return + + try: + parent = psutil.Process(self.pid) + except psutil.NoSuchProcess: + return + + while not self._stop_event.is_set(): + rss = 0 + try: + processes = [parent] + parent.children(recursive=True) + for p in processes: + try: + rss += p.memory_info().rss + except psutil.NoSuchProcess: + continue + self.max_rss_bytes = max(self.max_rss_bytes, rss) + except psutil.NoSuchProcess: + break + + time.sleep(self.poll_interval) + + +def start_memory_tracking(context: Context) -> None: + """ + Callback to start memory tracking for a task. + + This function should be used as an `on_execute_callback` for Cosmos operators + when debug mode is enabled. + + :param context: The Airflow task context. + """ + if not settings.enable_debug_mode: + return + + if not PSUTIL_AVAILABLE: + logger.warning( + "psutil is not available. Memory tracking is disabled. Install psutil to enable memory tracking." + ) + return + + ti = context["ti"] + task_key = f"{ti.dag_id}.{ti.task_id}.{ti.run_id}" + pid = os.getpid() + tracker = MemoryTracker(pid=pid, poll_interval=settings.debug_memory_poll_interval_seconds) + _memory_trackers[task_key] = tracker + tracker.start() + logger.debug("Started memory tracking for task %s (PID: %s)", task_key, pid) + + +def stop_memory_tracking(context: Context) -> None: + """ + Callback to stop memory tracking for a task and push the result to XCom. + + This function should be used as an `on_success_callback` or `on_failure_callback` + for Cosmos operators when debug mode is enabled. + + :param context: The Airflow task context. + """ + if not settings.enable_debug_mode: + return + + if not PSUTIL_AVAILABLE: + return + + ti = context["ti"] + task_key = f"{ti.dag_id}.{ti.task_id}.{ti.run_id}" + tracker = _memory_trackers.pop(task_key, None) + + if tracker: + tracker.stop() + max_mb = tracker.max_rss_bytes / 1024 / 1024 + logger.info("Max memory usage (RSS, incl. children): %.2f MB", max_mb) + # Persist to XCom for observability + ti.xcom_push(key="cosmos_debug_max_memory_mb", value=round(max_mb, 2)) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index 7ebab3ccf8..e78979e676 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -24,6 +24,7 @@ from airflow.utils.strings import to_boolean +from cosmos import settings from cosmos.dbt.executable import get_system_dbt from cosmos.log import get_logger @@ -315,7 +316,19 @@ def execute(self, context: Context, **kwargs) -> Any | None: # type: ignore if self.extra_context: context_merge(context, self.extra_context) - self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags(), **kwargs) + if settings.enable_debug_mode: + from cosmos.debug import start_memory_tracking, stop_memory_tracking + + start_memory_tracking(context) + try: + result = self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags(), **kwargs) + stop_memory_tracking(context) + return result + except Exception: + stop_memory_tracking(context) + raise + else: + self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags(), **kwargs) class DbtBuildMixin: diff --git a/cosmos/settings.py b/cosmos/settings.py index e002d953ee..285c50999d 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -85,3 +85,7 @@ def convert_to_boolean(value: str | None) -> bool: enable_telemetry = conf.getboolean("cosmos", "enable_telemetry", fallback=True) do_not_track = convert_to_boolean(os.getenv("DO_NOT_TRACK")) no_analytics = convert_to_boolean(os.getenv("SCARF_NO_ANALYTICS")) + +# Debug mode - when enabled, Cosmos will track and push memory utilization to XCom +enable_debug_mode = conf.getboolean("cosmos", "enable_debug_mode", fallback=False) +debug_memory_poll_interval_seconds = conf.getfloat("cosmos", "debug_memory_poll_interval_seconds", fallback=0.5) diff --git a/tests/test_debug.py b/tests/test_debug.py new file mode 100644 index 0000000000..869992abe8 --- /dev/null +++ b/tests/test_debug.py @@ -0,0 +1,416 @@ +"""Tests for the cosmos.debug module.""" + +from __future__ import annotations + +import os +import time +from importlib import reload +from unittest.mock import MagicMock, patch + +import pytest + +from cosmos import debug, settings + + +class TestMemoryTracker: + """Tests for the MemoryTracker class.""" + + @pytest.fixture + def reset_settings(self): + """Reset settings after each test.""" + yield + reload(settings) + reload(debug) + + def test_memory_tracker_initialization(self): + """Test MemoryTracker initializes with correct values.""" + tracker = debug.MemoryTracker(pid=os.getpid(), poll_interval=0.1) + assert tracker.pid == os.getpid() + assert tracker.poll_interval == 0.1 + assert tracker.max_rss_bytes == 0 + + @pytest.mark.skipif(not debug.PSUTIL_AVAILABLE, reason="psutil not available") + def test_memory_tracker_tracks_memory(self): + """Test MemoryTracker actually tracks memory usage.""" + tracker = debug.MemoryTracker(pid=os.getpid(), poll_interval=0.05) + tracker.start() + # Give it time to sample + time.sleep(0.2) + tracker.stop() + # Memory should be tracked (current process uses some memory) + assert tracker.max_rss_bytes > 0 + + def test_memory_tracker_stop_without_start(self): + """Test MemoryTracker.stop() doesn't raise if not started.""" + tracker = debug.MemoryTracker(pid=os.getpid()) + # Should not raise + tracker.stop() + + @pytest.mark.skipif(not debug.PSUTIL_AVAILABLE, reason="psutil not available") + def test_memory_tracker_with_nonexistent_pid(self): + """Test MemoryTracker handles non-existent PID gracefully.""" + # Use a very high PID that's unlikely to exist + tracker = debug.MemoryTracker(pid=999999999, poll_interval=0.05) + tracker.start() + time.sleep(0.1) + tracker.stop() + # Should have 0 bytes since process doesn't exist + assert tracker.max_rss_bytes == 0 + + +class TestStartMemoryTracking: + """Tests for the start_memory_tracking function.""" + + @pytest.fixture + def mock_context(self): + """Create a mock Airflow context.""" + mock_ti = MagicMock() + mock_ti.dag_id = "test_dag" + mock_ti.task_id = "test_task" + mock_ti.run_id = "test_run_123" + return {"ti": mock_ti} + + def test_start_memory_tracking_disabled(self, mock_context): + """Test start_memory_tracking does nothing when debug mode is disabled.""" + with patch.object(debug.settings, "enable_debug_mode", False): + debug.start_memory_tracking(mock_context) + # No tracker should be created + task_key = f"{mock_context['ti'].dag_id}.{mock_context['ti'].task_id}.{mock_context['ti'].run_id}" + assert task_key not in debug._memory_trackers + + @pytest.mark.skipif(not debug.PSUTIL_AVAILABLE, reason="psutil not available") + def test_start_memory_tracking_enabled(self, mock_context): + """Test start_memory_tracking creates tracker when debug mode is enabled.""" + with patch.object(debug.settings, "enable_debug_mode", True): + debug.start_memory_tracking(mock_context) + task_key = f"{mock_context['ti'].dag_id}.{mock_context['ti'].task_id}.{mock_context['ti'].run_id}" + assert task_key in debug._memory_trackers + # Cleanup + tracker = debug._memory_trackers.pop(task_key) + tracker.stop() + + +class TestStopMemoryTracking: + """Tests for the stop_memory_tracking function.""" + + @pytest.fixture + def mock_context(self): + """Create a mock Airflow context.""" + mock_ti = MagicMock() + mock_ti.dag_id = "test_dag" + mock_ti.task_id = "test_task" + mock_ti.run_id = "test_run_123" + return {"ti": mock_ti} + + def test_stop_memory_tracking_disabled(self, mock_context): + """Test stop_memory_tracking does nothing when debug mode is disabled.""" + with patch.object(debug.settings, "enable_debug_mode", False): + debug.stop_memory_tracking(mock_context) + # Should not raise and xcom_push should not be called + mock_context["ti"].xcom_push.assert_not_called() + + @pytest.mark.skipif(not debug.PSUTIL_AVAILABLE, reason="psutil not available") + def test_stop_memory_tracking_pushes_xcom(self, mock_context): + """Test stop_memory_tracking pushes memory data to XCom.""" + with patch.object(debug.settings, "enable_debug_mode", True): + # Start tracking first + debug.start_memory_tracking(mock_context) + time.sleep(0.1) # Let it sample + # Stop and check XCom push + debug.stop_memory_tracking(mock_context) + mock_context["ti"].xcom_push.assert_called_once() + call_args = mock_context["ti"].xcom_push.call_args + assert call_args[1]["key"] == "cosmos_debug_max_memory_mb" + assert isinstance(call_args[1]["value"], float) + assert call_args[1]["value"] > 0 + + def test_stop_memory_tracking_no_tracker(self, mock_context): + """Test stop_memory_tracking handles missing tracker gracefully.""" + with patch.object(debug.settings, "enable_debug_mode", True): + # Don't start tracking, just stop + debug.stop_memory_tracking(mock_context) + # Should not raise and xcom_push should not be called + mock_context["ti"].xcom_push.assert_not_called() + + +class TestIntegration: + """Integration tests for the full debug flow.""" + + @pytest.fixture + def mock_context(self): + """Create a mock Airflow context.""" + mock_ti = MagicMock() + mock_ti.dag_id = "test_dag" + mock_ti.task_id = "test_task" + mock_ti.run_id = "test_run_integration" + return {"ti": mock_ti} + + @pytest.mark.skipif(not debug.PSUTIL_AVAILABLE, reason="psutil not available") + def test_full_tracking_lifecycle(self, mock_context): + """Test complete memory tracking lifecycle.""" + with patch.object(debug.settings, "enable_debug_mode", True): + # Start + debug.start_memory_tracking(mock_context) + task_key = f"{mock_context['ti'].dag_id}.{mock_context['ti'].task_id}.{mock_context['ti'].run_id}" + assert task_key in debug._memory_trackers + + # Simulate some work + time.sleep(0.2) + + # Stop + debug.stop_memory_tracking(mock_context) + assert task_key not in debug._memory_trackers + mock_context["ti"].xcom_push.assert_called_once() + + +class TestDbtLocalRunOperatorDebugIntegration: + """Integration tests for DbtRunLocalOperator with debug mode enabled.""" + + @pytest.mark.skipif(not debug.PSUTIL_AVAILABLE, reason="psutil not available") + def test_dbt_run_local_operator_stores_memory_in_xcom_when_debug_enabled(self): + """ + Test that DbtRunLocalOperator pushes peak memory utilization to XCom + when debug mode is enabled. + """ + from pathlib import Path + + from cosmos.config import ProfileConfig + from cosmos.operators.local import DbtRunLocalOperator + + # Use the mini project for testing + mini_project_dir = Path(__file__).parent / "sample" / "mini" + mini_profile_path = mini_project_dir / "profiles.yml" + + profile_config = ProfileConfig( + profile_name="mini", + target_name="dev", + profiles_yml_filepath=mini_profile_path, + ) + + operator = DbtRunLocalOperator( + task_id="test_debug_memory", + project_dir=str(mini_project_dir), + profile_config=profile_config, + emit_datasets=False, + ) + + # Create mock context + mock_ti = MagicMock() + mock_ti.dag_id = "test_dag" + mock_ti.task_id = "test_debug_memory" + mock_ti.run_id = "test_run_debug" + mock_context = { + "ti": mock_ti, + "run_id": "test_run_debug", + "execution_date": MagicMock(), + "ds": "2024-01-01", + "ds_nodash": "20240101", + "ts": "2024-01-01T00:00:00+00:00", + "ts_nodash": "20240101T000000", + "ts_nodash_with_tz": "20240101T000000+0000", + "prev_ds": None, + "prev_ds_nodash": None, + "next_ds": None, + "next_ds_nodash": None, + "yesterday_ds": "2023-12-31", + "yesterday_ds_nodash": "20231231", + "tomorrow_ds": "2024-01-02", + "tomorrow_ds_nodash": "20240102", + "prev_execution_date": None, + "prev_execution_date_success": None, + "next_execution_date": None, + "dag": MagicMock(), + "task": MagicMock(), + "macros": MagicMock(), + "params": {}, + "var": MagicMock(), + "inlets": [], + "outlets": [], + "templates_dict": None, + "conf": MagicMock(), + "dag_run": MagicMock(), + "test_mode": True, + "outlet_events": MagicMock(), + } + + # Patch settings to enable debug mode and mock the build_and_run_cmd to avoid actual dbt execution + with ( + patch.object(settings, "enable_debug_mode", True), + patch.object(operator, "build_and_run_cmd", return_value=None), + ): + operator.execute(mock_context) + + # Verify that xcom_push was called with the debug memory key + xcom_calls = mock_ti.xcom_push.call_args_list + memory_xcom_calls = [call for call in xcom_calls if call[1].get("key") == "cosmos_debug_max_memory_mb"] + + assert len(memory_xcom_calls) == 1, "Expected exactly one XCom push for cosmos_debug_max_memory_mb" + memory_value = memory_xcom_calls[0][1]["value"] + assert isinstance(memory_value, float), "Memory value should be a float" + assert memory_value > 0, "Memory value should be greater than 0" + + @pytest.mark.skipif(not debug.PSUTIL_AVAILABLE, reason="psutil not available") + def test_dbt_run_local_operator_does_not_store_memory_when_debug_disabled(self): + """ + Test that DbtRunLocalOperator does NOT push memory utilization to XCom + when debug mode is disabled (default behavior). + """ + from pathlib import Path + + from cosmos.config import ProfileConfig + from cosmos.operators.local import DbtRunLocalOperator + + # Use the mini project for testing + mini_project_dir = Path(__file__).parent / "sample" / "mini" + mini_profile_path = mini_project_dir / "profiles.yml" + + profile_config = ProfileConfig( + profile_name="mini", + target_name="dev", + profiles_yml_filepath=mini_profile_path, + ) + + operator = DbtRunLocalOperator( + task_id="test_debug_memory_disabled", + project_dir=str(mini_project_dir), + profile_config=profile_config, + emit_datasets=False, + ) + + # Create mock context + mock_ti = MagicMock() + mock_ti.dag_id = "test_dag" + mock_ti.task_id = "test_debug_memory_disabled" + mock_ti.run_id = "test_run_debug_disabled" + mock_context = { + "ti": mock_ti, + "run_id": "test_run_debug_disabled", + "execution_date": MagicMock(), + "ds": "2024-01-01", + "ds_nodash": "20240101", + "ts": "2024-01-01T00:00:00+00:00", + "ts_nodash": "20240101T000000", + "ts_nodash_with_tz": "20240101T000000+0000", + "prev_ds": None, + "prev_ds_nodash": None, + "next_ds": None, + "next_ds_nodash": None, + "yesterday_ds": "2023-12-31", + "yesterday_ds_nodash": "20231231", + "tomorrow_ds": "2024-01-02", + "tomorrow_ds_nodash": "20240102", + "prev_execution_date": None, + "prev_execution_date_success": None, + "next_execution_date": None, + "dag": MagicMock(), + "task": MagicMock(), + "macros": MagicMock(), + "params": {}, + "var": MagicMock(), + "inlets": [], + "outlets": [], + "templates_dict": None, + "conf": MagicMock(), + "dag_run": MagicMock(), + "test_mode": True, + "outlet_events": MagicMock(), + } + + # Patch settings to disable debug mode (default) and mock build_and_run_cmd + with ( + patch.object(settings, "enable_debug_mode", False), + patch.object(operator, "build_and_run_cmd", return_value=None), + ): + operator.execute(mock_context) + + # Verify that xcom_push was NOT called with the debug memory key + xcom_calls = mock_ti.xcom_push.call_args_list + memory_xcom_calls = [call for call in xcom_calls if call[1].get("key") == "cosmos_debug_max_memory_mb"] + + assert ( + len(memory_xcom_calls) == 0 + ), "Expected no XCom push for cosmos_debug_max_memory_mb when debug is disabled" + + @pytest.mark.skipif(not debug.PSUTIL_AVAILABLE, reason="psutil not available") + def test_dbt_run_local_operator_stores_memory_even_on_failure(self): + """ + Test that DbtRunLocalOperator pushes memory utilization to XCom + even when the task execution fails. + """ + from pathlib import Path + + from cosmos.config import ProfileConfig + from cosmos.operators.local import DbtRunLocalOperator + + # Use the mini project for testing + mini_project_dir = Path(__file__).parent / "sample" / "mini" + mini_profile_path = mini_project_dir / "profiles.yml" + + profile_config = ProfileConfig( + profile_name="mini", + target_name="dev", + profiles_yml_filepath=mini_profile_path, + ) + + operator = DbtRunLocalOperator( + task_id="test_debug_memory_failure", + project_dir=str(mini_project_dir), + profile_config=profile_config, + emit_datasets=False, + ) + + # Create mock context + mock_ti = MagicMock() + mock_ti.dag_id = "test_dag" + mock_ti.task_id = "test_debug_memory_failure" + mock_ti.run_id = "test_run_debug_failure" + mock_context = { + "ti": mock_ti, + "run_id": "test_run_debug_failure", + "execution_date": MagicMock(), + "ds": "2024-01-01", + "ds_nodash": "20240101", + "ts": "2024-01-01T00:00:00+00:00", + "ts_nodash": "20240101T000000", + "ts_nodash_with_tz": "20240101T000000+0000", + "prev_ds": None, + "prev_ds_nodash": None, + "next_ds": None, + "next_ds_nodash": None, + "yesterday_ds": "2023-12-31", + "yesterday_ds_nodash": "20231231", + "tomorrow_ds": "2024-01-02", + "tomorrow_ds_nodash": "20240102", + "prev_execution_date": None, + "prev_execution_date_success": None, + "next_execution_date": None, + "dag": MagicMock(), + "task": MagicMock(), + "macros": MagicMock(), + "params": {}, + "var": MagicMock(), + "inlets": [], + "outlets": [], + "templates_dict": None, + "conf": MagicMock(), + "dag_run": MagicMock(), + "test_mode": True, + "outlet_events": MagicMock(), + } + + # Patch settings to enable debug mode and mock build_and_run_cmd to raise an exception + with ( + patch.object(settings, "enable_debug_mode", True), + patch.object(operator, "build_and_run_cmd", side_effect=Exception("Simulated failure")), + ): + with pytest.raises(Exception, match="Simulated failure"): + operator.execute(mock_context) + + # Verify that xcom_push was still called with the debug memory key + xcom_calls = mock_ti.xcom_push.call_args_list + memory_xcom_calls = [call for call in xcom_calls if call[1].get("key") == "cosmos_debug_max_memory_mb"] + + assert len(memory_xcom_calls) == 1, "Expected XCom push for cosmos_debug_max_memory_mb even on failure" + memory_value = memory_xcom_calls[0][1]["value"] + assert isinstance(memory_value, float), "Memory value should be a float" + assert memory_value > 0, "Memory value should be greater than 0" diff --git a/tests/test_settings.py b/tests/test_settings.py index 1b8f79fa33..8efe66f1de 100644 --- a/tests/test_settings.py +++ b/tests/test_settings.py @@ -37,3 +37,19 @@ def test_enable_memory_optimised_imports_false(monkeypatch): result = subprocess.run(["python", "-c", script], capture_output=True, text=True) assert result.returncode == 0, result.stderr + + +@patch.dict(os.environ, {"AIRFLOW__COSMOS__ENABLE_DEBUG_MODE": "True"}, clear=True) +def test_enable_debug_mode_env_var(): + reload(settings) + assert settings.enable_debug_mode is True + + +@patch.dict( + os.environ, + {"AIRFLOW__COSMOS__DEBUG_MEMORY_POLL_INTERVAL_SECONDS": "0.25"}, + clear=True, +) +def test_debug_memory_poll_interval_env_var(): + reload(settings) + assert settings.debug_memory_poll_interval_seconds == 0.25 From fa534101d954c9cee20a942af51a1701a3b0b5a1 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 5 Feb 2026 15:56:50 +0000 Subject: [PATCH 02/10] improv tests --- tests/test_debug.py | 290 +++++++------------------------------------- 1 file changed, 41 insertions(+), 249 deletions(-) diff --git a/tests/test_debug.py b/tests/test_debug.py index 869992abe8..5466ad256c 100644 --- a/tests/test_debug.py +++ b/tests/test_debug.py @@ -4,12 +4,18 @@ import os import time +from datetime import datetime from importlib import reload +from pathlib import Path from unittest.mock import MagicMock, patch import pytest +from airflow import DAG from cosmos import debug, settings +from cosmos.config import ProfileConfig +from cosmos.operators.local import DbtRunLocalOperator +from tests.utils import test_dag as run_test_dag class TestMemoryTracker: @@ -163,254 +169,40 @@ def test_full_tracking_lifecycle(self, mock_context): mock_context["ti"].xcom_push.assert_called_once() -class TestDbtLocalRunOperatorDebugIntegration: - """Integration tests for DbtRunLocalOperator with debug mode enabled.""" - - @pytest.mark.skipif(not debug.PSUTIL_AVAILABLE, reason="psutil not available") - def test_dbt_run_local_operator_stores_memory_in_xcom_when_debug_enabled(self): - """ - Test that DbtRunLocalOperator pushes peak memory utilization to XCom - when debug mode is enabled. - """ - from pathlib import Path - - from cosmos.config import ProfileConfig - from cosmos.operators.local import DbtRunLocalOperator - - # Use the mini project for testing - mini_project_dir = Path(__file__).parent / "sample" / "mini" - mini_profile_path = mini_project_dir / "profiles.yml" - - profile_config = ProfileConfig( - profile_name="mini", - target_name="dev", - profiles_yml_filepath=mini_profile_path, - ) - - operator = DbtRunLocalOperator( - task_id="test_debug_memory", - project_dir=str(mini_project_dir), - profile_config=profile_config, - emit_datasets=False, - ) - - # Create mock context - mock_ti = MagicMock() - mock_ti.dag_id = "test_dag" - mock_ti.task_id = "test_debug_memory" - mock_ti.run_id = "test_run_debug" - mock_context = { - "ti": mock_ti, - "run_id": "test_run_debug", - "execution_date": MagicMock(), - "ds": "2024-01-01", - "ds_nodash": "20240101", - "ts": "2024-01-01T00:00:00+00:00", - "ts_nodash": "20240101T000000", - "ts_nodash_with_tz": "20240101T000000+0000", - "prev_ds": None, - "prev_ds_nodash": None, - "next_ds": None, - "next_ds_nodash": None, - "yesterday_ds": "2023-12-31", - "yesterday_ds_nodash": "20231231", - "tomorrow_ds": "2024-01-02", - "tomorrow_ds_nodash": "20240102", - "prev_execution_date": None, - "prev_execution_date_success": None, - "next_execution_date": None, - "dag": MagicMock(), - "task": MagicMock(), - "macros": MagicMock(), - "params": {}, - "var": MagicMock(), - "inlets": [], - "outlets": [], - "templates_dict": None, - "conf": MagicMock(), - "dag_run": MagicMock(), - "test_mode": True, - "outlet_events": MagicMock(), - } - - # Patch settings to enable debug mode and mock the build_and_run_cmd to avoid actual dbt execution - with ( - patch.object(settings, "enable_debug_mode", True), - patch.object(operator, "build_and_run_cmd", return_value=None), - ): - operator.execute(mock_context) - - # Verify that xcom_push was called with the debug memory key - xcom_calls = mock_ti.xcom_push.call_args_list - memory_xcom_calls = [call for call in xcom_calls if call[1].get("key") == "cosmos_debug_max_memory_mb"] - - assert len(memory_xcom_calls) == 1, "Expected exactly one XCom push for cosmos_debug_max_memory_mb" - memory_value = memory_xcom_calls[0][1]["value"] - assert isinstance(memory_value, float), "Memory value should be a float" - assert memory_value > 0, "Memory value should be greater than 0" - - @pytest.mark.skipif(not debug.PSUTIL_AVAILABLE, reason="psutil not available") - def test_dbt_run_local_operator_does_not_store_memory_when_debug_disabled(self): - """ - Test that DbtRunLocalOperator does NOT push memory utilization to XCom - when debug mode is disabled (default behavior). - """ - from pathlib import Path - - from cosmos.config import ProfileConfig - from cosmos.operators.local import DbtRunLocalOperator - - # Use the mini project for testing - mini_project_dir = Path(__file__).parent / "sample" / "mini" - mini_profile_path = mini_project_dir / "profiles.yml" - - profile_config = ProfileConfig( - profile_name="mini", - target_name="dev", - profiles_yml_filepath=mini_profile_path, - ) - - operator = DbtRunLocalOperator( - task_id="test_debug_memory_disabled", - project_dir=str(mini_project_dir), - profile_config=profile_config, - emit_datasets=False, - ) - - # Create mock context - mock_ti = MagicMock() - mock_ti.dag_id = "test_dag" - mock_ti.task_id = "test_debug_memory_disabled" - mock_ti.run_id = "test_run_debug_disabled" - mock_context = { - "ti": mock_ti, - "run_id": "test_run_debug_disabled", - "execution_date": MagicMock(), - "ds": "2024-01-01", - "ds_nodash": "20240101", - "ts": "2024-01-01T00:00:00+00:00", - "ts_nodash": "20240101T000000", - "ts_nodash_with_tz": "20240101T000000+0000", - "prev_ds": None, - "prev_ds_nodash": None, - "next_ds": None, - "next_ds_nodash": None, - "yesterday_ds": "2023-12-31", - "yesterday_ds_nodash": "20231231", - "tomorrow_ds": "2024-01-02", - "tomorrow_ds_nodash": "20240102", - "prev_execution_date": None, - "prev_execution_date_success": None, - "next_execution_date": None, - "dag": MagicMock(), - "task": MagicMock(), - "macros": MagicMock(), - "params": {}, - "var": MagicMock(), - "inlets": [], - "outlets": [], - "templates_dict": None, - "conf": MagicMock(), - "dag_run": MagicMock(), - "test_mode": True, - "outlet_events": MagicMock(), - } - - # Patch settings to disable debug mode (default) and mock build_and_run_cmd - with ( - patch.object(settings, "enable_debug_mode", False), - patch.object(operator, "build_and_run_cmd", return_value=None), - ): - operator.execute(mock_context) - - # Verify that xcom_push was NOT called with the debug memory key - xcom_calls = mock_ti.xcom_push.call_args_list - memory_xcom_calls = [call for call in xcom_calls if call[1].get("key") == "cosmos_debug_max_memory_mb"] - - assert ( - len(memory_xcom_calls) == 0 - ), "Expected no XCom push for cosmos_debug_max_memory_mb when debug is disabled" - - @pytest.mark.skipif(not debug.PSUTIL_AVAILABLE, reason="psutil not available") - def test_dbt_run_local_operator_stores_memory_even_on_failure(self): - """ - Test that DbtRunLocalOperator pushes memory utilization to XCom - even when the task execution fails. - """ - from pathlib import Path - - from cosmos.config import ProfileConfig - from cosmos.operators.local import DbtRunLocalOperator - - # Use the mini project for testing - mini_project_dir = Path(__file__).parent / "sample" / "mini" - mini_profile_path = mini_project_dir / "profiles.yml" - - profile_config = ProfileConfig( - profile_name="mini", - target_name="dev", - profiles_yml_filepath=mini_profile_path, - ) - - operator = DbtRunLocalOperator( - task_id="test_debug_memory_failure", - project_dir=str(mini_project_dir), - profile_config=profile_config, - emit_datasets=False, - ) - - # Create mock context - mock_ti = MagicMock() - mock_ti.dag_id = "test_dag" - mock_ti.task_id = "test_debug_memory_failure" - mock_ti.run_id = "test_run_debug_failure" - mock_context = { - "ti": mock_ti, - "run_id": "test_run_debug_failure", - "execution_date": MagicMock(), - "ds": "2024-01-01", - "ds_nodash": "20240101", - "ts": "2024-01-01T00:00:00+00:00", - "ts_nodash": "20240101T000000", - "ts_nodash_with_tz": "20240101T000000+0000", - "prev_ds": None, - "prev_ds_nodash": None, - "next_ds": None, - "next_ds_nodash": None, - "yesterday_ds": "2023-12-31", - "yesterday_ds_nodash": "20231231", - "tomorrow_ds": "2024-01-02", - "tomorrow_ds_nodash": "20240102", - "prev_execution_date": None, - "prev_execution_date_success": None, - "next_execution_date": None, - "dag": MagicMock(), - "task": MagicMock(), - "macros": MagicMock(), - "params": {}, - "var": MagicMock(), - "inlets": [], - "outlets": [], - "templates_dict": None, - "conf": MagicMock(), - "dag_run": MagicMock(), - "test_mode": True, - "outlet_events": MagicMock(), - } - - # Patch settings to enable debug mode and mock build_and_run_cmd to raise an exception - with ( - patch.object(settings, "enable_debug_mode", True), - patch.object(operator, "build_and_run_cmd", side_effect=Exception("Simulated failure")), - ): - with pytest.raises(Exception, match="Simulated failure"): - operator.execute(mock_context) - - # Verify that xcom_push was still called with the debug memory key - xcom_calls = mock_ti.xcom_push.call_args_list - memory_xcom_calls = [call for call in xcom_calls if call[1].get("key") == "cosmos_debug_max_memory_mb"] - - assert len(memory_xcom_calls) == 1, "Expected XCom push for cosmos_debug_max_memory_mb even on failure" - memory_value = memory_xcom_calls[0][1]["value"] +MINI_DBT_PROJ_DIR = Path(__file__).parent / "sample" / "mini" +MINI_DBT_PROJ_PROFILE = MINI_DBT_PROJ_DIR / "profiles.yml" + +mini_profile_config = ProfileConfig( + profile_name="mini", + target_name="dev", + profiles_yml_filepath=MINI_DBT_PROJ_PROFILE, +) + + +@pytest.mark.integration +@pytest.mark.skipif(not debug.PSUTIL_AVAILABLE, reason="psutil not available") +def test_dbt_run_local_operator_stores_memory_in_xcom_when_debug_enabled(): + """ + Integration test that DbtRunLocalOperator pushes peak memory utilization to XCom + when debug mode is enabled. + """ + with patch.object(settings, "enable_debug_mode", True): + with DAG("test-debug-memory", start_date=datetime(2022, 1, 1)) as dag: + run_operator = DbtRunLocalOperator( + profile_config=mini_profile_config, + project_dir=MINI_DBT_PROJ_DIR, + task_id="run", + append_env=True, + emit_datasets=False, + ) + run_operator + + dag_run = run_test_dag(dag) + + # Get the task instance to check XCom + ti = dag_run.get_task_instance(task_id="run") + memory_value = ti.xcom_pull(key="cosmos_debug_max_memory_mb") + + assert memory_value is not None, "Expected cosmos_debug_max_memory_mb in XCom" assert isinstance(memory_value, float), "Memory value should be a float" assert memory_value > 0, "Memory value should be greater than 0" From f2d9e4b0582300156c0318afad6b31a688177baa Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 5 Feb 2026 16:03:02 +0000 Subject: [PATCH 03/10] Add docs --- docs/configuration/cosmos-conf.rst | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/docs/configuration/cosmos-conf.rst b/docs/configuration/cosmos-conf.rst index f8db11e593..1ed4c7bb56 100644 --- a/docs/configuration/cosmos-conf.rst +++ b/docs/configuration/cosmos-conf.rst @@ -261,6 +261,24 @@ This page lists all available Airflow configurations that affect ``astronomer-co :start-after: [START cosmos_init_imports] :end-before: [END cosmos_init_imports] +.. _enable_debug_mode: + +`enable_debug_mode`_: + Enable or disable debug mode. When enabled, Cosmos will track memory utilization for its tasks and push the peak + memory usage (in MB) to XCom under the key ``cosmos_debug_max_memory_mb``. This is useful for profiling and + optimizing resource allocation for dbt tasks. Requires ``psutil`` to be installed. + + - Default: ``False`` + - Environment Variable: ``AIRFLOW__COSMOS__ENABLE_DEBUG_MODE`` + +.. _debug_memory_poll_interval_seconds: + +`debug_memory_poll_interval_seconds`_: + The interval (in seconds) at which memory utilization is polled when debug mode is enabled. Lower values provide + more accurate peak memory measurements but may add slight overhead. + + - Default: ``0.5`` + - Environment Variable: ``AIRFLOW__COSMOS__DEBUG_MEMORY_POLL_INTERVAL_SECONDS`` [openlineage] ~~~~~~~~~~~~~ From 93a70fa69823074c77a04fe09f4c4601a6ea70cc Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 5 Feb 2026 17:44:57 +0000 Subject: [PATCH 04/10] Try to improve test coverage --- tests/test_debug.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tests/test_debug.py b/tests/test_debug.py index 5466ad256c..3f4190fc58 100644 --- a/tests/test_debug.py +++ b/tests/test_debug.py @@ -139,6 +139,48 @@ def test_stop_memory_tracking_no_tracker(self, mock_context): mock_context["ti"].xcom_push.assert_not_called() +class TestPsutilNotAvailable: + """Tests for when psutil is not available.""" + + @pytest.fixture + def mock_context(self): + """Create a mock Airflow context.""" + mock_ti = MagicMock() + mock_ti.dag_id = "test_dag" + mock_ti.task_id = "test_task" + mock_ti.run_id = "test_run_no_psutil" + return {"ti": mock_ti} + + def test_memory_tracker_run_without_psutil(self): + """Test MemoryTracker._run() returns early when psutil is not available.""" + tracker = debug.MemoryTracker(pid=os.getpid(), poll_interval=0.05) + with patch.object(debug, "PSUTIL_AVAILABLE", False): + # Call _run directly to test the branch + tracker._run() + # Should return immediately without tracking any memory + assert tracker.max_rss_bytes == 0 + + def test_start_memory_tracking_without_psutil_logs_warning(self, mock_context): + """Test start_memory_tracking logs warning when psutil is not available.""" + with patch.object(debug.settings, "enable_debug_mode", True): + with patch.object(debug, "PSUTIL_AVAILABLE", False): + with patch.object(debug.logger, "warning") as mock_warning: + debug.start_memory_tracking(mock_context) + mock_warning.assert_called_once() + assert "psutil is not available" in mock_warning.call_args[0][0] + # No tracker should be created + task_key = f"{mock_context['ti'].dag_id}.{mock_context['ti'].task_id}.{mock_context['ti'].run_id}" + assert task_key not in debug._memory_trackers + + def test_stop_memory_tracking_without_psutil_returns_early(self, mock_context): + """Test stop_memory_tracking returns early when psutil is not available.""" + with patch.object(debug.settings, "enable_debug_mode", True): + with patch.object(debug, "PSUTIL_AVAILABLE", False): + debug.stop_memory_tracking(mock_context) + # Should not push to XCom + mock_context["ti"].xcom_push.assert_not_called() + + class TestIntegration: """Integration tests for the full debug flow.""" From 2e2aa7ab6b580ef9b563e8a0b21fac2c5f8283ef Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 6 Feb 2026 09:59:02 +0000 Subject: [PATCH 05/10] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- cosmos/operators/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index e78979e676..1b5153f1da 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -328,7 +328,7 @@ def execute(self, context: Context, **kwargs) -> Any | None: # type: ignore stop_memory_tracking(context) raise else: - self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags(), **kwargs) + return self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags(), **kwargs) class DbtBuildMixin: From 97ab6643de06adc129302bd38f25df7d1a3397b9 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 6 Feb 2026 12:14:20 +0000 Subject: [PATCH 06/10] Define Xcom memory key as constant --- cosmos/debug.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cosmos/debug.py b/cosmos/debug.py index 53f8df68d9..68e606ccff 100644 --- a/cosmos/debug.py +++ b/cosmos/debug.py @@ -30,6 +30,9 @@ logger = get_logger(__name__) +# XCom key for storing maximum memory usage in debug mode +XCOM_DEBUG_MAX_MEMORY_MB_KEY = "cosmos_debug_max_memory_mb" + # Global dictionary to store memory trackers per task _memory_trackers: dict[str, MemoryTracker] = {} @@ -134,4 +137,4 @@ def stop_memory_tracking(context: Context) -> None: max_mb = tracker.max_rss_bytes / 1024 / 1024 logger.info("Max memory usage (RSS, incl. children): %.2f MB", max_mb) # Persist to XCom for observability - ti.xcom_push(key="cosmos_debug_max_memory_mb", value=round(max_mb, 2)) + ti.xcom_push(key=XCOM_DEBUG_MAX_MEMORY_MB_KEY, value=round(max_mb, 2)) From aa0f93c6abe5f12ca60ba98f07307f81bd0d6036 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 6 Feb 2026 12:17:25 +0000 Subject: [PATCH 07/10] Fix serialisation issue --- cosmos/operators/base.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index 1b5153f1da..b486a476f7 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -321,14 +321,13 @@ def execute(self, context: Context, **kwargs) -> Any | None: # type: ignore start_memory_tracking(context) try: - result = self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags(), **kwargs) + self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags(), **kwargs) stop_memory_tracking(context) - return result except Exception: stop_memory_tracking(context) raise else: - return self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags(), **kwargs) + self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags(), **kwargs) class DbtBuildMixin: From 0a863eec1fc10c2c8ff917eb72a40b7a2c770d58 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 6 Feb 2026 13:08:16 +0000 Subject: [PATCH 08/10] Improve test coverage --- tests/test_debug.py | 139 ++++++++++---------------------------------- 1 file changed, 32 insertions(+), 107 deletions(-) diff --git a/tests/test_debug.py b/tests/test_debug.py index 3f4190fc58..cd34912050 100644 --- a/tests/test_debug.py +++ b/tests/test_debug.py @@ -5,7 +5,6 @@ import os import time from datetime import datetime -from importlib import reload from pathlib import Path from unittest.mock import MagicMock, patch @@ -21,13 +20,6 @@ class TestMemoryTracker: """Tests for the MemoryTracker class.""" - @pytest.fixture - def reset_settings(self): - """Reset settings after each test.""" - yield - reload(settings) - reload(debug) - def test_memory_tracker_initialization(self): """Test MemoryTracker initializes with correct values.""" tracker = debug.MemoryTracker(pid=os.getpid(), poll_interval=0.1) @@ -35,7 +27,6 @@ def test_memory_tracker_initialization(self): assert tracker.poll_interval == 0.1 assert tracker.max_rss_bytes == 0 - @pytest.mark.skipif(not debug.PSUTIL_AVAILABLE, reason="psutil not available") def test_memory_tracker_tracks_memory(self): """Test MemoryTracker actually tracks memory usage.""" tracker = debug.MemoryTracker(pid=os.getpid(), poll_interval=0.05) @@ -52,7 +43,6 @@ def test_memory_tracker_stop_without_start(self): # Should not raise tracker.stop() - @pytest.mark.skipif(not debug.PSUTIL_AVAILABLE, reason="psutil not available") def test_memory_tracker_with_nonexistent_pid(self): """Test MemoryTracker handles non-existent PID gracefully.""" # Use a very high PID that's unlikely to exist @@ -76,24 +66,14 @@ def mock_context(self): mock_ti.run_id = "test_run_123" return {"ti": mock_ti} - def test_start_memory_tracking_disabled(self, mock_context): - """Test start_memory_tracking does nothing when debug mode is disabled.""" - with patch.object(debug.settings, "enable_debug_mode", False): - debug.start_memory_tracking(mock_context) - # No tracker should be created - task_key = f"{mock_context['ti'].dag_id}.{mock_context['ti'].task_id}.{mock_context['ti'].run_id}" - assert task_key not in debug._memory_trackers - - @pytest.mark.skipif(not debug.PSUTIL_AVAILABLE, reason="psutil not available") - def test_start_memory_tracking_enabled(self, mock_context): - """Test start_memory_tracking creates tracker when debug mode is enabled.""" - with patch.object(debug.settings, "enable_debug_mode", True): - debug.start_memory_tracking(mock_context) - task_key = f"{mock_context['ti'].dag_id}.{mock_context['ti'].task_id}.{mock_context['ti'].run_id}" - assert task_key in debug._memory_trackers - # Cleanup - tracker = debug._memory_trackers.pop(task_key) - tracker.stop() + def test_start_memory_tracking_creates_tracker(self, mock_context): + """Test start_memory_tracking creates tracker.""" + debug.start_memory_tracking(mock_context) + task_key = f"{mock_context['ti'].dag_id}.{mock_context['ti'].task_id}.{mock_context['ti'].run_id}" + assert task_key in debug._memory_trackers + # Cleanup + tracker = debug._memory_trackers.pop(task_key) + tracker.stop() class TestStopMemoryTracking: @@ -108,77 +88,25 @@ def mock_context(self): mock_ti.run_id = "test_run_123" return {"ti": mock_ti} - def test_stop_memory_tracking_disabled(self, mock_context): - """Test stop_memory_tracking does nothing when debug mode is disabled.""" - with patch.object(debug.settings, "enable_debug_mode", False): - debug.stop_memory_tracking(mock_context) - # Should not raise and xcom_push should not be called - mock_context["ti"].xcom_push.assert_not_called() - - @pytest.mark.skipif(not debug.PSUTIL_AVAILABLE, reason="psutil not available") def test_stop_memory_tracking_pushes_xcom(self, mock_context): """Test stop_memory_tracking pushes memory data to XCom.""" - with patch.object(debug.settings, "enable_debug_mode", True): - # Start tracking first - debug.start_memory_tracking(mock_context) - time.sleep(0.1) # Let it sample - # Stop and check XCom push - debug.stop_memory_tracking(mock_context) - mock_context["ti"].xcom_push.assert_called_once() - call_args = mock_context["ti"].xcom_push.call_args - assert call_args[1]["key"] == "cosmos_debug_max_memory_mb" - assert isinstance(call_args[1]["value"], float) - assert call_args[1]["value"] > 0 + # Start tracking first + debug.start_memory_tracking(mock_context) + time.sleep(0.1) # Let it sample + # Stop and check XCom push + debug.stop_memory_tracking(mock_context) + mock_context["ti"].xcom_push.assert_called_once() + call_args = mock_context["ti"].xcom_push.call_args + assert call_args[1]["key"] == "cosmos_debug_max_memory_mb" + assert isinstance(call_args[1]["value"], float) + assert call_args[1]["value"] > 0 def test_stop_memory_tracking_no_tracker(self, mock_context): """Test stop_memory_tracking handles missing tracker gracefully.""" - with patch.object(debug.settings, "enable_debug_mode", True): - # Don't start tracking, just stop - debug.stop_memory_tracking(mock_context) - # Should not raise and xcom_push should not be called - mock_context["ti"].xcom_push.assert_not_called() - - -class TestPsutilNotAvailable: - """Tests for when psutil is not available.""" - - @pytest.fixture - def mock_context(self): - """Create a mock Airflow context.""" - mock_ti = MagicMock() - mock_ti.dag_id = "test_dag" - mock_ti.task_id = "test_task" - mock_ti.run_id = "test_run_no_psutil" - return {"ti": mock_ti} - - def test_memory_tracker_run_without_psutil(self): - """Test MemoryTracker._run() returns early when psutil is not available.""" - tracker = debug.MemoryTracker(pid=os.getpid(), poll_interval=0.05) - with patch.object(debug, "PSUTIL_AVAILABLE", False): - # Call _run directly to test the branch - tracker._run() - # Should return immediately without tracking any memory - assert tracker.max_rss_bytes == 0 - - def test_start_memory_tracking_without_psutil_logs_warning(self, mock_context): - """Test start_memory_tracking logs warning when psutil is not available.""" - with patch.object(debug.settings, "enable_debug_mode", True): - with patch.object(debug, "PSUTIL_AVAILABLE", False): - with patch.object(debug.logger, "warning") as mock_warning: - debug.start_memory_tracking(mock_context) - mock_warning.assert_called_once() - assert "psutil is not available" in mock_warning.call_args[0][0] - # No tracker should be created - task_key = f"{mock_context['ti'].dag_id}.{mock_context['ti'].task_id}.{mock_context['ti'].run_id}" - assert task_key not in debug._memory_trackers - - def test_stop_memory_tracking_without_psutil_returns_early(self, mock_context): - """Test stop_memory_tracking returns early when psutil is not available.""" - with patch.object(debug.settings, "enable_debug_mode", True): - with patch.object(debug, "PSUTIL_AVAILABLE", False): - debug.stop_memory_tracking(mock_context) - # Should not push to XCom - mock_context["ti"].xcom_push.assert_not_called() + # Don't start tracking, just stop + debug.stop_memory_tracking(mock_context) + # Should not raise and xcom_push should not be called + mock_context["ti"].xcom_push.assert_not_called() class TestIntegration: @@ -193,22 +121,20 @@ def mock_context(self): mock_ti.run_id = "test_run_integration" return {"ti": mock_ti} - @pytest.mark.skipif(not debug.PSUTIL_AVAILABLE, reason="psutil not available") def test_full_tracking_lifecycle(self, mock_context): """Test complete memory tracking lifecycle.""" - with patch.object(debug.settings, "enable_debug_mode", True): - # Start - debug.start_memory_tracking(mock_context) - task_key = f"{mock_context['ti'].dag_id}.{mock_context['ti'].task_id}.{mock_context['ti'].run_id}" - assert task_key in debug._memory_trackers + # Start + debug.start_memory_tracking(mock_context) + task_key = f"{mock_context['ti'].dag_id}.{mock_context['ti'].task_id}.{mock_context['ti'].run_id}" + assert task_key in debug._memory_trackers - # Simulate some work - time.sleep(0.2) + # Simulate some work + time.sleep(0.2) - # Stop - debug.stop_memory_tracking(mock_context) - assert task_key not in debug._memory_trackers - mock_context["ti"].xcom_push.assert_called_once() + # Stop + debug.stop_memory_tracking(mock_context) + assert task_key not in debug._memory_trackers + mock_context["ti"].xcom_push.assert_called_once() MINI_DBT_PROJ_DIR = Path(__file__).parent / "sample" / "mini" @@ -222,7 +148,6 @@ def test_full_tracking_lifecycle(self, mock_context): @pytest.mark.integration -@pytest.mark.skipif(not debug.PSUTIL_AVAILABLE, reason="psutil not available") def test_dbt_run_local_operator_stores_memory_in_xcom_when_debug_enabled(): """ Integration test that DbtRunLocalOperator pushes peak memory utilization to XCom From 99e9b65a3c58eab13a5fcb01a3122664331e0ea4 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 6 Feb 2026 13:13:50 +0000 Subject: [PATCH 09/10] Increase test coverage --- cosmos/debug.py | 24 +++-------------- tests/test_debug.py | 64 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 21 deletions(-) diff --git a/cosmos/debug.py b/cosmos/debug.py index 68e606ccff..0aba093b42 100644 --- a/cosmos/debug.py +++ b/cosmos/debug.py @@ -14,10 +14,10 @@ try: import psutil - - PSUTIL_AVAILABLE = True except ImportError: - PSUTIL_AVAILABLE = False + raise RuntimeError( + "psutil is not available. Install `https://pypi.org/project/psutil/` to enable memory tracking." + ) from cosmos import settings from cosmos.log import get_logger @@ -62,9 +62,6 @@ def stop(self) -> None: def _run(self) -> None: """Background thread that polls memory usage.""" - if not PSUTIL_AVAILABLE: - return - try: parent = psutil.Process(self.pid) except psutil.NoSuchProcess: @@ -95,15 +92,6 @@ def start_memory_tracking(context: Context) -> None: :param context: The Airflow task context. """ - if not settings.enable_debug_mode: - return - - if not PSUTIL_AVAILABLE: - logger.warning( - "psutil is not available. Memory tracking is disabled. Install psutil to enable memory tracking." - ) - return - ti = context["ti"] task_key = f"{ti.dag_id}.{ti.task_id}.{ti.run_id}" pid = os.getpid() @@ -122,12 +110,6 @@ def stop_memory_tracking(context: Context) -> None: :param context: The Airflow task context. """ - if not settings.enable_debug_mode: - return - - if not PSUTIL_AVAILABLE: - return - ti = context["ti"] task_key = f"{ti.dag_id}.{ti.task_id}.{ti.run_id}" tracker = _memory_trackers.pop(task_key, None) diff --git a/tests/test_debug.py b/tests/test_debug.py index cd34912050..2809202cd7 100644 --- a/tests/test_debug.py +++ b/tests/test_debug.py @@ -3,6 +3,7 @@ from __future__ import annotations import os +import sys import time from datetime import datetime from pathlib import Path @@ -17,6 +18,28 @@ from tests.utils import test_dag as run_test_dag +class TestPsutilImport: + """Tests for psutil import handling.""" + + def test_import_raises_runtime_error_when_psutil_unavailable(self): + """Test that importing cosmos.debug raises RuntimeError when psutil is not available.""" + # Remove cosmos.debug from sys.modules to allow re-import + modules_to_remove = [key for key in sys.modules if key.startswith("cosmos.debug")] + original_modules = {key: sys.modules.pop(key) for key in modules_to_remove} + + # Mock psutil to simulate it not being installed + with patch.dict(sys.modules, {"psutil": None}): + with pytest.raises(RuntimeError) as exc_info: + from importlib import import_module + + import_module("cosmos.debug") + + assert "psutil is not available" in str(exc_info.value) + + # Restore original modules + sys.modules.update(original_modules) + + class TestMemoryTracker: """Tests for the MemoryTracker class.""" @@ -53,6 +76,47 @@ def test_memory_tracker_with_nonexistent_pid(self): # Should have 0 bytes since process doesn't exist assert tracker.max_rss_bytes == 0 + def test_memory_tracker_handles_child_process_termination(self): + """Test MemoryTracker continues when a child process terminates during memory_info() call.""" + import psutil + + tracker = debug.MemoryTracker(pid=os.getpid(), poll_interval=0.05) + + # Mock a child process that raises NoSuchProcess when memory_info() is called + mock_child = MagicMock() + mock_child.memory_info.side_effect = psutil.NoSuchProcess(pid=12345) + + mock_parent = MagicMock() + mock_parent.memory_info.return_value = MagicMock(rss=1024 * 1024) # 1 MB + mock_parent.children.return_value = [mock_child] + + with patch("psutil.Process", return_value=mock_parent): + tracker.start() + time.sleep(0.15) + tracker.stop() + + # Should have tracked memory from parent only (child raised NoSuchProcess) + assert tracker.max_rss_bytes >= 1024 * 1024 + + def test_memory_tracker_handles_parent_children_call_failure(self): + """Test MemoryTracker breaks loop when parent.children() raises NoSuchProcess.""" + import psutil + + tracker = debug.MemoryTracker(pid=os.getpid(), poll_interval=0.05) + + mock_parent = MagicMock() + # First call succeeds, second call raises NoSuchProcess + mock_parent.memory_info.return_value = MagicMock(rss=1024 * 1024) + mock_parent.children.side_effect = [[], psutil.NoSuchProcess(pid=os.getpid())] + + with patch("psutil.Process", return_value=mock_parent): + tracker.start() + time.sleep(0.15) + tracker.stop() + + # Should have tracked some memory before the exception + assert tracker.max_rss_bytes >= 1024 * 1024 + class TestStartMemoryTracking: """Tests for the start_memory_tracking function.""" From df0c7814db7530e88146b5386f74eb37ee421942 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 6 Feb 2026 14:00:31 +0000 Subject: [PATCH 10/10] Improve test coverage https://app.codecov.io/gh/astronomer/astronomer-cosmos/pull/2327\?src\=pr\&el\=tree\&utm_medium\=referral\&utm_source\=github\&utm_content\=comment\&utm_campaign\=pr+comments\&utm_term\=astronomer --- tests/operators/test_base.py | 48 ++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/tests/operators/test_base.py b/tests/operators/test_base.py index 400a97a06b..d9e605eeb7 100644 --- a/tests/operators/test_base.py +++ b/tests/operators/test_base.py @@ -181,3 +181,51 @@ def test_abstract_dbt_base_init_no_super(): source = inspect.getsource(init_method) assert "super().__init__" not in source + + +@patch("cosmos.operators.base.settings") +@patch("cosmos.operators.base.AbstractDbtBase.build_and_run_cmd") +def test_dbt_base_operator_execute_debug_mode_exception_stops_memory_tracking( + mock_build_and_run_cmd, mock_settings, monkeypatch +): + """Tests that stop_memory_tracking is called when an exception occurs during debug mode execution.""" + mock_settings.enable_debug_mode = True + mock_build_and_run_cmd.side_effect = RuntimeError("Test exception") + + monkeypatch.setattr(AbstractDbtBase, "add_cmd_flags", lambda _: []) + AbstractDbtBase.__abstractmethods__ = set() + + base_operator = AbstractDbtBase(task_id="fake_task", project_dir="fake_dir") + + with patch("cosmos.debug.start_memory_tracking") as mock_start_tracking: + with patch("cosmos.debug.stop_memory_tracking") as mock_stop_tracking: + with pytest.raises(RuntimeError, match="Test exception"): + base_operator.execute(context={}) + + # Verify memory tracking was started + mock_start_tracking.assert_called_once_with({}) + # Verify memory tracking was stopped even though an exception occurred + mock_stop_tracking.assert_called_once_with({}) + + +@patch("cosmos.operators.base.settings") +@patch("cosmos.operators.base.AbstractDbtBase.build_and_run_cmd") +def test_dbt_base_operator_execute_debug_mode_success_stops_memory_tracking( + mock_build_and_run_cmd, mock_settings, monkeypatch +): + """Tests that stop_memory_tracking is called when execution succeeds in debug mode.""" + mock_settings.enable_debug_mode = True + + monkeypatch.setattr(AbstractDbtBase, "add_cmd_flags", lambda _: []) + AbstractDbtBase.__abstractmethods__ = set() + + base_operator = AbstractDbtBase(task_id="fake_task", project_dir="fake_dir") + + with patch("cosmos.debug.start_memory_tracking") as mock_start_tracking: + with patch("cosmos.debug.stop_memory_tracking") as mock_stop_tracking: + base_operator.execute(context={}) + + # Verify memory tracking was started + mock_start_tracking.assert_called_once_with({}) + # Verify memory tracking was stopped after successful execution + mock_stop_tracking.assert_called_once_with({})