From 7b59ccc310605d4922ba965a701dc7121d128184 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 19 Nov 2025 17:28:22 +0530 Subject: [PATCH 01/21] Stream logs for invocation mode subprocess --- cosmos/hooks/subprocess.py | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/cosmos/hooks/subprocess.py b/cosmos/hooks/subprocess.py index b9131d342d..4c20978194 100644 --- a/cosmos/hooks/subprocess.py +++ b/cosmos/hooks/subprocess.py @@ -28,7 +28,7 @@ class FullOutputSubprocessHook(BaseHook): # type: ignore[misc] """Hook for running processes with the ``subprocess`` module.""" def __init__(self) -> None: - self.sub_process: Popen[bytes] | None = None + self.sub_process: Popen[str] | None = None super().__init__() # type: ignore[no-untyped-call] def run_command( @@ -79,26 +79,28 @@ def pre_exec() -> None: cwd=cwd, env=env if env or env == {} else os.environ, preexec_fn=pre_exec, + bufsize=1, # line-buffered (works only in text mode) + text=True, + encoding=output_encoding, + errors="backslashreplace", ) - self.log.info("Command output:") - line = "" + last_line: str = "" - if self.sub_process is None: - raise RuntimeError("The subprocess should be created here and is None!") - if self.sub_process.stdout is not None: - for raw_line in iter(self.sub_process.stdout.readline, b""): - line = raw_line.decode(output_encoding, errors="backslashreplace").rstrip() - # storing the warn & error lines to be used later - log_lines.append(line) - self.log.info("%s", line) + # Stream output line-by-line + assert self.sub_process.stdout is not None + for line in self.sub_process.stdout: + line = line.rstrip("\n") + last_line = line + log_lines.append(line) + self.log.info("%s", line) - self.sub_process.wait() + # Wait until process completes + return_code = self.sub_process.wait() - self.log.info("Command exited with return code %s", self.sub_process.returncode) - return_code: int = self.sub_process.returncode + self.log.info("Command exited with return code %s", return_code) - return FullOutputSubprocessResult(exit_code=return_code, output=line, full_output=log_lines) + return FullOutputSubprocessResult(exit_code=return_code, output=last_line, full_output=log_lines) def send_sigterm(self) -> None: """Sends SIGTERM signal to ``self.sub_process`` if one exists.""" From 97d7ab97ef2592645c3f478387084f6e3a1fd855 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Fri, 21 Nov 2025 12:40:22 +0530 Subject: [PATCH 02/21] Parse logs --- cosmos/hooks/subprocess.py | 16 ++++++++++++++++ cosmos/operators/base.py | 7 ++++++- cosmos/operators/watcher.py | 1 + 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/cosmos/hooks/subprocess.py b/cosmos/hooks/subprocess.py index 4c20978194..c45570dce5 100644 --- a/cosmos/hooks/subprocess.py +++ b/cosmos/hooks/subprocess.py @@ -5,6 +5,7 @@ from __future__ import annotations import contextlib +import json import os import signal from subprocess import PIPE, STDOUT, Popen @@ -31,6 +32,20 @@ def __init__(self) -> None: self.sub_process: Popen[str] | None = None super().__init__() # type: ignore[no-untyped-call] + def _parse_log(self, line: str) -> None: + try: + log_line = json.loads(line) + node_status = log_line.get("data", {}).get("node_info", {}).get("node_status") + + unique_id = log_line.get("data", {}).get("node_info", {}).get("unique_id") + + if node_status in ["success" or "failed"]: + # TODO: push {unique_id: node_status} to xcom + self.log.info("%s", unique_id) + pass + except json.JSONDecodeError: + pass + def run_command( self, command: list[str], @@ -94,6 +109,7 @@ def pre_exec() -> None: last_line = line log_lines.append(line) self.log.info("%s", line) + self._parse_log(line) # Wait until process completes return_code = self.sub_process.wait() diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index 26cc17dd7a..a42d4e571c 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -325,8 +325,9 @@ class DbtBuildMixin: template_fields: Sequence[str] = ("full_refresh",) - def __init__(self, full_refresh: bool | str = False, **kwargs: Any) -> None: + def __init__(self, full_refresh: bool | str = False, log_format: str | None = None, **kwargs: Any) -> None: self.full_refresh = full_refresh + self.log_format = log_format super().__init__(**kwargs) def add_cmd_flags(self) -> list[str]: @@ -341,6 +342,10 @@ def add_cmd_flags(self) -> list[str]: if full_refresh is True: flags.append("--full-refresh") + if self.log_format: + flags.append("--log-format") + flags.append(self.log_format) + return flags diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 8d103687f4..717ed40299 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -109,6 +109,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: default_args["retries"] = 0 kwargs["default_args"] = default_args kwargs["retries"] = 0 + kwargs["log_format"] = "json" super().__init__(task_id=task_id, *args, **kwargs) From 7d0ed47ac9827c53b6b50c000a2e14ed434354e9 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Sat, 22 Nov 2025 15:06:23 +0530 Subject: [PATCH 03/21] Monitor xcom value --- cosmos/_triggers/watcher.py | 24 ++++++++++------------ cosmos/hooks/subprocess.py | 40 ++++++++++++++++++++++++++++++++----- cosmos/operators/local.py | 6 +++++- cosmos/operators/watcher.py | 9 ++++++++- 4 files changed, 59 insertions(+), 20 deletions(-) diff --git a/cosmos/_triggers/watcher.py b/cosmos/_triggers/watcher.py index 9596e7a747..9978fca680 100644 --- a/cosmos/_triggers/watcher.py +++ b/cosmos/_triggers/watcher.py @@ -87,23 +87,21 @@ async def get_xcom_val(self, key: str) -> Any | None: return await self.get_xcom_val_af3(key) async def _parse_node_status(self) -> str | None: - key = f"nodefinished_{self.model_unique_id.replace('.', '__')}" if self.use_event else "run_results" - - compressed_xcom_val = await self.get_xcom_val(key) - if not compressed_xcom_val: - return None - - data_json = _parse_compressed_xcom(compressed_xcom_val) + key = ( + f"nodefinished_{self.model_unique_id.replace('.', '__')}" + if self.use_event + else f"{self.model_unique_id.replace('.', '__')}_status" + ) if self.use_event: + compressed_xcom_val = await self.get_xcom_val(key) + if not compressed_xcom_val: + return None + + data_json = _parse_compressed_xcom(compressed_xcom_val) return data_json.get("data", {}).get("run_result", {}).get("status") # type: ignore[no-any-return] - results = data_json.get("results", []) - node_result: dict[str, Any] = next( - (r for r in results if r.get("unique_id") == self.model_unique_id), - {}, - ) - return node_result.get("status") + return await self.get_xcom_val(key) async def _get_producer_task_status(self) -> str | None: """Retrieve the producer task state for both Airflow 2 and Airflow 3.""" diff --git a/cosmos/hooks/subprocess.py b/cosmos/hooks/subprocess.py index c45570dce5..f45ebcfbd1 100644 --- a/cosmos/hooks/subprocess.py +++ b/cosmos/hooks/subprocess.py @@ -10,13 +10,41 @@ import signal from subprocess import PIPE, STDOUT, Popen from tempfile import TemporaryDirectory, gettempdir -from typing import NamedTuple +from typing import TYPE_CHECKING, Any, NamedTuple + +if TYPE_CHECKING: # pragma: no cover + try: + from airflow.sdk.definitions.context import Context + except ImportError: + from airflow.utils.context import Context # type: ignore[attr-defined] try: # Airflow 3.1 onwards from airflow.sdk.bases.hook import BaseHook except ImportError: from airflow.hooks.base import BaseHook +from threading import Lock + +if TYPE_CHECKING: # pragma: no cover + try: + from airflow.sdk.types import RuntimeTaskInstanceProtocol as TaskInstance + except ImportError: + from airflow.models.taskinstance import TaskInstance # type: ignore[assignment] + + +xcom_set_lock = Lock() + + +def safe_xcom_push(task_instance: TaskInstance, key: str, value: Any) -> None: + """ + Safely set an XCom value in a thread-safe manner in Airflow 3.0 and 3.1. + We noticed that the combination of using dbt (multi-threaded) and Airflow 3.0 and 3.1 to set XCom lead to race conditions. + This leads the producer task to get stuck while running the dbt build command. + Unfortunately, since this is non-deterministic, and happens once every five runs, we were not able to have a proper test. + However, we applied this fix and run over 20 times a pipeline that would fail every 5 runs and this allowed us to no longer face the issue. + """ + with xcom_set_lock: + task_instance.xcom_push(key=key, value=value) class FullOutputSubprocessResult(NamedTuple): @@ -32,7 +60,8 @@ def __init__(self) -> None: self.sub_process: Popen[str] | None = None super().__init__() # type: ignore[no-untyped-call] - def _parse_log(self, line: str) -> None: + def _parse_log(self, line: str, context: Context | None = None) -> None: + assert context is not None # Make MyPy happy try: log_line = json.loads(line) node_status = log_line.get("data", {}).get("node_info", {}).get("node_status") @@ -40,9 +69,9 @@ def _parse_log(self, line: str) -> None: unique_id = log_line.get("data", {}).get("node_info", {}).get("unique_id") if node_status in ["success" or "failed"]: - # TODO: push {unique_id: node_status} to xcom self.log.info("%s", unique_id) - pass + modified_unique_id = unique_id.replace(".", "__") + safe_xcom_push(task_instance=context["ti"], key=f"{modified_unique_id}_status", value=node_status) except json.JSONDecodeError: pass @@ -52,6 +81,7 @@ def run_command( env: dict[str, str] | None = None, output_encoding: str = "utf-8", cwd: str | None = None, + context: Context | None = None, ) -> FullOutputSubprocessResult: """ Execute the command. @@ -109,7 +139,7 @@ def pre_exec() -> None: last_line = line log_lines.append(line) self.log.info("%s", line) - self._parse_log(line) + self._parse_log(line, context) # Wait until process completes return_code = self.sub_process.wait() diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 7a69402aa3..5745b54b74 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -457,13 +457,16 @@ def _override_rtif_airflow_2_x(session: Session = NEW_SESSION) -> None: _override_rtif_airflow_2_x() - def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str) -> FullOutputSubprocessResult: + def run_subprocess( + self, command: list[str], env: dict[str, str], cwd: str, context: Context + ) -> FullOutputSubprocessResult: logger.info("Trying to run the command:\n %s\nFrom %s", command, cwd) subprocess_result: FullOutputSubprocessResult = self.subprocess_hook.run_command( command=command, env=env, cwd=cwd, output_encoding=self.output_encoding, + context=context, ) # Logging changed in Airflow 3.1 and we needed to replace the output by the full output: output = "".join(subprocess_result.full_output) @@ -684,6 +687,7 @@ def run_command( # noqa: C901 command=full_cmd, env=env, cwd=tmp_project_dir, + context=context, ) if is_openlineage_common_available: self.calculate_openlineage_events_completes(env, tmp_dir_path) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 717ed40299..78c86bc079 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -410,6 +410,12 @@ def _use_event(self) -> bool: self._discover_invocation_mode() return self.invocation_mode == InvocationMode.DBT_RUNNER and EventMsg is not None + def _get_model_status_from_xcom(self, ti: TaskInstance) -> Any: + modified_unique_id = self.model_unique_id.replace(".", "__") + self.log.info("self.model_unique_id: %s", modified_unique_id) + status = ti.xcom_pull(self.producer_task_id, key=f"{modified_unique_id}_status") + return status + def poke(self, context: Context) -> bool: """ Checks the status of a dbt model run by pulling relevant XComs from the master task. @@ -434,7 +440,8 @@ def poke(self, context: Context) -> bool: if self._use_event(): status = self._get_status_from_events(ti, context) else: - status = self._get_status_from_run_results(ti, context) + status = self._get_model_status_from_xcom(ti) + # status = self._get_status_from_run_results(ti, context) if status is None: From 95e2e91f18a6338d23eba868ea7b5b09c4a5d4f3 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Sat, 22 Nov 2025 16:16:04 +0530 Subject: [PATCH 04/21] cleanup --- cosmos/hooks/subprocess.py | 26 +++----------------------- cosmos/operators/watcher.py | 18 +++--------------- 2 files changed, 6 insertions(+), 38 deletions(-) diff --git a/cosmos/hooks/subprocess.py b/cosmos/hooks/subprocess.py index f45ebcfbd1..c2f47369c8 100644 --- a/cosmos/hooks/subprocess.py +++ b/cosmos/hooks/subprocess.py @@ -10,7 +10,9 @@ import signal from subprocess import PIPE, STDOUT, Popen from tempfile import TemporaryDirectory, gettempdir -from typing import TYPE_CHECKING, Any, NamedTuple +from typing import TYPE_CHECKING, NamedTuple + +from cosmos._utils.common import safe_xcom_push if TYPE_CHECKING: # pragma: no cover try: @@ -23,28 +25,6 @@ from airflow.sdk.bases.hook import BaseHook except ImportError: from airflow.hooks.base import BaseHook -from threading import Lock - -if TYPE_CHECKING: # pragma: no cover - try: - from airflow.sdk.types import RuntimeTaskInstanceProtocol as TaskInstance - except ImportError: - from airflow.models.taskinstance import TaskInstance # type: ignore[assignment] - - -xcom_set_lock = Lock() - - -def safe_xcom_push(task_instance: TaskInstance, key: str, value: Any) -> None: - """ - Safely set an XCom value in a thread-safe manner in Airflow 3.0 and 3.1. - We noticed that the combination of using dbt (multi-threaded) and Airflow 3.0 and 3.1 to set XCom lead to race conditions. - This leads the producer task to get stuck while running the dbt build command. - Unfortunately, since this is non-deterministic, and happens once every five runs, we were not able to have a proper test. - However, we applied this fix and run over 20 times a pipeline that would fail every 5 runs and this allowed us to no longer face the issue. - """ - with xcom_set_lock: - task_instance.xcom_push(key=key, value=value) class FullOutputSubprocessResult(NamedTuple): diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 78c86bc079..ba963804be 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -6,10 +6,11 @@ import zlib from datetime import timedelta from pathlib import Path -from threading import Lock -from typing import TYPE_CHECKING, Any + +from typing import TYPE_CHECKING, Any, Callable, List, Union from cosmos._triggers.watcher import WatcherTrigger, _parse_compressed_xcom +from cosmos._utils.common import safe_xcom_push if TYPE_CHECKING: # pragma: no cover try: @@ -52,25 +53,12 @@ EventMsg = None logger = logging.getLogger(__name__) -xcom_set_lock = Lock() CONSUMER_OPERATOR_DEFAULT_PRIORITY_WEIGHT = 10 PRODUCER_OPERATOR_DEFAULT_PRIORITY_WEIGHT = 9999 WEIGHT_RULE = "absolute" # the default "downstream" does not work with dag.test() -def safe_xcom_push(task_instance: TaskInstance, key: str, value: Any) -> None: - """ - Safely set an XCom value in a thread-safe manner in Airflow 3.0 and 3.1. - We noticed that the combination of using dbt (multi-threaded) and Airflow 3.0 and 3.1 to set XCom lead to race conditions. - This leads the producer task to get stuck while running the dbt build command. - Unfortunately, since this is non-deterministic, and happens once every five runs, we were not able to have a proper test. - However, we applied this fix and run over 20 times a pipeline that would fail every 5 runs and this allowed us to no longer face the issue. - """ - with xcom_set_lock: - task_instance.xcom_push(key=key, value=value) - - class DbtProducerWatcherOperator(DbtBuildMixin, DbtLocalBaseOperator): """Run dbt build and update XCom with the progress of each model, as part of the *WATCHER* execution mode. From ad04979b4558861d4c1e7202ea3210f5ac5c48b4 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 22 Nov 2025 10:48:17 +0000 Subject: [PATCH 05/21] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/operators/watcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index ba963804be..3058680647 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -7,7 +7,7 @@ from datetime import timedelta from pathlib import Path -from typing import TYPE_CHECKING, Any, Callable, List, Union +from typing import TYPE_CHECKING, Any from cosmos._triggers.watcher import WatcherTrigger, _parse_compressed_xcom from cosmos._utils.common import safe_xcom_push From aa63523cdd8819fb467bc71840c5d0cf918954fa Mon Sep 17 00:00:00 2001 From: pankajastro Date: Sat, 22 Nov 2025 16:19:06 +0530 Subject: [PATCH 06/21] Add new version --- cosmos/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 4e62b386bd..00ca576392 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -9,7 +9,7 @@ from cosmos import settings -__version__ = "1.12.0a2" +__version__ = "1.12.0a3" if not settings.enable_memory_optimised_imports: from cosmos.airflow.dag import DbtDag From 11e1865b166aa864a947ec1c8b4d204e35f902a7 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Sat, 22 Nov 2025 16:29:51 +0530 Subject: [PATCH 07/21] Move safe_xcom_push in common --- cosmos/_utils/common.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 cosmos/_utils/common.py diff --git a/cosmos/_utils/common.py b/cosmos/_utils/common.py new file mode 100644 index 0000000000..1476f0b9d8 --- /dev/null +++ b/cosmos/_utils/common.py @@ -0,0 +1,23 @@ +from threading import Lock +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: # pragma: no cover + try: + from airflow.sdk.types import RuntimeTaskInstanceProtocol as TaskInstance + except ImportError: + from airflow.models.taskinstance import TaskInstance # type: ignore[assignment] + + +xcom_set_lock = Lock() + + +def safe_xcom_push(task_instance: TaskInstance, key: str, value: Any) -> None: + """ + Safely set an XCom value in a thread-safe manner in Airflow 3.0 and 3.1. + We noticed that the combination of using dbt (multi-threaded) and Airflow 3.0 and 3.1 to set XCom lead to race conditions. + This leads the producer task to get stuck while running the dbt build command. + Unfortunately, since this is non-deterministic, and happens once every five runs, we were not able to have a proper test. + However, we applied this fix and run over 20 times a pipeline that would fail every 5 runs and this allowed us to no longer face the issue. + """ + with xcom_set_lock: + task_instance.xcom_push(key=key, value=value) From 76cfac992b3342a26cb89ec132b6286df0879503 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Sat, 22 Nov 2025 18:28:10 +0530 Subject: [PATCH 08/21] Fix for runner --- cosmos/_utils/common.py | 11 +++++------ cosmos/dbt/runner.py | 8 +++++++- cosmos/operators/local.py | 6 ++++-- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/cosmos/_utils/common.py b/cosmos/_utils/common.py index 1476f0b9d8..812bdd959a 100644 --- a/cosmos/_utils/common.py +++ b/cosmos/_utils/common.py @@ -1,11 +1,10 @@ from threading import Lock -from typing import TYPE_CHECKING, Any +from typing import Any -if TYPE_CHECKING: # pragma: no cover - try: - from airflow.sdk.types import RuntimeTaskInstanceProtocol as TaskInstance - except ImportError: - from airflow.models.taskinstance import TaskInstance # type: ignore[assignment] +try: + from airflow.sdk.types import RuntimeTaskInstanceProtocol as TaskInstance +except ImportError: + from airflow.models.taskinstance import TaskInstance # type: ignore[assignment] xcom_set_lock = Lock() diff --git a/cosmos/dbt/runner.py b/cosmos/dbt/runner.py index 5324e41d76..b54d50cd6b 100644 --- a/cosmos/dbt/runner.py +++ b/cosmos/dbt/runner.py @@ -8,6 +8,12 @@ from cosmos.exceptions import CosmosDbtRunError from cosmos.log import get_logger +if TYPE_CHECKING: # pragma: no cover + try: + from airflow.sdk.definitions.context import Context + except ImportError: + from airflow.utils.context import Context # type: ignore[attr-defined] + if "pytest" in sys.modules: # We set the cache limit to 0, so nothing gets cached by default when # running tests @@ -61,7 +67,7 @@ def get_runner(callbacks: list[Callable] | None = None) -> dbtRunner: # type: i def run_command( - command: list[str], env: dict[str, str], cwd: str, callbacks: list[Callable] | None = None # type: ignore[type-arg] + command: list[str], env: dict[str, str], cwd: str, callbacks: list[Callable] | None = None, context: Context | None = None # type: ignore[type-arg] ) -> dbtRunnerResult: """ Invokes the dbt command programmatically. diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 5745b54b74..7a3d292ffd 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -473,14 +473,16 @@ def run_subprocess( logger.info(output) return subprocess_result - def run_dbt_runner(self, command: list[str], env: dict[str, str], cwd: str) -> dbtRunnerResult: + def run_dbt_runner( + self, command: list[str], env: dict[str, str], cwd: str, context: Context | None = None + ) -> dbtRunnerResult: """Invokes the dbt command programmatically.""" if not dbt_runner.is_available(): raise CosmosDbtRunError( "Could not import dbt core. Ensure that dbt-core >= v1.5 is installed and available in the environment where the operator is running." ) - return dbt_runner.run_command(command, env, cwd, callbacks=self._dbt_runner_callbacks) + return dbt_runner.run_command(command, env, cwd, callbacks=self._dbt_runner_callbacks, context=context) def _cache_package_lockfile(self, tmp_project_dir: Path) -> None: project_dir = Path(self.project_dir) From 5b6496f70a631ca20e350452ee194c6e55f78c70 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Sat, 22 Nov 2025 18:44:48 +0530 Subject: [PATCH 09/21] cleanup --- cosmos/_utils/common.py | 4 ++++ cosmos/operators/watcher.py | 13 ++----------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/cosmos/_utils/common.py b/cosmos/_utils/common.py index 812bdd959a..fb6fc9b5b5 100644 --- a/cosmos/_utils/common.py +++ b/cosmos/_utils/common.py @@ -20,3 +20,7 @@ def safe_xcom_push(task_instance: TaskInstance, key: str, value: Any) -> None: """ with xcom_set_lock: task_instance.xcom_push(key=key, value=value) + + +def get_xcom_val(task_instance: TaskInstance, task_ids: str | list[str], key: str) -> Any: + return task_instance.xcom_pull(task_ids, key=key) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 3058680647..1ca955f4ff 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -10,14 +10,12 @@ from typing import TYPE_CHECKING, Any from cosmos._triggers.watcher import WatcherTrigger, _parse_compressed_xcom -from cosmos._utils.common import safe_xcom_push +from cosmos._utils.common import get_xcom_val, safe_xcom_push if TYPE_CHECKING: # pragma: no cover try: from airflow.sdk.definitions.context import Context - from airflow.sdk.types import RuntimeTaskInstanceProtocol as TaskInstance except ImportError: - from airflow.models.taskinstance import TaskInstance # type: ignore[assignment] from airflow.utils.context import Context # type: ignore[attr-defined] try: @@ -398,12 +396,6 @@ def _use_event(self) -> bool: self._discover_invocation_mode() return self.invocation_mode == InvocationMode.DBT_RUNNER and EventMsg is not None - def _get_model_status_from_xcom(self, ti: TaskInstance) -> Any: - modified_unique_id = self.model_unique_id.replace(".", "__") - self.log.info("self.model_unique_id: %s", modified_unique_id) - status = ti.xcom_pull(self.producer_task_id, key=f"{modified_unique_id}_status") - return status - def poke(self, context: Context) -> bool: """ Checks the status of a dbt model run by pulling relevant XComs from the master task. @@ -428,8 +420,7 @@ def poke(self, context: Context) -> bool: if self._use_event(): status = self._get_status_from_events(ti, context) else: - status = self._get_model_status_from_xcom(ti) - # status = self._get_status_from_run_results(ti, context) + status = get_xcom_val(ti, self.producer_task_id, f"{self.model_unique_id.replace('.', '__')}_status") if status is None: From fdc8e3ad6face7402ffb4f87bcf33a9556042a0d Mon Sep 17 00:00:00 2001 From: pankajastro Date: Sat, 22 Nov 2025 18:48:47 +0530 Subject: [PATCH 10/21] restore some checks --- cosmos/hooks/subprocess.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cosmos/hooks/subprocess.py b/cosmos/hooks/subprocess.py index c2f47369c8..e5c7d97414 100644 --- a/cosmos/hooks/subprocess.py +++ b/cosmos/hooks/subprocess.py @@ -110,9 +110,12 @@ def pre_exec() -> None: errors="backslashreplace", ) - last_line: str = "" + if self.sub_process is None: + raise RuntimeError("The subprocess should be created here and is None!") + + self.log.info("Command output:") - # Stream output line-by-line + last_line: str = "" assert self.sub_process.stdout is not None for line in self.sub_process.stdout: line = line.rstrip("\n") From 013dfc40a706c077679a130059af306387894b04 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Sat, 22 Nov 2025 19:02:27 +0530 Subject: [PATCH 11/21] cleanup dbt log parsing method --- cosmos/hooks/subprocess.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/cosmos/hooks/subprocess.py b/cosmos/hooks/subprocess.py index e5c7d97414..334334607d 100644 --- a/cosmos/hooks/subprocess.py +++ b/cosmos/hooks/subprocess.py @@ -40,20 +40,21 @@ def __init__(self) -> None: self.sub_process: Popen[str] | None = None super().__init__() # type: ignore[no-untyped-call] - def _parse_log(self, line: str, context: Context | None = None) -> None: + def _store_dbt_resource_status_from_log(self, line: str, context: Context | None = None) -> None: assert context is not None # Make MyPy happy try: log_line = json.loads(line) node_status = log_line.get("data", {}).get("node_info", {}).get("node_status") - unique_id = log_line.get("data", {}).get("node_info", {}).get("unique_id") + self.log.debug("Model: %s is in {node_status}", unique_id, node_status) + if node_status in ["success" or "failed"]: - self.log.info("%s", unique_id) - modified_unique_id = unique_id.replace(".", "__") - safe_xcom_push(task_instance=context["ti"], key=f"{modified_unique_id}_status", value=node_status) + safe_xcom_push( + task_instance=context["ti"], key=f"{unique_id.replace('.', '__')}_status", value=node_status + ) except json.JSONDecodeError: - pass + self.log.debug("Failed to parse log: %s", line) def run_command( self, @@ -122,7 +123,7 @@ def pre_exec() -> None: last_line = line log_lines.append(line) self.log.info("%s", line) - self._parse_log(line, context) + self._store_dbt_resource_status_from_log(line, context) # Wait until process completes return_code = self.sub_process.wait() From 0aded86fc427f7d4335be5e93d27443628dfee9b Mon Sep 17 00:00:00 2001 From: pankajastro Date: Sat, 22 Nov 2025 19:07:08 +0530 Subject: [PATCH 12/21] fix log stmt --- cosmos/hooks/subprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/hooks/subprocess.py b/cosmos/hooks/subprocess.py index 334334607d..e8bf8928a3 100644 --- a/cosmos/hooks/subprocess.py +++ b/cosmos/hooks/subprocess.py @@ -47,7 +47,7 @@ def _store_dbt_resource_status_from_log(self, line: str, context: Context | None node_status = log_line.get("data", {}).get("node_info", {}).get("node_status") unique_id = log_line.get("data", {}).get("node_info", {}).get("unique_id") - self.log.debug("Model: %s is in {node_status}", unique_id, node_status) + self.log.debug("Model: %s is in %s state", unique_id, node_status) if node_status in ["success" or "failed"]: safe_xcom_push( From 9c3429149f60ddb4599e7d940ac6bbaa16159b7a Mon Sep 17 00:00:00 2001 From: pankajastro Date: Sat, 22 Nov 2025 19:34:29 +0530 Subject: [PATCH 13/21] Add kwargs at possible public interface --- cosmos/dbt/runner.py | 10 ++-------- cosmos/hooks/subprocess.py | 21 ++++++++------------- cosmos/operators/local.py | 10 ++++------ 3 files changed, 14 insertions(+), 27 deletions(-) diff --git a/cosmos/dbt/runner.py b/cosmos/dbt/runner.py index b54d50cd6b..00bc7c24e9 100644 --- a/cosmos/dbt/runner.py +++ b/cosmos/dbt/runner.py @@ -2,18 +2,12 @@ import sys from functools import lru_cache -from typing import TYPE_CHECKING, Callable +from typing import TYPE_CHECKING, Any, Callable from cosmos.dbt.project import change_working_directory, environ from cosmos.exceptions import CosmosDbtRunError from cosmos.log import get_logger -if TYPE_CHECKING: # pragma: no cover - try: - from airflow.sdk.definitions.context import Context - except ImportError: - from airflow.utils.context import Context # type: ignore[attr-defined] - if "pytest" in sys.modules: # We set the cache limit to 0, so nothing gets cached by default when # running tests @@ -67,7 +61,7 @@ def get_runner(callbacks: list[Callable] | None = None) -> dbtRunner: # type: i def run_command( - command: list[str], env: dict[str, str], cwd: str, callbacks: list[Callable] | None = None, context: Context | None = None # type: ignore[type-arg] + command: list[str], env: dict[str, str], cwd: str, callbacks: list[Callable] | None = None, **kwargs: Any # type: ignore[type-arg] ) -> dbtRunnerResult: """ Invokes the dbt command programmatically. diff --git a/cosmos/hooks/subprocess.py b/cosmos/hooks/subprocess.py index e8bf8928a3..b26dcae057 100644 --- a/cosmos/hooks/subprocess.py +++ b/cosmos/hooks/subprocess.py @@ -10,15 +10,7 @@ import signal from subprocess import PIPE, STDOUT, Popen from tempfile import TemporaryDirectory, gettempdir -from typing import TYPE_CHECKING, NamedTuple - -from cosmos._utils.common import safe_xcom_push - -if TYPE_CHECKING: # pragma: no cover - try: - from airflow.sdk.definitions.context import Context - except ImportError: - from airflow.utils.context import Context # type: ignore[attr-defined] +from typing import Any, NamedTuple try: # Airflow 3.1 onwards @@ -26,6 +18,8 @@ except ImportError: from airflow.hooks.base import BaseHook +from cosmos._utils.common import safe_xcom_push + class FullOutputSubprocessResult(NamedTuple): exit_code: int @@ -40,8 +34,7 @@ def __init__(self) -> None: self.sub_process: Popen[str] | None = None super().__init__() # type: ignore[no-untyped-call] - def _store_dbt_resource_status_from_log(self, line: str, context: Context | None = None) -> None: - assert context is not None # Make MyPy happy + def _store_dbt_resource_status_from_log(self, line: str, **kwargs: Any) -> None: try: log_line = json.loads(line) node_status = log_line.get("data", {}).get("node_info", {}).get("node_status") @@ -50,6 +43,8 @@ def _store_dbt_resource_status_from_log(self, line: str, context: Context | None self.log.debug("Model: %s is in %s state", unique_id, node_status) if node_status in ["success" or "failed"]: + context = kwargs.get("context") + assert context is not None # Make MyPy happy safe_xcom_push( task_instance=context["ti"], key=f"{unique_id.replace('.', '__')}_status", value=node_status ) @@ -62,7 +57,7 @@ def run_command( env: dict[str, str] | None = None, output_encoding: str = "utf-8", cwd: str | None = None, - context: Context | None = None, + **kwargs: Any, ) -> FullOutputSubprocessResult: """ Execute the command. @@ -123,7 +118,7 @@ def pre_exec() -> None: last_line = line log_lines.append(line) self.log.info("%s", line) - self._store_dbt_resource_status_from_log(line, context) + self._store_dbt_resource_status_from_log(line, **kwargs) # Wait until process completes return_code = self.sub_process.wait() diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 7a3d292ffd..9e558f95a2 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -458,7 +458,7 @@ def _override_rtif_airflow_2_x(session: Session = NEW_SESSION) -> None: _override_rtif_airflow_2_x() def run_subprocess( - self, command: list[str], env: dict[str, str], cwd: str, context: Context + self, command: list[str], env: dict[str, str], cwd: str, **kwargs: Any ) -> FullOutputSubprocessResult: logger.info("Trying to run the command:\n %s\nFrom %s", command, cwd) subprocess_result: FullOutputSubprocessResult = self.subprocess_hook.run_command( @@ -466,23 +466,21 @@ def run_subprocess( env=env, cwd=cwd, output_encoding=self.output_encoding, - context=context, + **kwargs, ) # Logging changed in Airflow 3.1 and we needed to replace the output by the full output: output = "".join(subprocess_result.full_output) logger.info(output) return subprocess_result - def run_dbt_runner( - self, command: list[str], env: dict[str, str], cwd: str, context: Context | None = None - ) -> dbtRunnerResult: + def run_dbt_runner(self, command: list[str], env: dict[str, str], cwd: str, **kwargs: Any) -> dbtRunnerResult: """Invokes the dbt command programmatically.""" if not dbt_runner.is_available(): raise CosmosDbtRunError( "Could not import dbt core. Ensure that dbt-core >= v1.5 is installed and available in the environment where the operator is running." ) - return dbt_runner.run_command(command, env, cwd, callbacks=self._dbt_runner_callbacks, context=context) + return dbt_runner.run_command(command, env, cwd, callbacks=self._dbt_runner_callbacks, **kwargs) def _cache_package_lockfile(self, tmp_project_dir: Path) -> None: project_dir = Path(self.project_dir) From d941f67965e563d73d5f58fa2bd9d740d23ea2f9 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 24 Nov 2025 20:12:50 +0530 Subject: [PATCH 14/21] Add kwargs at missing places --- cosmos/operators/_asynchronous/__init__.py | 6 ++++-- cosmos/operators/virtualenv.py | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/cosmos/operators/_asynchronous/__init__.py b/cosmos/operators/_asynchronous/__init__.py index 20c2a028d4..77d3c75ae9 100644 --- a/cosmos/operators/_asynchronous/__init__.py +++ b/cosmos/operators/_asynchronous/__init__.py @@ -31,7 +31,9 @@ def __init__(self, *args: Any, **kwargs: Any): kwargs["emit_datasets"] = False super().__init__(*args, **kwargs) - def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str) -> FullOutputSubprocessResult: + def run_subprocess( + self, command: list[str], env: dict[str, str], cwd: str, **kwargs: Any + ) -> FullOutputSubprocessResult: profile_type = self.profile_config.get_profile_type() if not self._py_bin: raise AttributeError("_py_bin attribute not set for VirtualEnv operator") @@ -49,7 +51,7 @@ def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str) -> F with open(dbt_executable_path, "w") as f: f.writelines(dbt_entrypoint_script) - return super().run_subprocess(command, env, cwd) + return super().run_subprocess(command, env, cwd, **kwargs) def execute(self, context: Context, **kwargs: Any) -> None: async_context = {"profile_type": self.profile_config.get_profile_type(), "run_id": context["run_id"]} diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index ec5c78acb3..6b0e3785dd 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -94,11 +94,13 @@ def __init__( if not self.py_requirements: self.log.error("Cosmos virtualenv operators require the `py_requirements` parameter") - def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str) -> FullOutputSubprocessResult: + def run_subprocess( + self, command: list[str], env: dict[str, str], cwd: str, **kwargs: Any + ) -> FullOutputSubprocessResult: if self._py_bin is not None: self.log.info(f"Using Python binary from virtualenv: {self._py_bin}") command[0] = str(Path(self._py_bin).parent / "dbt") - return super().run_subprocess(command, env, cwd) + return super().run_subprocess(command, env, cwd, **kwargs) def run_command( self, From 192f899984a777debce70ceb60cb81c16ce4f26b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 24 Nov 2025 14:47:34 +0000 Subject: [PATCH 15/21] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/operators/watcher.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 1ca955f4ff..7cefd28466 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -6,7 +6,6 @@ import zlib from datetime import timedelta from pathlib import Path - from typing import TYPE_CHECKING, Any from cosmos._triggers.watcher import WatcherTrigger, _parse_compressed_xcom From 3c148c346fb8ea7f9fa0f90ddb4dac82059bed28 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 24 Nov 2025 21:45:41 +0530 Subject: [PATCH 16/21] Fix unit tests --- tests/_triggers/test_watcher.py | 4 ++-- tests/operators/test_watcher.py | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/_triggers/test_watcher.py b/tests/_triggers/test_watcher.py index be37d088f6..439b259625 100644 --- a/tests/_triggers/test_watcher.py +++ b/tests/_triggers/test_watcher.py @@ -83,14 +83,14 @@ async def runner(*args, **kwargs): "use_event, xcom_val, expected_status", [ (True, {"data": {"run_result": {"status": "success"}}}, "success"), - (False, {"results": [{"unique_id": "model.test", "status": "failed"}]}, "failed"), + (False, "failed", "failed"), ], ) async def test_parse_node_status(self, use_event, xcom_val, expected_status): self.trigger.use_event = use_event with ( patch("cosmos._triggers.watcher._parse_compressed_xcom", return_value=xcom_val), - patch.object(self.trigger, "get_xcom_val", AsyncMock(return_value="compressed_data")), + patch.object(self.trigger, "get_xcom_val", AsyncMock(return_value="failed")), ): status = await self.trigger._parse_node_status() assert status == expected_status diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 19482d8b34..49d89ed332 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -612,7 +612,7 @@ def test_poke_success_from_run_results(self): ti = MagicMock() ti.try_number = 1 - ti.xcom_pull.return_value = ENCODED_RUN_RESULTS + ti.xcom_pull.return_value = "success" context = self.make_context(ti) result = sensor.poke(context) @@ -734,14 +734,14 @@ def test_get_status_from_events_sets_compiled_sql(self): assert result == "success" assert sensor.compiled_sql == "select 42" - @patch("cosmos.operators.watcher.DbtConsumerWatcherSensor._get_status_from_run_results") - def test_producer_state_failed(self, mock_run_result): + @patch("cosmos.operators.watcher.get_xcom_val") + def test_producer_state_failed(self, mock_get_xcom_val): sensor = self.make_sensor() sensor._get_producer_task_status.return_value = "failed" ti = MagicMock() ti.try_number = 1 sensor.poke_retry_number = 1 - mock_run_result.return_value = None + mock_get_xcom_val.return_value = None ti.xcom_pull.return_value = "failed" context = self.make_context(ti) @@ -753,9 +753,9 @@ def test_producer_state_failed(self, mock_run_result): sensor.poke(context) @patch("cosmos.operators.watcher.DbtConsumerWatcherSensor._fallback_to_local_run") - @patch("cosmos.operators.watcher.DbtConsumerWatcherSensor._get_status_from_run_results") + @patch("cosmos.operators.watcher.get_xcom_val") def test_producer_state_does_not_fail_if_previously_upstream_failed( - self, mock_run_result, mock_fallback_to_local_run + self, mock_get_xcom_val, mock_fallback_to_local_run ): """ Attempt to run the task using ExecutionMode.LOCAL if State.UPSTREAM_FAILED happens. @@ -766,7 +766,7 @@ def test_producer_state_does_not_fail_if_previously_upstream_failed( ti = MagicMock() ti.try_number = 1 sensor.poke_retry_number = 0 - mock_run_result.return_value = None + mock_get_xcom_val.return_value = None ti.xcom_pull.return_value = "failed" context = self.make_context(ti) From 53965f9cb222198144835c2809e7a810dc1c6483 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 24 Nov 2025 22:38:41 +0530 Subject: [PATCH 17/21] Add more unit tests --- cosmos/hooks/subprocess.py | 2 +- tests/_triggers/test_watcher.py | 3 +- tests/hooks/test_subprocess.py | 53 +++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/cosmos/hooks/subprocess.py b/cosmos/hooks/subprocess.py index b26dcae057..5eeb5f571a 100644 --- a/cosmos/hooks/subprocess.py +++ b/cosmos/hooks/subprocess.py @@ -42,7 +42,7 @@ def _store_dbt_resource_status_from_log(self, line: str, **kwargs: Any) -> None: self.log.debug("Model: %s is in %s state", unique_id, node_status) - if node_status in ["success" or "failed"]: + if node_status in ["success", "failed"]: context = kwargs.get("context") assert context is not None # Make MyPy happy safe_xcom_push( diff --git a/tests/_triggers/test_watcher.py b/tests/_triggers/test_watcher.py index 439b259625..be9684682d 100644 --- a/tests/_triggers/test_watcher.py +++ b/tests/_triggers/test_watcher.py @@ -83,6 +83,7 @@ async def runner(*args, **kwargs): "use_event, xcom_val, expected_status", [ (True, {"data": {"run_result": {"status": "success"}}}, "success"), + (True, None, None), (False, "failed", "failed"), ], ) @@ -90,7 +91,7 @@ async def test_parse_node_status(self, use_event, xcom_val, expected_status): self.trigger.use_event = use_event with ( patch("cosmos._triggers.watcher._parse_compressed_xcom", return_value=xcom_val), - patch.object(self.trigger, "get_xcom_val", AsyncMock(return_value="failed")), + patch.object(self.trigger, "get_xcom_val", AsyncMock(return_value=xcom_val)), ): status = await self.trigger._parse_node_status() assert status == expected_status diff --git a/tests/hooks/test_subprocess.py b/tests/hooks/test_subprocess.py index e8b16d387b..4ec4682ac1 100644 --- a/tests/hooks/test_subprocess.py +++ b/tests/hooks/test_subprocess.py @@ -52,6 +52,14 @@ def test_subprocess_hook(): assert result.full_output == ["foo"] +def test_run_command_runtime_error(): + hook = FullOutputSubprocessHook() + + with patch("cosmos.hooks.subprocess.Popen", return_value=None): + with pytest.raises(RuntimeError, match="The subprocess should be created here and is None!"): + hook.run_command(["echo", "hello"]) + + @patch("os.getpgid", return_value=123) @patch("os.killpg") def test_send_sigint(mock_killpg, mock_getpgid): @@ -68,3 +76,48 @@ def test_send_sigterm(mock_killpg, mock_getpgid): hook.sub_process = MagicMock() hook.send_sigterm() mock_killpg.assert_called_with(123, signal.SIGTERM) + + +import json + + +@pytest.mark.parametrize( + "status,context,should_push,expect_assert", + [ + ("success", {"ti": MagicMock()}, True, False), + ("failed", {"ti": MagicMock()}, True, False), + ("running", {"ti": MagicMock()}, False, False), + (None, {"ti": MagicMock()}, False, False), + ("success", None, False, True), + ("failed", None, False, True), + ], +) +def test_store_dbt_resource_status_from_log_param(status, context, should_push, expect_assert): + trigger = FullOutputSubprocessHook() + + # Prepare log line + log_line = {"data": {"node_info": {"node_status": status, "unique_id": "model.jaffle_shop.stg_orders"}}} + line = json.dumps(log_line) + + with patch("cosmos.hooks.subprocess.safe_xcom_push") as mock_push: + if expect_assert: + with pytest.raises(AssertionError): + trigger._store_dbt_resource_status_from_log(line, context=context) + else: + trigger._store_dbt_resource_status_from_log(line, context=context) + if should_push: + mock_push.assert_called_once_with( + task_instance=context["ti"], key="model__jaffle_shop__stg_orders_status", value=status + ) + else: + mock_push.assert_not_called() + + +def test_store_dbt_resource_status_from_log_invalid_json(): + trigger = FullOutputSubprocessHook() + + invalid_line = "{not a valid json}" + + with patch("cosmos.hooks.subprocess.safe_xcom_push") as mock_push: + trigger._store_dbt_resource_status_from_log(invalid_line, context={"ti": MagicMock()}) + mock_push.assert_not_called() From 014640f1ed69c9bfd54c2f9ec691c7d0264de8b4 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 24 Nov 2025 22:42:52 +0530 Subject: [PATCH 18/21] cleanup --- cosmos/hooks/subprocess.py | 1 + cosmos/operators/base.py | 7 ++++++- tests/hooks/test_subprocess.py | 4 +--- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/cosmos/hooks/subprocess.py b/cosmos/hooks/subprocess.py index 5eeb5f571a..a4b781124e 100644 --- a/cosmos/hooks/subprocess.py +++ b/cosmos/hooks/subprocess.py @@ -35,6 +35,7 @@ def __init__(self) -> None: super().__init__() # type: ignore[no-untyped-call] def _store_dbt_resource_status_from_log(self, line: str, **kwargs: Any) -> None: + try: log_line = json.loads(line) node_status = log_line.get("data", {}).get("node_info", {}).get("node_status") diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index a42d4e571c..cb409482bf 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -318,7 +318,12 @@ def execute(self, context: Context, **kwargs) -> Any | None: # type: ignore class DbtBuildMixin: - """Mixin for dbt build command.""" + """ + Mixin for dbt build command. + + :param full_refresh: whether to add the flag --full-refresh to the dbt build command + :param log_format: format for dbt logs (e.g., 'json', 'text'). If provided, adds --log-format flag + """ base_cmd = ["build"] ui_color = "#8194E0" diff --git a/tests/hooks/test_subprocess.py b/tests/hooks/test_subprocess.py index 4ec4682ac1..c06d9c0b7a 100644 --- a/tests/hooks/test_subprocess.py +++ b/tests/hooks/test_subprocess.py @@ -1,3 +1,4 @@ +import json import signal from pathlib import Path from tempfile import TemporaryDirectory @@ -78,9 +79,6 @@ def test_send_sigterm(mock_killpg, mock_getpgid): mock_killpg.assert_called_with(123, signal.SIGTERM) -import json - - @pytest.mark.parametrize( "status,context,should_push,expect_assert", [ From 37d46745705513e76370436e74128b3d019ee866 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 24 Nov 2025 23:21:04 +0530 Subject: [PATCH 19/21] Add docs string --- cosmos/hooks/subprocess.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cosmos/hooks/subprocess.py b/cosmos/hooks/subprocess.py index a4b781124e..22954a4d8a 100644 --- a/cosmos/hooks/subprocess.py +++ b/cosmos/hooks/subprocess.py @@ -35,7 +35,13 @@ def __init__(self) -> None: super().__init__() # type: ignore[no-untyped-call] def _store_dbt_resource_status_from_log(self, line: str, **kwargs: Any) -> None: + """ + Parses a single line from dbt JSON logs and stores node status to Airflow XCom. + This method parses each log line from dbt when --log-format json is used, + extracts node status information, and pushes it to XCom for consumption + by downstream watcher sensors. + """ try: log_line = json.loads(line) node_status = log_line.get("data", {}).get("node_info", {}).get("node_status") From bac37f658efbbef8dde03a1f66bde572a75ed5f2 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 24 Nov 2025 23:27:00 +0530 Subject: [PATCH 20/21] Add some comments --- cosmos/_utils/common.py | 1 + cosmos/hooks/subprocess.py | 1 + 2 files changed, 2 insertions(+) diff --git a/cosmos/_utils/common.py b/cosmos/_utils/common.py index fb6fc9b5b5..0c76898177 100644 --- a/cosmos/_utils/common.py +++ b/cosmos/_utils/common.py @@ -22,5 +22,6 @@ def safe_xcom_push(task_instance: TaskInstance, key: str, value: Any) -> None: task_instance.xcom_push(key=key, value=value) +# TODO: Unify the Airflow call from cosmos/_triggers/watcher.py and cosmos/operators/watcher.py def get_xcom_val(task_instance: TaskInstance, task_ids: str | list[str], key: str) -> Any: return task_instance.xcom_pull(task_ids, key=key) diff --git a/cosmos/hooks/subprocess.py b/cosmos/hooks/subprocess.py index 22954a4d8a..52fcfaeac0 100644 --- a/cosmos/hooks/subprocess.py +++ b/cosmos/hooks/subprocess.py @@ -49,6 +49,7 @@ def _store_dbt_resource_status_from_log(self, line: str, **kwargs: Any) -> None: self.log.debug("Model: %s is in %s state", unique_id, node_status) + # TODO: Handle and store all possible node statuses, not just the current success and failed if node_status in ["success", "failed"]: context = kwargs.get("context") assert context is not None # Make MyPy happy From 39d3609c49b4c77f195b197b021f84c8f16d8f99 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 3 Dec 2025 19:14:16 +0530 Subject: [PATCH 21/21] Address review feedbak --- cosmos/_utils/common.py | 27 --------------------------- cosmos/_utils/watcher_state.py | 26 ++++++++++++++++++++++++++ cosmos/hooks/subprocess.py | 26 +++++++++++++------------- cosmos/operators/watcher.py | 2 +- 4 files changed, 40 insertions(+), 41 deletions(-) delete mode 100644 cosmos/_utils/common.py diff --git a/cosmos/_utils/common.py b/cosmos/_utils/common.py deleted file mode 100644 index 0c76898177..0000000000 --- a/cosmos/_utils/common.py +++ /dev/null @@ -1,27 +0,0 @@ -from threading import Lock -from typing import Any - -try: - from airflow.sdk.types import RuntimeTaskInstanceProtocol as TaskInstance -except ImportError: - from airflow.models.taskinstance import TaskInstance # type: ignore[assignment] - - -xcom_set_lock = Lock() - - -def safe_xcom_push(task_instance: TaskInstance, key: str, value: Any) -> None: - """ - Safely set an XCom value in a thread-safe manner in Airflow 3.0 and 3.1. - We noticed that the combination of using dbt (multi-threaded) and Airflow 3.0 and 3.1 to set XCom lead to race conditions. - This leads the producer task to get stuck while running the dbt build command. - Unfortunately, since this is non-deterministic, and happens once every five runs, we were not able to have a proper test. - However, we applied this fix and run over 20 times a pipeline that would fail every 5 runs and this allowed us to no longer face the issue. - """ - with xcom_set_lock: - task_instance.xcom_push(key=key, value=value) - - -# TODO: Unify the Airflow call from cosmos/_triggers/watcher.py and cosmos/operators/watcher.py -def get_xcom_val(task_instance: TaskInstance, task_ids: str | list[str], key: str) -> Any: - return task_instance.xcom_pull(task_ids, key=key) diff --git a/cosmos/_utils/watcher_state.py b/cosmos/_utils/watcher_state.py index 9fd6d128ee..534b81aa36 100644 --- a/cosmos/_utils/watcher_state.py +++ b/cosmos/_utils/watcher_state.py @@ -1,13 +1,39 @@ from __future__ import annotations import logging +from threading import Lock from typing import Any, Callable +try: + from airflow.sdk.types import RuntimeTaskInstanceProtocol as TaskInstance +except ImportError: + from airflow.models.taskinstance import TaskInstance # type: ignore[assignment] + from packaging.version import Version ProducerStateFetcher = Callable[[], str | None] +xcom_set_lock = Lock() + + +def safe_xcom_push(task_instance: TaskInstance, key: str, value: Any) -> None: + """ + Safely set an XCom value in a thread-safe manner in Airflow 3.0 and 3.1. + We noticed that the combination of using dbt (multi-threaded) and Airflow 3.0 and 3.1 to set XCom lead to race conditions. + This leads the producer task to get stuck while running the dbt build command. + Unfortunately, since this is non-deterministic, and happens once every five runs, we were not able to have a proper test. + However, we applied this fix and run over 20 times a pipeline that would fail every 5 runs and this allowed us to no longer face the issue. + """ + with xcom_set_lock: + task_instance.xcom_push(key=key, value=value) + + +# TODO: Unify the Airflow call from cosmos/_triggers/watcher.py and cosmos/operators/watcher.py +def get_xcom_val(task_instance: TaskInstance, task_ids: str | list[str], key: str) -> Any: + return task_instance.xcom_pull(task_ids, key=key) + + def _load_airflow2_dependencies() -> tuple[Any, Callable[[], Any]]: from airflow.models import TaskInstance from airflow.utils.session import create_session diff --git a/cosmos/hooks/subprocess.py b/cosmos/hooks/subprocess.py index 52fcfaeac0..db8037f601 100644 --- a/cosmos/hooks/subprocess.py +++ b/cosmos/hooks/subprocess.py @@ -18,7 +18,7 @@ except ImportError: from airflow.hooks.base import BaseHook -from cosmos._utils.common import safe_xcom_push +from cosmos._utils.watcher_state import safe_xcom_push class FullOutputSubprocessResult(NamedTuple): @@ -44,20 +44,20 @@ def _store_dbt_resource_status_from_log(self, line: str, **kwargs: Any) -> None: """ try: log_line = json.loads(line) - node_status = log_line.get("data", {}).get("node_info", {}).get("node_status") - unique_id = log_line.get("data", {}).get("node_info", {}).get("unique_id") - - self.log.debug("Model: %s is in %s state", unique_id, node_status) - - # TODO: Handle and store all possible node statuses, not just the current success and failed - if node_status in ["success", "failed"]: - context = kwargs.get("context") - assert context is not None # Make MyPy happy - safe_xcom_push( - task_instance=context["ti"], key=f"{unique_id.replace('.', '__')}_status", value=node_status - ) except json.JSONDecodeError: self.log.debug("Failed to parse log: %s", line) + log_line = {} + + node_status = log_line.get("data", {}).get("node_info", {}).get("node_status") + unique_id = log_line.get("data", {}).get("node_info", {}).get("unique_id") + + self.log.debug("Model: %s is in %s state", unique_id, node_status) + + # TODO: Handle and store all possible node statuses, not just the current success and failed + if node_status in ["success", "failed"]: + context = kwargs.get("context") + assert context is not None # Make MyPy happy + safe_xcom_push(task_instance=context["ti"], key=f"{unique_id.replace('.', '__')}_status", value=node_status) def run_command( self, diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 7cefd28466..beed7b9446 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -9,7 +9,7 @@ from typing import TYPE_CHECKING, Any from cosmos._triggers.watcher import WatcherTrigger, _parse_compressed_xcom -from cosmos._utils.common import get_xcom_val, safe_xcom_push +from cosmos._utils.watcher_state import get_xcom_val, safe_xcom_push if TYPE_CHECKING: # pragma: no cover try: