From 61120ca25d6b3dbbcb9203804936e61e2b99a5f2 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 25 Sep 2025 19:23:10 +0530 Subject: [PATCH 1/4] Push producer's task completion status to XCOM --- cosmos/operators/watcher.py | 46 +++++++++++++++++++-------------- tests/operators/test_watcher.py | 36 ++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 19 deletions(-) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 5c52784a81..02ba9810e2 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -90,29 +90,37 @@ def _finalize(self, context: Context, startup_events: list[dict[str, Any]]) -> N ti.xcom_push(key="dbt_startup_events", value=startup_events) def execute(self, context: Context, **kwargs: Any) -> Any: - if not self.invocation_mode: - self._discover_invocation_mode() + try: + if not self.invocation_mode: + self._discover_invocation_mode() - use_events = self.invocation_mode == InvocationMode.DBT_RUNNER and EventMsg is not None - self.log.debug("DbtProducerWatcherOperator: use_events=%s", use_events) + use_events = self.invocation_mode == InvocationMode.DBT_RUNNER and EventMsg is not None + self.log.debug("DbtProducerWatcherOperator: use_events=%s", use_events) - startup_events: list[dict[str, Any]] = [] + startup_events: list[dict[str, Any]] = [] - if use_events: + if use_events: - def _callback(ev: EventMsg) -> None: - name = ev.info.name - if name in {"MainReportVersion", "AdapterRegistered"}: - self._handle_startup_event(ev, startup_events) - elif name == "NodeFinished": - self._handle_node_finished(ev, context) + def _callback(ev: EventMsg) -> None: + name = ev.info.name + if name in {"MainReportVersion", "AdapterRegistered"}: + self._handle_startup_event(ev, startup_events) + elif name == "NodeFinished": + self._handle_node_finished(ev, context) - self._dbt_runner_callbacks = [_callback] - result = super().execute(context=context, **kwargs) + self._dbt_runner_callbacks = [_callback] + result = super().execute(context=context, **kwargs) - self._finalize(context, startup_events) - return result + self._finalize(context, startup_events) + return_value = result + else: + # Fallback – push run_results.json via base class helper + kwargs["push_run_results_to_xcom"] = True + return_value = super().execute(context=context, **kwargs) - # Fallback – push run_results.json via base class helper - kwargs["push_run_results_to_xcom"] = True - return super().execute(context=context, **kwargs) + context["ti"].xcom_push(key="task_status", value="completed") + return return_value + + except Exception as e: + context["ti"].xcom_push(key="task_status", value="completed") + raise e diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index d2aeaaafa7..3be0932a99 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -4,6 +4,8 @@ from types import SimpleNamespace from unittest.mock import patch +import pytest + from cosmos.config import InvocationMode from cosmos.operators.watcher import PRODUCER_OPERATOR_DEFAULT_PRIORITY_WEIGHT, DbtProducerWatcherOperator @@ -63,6 +65,40 @@ def test_dbt_producer_watcher_operator_priority_weight_override(): assert op.priority_weight == 100 +def test_dbt_producer_watcher_operator_pushes_completion_status(): + """Test that operator pushes 'completed' status to XCom in both success and failure cases.""" + op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) + mock_ti = _MockTI() + context = {"ti": mock_ti} + + # Test success case + with patch("cosmos.operators.local.DbtLocalBaseOperator.execute") as mock_execute: + op.execute(context=context) + + # Verify status was pushed + assert mock_ti.store.get("task_status") == "completed" + # Verify parent execute was called + mock_execute.assert_called_once() + + # Reset mock and store + mock_ti.store.clear() + + # Test failure case + class TestException(Exception): + pass + + with patch("cosmos.operators.local.DbtLocalBaseOperator.execute") as mock_execute: + mock_execute.side_effect = TestException("test error") + + with pytest.raises(TestException): + op.execute(context=context) + + # Verify completed status was pushed even in failure case + assert mock_ti.store.get("task_status") == "completed" + # Verify parent execute was called + mock_execute.assert_called_once() + + def test_handle_startup_event(): op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) lst: list[dict] = [] From c50160c17db1180f38471f9002f0beea3c62591e Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 25 Sep 2025 19:29:33 +0530 Subject: [PATCH 2/4] Update cosmos/operators/watcher.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- cosmos/operators/watcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 02ba9810e2..38c5c93ce3 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -123,4 +123,4 @@ def _callback(ev: EventMsg) -> None: except Exception as e: context["ti"].xcom_push(key="task_status", value="completed") - raise e + raise From cdda1f3907a60dd8734fe045185cab735424fb64 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 25 Sep 2025 13:59:50 +0000 Subject: [PATCH 3/4] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20for?= =?UTF-8?q?mat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/operators/watcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 38c5c93ce3..555e6a1185 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -121,6 +121,6 @@ def _callback(ev: EventMsg) -> None: context["ti"].xcom_push(key="task_status", value="completed") return return_value - except Exception as e: + except Exception: context["ti"].xcom_push(key="task_status", value="completed") raise From dea4e17661423d922ae851ab42c18fc06c38ee83 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 29 Sep 2025 11:22:12 +0000 Subject: [PATCH 4/4] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20for?= =?UTF-8?q?mat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/operators/test_watcher.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 77a54873a8..0b9da1cb69 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -7,8 +7,6 @@ import pytest from airflow.exceptions import AirflowException -import pytest - from cosmos.config import InvocationMode from cosmos.operators.watcher import ( PRODUCER_OPERATOR_DEFAULT_PRIORITY_WEIGHT,