From b9cfc2d08642362e1b5dd493b79dd967b6de9cdf Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 14:12:43 -0300 Subject: [PATCH 01/26] Add source freshness aware execution for ExecutionMode.WATCHER When SourceRenderingBehavior is not NONE, the watcher producer now runs `dbt source freshness` before the main `dbt build`. For every stale source (status "error" or "warn"), all transitive dependents are marked as "skipped" via XCom and added to the build's --exclude list so they are not executed against stale data. Consumer sensors handle the new "skipped" status by raising AirflowSkipException (in both poke and deferred/trigger paths), giving clear visual feedback in the Airflow UI. Co-Authored-By: Claude Opus 4.6 (1M context) --- cosmos/airflow/graph.py | 2 + cosmos/operators/_watcher/base.py | 30 +++- cosmos/operators/_watcher/state.py | 10 +- cosmos/operators/_watcher/triggerer.py | 5 + cosmos/operators/local.py | 17 ++ cosmos/operators/watcher.py | 114 +++++++++++++ tests/airflow/test_graph.py | 34 +++- tests/operators/_watcher/test_state.py | 13 +- tests/operators/_watcher/test_triggerer.py | 1 + tests/operators/_watcher/test_watcher_base.py | 43 +++++ tests/operators/test_local.py | 26 +++ tests/operators/test_watcher.py | 161 ++++++++++++++++++ 12 files changed, 443 insertions(+), 13 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 54721b04d9..b1b023fcf6 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -687,6 +687,8 @@ def _add_watcher_producer_task( producer_task_args = task_args.copy() if tests_per_model is not None: producer_task_args["tests_per_model"] = tests_per_model + if render_config is not None and render_config.source_rendering_behavior is not SourceRenderingBehavior.NONE: + producer_task_args["_check_source_freshness"] = True if render_config is not None: producer_task_args["select"] = _convert_list_to_str(render_config.select) diff --git a/cosmos/operators/_watcher/base.py b/cosmos/operators/_watcher/base.py index d435013ca0..df0c33aa21 100644 --- a/cosmos/operators/_watcher/base.py +++ b/cosmos/operators/_watcher/base.py @@ -5,7 +5,7 @@ from pathlib import Path from typing import Any -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowSkipException from cosmos import settings from cosmos.config import ProfileConfig @@ -23,6 +23,7 @@ _log_dbt_event, build_producer_state_fetcher, get_xcom_val, + is_dbt_node_status_skipped, is_dbt_node_status_success, is_dbt_node_status_terminal, safe_xcom_push, @@ -440,6 +441,11 @@ def execute_complete(self, context: Context, event: dict[str, str]) -> None: status = event.get("status") reason = event.get("reason") + if status == "skipped": + raise AirflowSkipException( + f"{self._resource_label} '{self.model_unique_id}' was skipped because an upstream source is not fresh." + ) + if status == "success" and reason == WatcherEventReason.NODE_NOT_RUN: logger.info( "%s '%s' was skipped by the dbt command. This may happen if it is an ephemeral model or if the model sql file is empty.", @@ -494,6 +500,16 @@ def _get_node_status(self, ti: Any, context: Context) -> Any: return get_xcom_val(ti, self.producer_task_id, xcom_key) return get_xcom_val(ti, self.producer_task_id, f"{self.model_unique_id.replace('.', '__')}_status") + def _cache_compiled_sql(self, ti: Any, context: Context) -> None: + """Pull compiled_sql from XCom and cache it on the sensor instance.""" + compiled_sql = get_xcom_val( + ti, self.producer_task_id, f"{self.model_unique_id.replace('.', '__')}_compiled_sql" + ) + if compiled_sql: + self.compiled_sql = compiled_sql + if hasattr(self, "_override_rtif"): + self._override_rtif(context) + def poke(self, context: Context) -> bool: """ Checks the status of a dbt node (model or aggregated tests) by pulling relevant XComs from the producer task. @@ -521,13 +537,7 @@ def poke(self, context: Context) -> bool: # compiled_sql is always in the canonical per-model XCom key (same for event and subprocess modes) if status is not None: - compiled_sql = get_xcom_val( - ti, self.producer_task_id, f"{self.model_unique_id.replace('.', '__')}_compiled_sql" - ) - if compiled_sql: - self.compiled_sql = compiled_sql - if hasattr(self, "_override_rtif"): - self._override_rtif(context) + self._cache_compiled_sql(ti, context) dbt_events = get_xcom_val( task_instance=context["ti"], @@ -550,6 +560,10 @@ def poke(self, context: Context) -> bool: self.poke_retry_number += 1 return False + elif is_dbt_node_status_skipped(status): + raise AirflowSkipException( + f"{self._resource_label} '{self.model_unique_id}' was skipped because an upstream source is not fresh." + ) elif is_dbt_node_status_success(status): return True else: diff --git a/cosmos/operators/_watcher/state.py b/cosmos/operators/_watcher/state.py index 48ada8c1de..f1475b1e62 100644 --- a/cosmos/operators/_watcher/state.py +++ b/cosmos/operators/_watcher/state.py @@ -23,6 +23,7 @@ # dbt uses different status values for different node types (models/tests):" DBT_SUCCESS_STATUSES = frozenset({"success", "pass"}) DBT_FAILED_STATUSES = frozenset({"failed", "fail", "error", "runtime error"}) +DBT_SKIPPED_STATUSES = frozenset({"skipped"}) class DbtTestStatus(str, Enum): @@ -44,9 +45,14 @@ def is_dbt_node_status_failed(status: str | None) -> bool: return status in DBT_FAILED_STATUSES +def is_dbt_node_status_skipped(status: str | None) -> bool: + """Check if the dbt node status indicates it was skipped due to a stale upstream source.""" + return status in DBT_SKIPPED_STATUSES + + def is_dbt_node_status_terminal(status: str | None) -> bool: - """Check if the dbt node status is terminal (success or failed).""" - return is_dbt_node_status_success(status) or is_dbt_node_status_failed(status) + """Check if the dbt node status is terminal (success, failed, or skipped).""" + return is_dbt_node_status_success(status) or is_dbt_node_status_failed(status) or is_dbt_node_status_skipped(status) xcom_set_lock = Lock() diff --git a/cosmos/operators/_watcher/triggerer.py b/cosmos/operators/_watcher/triggerer.py index 0b19c465b7..709fd56bc1 100644 --- a/cosmos/operators/_watcher/triggerer.py +++ b/cosmos/operators/_watcher/triggerer.py @@ -16,6 +16,7 @@ _log_dbt_event, build_producer_state_fetcher, is_dbt_node_status_failed, + is_dbt_node_status_skipped, is_dbt_node_status_success, is_dbt_node_status_terminal, ) @@ -217,6 +218,10 @@ async def run(self) -> AsyncIterator[TriggerEvent]: event_data["compiled_sql"] = compiled_sql yield TriggerEvent(event_data) # type: ignore[no-untyped-call] return + elif is_dbt_node_status_skipped(dbt_node_status): + logger.info("dbt node '%s' skipped: upstream source is not fresh", self.model_unique_id) + yield TriggerEvent({"status": "skipped", "reason": "source_not_fresh"}) # type: ignore[no-untyped-call] + return elif is_dbt_node_status_failed(dbt_node_status): logger.warning("dbt node '%s' failed", self.model_unique_id) event_data = {"status": EventStatus.FAILED, "reason": WatcherEventReason.NODE_FAILED} diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 0745be3808..fac88b45ff 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -120,6 +120,19 @@ logger = get_logger(__name__) +def _read_target_sources_json(project_root: Path) -> dict[str, Any] | None: + """Parse ``target/sources.json`` under ``project_root`` if the file exists.""" + path = project_root / "target" / "sources.json" + if not path.is_file(): + return None + try: + result: dict[str, Any] = json.loads(path.read_text(encoding="utf-8")) + return result + except json.JSONDecodeError: + logger.warning("Could not parse JSON from %s", path) + return None + + # The following is related to the ability of Cosmos parsing dbt artifacts and generating OpenLineage URIs # It is used for emitting Airflow assets and not necessarily OpenLineage events try: @@ -204,6 +217,7 @@ def __init__( self.callback_args = callback_args or {} self.compiled_sql = "" self.freshness = "" + self._sources_json: dict[str, Any] | None = None self.should_store_compiled_sql = should_store_compiled_sql self.should_upload_compiled_sql = should_upload_compiled_sql self.openlineage_events_completes: list[RunEvent] = [] @@ -674,6 +688,9 @@ def run_command( # noqa: C901 cwd=tmp_project_dir, context=context, ) + if context.get("_check_source_freshness"): + self._sources_json = _read_target_sources_json(tmp_dir_path) + return result if is_openlineage_common_available: self.calculate_openlineage_events_completes(env, tmp_dir_path) if AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION: diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 55662f1499..12626a48af 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -7,12 +7,20 @@ from airflow.exceptions import AirflowException +try: + # Airflow 3.1 onwards + from airflow.sdk import TaskGroup +except ImportError: + from airflow.utils.task_group import TaskGroup + from cosmos.config import ProfileConfig from cosmos.constants import ( PRODUCER_WATCHER_DEFAULT_PRIORITY_WEIGHT, PRODUCER_WATCHER_TASK_ID, WATCHER_TASK_WEIGHT_RULE, + DbtResourceType, ) +from cosmos.dbt.graph import DbtNode from cosmos.log import get_logger from cosmos.operators._watcher import safe_xcom_push from cosmos.operators._watcher.base import ( @@ -46,6 +54,49 @@ logger = get_logger(__name__) +def _default_freshness_callback( + context: Context, + dag: Any, + task_group: TaskGroup | None, + nodes: dict[str, DbtNode] | None, + sources_json: dict[str, Any] | None, +) -> tuple[list[str], str]: + """Return unique_ids of all nodes that transitively depend on a stale source, plus the status ``"skip"``. + + Stale sources are those with ``status`` of ``"error"`` or ``"warn"`` in ``sources_json["results"]``. + Traversal is BFS over the reverse-dependency graph built from ``nodes``. + """ + if not nodes or not sources_json: + return [], "skip" + + stale_source_ids = {r["unique_id"] for r in sources_json.get("results", []) if r.get("status") in ("error", "warn")} + if not stale_source_ids: + return [], "skip" + + # Build reverse map: dep_id -> set of node_ids that directly depend on it + dependents: dict[str, set[str]] = {} + for node_id, node in nodes.items(): + for dep_id in node.depends_on: + dependents.setdefault(dep_id, set()).add(node_id) + + # BFS from each stale source to collect all transitive dependents + _excludable_resource_types = {DbtResourceType.MODEL, DbtResourceType.SEED, DbtResourceType.SNAPSHOT} + visited: set[str] = set() + queue = list(stale_source_ids) + while queue: + current = queue.pop() + for dependent_id in dependents.get(current, set()): + if dependent_id not in visited: + visited.add(dependent_id) + queue.append(dependent_id) + + # Only return model/seed/snapshot nodes — tests are skipped automatically when their parent is excluded, + # and test hash-suffixed unique_ids are not valid dbt --exclude selectors. + excludable = [uid for uid in visited if nodes.get(uid) and nodes[uid].resource_type in _excludable_resource_types] + logger.info("Nodes to skip due to stale sources: %s", excludable) + return excludable, "skip" + + class _NullWriter: """Write-only sink that discards all data; used to suppress dbt stdout in DBT_RUNNER mode. @@ -92,6 +143,11 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: task_id = kwargs.pop("task_id", PRODUCER_WATCHER_TASK_ID) self.tests_per_model: dict[str, list[str]] = kwargs.pop("tests_per_model", {}) self.test_results_per_model: dict[str, list[str]] = {} + self._check_source_freshness: bool = kwargs.pop("_check_source_freshness", False) + self._freshness_callback: Callable[ + [Context, Any, TaskGroup | None, dict[str, DbtNode] | None, dict[str, Any] | None], + tuple[list[str], str], + ] = _default_freshness_callback # Do not publish compiled_sql to the producer's rendered_template: it would contain SQL for # all models run by the producer, is often truncated in the UI due to size, and is of no use # there; individual sensor tasks show the corresponding rendered_template per model. @@ -172,6 +228,61 @@ def _event_callback(event: Any) -> None: return result return super().run_dbt_runner(command, env, cwd, **kwargs) + def _push_skipped_xcom_for_model(self, ti: Any, unique_id: str) -> None: + """Push a synthetic ``"skipped"`` status XCom for a model excluded due to a stale upstream source. + + Uses the unified ``*_status`` XCom key that consumer sensors already poll. + """ + uid_key = unique_id.replace(".", "__") + safe_xcom_push(task_instance=ti, key=f"{uid_key}_status", value="skipped") + + def _run_source_freshness(self, context: Context) -> None: + """Run ``dbt source freshness`` via ``build_cmd`` and ``run_command``.""" + original_base_cmd = self.base_cmd + original_indirect_selection = getattr(self, "indirect_selection", None) + try: + self.base_cmd = ["source", "freshness"] + self.indirect_selection = None # ``dbt source freshness`` does not support --indirect-selection + full_cmd, env = self.build_cmd(context=context, cmd_flags=self.add_cmd_flags()) + context["_check_source_freshness"] = True # type: ignore[typeddict-unknown-key] + self.run_command(cmd=full_cmd, env=env, context=context) + finally: + self.base_cmd = original_base_cmd + self.indirect_selection = original_indirect_selection + context.pop("_check_source_freshness", None) # type: ignore[typeddict-item] + + def _skipped_node_token(self, context: Context, node_unique_ids: list[str]) -> None: + if not node_unique_ids: + return + + ti = context["ti"] + + for unique_id in node_unique_ids: + logger.info( + "Marking resource '%s' as skipped (stale upstream source)", + unique_id, + ) + self._push_skipped_xcom_for_model(ti, unique_id) + + model_names = {uid.rsplit(".", 1)[-1] for uid in node_unique_ids} + + current_exclude = getattr(self, "exclude", None) + exclude_str = " ".join(model_names) + if current_exclude: + self.exclude = f"{current_exclude} {exclude_str}" + else: + self.exclude = exclude_str + + def _apply_source_freshness(self, context: Context) -> None: + """Run source freshness, invoke the callback, and mark affected nodes as skipped.""" + self._run_source_freshness(context) + dag = context.get("dag") + task_group = getattr(context.get("task_instance"), "task", None) + task_group = getattr(task_group, "task_group", None) + nodes = getattr(dag, "_cosmos_nodes", None) if dag else None + node_ids_to_skip, _ = self._freshness_callback(context, dag, task_group, nodes, self._sources_json) + self._skipped_node_token(context, node_ids_to_skip) + def execute(self, context: Context, **kwargs: Any) -> Any: task_instance = context.get("ti") if task_instance is None: @@ -187,6 +298,9 @@ def execute(self, context: Context, **kwargs: Any) -> Any: ) return None + if self._check_source_freshness: + self._apply_source_freshness(context) + try: return_value = super().execute(context=context, **kwargs) safe_xcom_push(task_instance=context["ti"], key="task_status", value="completed") diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 9e6863164f..81a9883f85 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -1,7 +1,7 @@ import os from datetime import datetime from pathlib import Path -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, Mock, patch import pytest from airflow.models import DAG @@ -20,6 +20,7 @@ from cosmos.airflow.graph import ( _add_teardown_task, + _add_watcher_producer_task, _convert_list_to_str, _snake_case_to_camelcase, build_airflow_graph, @@ -2016,3 +2017,34 @@ def test_create_test_task_metadata_after_all_empty_lists(self): assert metadata.arguments["select"] is None assert metadata.arguments["exclude"] is None assert metadata.arguments["selector"] is None + + +@pytest.mark.parametrize( + "source_rendering_behavior, expected_flag", + [ + (SourceRenderingBehavior.ALL, True), + (SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS, True), + (SourceRenderingBehavior.NONE, False), + ], +) +def test_add_watcher_producer_task_sets_check_source_freshness_flag(source_rendering_behavior, expected_flag): + """_add_watcher_producer_task passes _check_source_freshness based on source_rendering_behavior.""" + render_config = RenderConfig(source_rendering_behavior=source_rendering_behavior) + task_args = {"project_dir": "/tmp/sample_project", "profile_config": None} + + with patch("cosmos.airflow.graph.create_airflow_task") as mock_create_task: + mock_create_task.return_value = MagicMock() + + _add_watcher_producer_task( + dag=MagicMock(), + task_group=None, + tasks_map={}, + render_config=render_config, + task_args=task_args, + ) + + task_metadata = mock_create_task.call_args[0][0] + if expected_flag: + assert task_metadata.arguments["_check_source_freshness"] is True + else: + assert "_check_source_freshness" not in task_metadata.arguments diff --git a/tests/operators/_watcher/test_state.py b/tests/operators/_watcher/test_state.py index 3e4c9c276a..4bc45cbfef 100644 --- a/tests/operators/_watcher/test_state.py +++ b/tests/operators/_watcher/test_state.py @@ -9,6 +9,7 @@ from cosmos.operators._watcher.state import ( _log_dbt_event, is_dbt_node_status_failed, + is_dbt_node_status_skipped, is_dbt_node_status_success, is_dbt_node_status_terminal, ) @@ -33,11 +34,19 @@ def test_is_dbt_node_status_failed_true(self, status: str): def test_is_dbt_node_status_failed_false(self, status: str | None): assert is_dbt_node_status_failed(status) is False - @pytest.mark.parametrize("status", ["success", "pass", "failed", "fail", "error"]) + @pytest.mark.parametrize("status", ["skipped"]) + def test_is_dbt_node_status_skipped_true(self, status: str): + assert is_dbt_node_status_skipped(status) is True + + @pytest.mark.parametrize("status", ["success", "pass", "failed", "fail", "error", "warn", None, ""]) + def test_is_dbt_node_status_skipped_false(self, status: str | None): + assert is_dbt_node_status_skipped(status) is False + + @pytest.mark.parametrize("status", ["success", "pass", "failed", "fail", "error", "skipped"]) def test_is_dbt_node_status_terminal_true(self, status: str): assert is_dbt_node_status_terminal(status) is True - @pytest.mark.parametrize("status", ["skipped", "warn", "running", None, ""]) + @pytest.mark.parametrize("status", ["warn", "running", None, ""]) def test_is_dbt_node_status_terminal_false(self, status: str | None): assert is_dbt_node_status_terminal(status) is False diff --git a/tests/operators/_watcher/test_triggerer.py b/tests/operators/_watcher/test_triggerer.py index c390a8de65..acbef402b3 100644 --- a/tests/operators/_watcher/test_triggerer.py +++ b/tests/operators/_watcher/test_triggerer.py @@ -138,6 +138,7 @@ async def test_get_xcom_val_branches(self, airflow_version, expected_val): "dbt_node_status, producer_state, expected", [ ("success", "running", {"status": "success"}), + ("skipped", "running", {"status": "skipped", "reason": "source_not_fresh"}), ("failed", "running", {"status": "failed", "reason": WatcherEventReason.NODE_FAILED}), (None, "failed", {"status": "failed", "reason": WatcherEventReason.PRODUCER_FAILED}), (None, "success", {"status": "success", "reason": WatcherEventReason.NODE_NOT_RUN}), diff --git a/tests/operators/_watcher/test_watcher_base.py b/tests/operators/_watcher/test_watcher_base.py index a409497888..d94c895a3d 100644 --- a/tests/operators/_watcher/test_watcher_base.py +++ b/tests/operators/_watcher/test_watcher_base.py @@ -1,6 +1,7 @@ from unittest.mock import Mock, patch import pytest +from airflow.exceptions import AirflowSkipException from cosmos.operators._watcher.base import BaseConsumerSensor, _process_dbt_log_event from cosmos.operators.local import DbtRunLocalOperator @@ -89,3 +90,45 @@ def test_process_dbt_log_event_skips_when_no_unique_id(self): with patch("cosmos.operators._watcher.base.safe_xcom_push") as mock_push: _process_dbt_log_event(task_instance, dbt_log) mock_push.assert_not_called() + + def test_execute_complete_raises_airflow_skip_exception_when_status_is_skipped(self): + """execute_complete raises AirflowSkipException when the trigger sends status='skipped'.""" + + class SubclassBaseConsumerSensor(BaseConsumerSensor, DbtRunLocalOperator): + something_to_be_implemented = True + + sensor = SubclassBaseConsumerSensor( + task_id="test_sensor", + producer_task_id="dbt_run_local", + profile_config=None, + project_dir="/tmp/sample_project", + extra_context={"dbt_node_config": {"unique_id": "model.pkg.my_model"}}, + ) + context = Mock() + with pytest.raises(AirflowSkipException, match="upstream source is not fresh"): + sensor.execute_complete(context, {"status": "skipped", "reason": "source_not_fresh"}) + + def test_poke_raises_airflow_skip_exception_when_status_is_skipped(self): + """poke raises AirflowSkipException when node status is 'skipped'.""" + + class SubclassBaseConsumerSensor(BaseConsumerSensor, DbtRunLocalOperator): + something_to_be_implemented = True + + sensor = SubclassBaseConsumerSensor( + task_id="test_sensor", + producer_task_id="dbt_run_local", + profile_config=None, + project_dir="/tmp/sample_project", + extra_context={"dbt_node_config": {"unique_id": "model.pkg.my_model"}}, + ) + mock_ti = Mock() + mock_ti.try_number = 1 + context = {"ti": mock_ti, "run_id": "run_123"} + + with ( + patch.object(sensor, "_get_producer_task_status", return_value="running"), + patch.object(sensor, "_get_node_status", return_value="skipped"), + patch.object(sensor, "_log_startup_events"), + ): + with pytest.raises(AirflowSkipException, match="upstream source is not fresh"): + sensor.poke(context) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 14ac2f76c8..3f2291d026 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -50,6 +50,7 @@ DbtSnapshotLocalOperator, DbtSourceLocalOperator, DbtTestLocalOperator, + _read_target_sources_json, ) from cosmos.profiles import PostgresUserPasswordProfileMapping from tests.utils import new_test_dag @@ -2413,3 +2414,28 @@ def test_handle_datasets_does_not_push_xcom_when_no_outlets(): # Verify xcom_push was NOT called (no outlets to push) uri_xcom_calls = [call for call in mock_ti.xcom_push.call_args_list if call[1].get("key") == "uri"] assert len(uri_xcom_calls) == 0, "URI XCom should not be pushed when there are no outlets" + + +class TestReadTargetSourcesJson: + def test_returns_dict_when_file_exists(self, tmp_path): + target = tmp_path / "target" + target.mkdir() + sources_file = target / "sources.json" + sources_file.write_text(json.dumps({"results": [{"unique_id": "source.pkg.src", "status": "pass"}]})) + + result = _read_target_sources_json(tmp_path) + assert result is not None + assert result["results"][0]["unique_id"] == "source.pkg.src" + + def test_returns_none_when_file_missing(self, tmp_path): + result = _read_target_sources_json(tmp_path) + assert result is None + + def test_returns_none_on_invalid_json(self, tmp_path): + target = tmp_path / "target" + target.mkdir() + sources_file = target / "sources.json" + sources_file.write_text("not valid json {{{") + + result = _read_target_sources_json(tmp_path) + assert result is None diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 1b727093ea..ad791ffd9f 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -29,6 +29,7 @@ DbtRunWatcherOperator, DbtSeedWatcherOperator, DbtTestWatcherOperator, + _default_freshness_callback, store_dbt_resource_status_from_log, ) from cosmos.profiles import PostgresUserPasswordProfileMapping, get_automatic_profile_mapping @@ -1922,3 +1923,163 @@ def test_fallback_raises_on_retry(self): with pytest.raises(AirflowException, match="Test re-execution is not yet supported"): sensor.poke(context) + + +class TestDefaultFreshnessCallback: + """Tests for the _default_freshness_callback function.""" + + def test_returns_empty_when_no_nodes(self): + node_ids, status = _default_freshness_callback( + context=MagicMock(), dag=None, task_group=None, nodes=None, sources_json=None + ) + assert node_ids == [] + assert status == "skip" + + def test_returns_empty_when_no_stale_sources(self): + from cosmos.constants import DbtResourceType + from cosmos.dbt.graph import DbtNode + + nodes = { + "model.pkg.m1": DbtNode( + unique_id="model.pkg.m1", + resource_type=DbtResourceType.MODEL, + depends_on=["source.pkg.src1"], + path_base=Path("/tmp"), + original_file_path=Path("models/m.sql"), + ), + } + sources_json = {"results": [{"unique_id": "source.pkg.src1", "status": "pass"}]} + node_ids, status = _default_freshness_callback( + context=MagicMock(), dag=None, task_group=None, nodes=nodes, sources_json=sources_json + ) + assert node_ids == [] + assert status == "skip" + + def test_returns_transitive_dependents_of_stale_source(self): + from cosmos.constants import DbtResourceType + from cosmos.dbt.graph import DbtNode + + nodes = { + "source.pkg.src1": DbtNode( + unique_id="source.pkg.src1", + resource_type=DbtResourceType.SOURCE, + depends_on=[], + path_base=Path("/tmp"), + original_file_path=Path("models/m.sql"), + ), + "model.pkg.m1": DbtNode( + unique_id="model.pkg.m1", + resource_type=DbtResourceType.MODEL, + depends_on=["source.pkg.src1"], + path_base=Path("/tmp"), + original_file_path=Path("models/m.sql"), + ), + "model.pkg.m2": DbtNode( + unique_id="model.pkg.m2", + resource_type=DbtResourceType.MODEL, + depends_on=["model.pkg.m1"], + path_base=Path("/tmp"), + original_file_path=Path("models/m.sql"), + ), + } + sources_json = {"results": [{"unique_id": "source.pkg.src1", "status": "error"}]} + node_ids, status = _default_freshness_callback( + context=MagicMock(), dag=None, task_group=None, nodes=nodes, sources_json=sources_json + ) + assert set(node_ids) == {"model.pkg.m1", "model.pkg.m2"} + assert status == "skip" + + def test_excludes_test_nodes(self): + from cosmos.constants import DbtResourceType + from cosmos.dbt.graph import DbtNode + + nodes = { + "source.pkg.src1": DbtNode( + unique_id="source.pkg.src1", + resource_type=DbtResourceType.SOURCE, + depends_on=[], + path_base=Path("/tmp"), + original_file_path=Path("models/m.sql"), + ), + "model.pkg.m1": DbtNode( + unique_id="model.pkg.m1", + resource_type=DbtResourceType.MODEL, + depends_on=["source.pkg.src1"], + path_base=Path("/tmp"), + original_file_path=Path("models/m.sql"), + ), + "test.pkg.t1": DbtNode( + unique_id="test.pkg.t1", + resource_type=DbtResourceType.TEST, + depends_on=["model.pkg.m1"], + path_base=Path("/tmp"), + original_file_path=Path("models/m.sql"), + ), + } + sources_json = {"results": [{"unique_id": "source.pkg.src1", "status": "warn"}]} + node_ids, status = _default_freshness_callback( + context=MagicMock(), dag=None, task_group=None, nodes=nodes, sources_json=sources_json + ) + # Only model nodes, not test nodes + assert node_ids == ["model.pkg.m1"] + assert status == "skip" + + +class TestProducerSourceFreshness: + """Tests for source freshness methods on DbtProducerWatcherOperator.""" + + def _make_producer(self, check_source_freshness=True, **kwargs): + from airflow import DAG + + with DAG(dag_id="test_freshness_dag", start_date=datetime(2023, 1, 1)): + producer = DbtProducerWatcherOperator( + project_dir=str(DBT_PROJECT_PATH), + profile_config=profile_config, + _check_source_freshness=check_source_freshness, + **kwargs, + ) + return producer + + def test_init_stores_check_source_freshness_flag(self): + producer = self._make_producer(check_source_freshness=True) + assert producer._check_source_freshness is True + + def test_init_default_check_source_freshness_is_false(self): + producer = self._make_producer(check_source_freshness=False) + assert producer._check_source_freshness is False + + def test_push_skipped_xcom_for_model(self): + producer = self._make_producer() + ti = MagicMock() + producer._push_skipped_xcom_for_model(ti, "model.pkg.my_model") + ti.xcom_push.assert_called_once_with(key="model__pkg__my_model_status", value="skipped") + + def test_skipped_node_token_updates_exclude(self): + producer = self._make_producer() + producer.exclude = None + ti = MagicMock() + context = {"ti": ti} + producer._skipped_node_token(context, ["model.pkg.m1", "model.pkg.m2"]) + # Both models should be pushed as skipped + assert ti.xcom_push.call_count == 2 + # Exclude should contain the model short names + assert "m1" in producer.exclude + assert "m2" in producer.exclude + + def test_skipped_node_token_appends_to_existing_exclude(self): + producer = self._make_producer() + producer.exclude = "existing_model" + ti = MagicMock() + context = {"ti": ti} + producer._skipped_node_token(context, ["model.pkg.m1"]) + assert "existing_model" in producer.exclude + assert "m1" in producer.exclude + + def test_skipped_node_token_noop_when_empty(self): + producer = self._make_producer() + producer.exclude = None + ti = MagicMock() + context = {"ti": ti} + producer._skipped_node_token(context, []) + ti.xcom_push.assert_not_called() + assert producer.exclude is None From 32c637bf26ee208303363df0f8127a3db8b02973 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 14:38:23 -0300 Subject: [PATCH 02/26] Address review feedback on skip status handling Make skip-related messages generic instead of hard-coding "upstream source not fresh" as the only reason. The skipped status may also be emitted by dbt for other causes (e.g. upstream failure). Update docstrings, log messages, and trigger event payloads accordingly. Fix inline comment that incorrectly said BFS (uses DFS). Co-Authored-By: Claude Opus 4.6 (1M context) --- cosmos/operators/_watcher/base.py | 4 ++-- cosmos/operators/_watcher/state.py | 2 +- cosmos/operators/_watcher/triggerer.py | 4 ++-- cosmos/operators/watcher.py | 2 +- tests/operators/_watcher/test_triggerer.py | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cosmos/operators/_watcher/base.py b/cosmos/operators/_watcher/base.py index df0c33aa21..0481740819 100644 --- a/cosmos/operators/_watcher/base.py +++ b/cosmos/operators/_watcher/base.py @@ -443,7 +443,7 @@ def execute_complete(self, context: Context, event: dict[str, str]) -> None: if status == "skipped": raise AirflowSkipException( - f"{self._resource_label} '{self.model_unique_id}' was skipped because an upstream source is not fresh." + f"{self._resource_label} '{self.model_unique_id}' was skipped by the dbt command." ) if status == "success" and reason == WatcherEventReason.NODE_NOT_RUN: @@ -562,7 +562,7 @@ def poke(self, context: Context) -> bool: return False elif is_dbt_node_status_skipped(status): raise AirflowSkipException( - f"{self._resource_label} '{self.model_unique_id}' was skipped because an upstream source is not fresh." + f"{self._resource_label} '{self.model_unique_id}' was skipped by the dbt command." ) elif is_dbt_node_status_success(status): return True diff --git a/cosmos/operators/_watcher/state.py b/cosmos/operators/_watcher/state.py index f1475b1e62..88dc679141 100644 --- a/cosmos/operators/_watcher/state.py +++ b/cosmos/operators/_watcher/state.py @@ -46,7 +46,7 @@ def is_dbt_node_status_failed(status: str | None) -> bool: def is_dbt_node_status_skipped(status: str | None) -> bool: - """Check if the dbt node status indicates it was skipped due to a stale upstream source.""" + """Check if the dbt node status indicates it was skipped (e.g. stale upstream source, upstream failure).""" return status in DBT_SKIPPED_STATUSES diff --git a/cosmos/operators/_watcher/triggerer.py b/cosmos/operators/_watcher/triggerer.py index 709fd56bc1..2882359caa 100644 --- a/cosmos/operators/_watcher/triggerer.py +++ b/cosmos/operators/_watcher/triggerer.py @@ -219,8 +219,8 @@ async def run(self) -> AsyncIterator[TriggerEvent]: yield TriggerEvent(event_data) # type: ignore[no-untyped-call] return elif is_dbt_node_status_skipped(dbt_node_status): - logger.info("dbt node '%s' skipped: upstream source is not fresh", self.model_unique_id) - yield TriggerEvent({"status": "skipped", "reason": "source_not_fresh"}) # type: ignore[no-untyped-call] + logger.info("dbt node '%s' was skipped", self.model_unique_id) + yield TriggerEvent({"status": "skipped"}) # type: ignore[no-untyped-call] return elif is_dbt_node_status_failed(dbt_node_status): logger.warning("dbt node '%s' failed", self.model_unique_id) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 12626a48af..6918a8a9fb 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -79,7 +79,7 @@ def _default_freshness_callback( for dep_id in node.depends_on: dependents.setdefault(dep_id, set()).add(node_id) - # BFS from each stale source to collect all transitive dependents + # DFS from each stale source to collect all transitive dependents _excludable_resource_types = {DbtResourceType.MODEL, DbtResourceType.SEED, DbtResourceType.SNAPSHOT} visited: set[str] = set() queue = list(stale_source_ids) diff --git a/tests/operators/_watcher/test_triggerer.py b/tests/operators/_watcher/test_triggerer.py index acbef402b3..65276115a1 100644 --- a/tests/operators/_watcher/test_triggerer.py +++ b/tests/operators/_watcher/test_triggerer.py @@ -138,7 +138,7 @@ async def test_get_xcom_val_branches(self, airflow_version, expected_val): "dbt_node_status, producer_state, expected", [ ("success", "running", {"status": "success"}), - ("skipped", "running", {"status": "skipped", "reason": "source_not_fresh"}), + ("skipped", "running", {"status": "skipped"}), ("failed", "running", {"status": "failed", "reason": WatcherEventReason.NODE_FAILED}), (None, "failed", {"status": "failed", "reason": WatcherEventReason.PRODUCER_FAILED}), (None, "success", {"status": "success", "reason": WatcherEventReason.NODE_NOT_RUN}), From 4fc12de9748dc8b8e7fc0c65f3262f9a4cd965d8 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 14:46:35 -0300 Subject: [PATCH 03/26] Resolve graph nodes from DbtDag/DbtTaskGroup converter _apply_source_freshness() was reading nodes from a non-existent dag._cosmos_nodes attribute, so nodes would always be None and no downstream models would ever be skipped even when sources are stale. Now reads dbt_graph.filtered_nodes from the DbtDag or DbtTaskGroup converter (both inherit DbtToAirflowConverter which sets dbt_graph), falling back gracefully when neither is available. Co-Authored-By: Claude Opus 4.6 (1M context) --- cosmos/operators/watcher.py | 11 ++++++++++- tests/operators/_watcher/test_watcher_base.py | 4 ++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 6918a8a9fb..2ddb71acba 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -279,7 +279,16 @@ def _apply_source_freshness(self, context: Context) -> None: dag = context.get("dag") task_group = getattr(context.get("task_instance"), "task", None) task_group = getattr(task_group, "task_group", None) - nodes = getattr(dag, "_cosmos_nodes", None) if dag else None + + # Resolve graph nodes from the DbtDag or DbtTaskGroup converter, falling back gracefully. + nodes = None + if dag is not None: + dbt_graph = getattr(dag, "dbt_graph", None) + nodes = getattr(dbt_graph, "filtered_nodes", None) + if nodes is None and task_group is not None: + tg_dbt_graph = getattr(task_group, "dbt_graph", None) + nodes = getattr(tg_dbt_graph, "filtered_nodes", None) + node_ids_to_skip, _ = self._freshness_callback(context, dag, task_group, nodes, self._sources_json) self._skipped_node_token(context, node_ids_to_skip) diff --git a/tests/operators/_watcher/test_watcher_base.py b/tests/operators/_watcher/test_watcher_base.py index d94c895a3d..eac6a70e2a 100644 --- a/tests/operators/_watcher/test_watcher_base.py +++ b/tests/operators/_watcher/test_watcher_base.py @@ -105,7 +105,7 @@ class SubclassBaseConsumerSensor(BaseConsumerSensor, DbtRunLocalOperator): extra_context={"dbt_node_config": {"unique_id": "model.pkg.my_model"}}, ) context = Mock() - with pytest.raises(AirflowSkipException, match="upstream source is not fresh"): + with pytest.raises(AirflowSkipException, match="was skipped by the dbt command"): sensor.execute_complete(context, {"status": "skipped", "reason": "source_not_fresh"}) def test_poke_raises_airflow_skip_exception_when_status_is_skipped(self): @@ -130,5 +130,5 @@ class SubclassBaseConsumerSensor(BaseConsumerSensor, DbtRunLocalOperator): patch.object(sensor, "_get_node_status", return_value="skipped"), patch.object(sensor, "_log_startup_events"), ): - with pytest.raises(AirflowSkipException, match="upstream source is not fresh"): + with pytest.raises(AirflowSkipException, match="was skipped by the dbt command"): sensor.poke(context) From b32cb01fb6c17d6770a607aae26152b1fce26b97 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 14:55:57 -0300 Subject: [PATCH 04/26] Use EventStatus.SKIPPED constant for skipped trigger events Add SKIPPED to EventStatus enum for consistency with SUCCESS and FAILED. Use it in the trigger yield and execute_complete comparison instead of raw string literals. Also fix docstring that still said BFS (traversal uses DFS). Co-Authored-By: Claude Opus 4.6 (1M context) --- cosmos/listeners/dag_run_listener.py | 1 + cosmos/operators/_watcher/base.py | 3 ++- cosmos/operators/_watcher/triggerer.py | 2 +- cosmos/operators/watcher.py | 2 +- 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 0cab34d098..db723045be 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -25,6 +25,7 @@ class EventStatus: SUCCESS = "success" FAILED = "failed" + SKIPPED = "skipped" DAG_RUN = "dag_run" diff --git a/cosmos/operators/_watcher/base.py b/cosmos/operators/_watcher/base.py index 0481740819..fe9328c0a4 100644 --- a/cosmos/operators/_watcher/base.py +++ b/cosmos/operators/_watcher/base.py @@ -16,6 +16,7 @@ PRODUCER_WATCHER_TASK_ID, WATCHER_TASK_WEIGHT_RULE, ) +from cosmos.listeners.dag_run_listener import EventStatus from cosmos.log import get_logger from cosmos.operators._watcher.aggregation import get_tests_status_xcom_key, push_test_result_or_aggregate from cosmos.operators._watcher.state import ( @@ -441,7 +442,7 @@ def execute_complete(self, context: Context, event: dict[str, str]) -> None: status = event.get("status") reason = event.get("reason") - if status == "skipped": + if status == EventStatus.SKIPPED: raise AirflowSkipException( f"{self._resource_label} '{self.model_unique_id}' was skipped by the dbt command." ) diff --git a/cosmos/operators/_watcher/triggerer.py b/cosmos/operators/_watcher/triggerer.py index 2882359caa..2d3ba0334d 100644 --- a/cosmos/operators/_watcher/triggerer.py +++ b/cosmos/operators/_watcher/triggerer.py @@ -220,7 +220,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: return elif is_dbt_node_status_skipped(dbt_node_status): logger.info("dbt node '%s' was skipped", self.model_unique_id) - yield TriggerEvent({"status": "skipped"}) # type: ignore[no-untyped-call] + yield TriggerEvent({"status": EventStatus.SKIPPED}) # type: ignore[no-untyped-call] return elif is_dbt_node_status_failed(dbt_node_status): logger.warning("dbt node '%s' failed", self.model_unique_id) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 2ddb71acba..46ed0a97f3 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -64,7 +64,7 @@ def _default_freshness_callback( """Return unique_ids of all nodes that transitively depend on a stale source, plus the status ``"skip"``. Stale sources are those with ``status`` of ``"error"`` or ``"warn"`` in ``sources_json["results"]``. - Traversal is BFS over the reverse-dependency graph built from ``nodes``. + Traversal is DFS over the reverse-dependency graph built from ``nodes``. """ if not nodes or not sources_json: return [], "skip" From 8536e61e69f097c7ecf836ad3f6f722a3e3fceff Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 14:59:32 -0300 Subject: [PATCH 05/26] Sort model names in --exclude for deterministic command output _skipped_node_token used a set for model_names, making the resulting --exclude value non-deterministic across runs. Sort the set before joining to ensure stable command strings and predictable logging. Co-Authored-By: Claude Opus 4.6 (1M context) --- cosmos/operators/watcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 46ed0a97f3..539131b484 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -264,7 +264,7 @@ def _skipped_node_token(self, context: Context, node_unique_ids: list[str]) -> N ) self._push_skipped_xcom_for_model(ti, unique_id) - model_names = {uid.rsplit(".", 1)[-1] for uid in node_unique_ids} + model_names = sorted({uid.rsplit(".", 1)[-1] for uid in node_unique_ids}) current_exclude = getattr(self, "exclude", None) exclude_str = " ".join(model_names) From 806cb764b3afade3fb7fc607a63540a9a00abc17 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 15:01:27 -0300 Subject: [PATCH 06/26] Document _check_source_freshness in producer docstring Add documentation for the source freshness feature to the DbtProducerWatcherOperator class docstring, referencing the actual private kwarg name _check_source_freshness and how it is set automatically by _add_watcher_producer_task. Co-Authored-By: Claude Opus 4.6 (1M context) --- cosmos/operators/watcher.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 539131b484..0a2a0c9756 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -132,6 +132,12 @@ class DbtProducerWatcherOperator(DbtBuildMixin, DbtLocalBaseOperator): As each ``NodeFinished`` event arrives the operator pushes the per-model status to XCom under key ``_status`` so downstream sensors can react without waiting for the full build to complete. + + When the private kwarg ``_check_source_freshness`` is ``True`` (set automatically by + ``_add_watcher_producer_task`` when ``SourceRenderingBehavior`` is not ``NONE``), the + producer first runs ``dbt source freshness``, identifies stale sources, marks all + transitive dependents as ``"skipped"`` via XCom, and adds them to ``--exclude`` before + running the main ``dbt build``. """ template_fields = DbtLocalBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator] From 0895c55739c57a724e925c9f0ae9521df20f67c0 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 15:05:06 -0300 Subject: [PATCH 07/26] Use == instead of is for enum comparison in graph.py Use != instead of 'is not' for SourceRenderingBehavior enum comparison, consistent with existing checks in the same file. Co-Authored-By: Claude Opus 4.6 (1M context) --- cosmos/airflow/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index b1b023fcf6..458ebbe485 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -687,7 +687,7 @@ def _add_watcher_producer_task( producer_task_args = task_args.copy() if tests_per_model is not None: producer_task_args["tests_per_model"] = tests_per_model - if render_config is not None and render_config.source_rendering_behavior is not SourceRenderingBehavior.NONE: + if render_config is not None and render_config.source_rendering_behavior != SourceRenderingBehavior.NONE: producer_task_args["_check_source_freshness"] = True if render_config is not None: From a0f745cfd78f5a0e6807cf6a1c76a5795f552dcc Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 15:08:09 -0300 Subject: [PATCH 08/26] Fall through to error handling when sources.json is missing When _check_source_freshness is set but dbt failed to write target/sources.json, fall through to the normal error-handling path (handle_exception) instead of silently returning with no freshness data. Co-Authored-By: Claude Opus 4.6 (1M context) --- cosmos/operators/local.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index fac88b45ff..ccf6938ddf 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -690,7 +690,8 @@ def run_command( # noqa: C901 ) if context.get("_check_source_freshness"): self._sources_json = _read_target_sources_json(tmp_dir_path) - return result + if self._sources_json is not None: + return result if is_openlineage_common_available: self.calculate_openlineage_events_completes(env, tmp_dir_path) if AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION: From 84fb1667cbfa9952995d34db63b02e691b1e7bc1 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 15:09:38 -0300 Subject: [PATCH 09/26] Parse resource names correctly for versioned dbt models Use unique_id.split('.', 2)[2] (matching DbtNode.resource_name) instead of rsplit('.', 1)[-1] when building the --exclude list. The old logic would extract only 'v1' from 'model.pkg.name.v1', causing the exclude selector to miss the actual model. Co-Authored-By: Claude Opus 4.6 (1M context) --- cosmos/operators/watcher.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 0a2a0c9756..c69532f7b9 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -270,7 +270,9 @@ def _skipped_node_token(self, context: Context, node_unique_ids: list[str]) -> N ) self._push_skipped_xcom_for_model(ti, unique_id) - model_names = sorted({uid.rsplit(".", 1)[-1] for uid in node_unique_ids}) + # Use the same parsing as DbtNode.resource_name: unique_id.split(".", 2)[2] + # This preserves version suffixes (e.g. model.pkg.my_model.v1 -> my_model.v1) + model_names = sorted({uid.split(".", 2)[2] for uid in node_unique_ids if len(uid.split(".", 2)) == 3}) current_exclude = getattr(self, "exclude", None) exclude_str = " ".join(model_names) From bb288c091e807d83e78dd93a1e601df600ba7b6a Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 15:22:09 -0300 Subject: [PATCH 10/26] Use full graph for source freshness dependency traversal Use dbt_graph.nodes instead of dbt_graph.filtered_nodes for the dependency traversal in _apply_source_freshness(). filtered_nodes may exclude intermediate upstream nodes, causing the DFS to miss transitive relationships when a selected model depends on an unselected upstream node that depends on a stale source. The callback already filters the final skip list to actionable resource types (model/seed/snapshot), so using the full graph for traversal is safe. Co-Authored-By: Claude Opus 4.6 (1M context) --- cosmos/operators/watcher.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index c69532f7b9..13f43ec330 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -288,14 +288,16 @@ def _apply_source_freshness(self, context: Context) -> None: task_group = getattr(context.get("task_instance"), "task", None) task_group = getattr(task_group, "task_group", None) - # Resolve graph nodes from the DbtDag or DbtTaskGroup converter, falling back gracefully. + # Use the full graph (nodes) for dependency traversal so intermediate unselected + # nodes don't break transitive relationships. The callback intersects the result + # with rendered resource types so only actionable nodes are returned. nodes = None if dag is not None: dbt_graph = getattr(dag, "dbt_graph", None) - nodes = getattr(dbt_graph, "filtered_nodes", None) + nodes = getattr(dbt_graph, "nodes", None) if nodes is None and task_group is not None: tg_dbt_graph = getattr(task_group, "dbt_graph", None) - nodes = getattr(tg_dbt_graph, "filtered_nodes", None) + nodes = getattr(tg_dbt_graph, "nodes", None) node_ids_to_skip, _ = self._freshness_callback(context, dag, task_group, nodes, self._sources_json) self._skipped_node_token(context, node_ids_to_skip) From b2d7baa2b3b20b9c7e6dbcb31c2e859b7ce1a11a Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 21:59:05 -0300 Subject: [PATCH 11/26] Source tasks as consumer sensors, push freshness to XCom - Producer pushes per-source freshness status to XCom after running dbt source freshness, so source consumer sensors can read results instead of re-running freshness independently. - DbtSourceWatcherOperator now extends BaseConsumerSensor, following the same pattern as model/seed/snapshot watcher operators. - Clear --full-refresh and --indirect-selection for source freshness command (unsupported flags), preserve user select/exclude. - Add "warn" to DBT_SUCCESS_STATUSES (source freshness warning is non-fatal). - Improve poke/trigger logging: show actual XCom key being polled. Co-Authored-By: Claude Opus 4.6 (1M context) --- cosmos/operators/_watcher/base.py | 23 +++++++++++++-- cosmos/operators/_watcher/state.py | 2 +- cosmos/operators/_watcher/triggerer.py | 7 ++++- cosmos/operators/watcher.py | 39 ++++++++++++++++++++++---- 4 files changed, 61 insertions(+), 10 deletions(-) diff --git a/cosmos/operators/_watcher/base.py b/cosmos/operators/_watcher/base.py index fe9328c0a4..7a7ebc3cce 100644 --- a/cosmos/operators/_watcher/base.py +++ b/cosmos/operators/_watcher/base.py @@ -409,6 +409,17 @@ def _execute_core(self, context: Context) -> None: if not self.deferrable: super().execute(context) elif not self.poke(context): + if self.is_test_sensor: + xcom_key = get_tests_status_xcom_key(self.model_unique_id) + else: + xcom_key = f"{self.model_unique_id.replace('.', '__')}_status" + logger.info( + "Deferring %s '%s'. The trigger will poll XCom key '%s' from producer task '%s'.", + self._resource_label.lower(), + self.model_unique_id, + xcom_key, + self.producer_task_id, + ) self.defer( trigger=WatcherTrigger( model_unique_id=self.model_unique_id, @@ -498,8 +509,9 @@ def _get_node_status(self, ti: Any, context: Context) -> Any: """ if self.is_test_sensor: xcom_key = get_tests_status_xcom_key(self.model_unique_id) - return get_xcom_val(ti, self.producer_task_id, xcom_key) - return get_xcom_val(ti, self.producer_task_id, f"{self.model_unique_id.replace('.', '__')}_status") + else: + xcom_key = f"{self.model_unique_id.replace('.', '__')}_status" + return get_xcom_val(ti, self.producer_task_id, xcom_key) def _cache_compiled_sql(self, ti: Any, context: Context) -> None: """Pull compiled_sql from XCom and cache it on the sensor instance.""" @@ -519,11 +531,16 @@ def poke(self, context: Context) -> bool: ti = context["ti"] try_number = ti.try_number + if self.is_test_sensor: + xcom_key = get_tests_status_xcom_key(self.model_unique_id) + else: + xcom_key = f"{self.model_unique_id.replace('.', '__')}_status" logger.info( - "Try number #%s, poke attempt #%s: Pulling status from task_id '%s' for %s '%s'", + "Try number #%s, poke attempt #%s: Pulling status from task_id '%s' via XCom key '%s' for %s '%s'", try_number, self.poke_retry_number, self.producer_task_id, + xcom_key, self._resource_label.lower(), self.model_unique_id, ) diff --git a/cosmos/operators/_watcher/state.py b/cosmos/operators/_watcher/state.py index 88dc679141..27d3b488b0 100644 --- a/cosmos/operators/_watcher/state.py +++ b/cosmos/operators/_watcher/state.py @@ -21,7 +21,7 @@ ProducerStateFetcher = Callable[[], str | None] # dbt uses different status values for different node types (models/tests):" -DBT_SUCCESS_STATUSES = frozenset({"success", "pass"}) +DBT_SUCCESS_STATUSES = frozenset({"success", "pass", "warn"}) DBT_FAILED_STATUSES = frozenset({"failed", "fail", "error", "runtime error"}) DBT_SKIPPED_STATUSES = frozenset({"skipped"}) diff --git a/cosmos/operators/_watcher/triggerer.py b/cosmos/operators/_watcher/triggerer.py index 2d3ba0334d..f1fa21fe79 100644 --- a/cosmos/operators/_watcher/triggerer.py +++ b/cosmos/operators/_watcher/triggerer.py @@ -248,4 +248,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # Sleep briefly before re-polling await asyncio.sleep(self.poke_interval) - logger.debug("Polling again for node '%s' status...", self.model_unique_id) + logger.info( + "Polling again for node '%s': status=%s, producer_state=%s", + self.model_unique_id, + dbt_node_status, + producer_task_state, + ) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 13f43ec330..dd01c9cc0f 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -243,18 +243,26 @@ def _push_skipped_xcom_for_model(self, ti: Any, unique_id: str) -> None: safe_xcom_push(task_instance=ti, key=f"{uid_key}_status", value="skipped") def _run_source_freshness(self, context: Context) -> None: - """Run ``dbt source freshness`` via ``build_cmd`` and ``run_command``.""" + """Run ``dbt source freshness`` via ``build_cmd`` and ``run_command``. + + Temporarily overrides operator attributes that carry flags unsupported by + ``dbt source freshness`` (``--full-refresh``, ``--indirect-selection``). + User-supplied ``--select``/``--exclude`` are preserved. + """ original_base_cmd = self.base_cmd original_indirect_selection = getattr(self, "indirect_selection", None) + original_full_refresh = getattr(self, "full_refresh", None) try: self.base_cmd = ["source", "freshness"] self.indirect_selection = None # ``dbt source freshness`` does not support --indirect-selection + self.full_refresh = False # type: ignore[assignment] # ``dbt source freshness`` does not support --full-refresh full_cmd, env = self.build_cmd(context=context, cmd_flags=self.add_cmd_flags()) context["_check_source_freshness"] = True # type: ignore[typeddict-unknown-key] self.run_command(cmd=full_cmd, env=env, context=context) finally: self.base_cmd = original_base_cmd self.indirect_selection = original_indirect_selection + self.full_refresh = original_full_refresh # type: ignore[assignment] context.pop("_check_source_freshness", None) # type: ignore[typeddict-item] def _skipped_node_token(self, context: Context, node_unique_ids: list[str]) -> None: @@ -281,9 +289,25 @@ def _skipped_node_token(self, context: Context, node_unique_ids: list[str]) -> N else: self.exclude = exclude_str + def _push_source_freshness_results(self, context: Context) -> None: + """Push per-source freshness status to XCom so source consumer sensors can read it.""" + if not self._sources_json: + return + ti = context["ti"] + for result in self._sources_json.get("results", []): + unique_id = result.get("unique_id") + status = result.get("status") + if unique_id and status: + uid_key = unique_id.replace(".", "__") + safe_xcom_push(task_instance=ti, key=f"{uid_key}_status", value=status) + def _apply_source_freshness(self, context: Context) -> None: """Run source freshness, invoke the callback, and mark affected nodes as skipped.""" self._run_source_freshness(context) + + # Push per-source freshness results so source consumer sensors can read them + self._push_source_freshness_results(context) + dag = context.get("dag") task_group = getattr(context.get("task_instance"), "task", None) task_group = getattr(task_group, "task_group", None) @@ -383,12 +407,17 @@ class DbtSnapshotWatcherOperator(DbtSnapshotMixin, DbtConsumerWatcherSensor): # template_fields: tuple[str, ...] = DbtConsumerWatcherSensor.template_fields -class DbtSourceWatcherOperator(DbtSourceLocalOperator): - """ - Executes a dbt source freshness command, synchronously, as ExecutionMode.LOCAL. +class DbtSourceWatcherOperator(BaseConsumerSensor, DbtSourceLocalOperator): # type: ignore[misc] + """Watches for source freshness results from the producer task. + + When the producer has ``_check_source_freshness`` enabled it runs + ``dbt source freshness`` and pushes per-source status to XCom. + This sensor reads that status. On retry (or when the producer did + not provide a result) it falls back to running ``dbt source freshness`` + locally for its specific source. """ - template_fields: Sequence[str] = DbtSourceLocalOperator.template_fields # type: ignore[assignment] + template_fields: tuple[str, ...] = BaseConsumerSensor.template_fields + DbtSourceLocalOperator.template_fields # type: ignore[operator] class DbtRunWatcherOperator(DbtConsumerWatcherSensor): From 42a0635e87b36dc82ecb9ab97066b8e543de7699 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 22:34:38 -0300 Subject: [PATCH 12/26] Remove multibyte model from altered_jaffle_shop Airflow 3 SDK cannot handle non-ASCII characters in XCom key paths (UnicodeEncodeError in the WSGI/ASGI bridge). Remove the multibyte model to unblock the source rendering DAG. A dedicated dbt project for multibyte character tests will be created later. Co-Authored-By: Claude Opus 4.6 (1M context) --- ...275\211\357\275\202\357\275\231\357\275\224\357\275\205.sql" | 2 -- 1 file changed, 2 deletions(-) delete mode 100644 "dev/dags/dbt/altered_jaffle_shop/models/\357\275\215\357\275\225\357\275\214\357\275\224\357\275\211\357\275\202\357\275\231\357\275\224\357\275\205.sql" diff --git "a/dev/dags/dbt/altered_jaffle_shop/models/\357\275\215\357\275\225\357\275\214\357\275\224\357\275\211\357\275\202\357\275\231\357\275\224\357\275\205.sql" "b/dev/dags/dbt/altered_jaffle_shop/models/\357\275\215\357\275\225\357\275\214\357\275\224\357\275\211\357\275\202\357\275\231\357\275\224\357\275\205.sql" deleted file mode 100644 index b017c8d0c0..0000000000 --- "a/dev/dags/dbt/altered_jaffle_shop/models/\357\275\215\357\275\225\357\275\214\357\275\224\357\275\211\357\275\202\357\275\231\357\275\224\357\275\205.sql" +++ /dev/null @@ -1,2 +0,0 @@ -select - 'TEST_FOR_MULTIBYTE_CHARCTERS' From 0f6a246ab226918b88463d6ab1efaafe52af3396 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 22:35:36 -0300 Subject: [PATCH 13/26] Add an example source DAG with watcher --- dev/dags/example_source_rendering.py | 38 +++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/dev/dags/example_source_rendering.py b/dev/dags/example_source_rendering.py index c080e0f948..c6f950bd95 100644 --- a/dev/dags/example_source_rendering.py +++ b/dev/dags/example_source_rendering.py @@ -6,8 +6,8 @@ from datetime import datetime from pathlib import Path -from cosmos import DbtDag, ProfileConfig, ProjectConfig, RenderConfig -from cosmos.constants import SourceRenderingBehavior +from cosmos import DbtDag, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig +from cosmos.constants import ExecutionMode, SourceRenderingBehavior, TestBehavior from cosmos.profiles import PostgresUserPasswordProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" @@ -25,16 +25,19 @@ # [START cosmos_source_node_example] + +operator_args = { + "install_deps": True, # install any necessary dependencies before running any dbt command + "full_refresh": True, # used only in dbt commands that support this flag +} + source_rendering_dag = DbtDag( # dbt/cosmos-specific parameters project_config=ProjectConfig( DBT_ROOT_PATH / "altered_jaffle_shop", ), profile_config=profile_config, - operator_args={ - "install_deps": True, # install any necessary dependencies before running any dbt command - "full_refresh": True, # used only in dbt commands that support this flag - }, + operator_args=operator_args, render_config=RenderConfig(source_rendering_behavior=SourceRenderingBehavior.ALL), # normal dag parameters schedule="@daily", @@ -45,3 +48,26 @@ on_warning_callback=lambda context: print(context), ) # [END cosmos_source_node_example] + + +# Currently airflow dags test ignores priority_weight and weight_rule, for this reason, we're setting the following in the CI only: +if os.getenv("CI"): + operator_args["trigger_rule"] = "all_success" + +watcher_source_rendering_dag = DbtDag( + # dbt/cosmos-specific parameters + project_config=ProjectConfig( + DBT_ROOT_PATH / "altered_jaffle_shop", + ), + execution_config=ExecutionConfig(execution_mode=ExecutionMode.WATCHER), + profile_config=profile_config, + operator_args=operator_args, + render_config=RenderConfig(test_behavior=TestBehavior.NONE, source_rendering_behavior=SourceRenderingBehavior.ALL), + # normal dag parameters + schedule="@daily", + start_date=datetime(2024, 1, 1), + catchup=False, + dag_id="watcher_source_rendering_dag", + default_args={"retries": 0}, + on_warning_callback=lambda context: print(context), +) From 914a11ae684316d836ffb7599850c7d562ab2880 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 22:36:02 -0300 Subject: [PATCH 14/26] Run example_source_rendering.py in AF3 --- tests/test_example_dags.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 16ebcbbd2c..76f596c374 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -67,13 +67,6 @@ def get_dag_bag() -> DagBag: # noqa: C901 # This DAG is taking too long to run int the CI (https://github.com/astronomer/astronomer-cosmos/actions/runs/21902660682/job/63234728594) file.writelines("example_cosmos_python_models.py\n") - # Disabling these DAGs temporarily due to an Airflow 3 bug on processing DatasetAlias that contain non-ASCII characters: - # https://github.com/apache/airflow/issues/51566 - # https://github.com/astronomer/astronomer-cosmos/issues/1802 - if AIRFLOW_VERSION >= Version("3.0.0"): - file.writelines("example_source_rendering.py\n") - file.writelines("basic_cosmos_task_group_different_owners.py\n") - print(".airflowignore contents: ") print(AIRFLOW_IGNORE_FILE.read_text()) db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False) From 18083c2f18629653297b8bf6bba731e2d4f349e9 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 22:41:30 -0300 Subject: [PATCH 15/26] Fix tests for warn status, source operator, and multibyte removal - Move "warn" from non-success to success test params (now in DBT_SUCCESS_STATUSES for source freshness). - Update source watcher template_fields test to reflect it is now a consumer sensor with model_unique_id. - Update altered_jaffle_shop node count from 29 to 28 after multibyte model removal. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/dbt/test_graph.py | 2 +- tests/operators/_watcher/test_state.py | 8 ++++---- tests/operators/test_watcher.py | 15 +++++++++------ 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index db78a83510..0ed16a0acc 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1326,7 +1326,7 @@ def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate, po mock_popen_communicate.assert_called_once() -@pytest.mark.parametrize("project_name,nodes_count", [("altered_jaffle_shop", 29), ("jaffle_shop_python", 28)]) +@pytest.mark.parametrize("project_name,nodes_count", [("altered_jaffle_shop", 28), ("jaffle_shop_python", 28)]) def test_load_via_load_via_custom_parser(project_name, nodes_count): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name) diff --git a/tests/operators/_watcher/test_state.py b/tests/operators/_watcher/test_state.py index 4bc45cbfef..e027ae3744 100644 --- a/tests/operators/_watcher/test_state.py +++ b/tests/operators/_watcher/test_state.py @@ -18,11 +18,11 @@ class TestNodeStatusHelpers: """Tests for node status helper functions.""" - @pytest.mark.parametrize("status", ["success", "pass"]) + @pytest.mark.parametrize("status", ["success", "pass", "warn"]) def test_is_dbt_node_status_success_true(self, status: str): assert is_dbt_node_status_success(status) is True - @pytest.mark.parametrize("status", ["failed", "fail", "error", "skipped", "warn", None, ""]) + @pytest.mark.parametrize("status", ["failed", "fail", "error", "skipped", None, ""]) def test_is_dbt_node_status_success_false(self, status: str | None): assert is_dbt_node_status_success(status) is False @@ -42,11 +42,11 @@ def test_is_dbt_node_status_skipped_true(self, status: str): def test_is_dbt_node_status_skipped_false(self, status: str | None): assert is_dbt_node_status_skipped(status) is False - @pytest.mark.parametrize("status", ["success", "pass", "failed", "fail", "error", "skipped"]) + @pytest.mark.parametrize("status", ["success", "pass", "warn", "failed", "fail", "error", "skipped"]) def test_is_dbt_node_status_terminal_true(self, status: str): assert is_dbt_node_status_terminal(status) is True - @pytest.mark.parametrize("status", ["warn", "running", None, ""]) + @pytest.mark.parametrize("status", ["running", None, ""]) def test_is_dbt_node_status_terminal_false(self, status: str | None): assert is_dbt_node_status_terminal(status) is False diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index ad791ffd9f..fc4e4186ef 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -1826,16 +1826,19 @@ def test_sensor_and_producer_different_param_values(mock_bigquery_conn): def test_dbt_source_watcher_operator_template_fields(): - """Test that DbtSourceWatcherOperator doesn't include model_unique_id in template_fields.""" + """Test that DbtSourceWatcherOperator includes model_unique_id as a consumer sensor.""" + from cosmos.operators._watcher.base import BaseConsumerSensor from cosmos.operators.local import DbtSourceLocalOperator from cosmos.operators.watcher import DbtSourceWatcherOperator - # DbtSourceWatcherOperator should NOT have model_unique_id in template_fields - # because it runs locally and doesn't watch models, it executes source freshness - assert "model_unique_id" not in DbtSourceWatcherOperator.template_fields + # DbtSourceWatcherOperator is now a consumer sensor, so it should have model_unique_id + assert "model_unique_id" in DbtSourceWatcherOperator.template_fields - # DbtSourceWatcherOperator should inherit template_fields from DbtSourceLocalOperator - assert DbtSourceWatcherOperator.template_fields == DbtSourceLocalOperator.template_fields + # It should combine template_fields from both BaseConsumerSensor and DbtSourceLocalOperator + for field in BaseConsumerSensor.template_fields: + assert field in DbtSourceWatcherOperator.template_fields + for field in DbtSourceLocalOperator.template_fields: + assert field in DbtSourceWatcherOperator.template_fields class TestDbtTestWatcherOperator: From f22d9f7d4e013108f4c72bf9ac4b75e0998e99c4 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 22:43:49 -0300 Subject: [PATCH 16/26] Reduce source freshness interval to help testing --- dev/dags/dbt/altered_jaffle_shop/models/staging/sources.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev/dags/dbt/altered_jaffle_shop/models/staging/sources.yml b/dev/dags/dbt/altered_jaffle_shop/models/staging/sources.yml index 4c3a834262..8a9493ece7 100644 --- a/dev/dags/dbt/altered_jaffle_shop/models/staging/sources.yml +++ b/dev/dags/dbt/altered_jaffle_shop/models/staging/sources.yml @@ -27,6 +27,6 @@ sources: - not_null freshness: warn_after: - count: 3650 - period: day + count: 1 + period: minute loaded_at_field: CAST(order_date AS TIMESTAMP) From 26683820381fb3e06173913831b89fb83b956873 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 01:45:05 +0000 Subject: [PATCH 17/26] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/operators/watcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index dd01c9cc0f..4d568784b5 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -2,7 +2,7 @@ import contextlib import functools -from collections.abc import Callable, Sequence +from collections.abc import Callable from typing import TYPE_CHECKING, Any from airflow.exceptions import AirflowException From 4cabef4f6ebba581989d9e8922a0aa61f298fbb2 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 23:00:47 -0300 Subject: [PATCH 18/26] Fix altered_jaffle_shop node count in dbt_ls test (40 to 39) Multibyte model was removed, reducing the node count by one. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/dbt/test_graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 0ed16a0acc..8e0c31ff63 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -965,7 +965,7 @@ def test_load_via_dbt_ls_with_exclude(postgres_profile_config): @pytest.mark.integration @pytest.mark.parametrize( "project_dir,node_count", - [(DBT_PROJECTS_ROOT_DIR / ALTERED_DBT_PROJECT_NAME, 40), (DBT_PROJECTS_ROOT_DIR / "jaffle_shop_python", 28)], + [(DBT_PROJECTS_ROOT_DIR / ALTERED_DBT_PROJECT_NAME, 39), (DBT_PROJECTS_ROOT_DIR / "jaffle_shop_python", 28)], ) def test_load_via_dbt_ls_without_exclude(project_dir, node_count, postgres_profile_config): project_config = ProjectConfig(dbt_project_path=project_dir) From 77a1ae2049a3b9388cd3e436995bff4d71c0e0ff Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 23:02:30 -0300 Subject: [PATCH 19/26] Run handle_exception before early return in source freshness The early return for _check_source_freshness was bypassing handle_exception(result), silently ignoring dbt failures. Now check for errors before returning. Other post-exec steps (OpenLineage, datasets, partial parse) are intentionally skipped as they don't apply to source freshness. Co-Authored-By: Claude Opus 4.6 (1M context) --- cosmos/operators/local.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index ccf6938ddf..c0a4f2bcda 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -690,8 +690,8 @@ def run_command( # noqa: C901 ) if context.get("_check_source_freshness"): self._sources_json = _read_target_sources_json(tmp_dir_path) - if self._sources_json is not None: - return result + self.handle_exception(result) + return result if is_openlineage_common_available: self.calculate_openlineage_events_completes(env, tmp_dir_path) if AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION: From 19aad8ab5bb0034a9142d400a39934f04c08a661 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 23:04:40 -0300 Subject: [PATCH 20/26] Override _resource_label for DbtSourceWatcherOperator Without this, skip/failure messages would say "Model" instead of "Source" for source freshness sensors. Co-Authored-By: Claude Opus 4.6 (1M context) --- cosmos/operators/watcher.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 4d568784b5..14bb5963e9 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -419,6 +419,11 @@ class DbtSourceWatcherOperator(BaseConsumerSensor, DbtSourceLocalOperator): # t template_fields: tuple[str, ...] = BaseConsumerSensor.template_fields + DbtSourceLocalOperator.template_fields # type: ignore[operator] + @property + def _resource_label(self) -> str: + """Human-readable label for this sensor's dbt resource type.""" + return "Source" + class DbtRunWatcherOperator(DbtConsumerWatcherSensor): """ From 8e1a997f122150ece3a25a30e5b22510cf5388aa Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 23:06:08 -0300 Subject: [PATCH 21/26] Override fallback for DbtSourceWatcherOperator The base _fallback_to_non_watcher_run runs dbt run, which is wrong for source sensors. Override to run dbt source freshness with the correct source selector syntax on retry. Co-Authored-By: Claude Opus 4.6 (1M context) --- cosmos/operators/watcher.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 14bb5963e9..da15f342f7 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -424,6 +424,20 @@ def _resource_label(self) -> str: """Human-readable label for this sensor's dbt resource type.""" return "Source" + def _fallback_to_non_watcher_run(self, try_number: int, context: Context) -> bool: + """Run ``dbt source freshness`` locally for this specific source on retry.""" + logger.info( + "Retry attempt #%s – Running source freshness for '%s' from project '%s'", + try_number - 1, + self.model_unique_id, + self.project_dir, + ) + resource_name = self.model_unique_id.split(".", 2)[2] + cmd_flags = ["--select", f"source:{resource_name}"] + self.build_and_run_cmd(context, cmd_flags=cmd_flags) # type: ignore[attr-defined] + logger.info("dbt source freshness completed successfully on retry for source '%s'", self.model_unique_id) + return True + class DbtRunWatcherOperator(DbtConsumerWatcherSensor): """ From 9a1c720ec89852302d0cf930ebb960147d9142cd Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 23:07:04 -0300 Subject: [PATCH 22/26] Revert trigger poll log to DEBUG to reduce log noise INFO on every poll iteration is too noisy with many sensors. Revert to DEBUG; terminal state transitions still log at INFO/WARNING level. Co-Authored-By: Claude Opus 4.6 (1M context) --- cosmos/operators/_watcher/triggerer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/operators/_watcher/triggerer.py b/cosmos/operators/_watcher/triggerer.py index f1fa21fe79..0d71191491 100644 --- a/cosmos/operators/_watcher/triggerer.py +++ b/cosmos/operators/_watcher/triggerer.py @@ -248,7 +248,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # Sleep briefly before re-polling await asyncio.sleep(self.poke_interval) - logger.info( + logger.debug( "Polling again for node '%s': status=%s, producer_state=%s", self.model_unique_id, dbt_node_status, From c547eef15242f8a5d89540969e6c90e4a623f89f Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 31 Mar 2026 23:50:39 -0300 Subject: [PATCH 23/26] Add ordered integration test for watcher source rendering DAG The watcher source rendering DAG runs dbt source freshness before dbt build, so the source tables must already exist. Add a dedicated test that runs source_rendering_dag (LOCAL mode) first to load seeds, then watcher_source_rendering_dag. Skip multibyte dataset tests (model removed from altered_jaffle_shop). Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/operators/test_local.py | 2 ++ tests/test_example_dags.py | 15 ++++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 3f2291d026..60b013942d 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -748,6 +748,7 @@ def test_run_operator_dataset_emission_is_skipped(caplog): reason="We do not support emitting assets with Airflow 3.0 without dataset alias.", ) @pytest.mark.integration +@pytest.mark.skip(reason="Multibyte model removed from altered_jaffle_shop; will be restored in a dedicated project") @patch("cosmos.settings.enable_dataset_alias", 0) def test_run_operator_dataset_url_encoded_names_in_airflow2(caplog): try: @@ -785,6 +786,7 @@ def test_run_operator_dataset_url_encoded_names_in_airflow2(caplog): reason="We do not support emitting assets with Airflow 3.0 without dataset alias.", ) @pytest.mark.integration +@pytest.mark.skip(reason="Multibyte model removed from altered_jaffle_shop; will be restored in a dedicated project") @patch("cosmos.settings.use_dataset_airflow3_uri_standard", 1) @patch("cosmos.settings.enable_dataset_alias", 0) def test_run_operator_dataset_url_encoded_names_in_airflow2_with_airflow3_uri(caplog): diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 76f596c374..664e3ddbfd 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -20,6 +20,8 @@ AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore" DBT_VERSION = Version(get_dbt_version().to_version_string()[1:]) KUBERNETES_DAGS = ["jaffle_shop_kubernetes", "jaffle_shop_watcher_kubernetes"] +# DAGs that require seeds to be loaded first (run via dedicated ordered tests below) +DAGS_WITH_SEED_DEPENDENCY = ["watcher_source_rendering_dag"] IGNORED_DAG_FILES = [ "performance_dag.py", "jaffle_shop_kubernetes.py", @@ -121,11 +123,22 @@ def run_dag(dag_id: str): @pytest.mark.integration @pytest.mark.parametrize("dag_id", get_dag_ids()) def test_example_dag(session, dag_id: str): - if dag_id in KUBERNETES_DAGS: + if dag_id in KUBERNETES_DAGS or dag_id in DAGS_WITH_SEED_DEPENDENCY: return run_dag(dag_id) +@pytest.mark.skipif( + AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, + reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs", +) +@pytest.mark.integration +def test_watcher_source_rendering_dag(session): + """Run source_rendering_dag first to load seeds, then watcher_source_rendering_dag.""" + run_dag("source_rendering_dag") + run_dag("watcher_source_rendering_dag") + + @pytest.mark.skipif( AIRFLOW_VERSION >= Version("3.1.0") # TODO: Fix https://github.com/astronomer/astronomer-cosmos/issues/2045 or AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, From 0d94adef1f1fec14d59cd181510fa0f584999e0a Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 1 Apr 2026 05:51:09 -0300 Subject: [PATCH 24/26] Clear dbt_cmd_flags for source freshness command dbt source freshness does not support --resource-type flags that are added to dbt_cmd_flags when TestBehavior is NONE. Save and clear dbt_cmd_flags before running source freshness, restore them afterward. --select/--exclude are unaffected as they come from add_global_flags via separate attributes. Co-Authored-By: Claude Opus 4.6 (1M context) --- cosmos/operators/watcher.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index da15f342f7..30540a6591 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -246,16 +246,19 @@ def _run_source_freshness(self, context: Context) -> None: """Run ``dbt source freshness`` via ``build_cmd`` and ``run_command``. Temporarily overrides operator attributes that carry flags unsupported by - ``dbt source freshness`` (``--full-refresh``, ``--indirect-selection``). - User-supplied ``--select``/``--exclude`` are preserved. + ``dbt source freshness`` (``--full-refresh``, ``--indirect-selection``, + and build-specific ``dbt_cmd_flags`` such as ``--resource-type``). + ``--select``/``--exclude`` are unaffected (they come from ``add_global_flags``). """ original_base_cmd = self.base_cmd original_indirect_selection = getattr(self, "indirect_selection", None) original_full_refresh = getattr(self, "full_refresh", None) + original_dbt_cmd_flags = self.dbt_cmd_flags try: self.base_cmd = ["source", "freshness"] self.indirect_selection = None # ``dbt source freshness`` does not support --indirect-selection self.full_refresh = False # type: ignore[assignment] # ``dbt source freshness`` does not support --full-refresh + self.dbt_cmd_flags = [] # clear build-specific flags (e.g. --resource-type) full_cmd, env = self.build_cmd(context=context, cmd_flags=self.add_cmd_flags()) context["_check_source_freshness"] = True # type: ignore[typeddict-unknown-key] self.run_command(cmd=full_cmd, env=env, context=context) @@ -263,6 +266,7 @@ def _run_source_freshness(self, context: Context) -> None: self.base_cmd = original_base_cmd self.indirect_selection = original_indirect_selection self.full_refresh = original_full_refresh # type: ignore[assignment] + self.dbt_cmd_flags = original_dbt_cmd_flags context.pop("_check_source_freshness", None) # type: ignore[typeddict-item] def _skipped_node_token(self, context: Context, node_unique_ids: list[str]) -> None: From 1cc45c3da51e2621464e934d534dce46bcd52cf9 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 1 Apr 2026 07:47:10 -0300 Subject: [PATCH 25/26] Update dev/dags/example_source_rendering.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- dev/dags/example_source_rendering.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/dags/example_source_rendering.py b/dev/dags/example_source_rendering.py index c6f950bd95..e41e635957 100644 --- a/dev/dags/example_source_rendering.py +++ b/dev/dags/example_source_rendering.py @@ -50,7 +50,7 @@ # [END cosmos_source_node_example] -# Currently airflow dags test ignores priority_weight and weight_rule, for this reason, we're setting the following in the CI only: +# Airflow DAG tests currently ignore priority_weight and weight_rule, so in CI we explicitly set trigger_rule: if os.getenv("CI"): operator_args["trigger_rule"] = "all_success" From 270ad194511d69b6ac800ef56b37487b2558fbbd Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 1 Apr 2026 07:48:46 -0300 Subject: [PATCH 26/26] Fix _make_producer to test actual default for _check_source_freshness Remove the default=True from the helper so tests that call _make_producer() without arguments validate the operator's real default (False). Add explicit test for the default value. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/operators/test_watcher.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index fc4e4186ef..09ef0f774b 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -2031,26 +2031,25 @@ def test_excludes_test_nodes(self): class TestProducerSourceFreshness: """Tests for source freshness methods on DbtProducerWatcherOperator.""" - def _make_producer(self, check_source_freshness=True, **kwargs): + def _make_producer(self, **kwargs): from airflow import DAG with DAG(dag_id="test_freshness_dag", start_date=datetime(2023, 1, 1)): producer = DbtProducerWatcherOperator( project_dir=str(DBT_PROJECT_PATH), profile_config=profile_config, - _check_source_freshness=check_source_freshness, **kwargs, ) return producer + def test_init_check_source_freshness_defaults_to_false(self): + producer = self._make_producer() + assert producer._check_source_freshness is False + def test_init_stores_check_source_freshness_flag(self): - producer = self._make_producer(check_source_freshness=True) + producer = self._make_producer(_check_source_freshness=True) assert producer._check_source_freshness is True - def test_init_default_check_source_freshness_is_false(self): - producer = self._make_producer(check_source_freshness=False) - assert producer._check_source_freshness is False - def test_push_skipped_xcom_for_model(self): producer = self._make_producer() ti = MagicMock()