Skip to content

Fix running empty models or ephemeral nodes in ExecutionMode.WATCHER#2279

Merged
tatiana merged 16 commits into
mainfrom
fix-empty-model-issue
Jan 26, 2026
Merged

Fix running empty models or ephemeral nodes in ExecutionMode.WATCHER#2279
tatiana merged 16 commits into
mainfrom
fix-empty-model-issue

Conversation

@tatiana
Copy link
Copy Markdown
Collaborator

@tatiana tatiana commented Jan 22, 2026

Avoid consumer tasks that hang indefinitely when using ExecutionMode.WATCHER when the associated dbt models are either ephemeral or consist of empty SQL models that are not run by dbt.

Context

There are circumstances when there is a discrepant number of nodes in the output when we run dbt ls and dbt build, using the same selectors.

In the following example (tests/sample/dbt_project_with_empty_model), we can observe that dbt ls returned two models, while the dbt build returned a single one:

$ dbt ls
10:48:32  Running with dbt=1.11.2
10:48:32  Registered adapter: postgres=1.10.0
10:48:32  Unable to do partial parsing because saved manifest not found. Starting full parse.
10:48:32  Found 2 models, 464 macros
micro_dbt_project.add_row
micro_dbt_project.empty_model

$ dbt build
10:50:21  Running with dbt=1.11.2
10:50:21  Registered adapter: postgres=1.10.0
10:50:21  Found 2 models, 464 macros
10:50:21  
10:50:21  Concurrency: 4 threads (target='dev')
10:50:21  
10:50:21  1 of 1 START sql view model public.add_row ..................................... [RUN]
10:50:21  1 of 1 OK created sql view model public.add_row ................................ [CREATE VIEW in 0.06s]
10:50:21  
10:50:21  Finished running 1 view model in 0 hours 0 minutes and 0.20 seconds (0.20s).
10:50:21  
10:50:21  Completed successfully
10:50:21  
10:50:21  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 NO-OP=0 TOTAL=1

