Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
Changelog
=========

1.13.0a2 (2026-01-22)
---------------------

(Includes more changes)

Features

- Introduce ``ExecutionMode.WATCHER_KUBERNETES`` to use watcher with ``KubernetesPodOperator`` by @tatiana in #2207

Bug Fixes

- Fix running empty models or ephemeral nodes in ``ExecutionMode.WATCHER`` by @tatiana in #2279


1.12.1 (2026-01-14)
----------------------
-------------------

Bug Fixes

Expand Down
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from cosmos import settings

__version__ = "1.13.0a1"
__version__ = "1.13.0a2"


if not settings.enable_memory_optimised_imports:
Expand Down
14 changes: 12 additions & 2 deletions cosmos/operators/_watcher/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,10 @@ def _get_status_from_run_results(self, ti: Any, context: Context) -> Any:
node_result = next((r for r in results if r.get("unique_id") == self.model_unique_id), None)

if not node_result: # pragma: no cover
logger.warning("No matching result found for unique_id '%s'", self.model_unique_id)
logger.warning(
"The dbt node with unique_id '%s' was not executed by the dbt command run in the producer task. This may happen if it is an ephemeral model or if the model sql file is empty.",
self.model_unique_id,
)
return None

logger.info("Node Info: %s", run_results_json)
Expand Down Expand Up @@ -198,10 +201,17 @@ def execute(self, context: Context, **kwargs: Any) -> None:

def execute_complete(self, context: Context, event: dict[str, str]) -> None:
status = event.get("status")
reason = event.get("reason")

if status == "success" and reason == "model_not_run":
logger.info(
"Model '%s' was skipped by the dbt command. This may happen if it is an ephemeral model or if the model sql file is empty.",
self.model_unique_id,
)

if status != "failed":
return

