Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b9cfc2d
Add source freshness aware execution for ExecutionMode.WATCHER
tatiana Mar 31, 2026
32c637b
Address review feedback on skip status handling
tatiana Mar 31, 2026
4fc12de
Resolve graph nodes from DbtDag/DbtTaskGroup converter
tatiana Mar 31, 2026
b32cb01
Use EventStatus.SKIPPED constant for skipped trigger events
tatiana Mar 31, 2026
8536e61
Sort model names in --exclude for deterministic command output
tatiana Mar 31, 2026
806cb76
Document _check_source_freshness in producer docstring
tatiana Mar 31, 2026
0895c55
Use == instead of is for enum comparison in graph.py
tatiana Mar 31, 2026
a0f745c
Fall through to error handling when sources.json is missing
tatiana Mar 31, 2026
84fb166
Parse resource names correctly for versioned dbt models
tatiana Mar 31, 2026
bb288c0
Use full graph for source freshness dependency traversal
tatiana Mar 31, 2026
e47a8c8
Merge branch 'main' into feature/watcher-source-freshness
tatiana Mar 31, 2026
b2d7baa
Source tasks as consumer sensors, push freshness to XCom
tatiana Apr 1, 2026
42a0635
Remove multibyte model from altered_jaffle_shop
tatiana Apr 1, 2026
0f6a246
Add an example source DAG with watcher
tatiana Apr 1, 2026
914a11a
Run example_source_rendering.py in AF3
tatiana Apr 1, 2026
18083c2
Fix tests for warn status, source operator, and multibyte removal
tatiana Apr 1, 2026
f22d9f7
Reduce source freshness interval to help testing
tatiana Apr 1, 2026
2668382
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 1, 2026
4cabef4
Fix altered_jaffle_shop node count in dbt_ls test (40 to 39)
tatiana Apr 1, 2026
77a1ae2
Run handle_exception before early return in source freshness
tatiana Apr 1, 2026
19aad8a
Override _resource_label for DbtSourceWatcherOperator
tatiana Apr 1, 2026
8e1a997
Override fallback for DbtSourceWatcherOperator
tatiana Apr 1, 2026
9a1c720
Revert trigger poll log to DEBUG to reduce log noise
tatiana Apr 1, 2026
c547eef
Add ordered integration test for watcher source rendering DAG
tatiana Apr 1, 2026
0d94ade
Clear dbt_cmd_flags for source freshness command
tatiana Apr 1, 2026
1cc45c3
Update dev/dags/example_source_rendering.py
tatiana Apr 1, 2026
270ad19
Fix _make_producer to test actual default for _check_source_freshness
tatiana Apr 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,8 @@ def _add_watcher_producer_task(
producer_task_args = task_args.copy()
if tests_per_model is not None:
producer_task_args["tests_per_model"] = tests_per_model
if render_config is not None and render_config.source_rendering_behavior != SourceRenderingBehavior.NONE:
producer_task_args["_check_source_freshness"] = True

if render_config is not None:
producer_task_args["select"] = _convert_list_to_str(render_config.select)
Expand Down
1 change: 1 addition & 0 deletions cosmos/listeners/dag_run_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
class EventStatus:
SUCCESS = "success"
FAILED = "failed"
SKIPPED = "skipped"


DAG_RUN = "dag_run"
Expand Down
54 changes: 43 additions & 11 deletions cosmos/operators/_watcher/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pathlib import Path
from typing import Any

from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, AirflowSkipException

from cosmos import settings
from cosmos.config import ProfileConfig
Expand All @@ -16,13 +16,15 @@
PRODUCER_WATCHER_TASK_ID,
WATCHER_TASK_WEIGHT_RULE,
)
from cosmos.listeners.dag_run_listener import EventStatus
from cosmos.log import get_logger
from cosmos.operators._watcher.aggregation import get_tests_status_xcom_key, push_test_result_or_aggregate
from cosmos.operators._watcher.state import (
_iso_to_string,
_log_dbt_event,
build_producer_state_fetcher,
get_xcom_val,
is_dbt_node_status_skipped,
is_dbt_node_status_success,
is_dbt_node_status_terminal,
safe_xcom_push,
Expand Down Expand Up @@ -407,6 +409,17 @@ def _execute_core(self, context: Context) -> None:
if not self.deferrable:
super().execute(context)
elif not self.poke(context):
if self.is_test_sensor:
xcom_key = get_tests_status_xcom_key(self.model_unique_id)
else:
xcom_key = f"{self.model_unique_id.replace('.', '__')}_status"
logger.info(
"Deferring %s '%s'. The trigger will poll XCom key '%s' from producer task '%s'.",
self._resource_label.lower(),
self.model_unique_id,
xcom_key,
self.producer_task_id,
)
self.defer(
trigger=WatcherTrigger(
model_unique_id=self.model_unique_id,
Expand Down Expand Up @@ -440,6 +453,11 @@ def execute_complete(self, context: Context, event: dict[str, str]) -> None:
status = event.get("status")
reason = event.get("reason")

if status == EventStatus.SKIPPED:
raise AirflowSkipException(
f"{self._resource_label} '{self.model_unique_id}' was skipped by the dbt command."
)

if status == "success" and reason == WatcherEventReason.NODE_NOT_RUN:
logger.info(
"%s '%s' was skipped by the dbt command. This may happen if it is an ephemeral model or if the model sql file is empty.",
Expand Down Expand Up @@ -491,8 +509,19 @@ def _get_node_status(self, ti: Any, context: Context) -> Any:
"""
if self.is_test_sensor:
xcom_key = get_tests_status_xcom_key(self.model_unique_id)
return get_xcom_val(ti, self.producer_task_id, xcom_key)
return get_xcom_val(ti, self.producer_task_id, f"{self.model_unique_id.replace('.', '__')}_status")
else:
xcom_key = f"{self.model_unique_id.replace('.', '__')}_status"
return get_xcom_val(ti, self.producer_task_id, xcom_key)

def _cache_compiled_sql(self, ti: Any, context: Context) -> None:
"""Pull compiled_sql from XCom and cache it on the sensor instance."""
compiled_sql = get_xcom_val(
ti, self.producer_task_id, f"{self.model_unique_id.replace('.', '__')}_compiled_sql"
)
if compiled_sql:
self.compiled_sql = compiled_sql
if hasattr(self, "_override_rtif"):
self._override_rtif(context)

def poke(self, context: Context) -> bool:
"""
Expand All @@ -502,11 +531,16 @@ def poke(self, context: Context) -> bool:
ti = context["ti"]
try_number = ti.try_number

if self.is_test_sensor:
xcom_key = get_tests_status_xcom_key(self.model_unique_id)
else:
xcom_key = f"{self.model_unique_id.replace('.', '__')}_status"
logger.info(
"Try number #%s, poke attempt #%s: Pulling status from task_id '%s' for %s '%s'",
"Try number #%s, poke attempt #%s: Pulling status from task_id '%s' via XCom key '%s' for %s '%s'",
try_number,
self.poke_retry_number,
self.producer_task_id,
xcom_key,
self._resource_label.lower(),
self.model_unique_id,
)
Expand All @@ -521,13 +555,7 @@ def poke(self, context: Context) -> bool:

# compiled_sql is always in the canonical per-model XCom key (same for event and subprocess modes)
if status is not None:
compiled_sql = get_xcom_val(
ti, self.producer_task_id, f"{self.model_unique_id.replace('.', '__')}_compiled_sql"
)
if compiled_sql:
self.compiled_sql = compiled_sql
if hasattr(self, "_override_rtif"):
self._override_rtif(context)
self._cache_compiled_sql(ti, context)

dbt_events = get_xcom_val(
task_instance=context["ti"],
Expand All @@ -550,6 +578,10 @@ def poke(self, context: Context) -> bool:
self.poke_retry_number += 1

return False
elif is_dbt_node_status_skipped(status):
raise AirflowSkipException(
f"{self._resource_label} '{self.model_unique_id}' was skipped by the dbt command."
)
elif is_dbt_node_status_success(status):
return True
else:
Expand Down
12 changes: 9 additions & 3 deletions cosmos/operators/_watcher/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
ProducerStateFetcher = Callable[[], str | None]

# dbt uses different status values for different node types (models/tests):"
DBT_SUCCESS_STATUSES = frozenset({"success", "pass"})
DBT_SUCCESS_STATUSES = frozenset({"success", "pass", "warn"})
DBT_FAILED_STATUSES = frozenset({"failed", "fail", "error", "runtime error"})
DBT_SKIPPED_STATUSES = frozenset({"skipped"})


class DbtTestStatus(str, Enum):
Expand All @@ -44,9 +45,14 @@ def is_dbt_node_status_failed(status: str | None) -> bool:
return status in DBT_FAILED_STATUSES


def is_dbt_node_status_skipped(status: str | None) -> bool:
"""Check if the dbt node status indicates it was skipped (e.g. stale upstream source, upstream failure)."""
return status in DBT_SKIPPED_STATUSES


def is_dbt_node_status_terminal(status: str | None) -> bool:
"""Check if the dbt node status is terminal (success or failed)."""
return is_dbt_node_status_success(status) or is_dbt_node_status_failed(status)
"""Check if the dbt node status is terminal (success, failed, or skipped)."""
return is_dbt_node_status_success(status) or is_dbt_node_status_failed(status) or is_dbt_node_status_skipped(status)
Comment thread
tatiana marked this conversation as resolved.


xcom_set_lock = Lock()
Expand Down
12 changes: 11 additions & 1 deletion cosmos/operators/_watcher/triggerer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
_log_dbt_event,
build_producer_state_fetcher,
is_dbt_node_status_failed,
is_dbt_node_status_skipped,
is_dbt_node_status_success,
is_dbt_node_status_terminal,
)
Expand Down Expand Up @@ -217,6 +218,10 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
event_data["compiled_sql"] = compiled_sql
yield TriggerEvent(event_data) # type: ignore[no-untyped-call]
return
elif is_dbt_node_status_skipped(dbt_node_status):
logger.info("dbt node '%s' was skipped", self.model_unique_id)
yield TriggerEvent({"status": EventStatus.SKIPPED}) # type: ignore[no-untyped-call]
return
elif is_dbt_node_status_failed(dbt_node_status):
logger.warning("dbt node '%s' failed", self.model_unique_id)
event_data = {"status": EventStatus.FAILED, "reason": WatcherEventReason.NODE_FAILED}
Expand All @@ -243,4 +248,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]:

# Sleep briefly before re-polling
await asyncio.sleep(self.poke_interval)
logger.debug("Polling again for node '%s' status...", self.model_unique_id)
logger.debug(
"Polling again for node '%s': status=%s, producer_state=%s",
self.model_unique_id,
dbt_node_status,
producer_task_state,
)
Comment thread
tatiana marked this conversation as resolved.
18 changes: 18 additions & 0 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,19 @@
logger = get_logger(__name__)


def _read_target_sources_json(project_root: Path) -> dict[str, Any] | None:
"""Parse ``target/sources.json`` under ``project_root`` if the file exists."""
path = project_root / "target" / "sources.json"
if not path.is_file():
return None
try:
result: dict[str, Any] = json.loads(path.read_text(encoding="utf-8"))
return result
except json.JSONDecodeError:
logger.warning("Could not parse JSON from %s", path)
return None


# The following is related to the ability of Cosmos parsing dbt artifacts and generating OpenLineage URIs
# It is used for emitting Airflow assets and not necessarily OpenLineage events
try:
Expand Down Expand Up @@ -204,6 +217,7 @@ def __init__(
self.callback_args = callback_args or {}
self.compiled_sql = ""
self.freshness = ""
self._sources_json: dict[str, Any] | None = None
self.should_store_compiled_sql = should_store_compiled_sql
self.should_upload_compiled_sql = should_upload_compiled_sql
self.openlineage_events_completes: list[RunEvent] = []
Expand Down Expand Up @@ -674,6 +688,10 @@ def run_command( # noqa: C901
cwd=tmp_project_dir,
context=context,
)
if context.get("_check_source_freshness"):
self._sources_json = _read_target_sources_json(tmp_dir_path)
self.handle_exception(result)
return result
if is_openlineage_common_available:
self.calculate_openlineage_events_completes(env, tmp_dir_path)
if AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION:
Expand Down
Loading
Loading