So far, we observed this happening in two scenarios:

  1. Ephemeral nodes ([Bug] Watcher mode sensor tasks timeout for ephemeral models without --debug #2266)
  2. If the dbt model is not executable (e.g. it is an empty SQL file), both the dbt build and the dbt run will not display it in their info logs.

Until Cosmos 1.12.1, Cosmos assumed these two commands would return the same number of nodes, and we implemented the LoadMode.MANIFEST assuming the same. In the case of ExecutionMode.LOCAL, this was not a big issue, because dbt does not run when we select the particular model it's excluding:

$  dbt build --select empty_model 
10:53:03  Running with dbt=1.11.2
10:53:03  Registered adapter: postgres=1.10.0
10:53:03  Found 2 models, 464 macros
10:53:03  Nothing to do. Try checking your model configs and model specification args

The downside in the case of ExecutionMode.LOCAL is that we waste Airflow resources by potentially parsing a dbt project that wouldn't need to be parsed in those particular tasks. The PR #1625 aims to address this.

However, in the case of ExecutionMode.WATCHER, this became a big problem, as the behaviour caused consumer nodes representing ephemeral nodes or empty models to hang indefinitely after the producer task completed successfully. The producer task was not aware of them and would not populate XCom, whereas the consumer tasks would keep checking for updates.

Closes: #2266
Closes: https://github.com/astronomer/oss-integrations-private/issues/315
Closes: https://astronomer.zendesk.com/agent/tickets/87180

About the solution

Ideally, probably, we would know upfront which nodes dbt build decides to execute, and we would not render them as Airflow tasks. However, I do not believe this is a simple problem, since there may be other circumstances when dbt build skips nodes from being executed - and any custom logic we implement in Cosmos will be affected by changes dbt Core/Fusion implements upstream.

Therefore, it feels - for now - the safest solution is:

  • Continue adding those nodes to Cosmos
  • Mark them as successful, logging a specific message, if they were not actually run by the dbt command. This is identified by checking the run_results.json file.

Copilot AI review requested due to automatic review settings January 22, 2026 11:01
@netlify
Copy link
Copy Markdown

netlify Bot commented Jan 22, 2026

Deploy Preview for astronomer-cosmos canceled.

Name Link
🔨 Latest commit 31300ff
🔍 Latest deploy log https://app.netlify.com/projects/astronomer-cosmos/deploys/69775c43cb03e000082e5496

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes an issue where consumer tasks in ExecutionMode.WATCHER would hang indefinitely when representing ephemeral or empty models that dbt skips during execution. The fix ensures these tasks complete successfully with appropriate logging instead of waiting forever for XCom data that will never arrive.

Changes:

  • Added detection logic to identify when dbt skips a model during execution
  • Enhanced logging to clarify when models are skipped due to being ephemeral or empty
  • Added comprehensive integration test with a dbt project containing an empty model

Reviewed changes

Copilot reviewed 7 out of 8 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
tests/sample/dbt_project_with_empty_model/profiles.yml New test project configuration for PostgreSQL connection
tests/sample/dbt_project_with_empty_model/models/schema.yml Schema definition for test models including an empty model
tests/sample/dbt_project_with_empty_model/models/add_row.sql SQL model that executes successfully to contrast with empty model
tests/sample/dbt_project_with_empty_model/dbt_project.yml dbt project configuration for the test case
tests/operators/test_watcher.py Integration test verifying correct handling of empty models in watcher mode
cosmos/operators/_watcher/triggerer.py Added logic to detect and handle models not executed by dbt
cosmos/operators/_watcher/base.py Enhanced logging and handling when models are skipped by dbt commands

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread tests/operators/test_watcher.py
Comment thread cosmos/operators/_watcher/base.py Outdated
@tatiana tatiana requested review from pankajastro and pankajkoti and removed request for pankajastro January 22, 2026 11:03
Comment thread cosmos/operators/_watcher/base.py Outdated
Copy link
Copy Markdown
Contributor

@pankajkoti pankajkoti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM if https://github.com/astronomer/astronomer-cosmos/pull/2279/changes#r2717497880 can be considered and CI is happy. Thanks for fixing this!

INFO     airflow.models.taskinstance:taskinstance.py:1138 Marking task as FAILED. dag_id=watcher_dag, task_id=empty_model_run, execution_date=20260123T122719, start_date=, end_date=20260123T122721
ERROR    cosmos.airflow.dag.DbtDag:dag.py:2881 Task failed; ti=<TaskInstance: watcher_dag.empty_model_run manual__2026-01-23T12:27:19.277635+00:00 [failed]>
Traceback (most recent call last):
  File "/Users/tatiana.alchueyr/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4lAAXoPP/tests.py3.10-2.8-1.11/lib/python3.10/site-packages/airflow/models/dag.py", line 2879, in test
    _run_task(ti=ti, inline_trigger=not triggerer_running, session=session)
  File "/Users/tatiana.alchueyr/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4lAAXoPP/tests.py3.10-2.8-1.11/lib/python3.10/site-packages/airflow/models/dag.py", line 4032, in _run_task
    ti._run_raw_task(session=session, raise_on_defer=inline_trigger)
  File "/Users/tatiana.alchueyr/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4lAAXoPP/tests.py3.10-2.8-1.11/lib/python3.10/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, **kwargs)
  File "/Users/tatiana.alchueyr/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4lAAXoPP/tests.py3.10-2.8-1.11/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2335, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode, session=session)
  File "/Users/tatiana.alchueyr/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4lAAXoPP/tests.py3.10-2.8-1.11/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2500, in _execute_task_with_callbacks
    result = self._execute_task(context, task_orig)
  File "/Users/tatiana.alchueyr/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4lAAXoPP/tests.py3.10-2.8-1.11/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2517, in _execute_task
    return _execute_task(self, context, task_orig)
  File "/Users/tatiana.alchueyr/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4lAAXoPP/tests.py3.10-2.8-1.11/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 418, in _execute_task
    task_to_execute.execution_timeout - (timezone.utcnow() - task_instance.start_date)
TypeError: unsupported operand type(s) for -: 'datetime.datetime' and 'NoneType'
Comment thread tests/operators/test_watcher.py Outdated
@codecov
Copy link
Copy Markdown

codecov Bot commented Jan 23, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 98.02%. Comparing base (bd015ec) to head (31300ff).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #2279   +/-   ##
=======================================
  Coverage   98.01%   98.02%           
=======================================
  Files         100      100           
  Lines        6412     6420    +8     
=======================================
+ Hits         6285     6293    +8     
  Misses        127      127           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@tatiana
Copy link
Copy Markdown
Collaborator Author

tatiana commented Jan 23, 2026

@tatiana tatiana merged commit 4134018 into main Jan 26, 2026
90 checks passed
@tatiana tatiana deleted the fix-empty-model-issue branch January 26, 2026 14:06
@Miskho
Copy link
Copy Markdown

Miskho commented Jan 27, 2026

Hi, thank you for your great work @tatiana.

However I am still currently encountering problems with this execution mode. I am experimenting on GCP Composer (composer-3-airflow-3.1.0-build.7) where I installed the latest astronomer-cosmos (1.13.0a2) version. My project is very small: 5 models, few MBs of data.

I am still continuously spammed by warning logs of format:

Triggerer's async thread was blocked for a while, likely due to the highly utilized environment....

and the number of blocking tasks keep increasing until the triggerer restarts. Everything is fine, until the DbtDag is scheduled again, until this behaviour keeps repeating.

Before these warnings and during the initial build of the project executed in the dbt_produced_watcher, I get few minutes of these error level logs

textPayload: "XCom not found dag_id=dbt run_id=scheduled__2026-01-27T10:00:00+00:00 task_id=dbt_producer_watcher key=nodefinished_model__redacted map_index=-1 detail={'detail': {'reason': 'not_found', 'message': "XCom with key='nodefinished_model__redacted' map_index=-1 not found for task 'dbt_producer_watcher' in DAG run 'scheduled__2026-01-27T10:00:00+00:00' of 'dbt'"}} status_code=404"

These logs occur only for the parent-less nodes (tasks). All other (dependent) models do not throw this error.
It seems that the XCom is not populated ahead of time for those first tasks, but is correctly afterwards? Is this possible?

Thank you very much.

EDIT: Honestly its seems that ephemeral problems might still be the problem, when i swithced the downstream empemeral materialisations to views. The warning messages are no longer present, although the error ones are still present even in that scenario.

@tatiana tatiana added this to the Cosmos 1.13.0 milestone Jan 29, 2026
@pankajastro pankajastro mentioned this pull request Jan 29, 2026
tatiana added a commit that referenced this pull request Jan 30, 2026
Features

* Support cross-referencing models across dbt projects using dbt-loom by
@pankajkoti in #2271
* Support use of YAML selectors when using ``LoadMode.DBT_MANIFEST`` by
@YourRoyalLinus in #2261
* Introduce ``ExecutionMode.WATCHER_KUBERNETES`` to use the watcher with
``KubernetesPodOperator`` by @tatiana in #2207
* Add support for StarRocks profile mapping by @kurkim0661 in #2256
* Allow pushing URIs as XComs for Cosmos tasks by @corsettigyg in #2275
* Support defining custom callbacks alongside the ``WATCHER_KUBERNETES``
callback by @johnhoran in #2307

Enhancements

* Refactor: remove duplicate ``_construct_dest_file_path`` by @jx2lee in
#2077
* Leverage Airflow ``::group::`` to group logs associated with DAG
parsing by @tatiana in #2235
* Refactor ``DbtConsumerWatcherSensor`` for reusability by @tatiana in
#2245
* Restore plain text output when using ``ExecutionMode.WATCHER`` by
@tiovader in #2241

Bug Fixes

* Fix running empty models or ephemeral nodes in
``ExecutionMode.WATCHER`` by @tatiana in #2279
* Improve watcher producer task priority in scheduling and the UI by
@tatiana in #2237
* Fix typos and formatting issues in documentation by @pankajkoti in
#2259
* Allow watcher producer retries without erroring by @tatiana in #2283
* Fix ``TestBehavior.AFTER_ALL`` is missing project_name information
when loading project using manifest file by @tuantran0910 in #2242
* Fix duplicate log lines in watcher subprocess execution and format
timestamps by @pankajkoti in #2301

Docs

* Add Watcher Kubernetes documentation by @tatiana in #2303
* Document newly added telemetry metrics in the privacy notice by
@pankajkoti in #2249
* Add compatibility policy document by @pankajastro in #2251
* Improve watcher documentation related to dbt threads by @tatiana in
#2273
* Fix link in watcher execution mode documentation by @jedcunningham in
#2277
* Update Apache Airflow minimum compatibility policy by @tatiana in
#2285
* Clarify Cosmos runtime support until "End of Basic Support" by
@jedcunningham in #2286
* Update watcher docs by @tatiana in #2298
* Update watcher kubernetes documentation by @tatiana in #2306

Others

* Add Airflow 3 DAG versioning tests for Cosmos by @michal-mrazek in
#2177
* Add dbt Core 1.11 to the test matrix by @tatiana in #2230
* Add integration tests using InvocationMode.SUBPROCESS and validate
output by @tatiana in #2287
* Fix main branch failing tests by @tatiana in #2296
* Update pre-commit hooks to the latest versions by @jedcunningham in
#2289
* Pre-commit autoupdates by @pre-commit in #2222, #2264, #2274 and #2290
* Dependabot updates by @dependabot in #2218, #2219, #2220, #2280 and
#2284
* Add Scarf metrics to understand Cosmos feature usage patterns
- Add telemetry tracking for dbt docs plugin usage by @pankajkoti in
#2240
- Add DAG run telemetry metrics for load mode, invocation, and
render_config parameters by @pankajkoti in #2223
  - Collect profile metrics for DAG runs by @pankajastro in #2228
- Compress telemetry metadata to reduce serialized DAG size by
@pankajkoti in #2252
- Skip storing telemetry metadata when emission is disabled by
@pankajkoti in #2278
- Hide telemetry metadata parameters from the Airflow trigger UI by
@pankajkoti in #2247

closes:
astronomer/oss-integrations-private#317

---------

Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Watcher mode sensor tasks timeout for ephemeral models without --debug

4 participants