reason = event.get("reason")
if reason == "model_failed":
raise AirflowException(
f"dbt model '{self.model_unique_id}' failed. Review the producer task '{self.producer_task_id}' logs for details."
Expand Down
26 changes: 19 additions & 7 deletions cosmos/operators/_watcher/triggerer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
from packaging.version import Version

from cosmos.constants import AIRFLOW_VERSION
from cosmos.log import get_logger
from cosmos.operators._watcher.state import build_producer_state_fetcher

logger = get_logger(__name__)


class WatcherTrigger(BaseTrigger):

Expand Down Expand Up @@ -82,7 +85,7 @@ def _get_xcom_val() -> Any | None:
return await sync_to_async(_get_xcom_val)()

async def get_xcom_val(self, key: str) -> Any | None:
self.log.info(
logger.info(
"Trying to retrieve value using XCom key <%s> by task_id <%s>, dag_id <%s>, run_id <%s> and map_index <%s>",
key,
self.producer_task_id,
Expand Down Expand Up @@ -120,38 +123,47 @@ async def _get_producer_task_status(self) -> str | None:
dag_id=self.dag_id,
run_id=self.run_id,
producer_task_id=self.producer_task_id,
logger=self.log,
logger=logger,
)
if fetch_state is None:
return None

return await sync_to_async(fetch_state)()

async def run(self) -> AsyncIterator[TriggerEvent]:
self.log.info("Starting WatcherTrigger for model: %s", self.model_unique_id)
logger.info("Starting WatcherTrigger for model: %s", self.model_unique_id)

while True:
producer_task_state = await self._get_producer_task_status()
node_status = await self._parse_node_status()
if node_status == "success":
self.log.info("Model '%s' succeeded", self.model_unique_id)
logger.info("Model '%s' succeeded", self.model_unique_id)
yield TriggerEvent({"status": "success"}) # type: ignore[no-untyped-call]
return
elif node_status == "failed":
self.log.warning("Model '%s' failed", self.model_unique_id)
logger.warning("Model '%s' failed", self.model_unique_id)
yield TriggerEvent({"status": "failed", "reason": "model_failed"}) # type: ignore[no-untyped-call]
return
elif producer_task_state == "failed":
self.log.error(
logger.error(
"Watcher producer task '%s' failed before delivering results for model '%s'",
self.producer_task_id,
self.model_unique_id,
)
yield TriggerEvent({"status": "failed", "reason": "producer_failed"}) # type: ignore[no-untyped-call]
return
elif producer_task_state == "success" and node_status is None:
logger.info(
"The producer task '%s' succeeded. There is no information about the model '%s' execution.",
self.producer_task_id,
self.model_unique_id,
)
yield TriggerEvent({"status": "success", "reason": "model_not_run"}) # type: ignore[no-untyped-call]
return

# Sleep briefly before re-polling
await asyncio.sleep(self.poke_interval)
self.log.debug("Polling again for model '%s' status...", self.model_unique_id)
logger.debug("Polling again for model '%s' status...", self.model_unique_id)


def _parse_compressed_xcom(compressed_b64_event_msg: str) -> Any:
Expand Down
22 changes: 22 additions & 0 deletions tests/operators/_watcher/test_triggerer.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ async def test_get_xcom_val_branches(self, airflow_version, expected_val):
("success", "running", {"status": "success"}),
("failed", "running", {"status": "failed", "reason": "model_failed"}),
(None, "failed", {"status": "failed", "reason": "producer_failed"}),
(None, "success", {"status": "success", "reason": "model_not_run"}),
],
)
async def test_run_various_outcomes(self, node_status, producer_state, expected):
Expand Down Expand Up @@ -205,6 +206,27 @@ def _import_side_effect(name: str, *args, **kwargs):

assert state is None

@pytest.mark.asyncio
async def test_run_producer_success_model_not_run(self, caplog):
"""Test that when producer succeeds but model has no status, trigger yields success with model_not_run reason."""
get_producer_status_mock = AsyncMock(return_value="success")
parse_node_status_mock = AsyncMock(return_value=None)

caplog.set_level("INFO")

with (
patch.object(self.trigger, "_get_producer_task_status", get_producer_status_mock),
patch.object(self.trigger, "_parse_node_status", parse_node_status_mock),
):
events = []
async for event in self.trigger.run():
events.append(event)

assert len(events) == 1
assert events[0].payload == {"status": "success", "reason": "model_not_run"}
assert "The producer task 'task_1' succeeded" in caplog.text
assert "There is no information about the model 'model.test' execution" in caplog.text

@pytest.mark.asyncio
async def test_run_poke_interval_and_debug_log(self, caplog):
get_xcom_val_mock = AsyncMock(side_effect=["compressed_data"])
Expand Down
116 changes: 115 additions & 1 deletion tests/operators/test_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

DBT_PROJECT_PATH = Path(__file__).parent.parent.parent / "dev/dags/dbt/jaffle_shop"
DBT_PROFILES_YAML_FILEPATH = DBT_PROJECT_PATH / "profiles.yml"

DBT_PROJECT_WITH_EMPTY_MODEL_PATH = Path(__file__).parent.parent / "sample/dbt_project_with_empty_model"

project_config = ProjectConfig(
dbt_project_path=DBT_PROJECT_PATH,
Expand Down Expand Up @@ -184,6 +184,31 @@ def test_handle_startup_event():
assert lst and lst[0]["name"] == "MainReportVersion"


def test_dbt_consumer_watcher_sensor_execute_complete_model_not_run_logs_message(caplog):
"""Test that execute_complete logs an info message when model was skipped (model_not_run)."""
sensor = DbtConsumerWatcherSensor(
project_dir=".",
profiles_dir=".",
profile_config=profile_config,
model_unique_id="model.pkg.skipped_model",
poke_interval=1,
producer_task_id="dbt_producer_watcher_operator",
task_id="consumer_sensor",
)
sensor.model_unique_id = "model.pkg.skipped_model"

context = {"dag_run": MagicMock()}
event = {"status": "success", "reason": "model_not_run"}

with caplog.at_level(logging.INFO):
sensor.execute_complete(context, event)

assert any(
"Model 'model.pkg.skipped_model' was skipped by the dbt command" in message for message in caplog.messages
)
assert any("ephemeral model or if the model sql file is empty" in message for message in caplog.messages)


def test_dbt_producer_watcher_operator_logs_retry_message(caplog):
op = DbtProducerWatcherOperator(project_dir=".", profile_config=None)
ti = _MockTI()
Expand Down Expand Up @@ -218,6 +243,7 @@ def test_dbt_producer_watcher_operator_skips_retry_attempt(caplog):
"event, expected_message",
[
({"status": "success"}, None),
({"status": "success", "reason": "model_not_run"}, None),
(
{"status": "failed", "reason": "model_failed"},
"dbt model 'model.pkg.m' failed. Review the producer task 'dbt_producer_watcher' logs for details.",
Expand Down Expand Up @@ -1041,6 +1067,94 @@ def test_dbt_dag_with_watcher():
}


# Airflow 3.0.0 hangs indefinitely, while Airflow 3.0.6 fails due to this Airflow bug:
# https://github.com/apache/airflow/issues/51816
conditions_to_skip = (AIRFLOW_VERSION < Version("2.8"), AIRFLOW_VERSION == Version("3.0"))


@pytest.mark.skipif(
conditions_to_skip,
reason="Airflow hangs in these versions when trying to fetch XCom from the triggerer when using dags.test()",
)
@pytest.mark.integration
def test_dbt_dag_with_watcher_and_empty_model(caplog):
"""
Run a DbtDag using `ExecutionMode.WATCHER` and a dbt project with an empty model. This was a situation observed by an Astronomer customer.
Confirm the right amount of tasks is created and that tasks are in the expected topological order.
Confirm that the producer watcher task is created and that it is the parent of the root dbt nodes.
"""
project_config = ProjectConfig(
dbt_project_path=DBT_PROJECT_WITH_EMPTY_MODEL_PATH,
)
# There are two dbt projects defined in this folder.
# When we run `dbt ls`, we can see this:
#
# 10:32:30 Found 2 models, 464 macros
# micro_dbt_project.add_row
# micro_dbt_project.empty_model
#
# However, during `dbt build`, dbt skips running the empty model, and only runs the add_row model:
#
# 10:29:03 Running with dbt=1.11.2
# 10:29:03 Registered adapter: postgres=1.10.0
# 10:29:03 Found 2 models, 464 macros
# 10:29:03
# 10:29:03 Concurrency: 4 threads (target='dev')
# 10:29:03
# 10:29:03 1 of 1 START sql view model public.add_row ..................................... [RUN]
# 10:29:03 1 of 1 OK created sql view model public.add_row ................................ [CREATE VIEW in 0.06s]
# 10:29:03
# 10:29:03 Finished running 1 view model in 0 hours 0 minutes and 0.19 seconds (0.19s).
# 10:29:03
# 10:29:03 Completed successfully
# 10:29:03
# 10:29:03 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 NO-OP=0 TOTAL=1

watcher_dag = DbtDag(
project_config=project_config,
profile_config=profile_config,
start_date=datetime(2023, 1, 1),
dag_id="watcher_dag",
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.WATCHER,
invocation_mode=InvocationMode.DBT_RUNNER,
),
render_config=RenderConfig(emit_datasets=False, test_behavior=TestBehavior.NONE),
operator_args={
"trigger_rule": "all_success",
"execution_timeout": timedelta(seconds=10),
},
dagrun_timeout=timedelta(seconds=30),
)
outcome = new_test_dag(watcher_dag)
assert outcome.state == DagRunState.SUCCESS

assert len(watcher_dag.dbt_graph.filtered_nodes) == 2

assert len(watcher_dag.task_dict) == 3
tasks_names = [task.task_id for task in watcher_dag.topological_sort()]
expected_task_names = [
"dbt_producer_watcher",
"add_row_run",
"empty_model_run",
]
assert tasks_names == expected_task_names

assert isinstance(watcher_dag.task_dict["dbt_producer_watcher"], DbtProducerWatcherOperator)
assert isinstance(watcher_dag.task_dict["add_row_run"], DbtRunWatcherOperator)
assert isinstance(watcher_dag.task_dict["empty_model_run"], DbtRunWatcherOperator)

assert watcher_dag.task_dict["dbt_producer_watcher"].downstream_task_ids == {
"add_row_run",
"empty_model_run",
}

assert "Total filtered nodes: 2" in caplog.text
assert "Finished running node model.micro_dbt_project.add_row" in caplog.text
assert "Finished running node model.micro_dbt_project.empty_model_run" not in caplog.text
Comment thread
tatiana marked this conversation as resolved.
assert "Model 'model.micro_dbt_project.empty_model' was skipped by the dbt command" in caplog.text


@pytest.mark.skipif(AIRFLOW_VERSION < Version("2.7"), reason="Airflow did not have dag.test() until the 2.6 release")
@pytest.mark.integration
def test_dbt_task_group_with_watcher():
Expand Down
7 changes: 7 additions & 0 deletions tests/sample/dbt_project_with_empty_model/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
name: micro_dbt_project
version: '1.0'
config-version: 2

profile: default

model-paths: ["models"]
12 changes: 12 additions & 0 deletions tests/sample/dbt_project_with_empty_model/models/add_row.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
with base as (
select
1 as id,
'original_row' as description

)

select * from base
union all
select
2 as id,
'added_row' as description
Empty file.
8 changes: 8 additions & 0 deletions tests/sample/dbt_project_with_empty_model/models/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: 2

models:
- name: add_row
description: "Model that selects data and adds an extra row"

- name: empty_model
description: "Empty model definition"
12 changes: 12 additions & 0 deletions tests/sample/dbt_project_with_empty_model/profiles.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
default:
target: dev
outputs:
dev:
type: postgres
host: "{{ env_var('POSTGRES_HOST') }}"
user: "{{ env_var('POSTGRES_USER') }}"
password: "{{ env_var('POSTGRES_PASSWORD') }}"
port: "{{ env_var('POSTGRES_PORT') | int }}"
dbname: "{{ env_var('POSTGRES_DB') }}"
schema: "{{ env_var('POSTGRES_SCHEMA') }}"
threads: 4