From 094bf9280ce175b33b30d5ab0c168b69e0096ad0 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 7 Jan 2026 09:05:15 +0000 Subject: [PATCH 01/15] Tmp commit with WIP code --- cosmos/hooks/subprocess.py | 3 +- cosmos/operators/_watcher/base.py | 26 +++++++++------- cosmos/operators/local.py | 3 -- cosmos/operators/watcher.py | 16 ++++++++-- dev/dags/example_watcher.py | 7 ++++- scripts/test/integration.sh | 29 +++++++++++------- tests/operators/test_watcher.py | 51 +++++++++++++++++++++++++++++-- 7 files changed, 103 insertions(+), 32 deletions(-) diff --git a/cosmos/hooks/subprocess.py b/cosmos/hooks/subprocess.py index e2a82e19be..e028d57af6 100644 --- a/cosmos/hooks/subprocess.py +++ b/cosmos/hooks/subprocess.py @@ -99,9 +99,10 @@ def pre_exec() -> None: line = line.rstrip("\n") last_line = line log_lines.append(line) - self.log.info("%s", line) if process_log_line: process_log_line(line, kwargs) + else: + self.log.info("%s", line) # Wait until process completes return_code = self.sub_process.wait() diff --git a/cosmos/operators/_watcher/base.py b/cosmos/operators/_watcher/base.py index 389717a41d..61157493b2 100644 --- a/cosmos/operators/_watcher/base.py +++ b/cosmos/operators/_watcher/base.py @@ -39,17 +39,21 @@ def store_dbt_resource_status_from_log(line: str, extra_kwargs: Any) -> None: except json.JSONDecodeError: logger.debug("Failed to parse log: %s", line) log_line = {} - node_info = log_line.get("data", {}).get("node_info", {}) - node_status = node_info.get("node_status") - unique_id = node_info.get("unique_id") - - logger.debug("Model: %s is in %s state", unique_id, node_status) - - # TODO: Handle and store all possible node statuses, not just the current success and failed - if node_status in ["success", "failed"]: - context = extra_kwargs.get("context") - assert context is not None # Make MyPy happy - safe_xcom_push(task_instance=context["ti"], key=f"{unique_id.replace('.', '__')}_status", value=node_status) + else: + logger.debug("Log line: %s", log_line) + if "info" in log_line and "msg" in log_line["info"]: + logger.info(log_line["info"]["msg"]) + node_info = log_line.get("data", {}).get("node_info", {}) + node_status = node_info.get("node_status") + unique_id = node_info.get("unique_id") + + logger.debug("Model: %s is in %s state", unique_id, node_status) + + # TODO: Handle and store all possible node statuses, not just the current success and failed + if node_status in ["success", "failed"]: + context = extra_kwargs.get("context") + assert context is not None # Make MyPy happy + safe_xcom_push(task_instance=context["ti"], key=f"{unique_id.replace('.', '__')}_status", value=node_status) class BaseConsumerSensor(BaseSensorOperator): # type: ignore[misc] diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 2b37d6d821..ae1ec5b46b 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -463,9 +463,6 @@ def run_subprocess( process_log_line=self._process_log_line_callable, **kwargs, ) - # Logging changed in Airflow 3.1 and we needed to replace the output by the full output: - output = "".join(subprocess_result.full_output) - logger.info(output) return subprocess_result def run_dbt_runner(self, command: list[str], env: dict[str, str], cwd: str, **kwargs: Any) -> dbtRunnerResult: diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 29696fadc0..ebf0424298 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -3,7 +3,7 @@ import base64 import json import zlib -from collections.abc import Sequence +from collections.abc import Callable, Sequence from datetime import timedelta from pathlib import Path from typing import TYPE_CHECKING, Any @@ -83,7 +83,7 @@ class DbtProducerWatcherOperator(DbtBuildMixin, DbtLocalBaseOperator): template_fields = DbtLocalBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator] # Use staticmethod to prevent Python's descriptor protocol from binding the function to `self` # when accessed via instance, which would incorrectly pass `self` as the first argument - _process_log_line_callable = staticmethod(store_dbt_resource_status_from_log) + _process_log_line_callable: Callable[[str, Any], None] = None def __init__(self, *args: Any, **kwargs: Any) -> None: task_id = kwargs.pop("task_id", PRODUCER_WATCHER_TASK_ID) @@ -96,7 +96,6 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: default_args["retries"] = 0 kwargs["default_args"] = default_args kwargs["retries"] = 0 - kwargs["log_format"] = "json" super().__init__(task_id=task_id, *args, **kwargs) @@ -148,6 +147,17 @@ def _finalize(self, context: Context, startup_events: list[dict[str, Any]]) -> N safe_xcom_push(task_instance=context["ti"], key="dbt_startup_events", value=startup_events) def execute(self, context: Context, **kwargs: Any) -> Any: + if not self.invocation_mode: + logger.info("No invocation mode provided, discovering it") + self._discover_invocation_mode() + + logger.info("Invocation mode: %s", self.invocation_mode) + + if self.invocation_mode == InvocationMode.SUBPROCESS: + logger.info("Setting log_format to json and process_log_line_callable to store_dbt_resource_status_from_log") + self.log_format = "json" + self._process_log_line_callable = store_dbt_resource_status_from_log + task_instance = context.get("ti") if task_instance is None: raise AirflowException("DbtProducerWatcherOperator expects a task instance in the execution context") diff --git a/dev/dags/example_watcher.py b/dev/dags/example_watcher.py index 9de2055287..b009348d7d 100644 --- a/dev/dags/example_watcher.py +++ b/dev/dags/example_watcher.py @@ -14,6 +14,7 @@ DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) DBT_PROJECT_NAME = os.getenv("DBT_PROJECT_NAME", "jaffle_shop") DBT_PROJECT_PATH = DBT_ROOT_PATH / DBT_PROJECT_NAME +DBT_EXECUTABLE_PATH = Path(__file__).parent.parent / "venv-subprocess/bin/dbt" profile_config = ProfileConfig( @@ -42,7 +43,11 @@ # [START example_watcher] example_watcher = DbtDag( # dbt/cosmos-specific parameters - execution_config=ExecutionConfig(execution_mode=ExecutionMode.WATCHER, invocation_mode=InvocationMode.DBT_RUNNER), + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.WATCHER, + invocation_mode=InvocationMode.SUBPROCESS, + dbt_executable_path=DBT_EXECUTABLE_PATH, + ), project_config=ProjectConfig(DBT_PROJECT_PATH), profile_config=profile_config, render_config=RenderConfig(exclude=["raw_payments"]), diff --git a/scripts/test/integration.sh b/scripts/test/integration.sh index f988198076..685c6d652e 100644 --- a/scripts/test/integration.sh +++ b/scripts/test/integration.sh @@ -11,16 +11,23 @@ ls $AIRFLOW_HOME airflow db check +python -m venv venv-subprocess +venv-subprocess/bin/pip install -U dbt-postgres + rm -rf dbt/jaffle_shop/dbt_packages; -pytest -vv \ - --cov=cosmos \ - --cov-report=term-missing \ - --cov-report=xml \ - --durations=0 \ - -m 'integration and not dbtfusion' \ - --ignore=tests/perf \ - --ignore=tests/test_async_example_dag.py \ - --ignore=tests/test_example_k8s_dags.py \ - --ignore=tests/operators/test_watcher_kubernetes_integration.py \ - -k 'not (simple_dag_async or example_cosmos_python_models or example_virtualenv or jaffle_shop_kubernetes or jaffle_shop_watcher_kubernetes)' + +# pytest -vv \ +# --cov=cosmos \ +# --cov-report=term-missing \ +# --cov-report=xml \ +# --durations=0 \ +# -m 'integration and not dbtfusion' \ +# --ignore=tests/perf \ +# --ignore=tests/test_async_example_dag.py \ +# --ignore=tests/test_example_k8s_dags.py \ +# --ignore=tests/operators/test_watcher_kubernetes_integration.py \ +# -k 'not (simple_dag_async or example_cosmos_python_models or example_virtualenv or jaffle_shop_kubernetes or jaffle_shop_watcher_kubernetes)' + +pytest -vv tests/operators/test_watcher.py::test_dbt_dag_with_watcher + diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index f631fa0022..976accb6e1 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -33,7 +33,7 @@ DBT_PROJECT_PATH = Path(__file__).parent.parent.parent / "dev/dags/dbt/jaffle_shop" DBT_PROFILES_YAML_FILEPATH = DBT_PROJECT_PATH / "profiles.yml" - +DBT_EXECUTABLE_PATH = Path(__file__).parent.parent.parent / "venv-subprocess/bin/dbt" project_config = ProjectConfig( dbt_project_path=DBT_PROJECT_PATH, @@ -977,7 +977,7 @@ def test_dbt_build_watcher_operator_raises_not_implemented_error(self): @pytest.mark.skipif(AIRFLOW_VERSION < Version("2.7"), reason="Airflow did not have dag.test() until the 2.6 release") @pytest.mark.integration -def test_dbt_dag_with_watcher(): +def test_dbt_dag_with_watcher(caplog): """ Run a DbtDag using `ExecutionMode.WATCHER`. Confirm the right amount of tasks is created and that tasks are in the expected topological order. @@ -990,6 +990,7 @@ def test_dbt_dag_with_watcher(): dag_id="watcher_dag", execution_config=ExecutionConfig( execution_mode=ExecutionMode.WATCHER, + invocation_mode=InvocationMode.DBT_RUNNER, ), render_config=RenderConfig(emit_datasets=False), operator_args={"trigger_rule": "all_success", "execution_timeout": timedelta(seconds=120)}, @@ -1039,7 +1040,53 @@ def test_dbt_dag_with_watcher(): "raw_orders_seed", "raw_customers_seed", } + assert '''"node_status": "success", "resource_type": "seed", "unique_id": "seed.jaffle_shop.raw_orders"''' not in caplog.text + assert "OK loaded seed file public.raw_orders" in caplog.text + + +@pytest.mark.skipif(AIRFLOW_VERSION < Version("2.7"), reason="Airflow did not have dag.test() until the 2.6 release") +@pytest.mark.integration +def test_dbt_dag_with_watcher_and_subprocess(caplog): + """ + Run a DbtDag using `ExecutionMode.WATCHER`. + Confirm the right amount of tasks is created and that tasks are in the expected topological order. + Confirm that the producer watcher task is created and that it is the parent of the root dbt nodes. + """ + watcher_dag = DbtDag( + project_config=project_config, + profile_config=profile_config, + start_date=datetime(2023, 1, 1), + dag_id="watcher_dag", + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.WATCHER, + invocation_mode=InvocationMode.SUBPROCESS, + dbt_executable_path=DBT_EXECUTABLE_PATH + ), + render_config=RenderConfig( + emit_datasets=False, + select=["raw_orders"], + test_behavior=TestBehavior.AFTER_ALL + ), + operator_args={"trigger_rule": "all_success", "execution_timeout": timedelta(seconds=120)}, + ) + dag_run = new_test_dag(watcher_dag) + assert dag_run.state == DagRunState.SUCCESS + assert len(watcher_dag.dbt_graph.filtered_nodes) == 1 + + assert len(watcher_dag.task_dict) == 3 + tasks_names = [task.task_id for task in watcher_dag.topological_sort()] + expected_task_names = [ + "dbt_producer_watcher", + "raw_orders_seed", + "jaffle_shop_test" + ] + assert tasks_names == expected_task_names + # Confirm that the dbt command was successfully run using the given dbt executable path: + assert "venv-subprocess/bin/dbt'), 'build'" in caplog.text + # Confirm that the seed was successfully run and the log output was JSON: + assert '''"node_status": "success", "resource_type": "seed", "unique_id": "seed.jaffle_shop.raw_orders"''' not in caplog.text + assert "OK loaded seed file public.raw_orders" in caplog.text @pytest.mark.skipif(AIRFLOW_VERSION < Version("2.7"), reason="Airflow did not have dag.test() until the 2.6 release") @pytest.mark.integration From e6baa5c1c0e6212196506129418740f635e992a2 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 13 Jan 2026 17:45:35 +0000 Subject: [PATCH 02/15] Fix test --- tests/operators/test_watcher.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 976accb6e1..606acc68eb 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -1040,7 +1040,10 @@ def test_dbt_dag_with_watcher(caplog): "raw_orders_seed", "raw_customers_seed", } - assert '''"node_status": "success", "resource_type": "seed", "unique_id": "seed.jaffle_shop.raw_orders"''' not in caplog.text + assert ( + '''"node_status": "success", "resource_type": "seed", "unique_id": "seed.jaffle_shop.raw_orders"''' + not in caplog.text + ) assert "OK loaded seed file public.raw_orders" in caplog.text @@ -1060,13 +1063,9 @@ def test_dbt_dag_with_watcher_and_subprocess(caplog): execution_config=ExecutionConfig( execution_mode=ExecutionMode.WATCHER, invocation_mode=InvocationMode.SUBPROCESS, - dbt_executable_path=DBT_EXECUTABLE_PATH - ), - render_config=RenderConfig( - emit_datasets=False, - select=["raw_orders"], - test_behavior=TestBehavior.AFTER_ALL + dbt_executable_path=DBT_EXECUTABLE_PATH, ), + render_config=RenderConfig(emit_datasets=False, select=["raw_orders"], test_behavior=TestBehavior.AFTER_ALL), operator_args={"trigger_rule": "all_success", "execution_timeout": timedelta(seconds=120)}, ) dag_run = new_test_dag(watcher_dag) @@ -1076,18 +1075,18 @@ def test_dbt_dag_with_watcher_and_subprocess(caplog): assert len(watcher_dag.task_dict) == 3 tasks_names = [task.task_id for task in watcher_dag.topological_sort()] - expected_task_names = [ - "dbt_producer_watcher", - "raw_orders_seed", - "jaffle_shop_test" - ] + expected_task_names = ["dbt_producer_watcher", "raw_orders_seed", "jaffle_shop_test"] assert tasks_names == expected_task_names # Confirm that the dbt command was successfully run using the given dbt executable path: assert "venv-subprocess/bin/dbt'), 'build'" in caplog.text # Confirm that the seed was successfully run and the log output was JSON: - assert '''"node_status": "success", "resource_type": "seed", "unique_id": "seed.jaffle_shop.raw_orders"''' not in caplog.text + assert ( + '''"node_status": "success", "resource_type": "seed", "unique_id": "seed.jaffle_shop.raw_orders"''' + not in caplog.text + ) assert "OK loaded seed file public.raw_orders" in caplog.text + @pytest.mark.skipif(AIRFLOW_VERSION < Version("2.7"), reason="Airflow did not have dag.test() until the 2.6 release") @pytest.mark.integration def test_dbt_task_group_with_watcher(): From 1cb4117603fef310ee63f61479c79d0fd73a654a Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 27 Jan 2026 12:48:31 +0000 Subject: [PATCH 03/15] Fix test --- cosmos/dbt/executable.py | 13 ++++++++++ tests/operators/test_watcher.py | 44 ++------------------------------- 2 files changed, 15 insertions(+), 42 deletions(-) diff --git a/cosmos/dbt/executable.py b/cosmos/dbt/executable.py index d926e1c832..6a75bc7173 100644 --- a/cosmos/dbt/executable.py +++ b/cosmos/dbt/executable.py @@ -1,4 +1,5 @@ import shutil +from importlib.util import find_spec def get_system_dbt() -> str: @@ -6,3 +7,15 @@ def get_system_dbt() -> str: Tries to identify which is the path to the dbt executable, return "dbt" otherwise. """ return shutil.which("dbt") or "dbt" + + +def is_dbt_installed_in_same_environment() -> bool: + """ + Checks if dbt is installed in the same environment as the current one. + """ + try: + find_spec("dbt") + except ImportError: + return False + else: + return True diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 606acc68eb..d5abda0394 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -487,48 +487,10 @@ def teststore_dbt_resource_status_from_log_handles_missing_node_info(self): # No status should be stored assert len(ti.store) == 0 - def test_process_log_line_callable_is_not_bound_method(self): - """Test that _process_log_line_callable is not bound as a method when accessed through an instance. - - This test verifies the fix for the bug where accessing _process_log_line_callable through - an instance would create a bound method, causing 'self' to be passed as the first argument. - """ - import inspect - - op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) - - # Access the callable through the instance - callable_from_instance = op._process_log_line_callable - - # Verify it's not a bound method (which would have __self__ attribute) - assert not inspect.ismethod( - callable_from_instance - ), "_process_log_line_callable should not be a bound method when accessed through instance" - - # Verify it's the original function - assert callable_from_instance is store_dbt_resource_status_from_log - - def test_process_log_line_callable_accepts_two_arguments(self): - """Test that the callable can be called with exactly 2 arguments (line, kwargs). - - This tests the integration pattern used in subprocess.py where process_log_line(line, kwargs) is called. - """ - op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) - callable_from_instance = op._process_log_line_callable - - ti = _MockTI() - ctx = {"ti": ti} - - log_line = json.dumps({"data": {"node_info": {"node_status": "success", "unique_id": "model.pkg.test_model"}}}) - - # This should NOT raise TypeError about wrong number of arguments - callable_from_instance(log_line, {"context": ctx}) - - assert ti.store.get("model__pkg__test_model_status") == "success" - def test_process_log_line_callable_integration_with_subprocess_pattern(self): """Test the exact pattern used in subprocess.py: process_log_line(line, kwargs).""" op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) + op._process_log_line_callable = store_dbt_resource_status_from_log ti = _MockTI() ctx = {"ti": ti} @@ -543,10 +505,8 @@ def test_process_log_line_callable_integration_with_subprocess_pattern(self): ] # Simulate the subprocess.py pattern - process_log_line = op._process_log_line_callable for line in log_lines: - if process_log_line: - process_log_line(line, kwargs) + op._process_log_line_callable(line, kwargs) assert ti.store.get("model__pkg__model_a_status") == "success" assert ti.store.get("model__pkg__model_b_status") == "failed" From e4234c7434f4cca10e28ab9ee4f974475672ad77 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 27 Jan 2026 12:54:28 +0000 Subject: [PATCH 04/15] Revert integraion test temporary change --- scripts/test/integration.sh | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/scripts/test/integration.sh b/scripts/test/integration.sh index 685c6d652e..a844e37357 100644 --- a/scripts/test/integration.sh +++ b/scripts/test/integration.sh @@ -17,17 +17,14 @@ venv-subprocess/bin/pip install -U dbt-postgres rm -rf dbt/jaffle_shop/dbt_packages; -# pytest -vv \ -# --cov=cosmos \ -# --cov-report=term-missing \ -# --cov-report=xml \ -# --durations=0 \ -# -m 'integration and not dbtfusion' \ -# --ignore=tests/perf \ -# --ignore=tests/test_async_example_dag.py \ -# --ignore=tests/test_example_k8s_dags.py \ -# --ignore=tests/operators/test_watcher_kubernetes_integration.py \ -# -k 'not (simple_dag_async or example_cosmos_python_models or example_virtualenv or jaffle_shop_kubernetes or jaffle_shop_watcher_kubernetes)' - -pytest -vv tests/operators/test_watcher.py::test_dbt_dag_with_watcher - +pytest -vv \ + --cov=cosmos \ + --cov-report=term-missing \ + --cov-report=xml \ + --durations=0 \ + -m 'integration and not dbtfusion' \ + --ignore=tests/perf \ + --ignore=tests/test_async_example_dag.py \ + --ignore=tests/test_example_k8s_dags.py \ + --ignore=tests/operators/test_watcher_kubernetes_integration.py \ + -k 'not (simple_dag_async or example_cosmos_python_models or example_virtualenv or jaffle_shop_kubernetes or jaffle_shop_watcher_kubernetes)' From fdad323d8131a09a0ecfbf8a5a136e653e74abd3 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 27 Jan 2026 13:00:17 +0000 Subject: [PATCH 05/15] Fix MyPy checks --- cosmos/operators/watcher.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index ebf0424298..3b60c63786 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -83,7 +83,7 @@ class DbtProducerWatcherOperator(DbtBuildMixin, DbtLocalBaseOperator): template_fields = DbtLocalBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator] # Use staticmethod to prevent Python's descriptor protocol from binding the function to `self` # when accessed via instance, which would incorrectly pass `self` as the first argument - _process_log_line_callable: Callable[[str, Any], None] = None + _process_log_line_callable: Callable[[str, Any], None] | None = None def __init__(self, *args: Any, **kwargs: Any) -> None: task_id = kwargs.pop("task_id", PRODUCER_WATCHER_TASK_ID) @@ -146,18 +146,23 @@ def _finalize(self, context: Context, startup_events: list[dict[str, Any]]) -> N if startup_events: safe_xcom_push(task_instance=context["ti"], key="dbt_startup_events", value=startup_events) - def execute(self, context: Context, **kwargs: Any) -> Any: + def _set_invocation_mode_if_not_set(self) -> None: if not self.invocation_mode: logger.info("No invocation mode provided, discovering it") self._discover_invocation_mode() - logger.info("Invocation mode: %s", self.invocation_mode) - + def _set_process_log_line_callable_if_subprocess(self) -> None: if self.invocation_mode == InvocationMode.SUBPROCESS: - logger.info("Setting log_format to json and process_log_line_callable to store_dbt_resource_status_from_log") + logger.info( + "DbtProducerWatcherOperator: Setting log_format to json and process_log_line_callable to store_dbt_resource_status_from_log" + ) self.log_format = "json" self._process_log_line_callable = store_dbt_resource_status_from_log + def execute(self, context: Context, **kwargs: Any) -> Any: + self._set_invocation_mode_if_not_set() + self._set_process_log_line_callable_if_subprocess() + task_instance = context.get("ti") if task_instance is None: raise AirflowException("DbtProducerWatcherOperator expects a task instance in the execution context") @@ -165,12 +170,12 @@ def execute(self, context: Context, **kwargs: Any) -> Any: try_number = getattr(task_instance, "try_number", 1) if try_number > 1: - retry_message = ( + self.log.info( "Dbt WATCHER producer task does not support Airflow retries. " - f"Detected attempt #{try_number}; failing fast to avoid running a second dbt build." + "Detected attempt #%s; skipping execution to avoid running a second dbt build.", + try_number, ) - self.log.error(retry_message) - raise AirflowException(retry_message) + return None self.log.info( "Dbt WATCHER producer task forces Airflow retries to 0 so the dbt build only runs once; " From be798d9db638ccb204a74abe09cde902df23ae78 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 27 Jan 2026 13:01:29 +0000 Subject: [PATCH 06/15] Add tests --- tests/dbt/test_executable.py | 39 ++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 tests/dbt/test_executable.py diff --git a/tests/dbt/test_executable.py b/tests/dbt/test_executable.py new file mode 100644 index 0000000000..78c8bb50e7 --- /dev/null +++ b/tests/dbt/test_executable.py @@ -0,0 +1,39 @@ +from unittest.mock import MagicMock, patch + +from cosmos.dbt.executable import get_system_dbt, is_dbt_installed_in_same_environment + + +class TestGetSystemDbt: + @patch("shutil.which") + def test_get_system_dbt_returns_path_when_found(self, mock_which): + """Test that get_system_dbt returns the path when dbt is found.""" + mock_which.return_value = "/usr/local/bin/dbt" + result = get_system_dbt() + assert result == "/usr/local/bin/dbt" + mock_which.assert_called_once_with("dbt") + + @patch("shutil.which") + def test_get_system_dbt_returns_dbt_when_not_found(self, mock_which): + """Test that get_system_dbt returns 'dbt' when dbt is not found.""" + mock_which.return_value = None + result = get_system_dbt() + assert result == "dbt" + mock_which.assert_called_once_with("dbt") + + +class TestIsDbtInstalledInSameEnvironment: + @patch("cosmos.dbt.executable.find_spec") + def test_is_dbt_installed_in_same_environment_returns_true_when_dbt_found(self, mock_find_spec): + """Test that is_dbt_installed_in_same_environment returns True when find_spec finds dbt.""" + mock_find_spec.return_value = MagicMock() # Simulates dbt module spec found + result = is_dbt_installed_in_same_environment() + assert result is True + mock_find_spec.assert_called_once_with("dbt") + + @patch("cosmos.dbt.executable.find_spec") + def test_is_dbt_installed_in_same_environment_returns_false_when_import_error(self, mock_find_spec): + """Test that is_dbt_installed_in_same_environment returns False when find_spec raises ImportError.""" + mock_find_spec.side_effect = ImportError("No module named 'dbt'") + result = is_dbt_installed_in_same_environment() + assert result is False + mock_find_spec.assert_called_once_with("dbt") From 7c05492e632d97a977a537f60cb3a5e5f7dc56e3 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 27 Jan 2026 13:07:29 +0000 Subject: [PATCH 07/15] Fix test --- tests/operators/test_watcher.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index d5abda0394..86e915c1ba 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -198,22 +198,6 @@ def test_dbt_producer_watcher_operator_logs_retry_message(caplog): assert any("forces Airflow retries to 0" in message for message in caplog.messages) -def test_dbt_producer_watcher_operator_blocks_retry_attempt(caplog): - op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) - ti = _MockTI() - ti.try_number = 2 - context = {"ti": ti} - - with patch("cosmos.operators.local.DbtLocalBaseOperator.execute") as mock_execute: - with caplog.at_level(logging.ERROR): - with pytest.raises(AirflowException) as excinfo: - op.execute(context=context) - - mock_execute.assert_not_called() - assert "does not support Airflow retries" in str(excinfo.value) - assert any("does not support Airflow retries" in message for message in caplog.messages) - - @pytest.mark.parametrize( "event, expected_message", [ From 0595504cfffb32648ed2db498c037d374788be29 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 27 Jan 2026 13:51:28 +0000 Subject: [PATCH 08/15] Normalise Cosmos logs --- cosmos/hooks/subprocess.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/cosmos/hooks/subprocess.py b/cosmos/hooks/subprocess.py index e028d57af6..2e12876625 100644 --- a/cosmos/hooks/subprocess.py +++ b/cosmos/hooks/subprocess.py @@ -18,6 +18,10 @@ except ImportError: from airflow.hooks.base import BaseHook +from cosmos.log import get_logger + +logger = get_logger(__name__) + class FullOutputSubprocessResult(NamedTuple): exit_code: int @@ -60,7 +64,7 @@ def run_command( ``output``: the last line from stderr or stdout ``full_output``: all lines from stderr or stdout. """ - self.log.info("Tmp dir root location: \n %s", gettempdir()) + logger.info("Tmp dir root location: \n %s", gettempdir()) log_lines = [] with contextlib.ExitStack() as stack: if cwd is None: @@ -73,7 +77,7 @@ def pre_exec() -> None: signal.signal(getattr(signal, sig), signal.SIG_DFL) os.setsid() - self.log.info("Running command: %s", command) + logger.info("Running command: %s", command) self.sub_process = Popen( command, @@ -91,7 +95,7 @@ def pre_exec() -> None: if self.sub_process is None: raise RuntimeError("The subprocess should be created here and is None!") - self.log.info("Command output:") + logger.info("Command output:") last_line: str = "" assert self.sub_process.stdout is not None @@ -102,23 +106,23 @@ def pre_exec() -> None: if process_log_line: process_log_line(line, kwargs) else: - self.log.info("%s", line) + logger.info("%s", line) # Wait until process completes return_code = self.sub_process.wait() - self.log.info("Command exited with return code %s", return_code) + logger.info("Command exited with return code %s", return_code) return FullOutputSubprocessResult(exit_code=return_code, output=last_line, full_output=log_lines) def send_sigterm(self) -> None: """Sends SIGTERM signal to ``self.sub_process`` if one exists.""" - self.log.info("Sending SIGTERM signal to process group") + logger.info("Sending SIGTERM signal to process group") if self.sub_process and hasattr(self.sub_process, "pid"): os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM) def send_sigint(self) -> None: """Sends SIGINT signal to ``self.sub_process`` if one exists.""" - self.log.info("Sending SIGINT signal to process group") + logger.info("Sending SIGINT signal to process group") if self.sub_process and hasattr(self.sub_process, "pid"): os.killpg(os.getpgid(self.sub_process.pid), signal.SIGINT) From 58d31000c9fde7bc612bd5fca325e4b4147c513b Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 27 Jan 2026 13:51:57 +0000 Subject: [PATCH 09/15] Fix integration test that was failing due to capsys/caplog --- tests/operators/test_watcher.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 86e915c1ba..bc78390326 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -921,7 +921,7 @@ def test_dbt_build_watcher_operator_raises_not_implemented_error(self): @pytest.mark.skipif(AIRFLOW_VERSION < Version("2.7"), reason="Airflow did not have dag.test() until the 2.6 release") @pytest.mark.integration -def test_dbt_dag_with_watcher(caplog): +def test_dbt_dag_with_watcher(capsys): """ Run a DbtDag using `ExecutionMode.WATCHER`. Confirm the right amount of tasks is created and that tasks are in the expected topological order. @@ -984,11 +984,17 @@ def test_dbt_dag_with_watcher(caplog): "raw_orders_seed", "raw_customers_seed", } + + # dbt runner logs are not captured by caplog, so we need to capture them using capsys + capsys_output = capsys.readouterr() + stdout = capsys_output.out + assert ( '''"node_status": "success", "resource_type": "seed", "unique_id": "seed.jaffle_shop.raw_orders"''' - not in caplog.text + not in stdout ) - assert "OK loaded seed file public.raw_orders" in caplog.text + + assert "OK loaded seed file public.raw_orders" in stdout @pytest.mark.skipif(AIRFLOW_VERSION < Version("2.7"), reason="Airflow did not have dag.test() until the 2.6 release") From 435a2ee0bd614bb519bee5018c4606188658913c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 27 Jan 2026 13:53:30 +0000 Subject: [PATCH 10/15] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/operators/test_watcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 526e22741d..c05923fab0 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -240,7 +240,7 @@ def test_dbt_producer_watcher_operator_skips_retry_attempt(caplog): assert any("does not support Airflow retries" in message for message in caplog.messages) assert any("skipping execution" in message for message in caplog.messages) - + @pytest.mark.parametrize( "event, expected_message", [ From 4f4e514fdde5bd796c60de2f62277860ceaf95b8 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 27 Jan 2026 14:21:40 +0000 Subject: [PATCH 11/15] Remove no longer needed tests --- tests/operators/test_watcher.py | 39 --------------------------------- 1 file changed, 39 deletions(-) diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 57c31ca954..9f743a72eb 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -550,45 +550,6 @@ def test_store_dbt_resource_status_from_log_outputs_dbt_info(self, caplog, msg, assert msg in caplog.text assert any(record.levelname == logging.getLevelName(dynamic_level) for record in caplog.records) - def test_process_log_line_callable_is_not_bound_method(self): - """Test that _process_log_line_callable is not bound as a method when accessed through an instance. - - This test verifies the fix for the bug where accessing _process_log_line_callable through - an instance would create a bound method, causing 'self' to be passed as the first argument. - """ - import inspect - - op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) - - # Access the callable through the instance - callable_from_instance = op._process_log_line_callable - - # Verify it's not a bound method (which would have __self__ attribute) - assert not inspect.ismethod( - callable_from_instance - ), "_process_log_line_callable should not be a bound method when accessed through instance" - - # Verify it's the original function - assert callable_from_instance is store_dbt_resource_status_from_log - - def test_process_log_line_callable_accepts_two_arguments(self): - """Test that the callable can be called with exactly 2 arguments (line, kwargs). - - This tests the integration pattern used in subprocess.py where process_log_line(line, kwargs) is called. - """ - op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) - callable_from_instance = op._process_log_line_callable - - ti = _MockTI() - ctx = {"ti": ti} - - log_line = json.dumps({"data": {"node_info": {"node_status": "success", "unique_id": "model.pkg.test_model"}}}) - - # This should NOT raise TypeError about wrong number of arguments - callable_from_instance(log_line, {"context": ctx}) - - assert ti.store.get("model__pkg__test_model_status") == "success" - def test_process_log_line_callable_integration_with_subprocess_pattern(self): """Test the exact pattern used in subprocess.py: process_log_line(line, kwargs).""" op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) From e6ba31883458a1311dbcd103d7f9a320eb9fff5f Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 27 Jan 2026 15:18:35 +0000 Subject: [PATCH 12/15] Revert changes to example dag --- dev/dags/example_watcher.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/dev/dags/example_watcher.py b/dev/dags/example_watcher.py index b009348d7d..de2e5e8348 100644 --- a/dev/dags/example_watcher.py +++ b/dev/dags/example_watcher.py @@ -43,11 +43,10 @@ # [START example_watcher] example_watcher = DbtDag( # dbt/cosmos-specific parameters - execution_config=ExecutionConfig( - execution_mode=ExecutionMode.WATCHER, - invocation_mode=InvocationMode.SUBPROCESS, - dbt_executable_path=DBT_EXECUTABLE_PATH, - ), + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.WATCHER, + invocation_mode=InvocationMode.DBT_RUNNER + ), project_config=ProjectConfig(DBT_PROJECT_PATH), profile_config=profile_config, render_config=RenderConfig(exclude=["raw_payments"]), From d4f61ba28da8085c2266855bdf3455d5a71b2882 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 27 Jan 2026 15:18:46 +0000 Subject: [PATCH 13/15] Apply suggestion from @tatiana --- dev/dags/example_watcher.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dev/dags/example_watcher.py b/dev/dags/example_watcher.py index de2e5e8348..fc147d5c60 100644 --- a/dev/dags/example_watcher.py +++ b/dev/dags/example_watcher.py @@ -14,7 +14,6 @@ DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) DBT_PROJECT_NAME = os.getenv("DBT_PROJECT_NAME", "jaffle_shop") DBT_PROJECT_PATH = DBT_ROOT_PATH / DBT_PROJECT_NAME -DBT_EXECUTABLE_PATH = Path(__file__).parent.parent / "venv-subprocess/bin/dbt" profile_config = ProfileConfig( From 822c5c9d7a77b530173cd6f44314c47e5f24c6e1 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 27 Jan 2026 15:18:56 +0000 Subject: [PATCH 14/15] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dev/dags/example_watcher.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/dev/dags/example_watcher.py b/dev/dags/example_watcher.py index fc147d5c60..9de2055287 100644 --- a/dev/dags/example_watcher.py +++ b/dev/dags/example_watcher.py @@ -42,10 +42,7 @@ # [START example_watcher] example_watcher = DbtDag( # dbt/cosmos-specific parameters - execution_config=ExecutionConfig( - execution_mode=ExecutionMode.WATCHER, - invocation_mode=InvocationMode.DBT_RUNNER - ), + execution_config=ExecutionConfig(execution_mode=ExecutionMode.WATCHER, invocation_mode=InvocationMode.DBT_RUNNER), project_config=ProjectConfig(DBT_PROJECT_PATH), profile_config=profile_config, render_config=RenderConfig(exclude=["raw_payments"]), From 441ae2d3afc009b46853848d82a353a5a84a4720 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 29 Jan 2026 08:54:57 +0000 Subject: [PATCH 15/15] Address feedback from copilot on code duplicate --- cosmos/operators/watcher.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index f12b11240c..a999ea54e6 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -185,9 +185,6 @@ def execute(self, context: Context, **kwargs: Any) -> Any: ) try: - if not self.invocation_mode: - self._discover_invocation_mode() - use_events = self.invocation_mode == InvocationMode.DBT_RUNNER and EventMsg is not None logger.debug("DbtProducerWatcherOperator: use_events=%s", use_events)