Skip to content

Add DbtModelStatusSensor for the proposed ExecutionMode.WATCHER#1993

Closed
pankajastro wants to merge 19 commits into
mainfrom
1971-add-models-sensor
Closed

Add DbtModelStatusSensor for the proposed ExecutionMode.WATCHER#1993
pankajastro wants to merge 19 commits into
mainfrom
1971-add-models-sensor

Conversation

@pankajastro
Copy link
Copy Markdown
Contributor

@pankajastro pankajastro commented Sep 23, 2025

This PR add DbtModelStatusSensor for the proposed ExecutionMode.WATCHER.

Introduced a new DbtModelStatusSensor class that extends BaseSensorOperator and reuses DbtRunLocalOperator functionality to monitor and handle the status of a dbt model execution triggered by a coordinating task.

  • Added DbtModelStatusSensor:
  • Pulls XComs from the upstream master_task_id to check dbt model status.
  • Handles task retries by reconstructing and re-running dbt commands with sanitized flags.
  • Implemented _filter_flags helper to exclude incompatible flags like --select or --exclude.
  • Decodes and decompresses dbt event messages from XComs to inspect model status.
  • Logs detailed information for debugging and visibility during execution and retries.
  • Raises AirflowException if the model finishes with a non-success status.

Airflow task to test with jaffle-shop

sensor = DbtModelStatusSensor(
        task_id="jaffle_shop__customers",
        model_unique_id="model.jaffle_shop.customers",
        master_task_id="dbt_build",
        poke_interval=1,
        profile_config=bigquery_db,
        project_dir=DBT_PROFILES_DIR,

    )

closes: #1971

@pankajastro pankajastro marked this pull request as ready for review September 23, 2025 13:50
return super().execute(context=context, **kwargs)


class DbtModelStatusSensor(BaseSensorOperator, DbtRunLocalOperator):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency, we probably want to rename it to:

Suggested change
class DbtModelStatusSensor(BaseSensorOperator, DbtRunLocalOperator):
class DbtConsumerWatcherOperator(BaseSensorOperator, DbtRunLocalOperator):

Base automatically changed from 1958-watcher-build-coordinator-task to main September 25, 2025 09:36
@netlify
Copy link
Copy Markdown

netlify Bot commented Sep 25, 2025

Deploy Preview for sunny-pastelito-5ecb04 canceled.

Name Link
🔨 Latest commit a66d12e
🔍 Latest deploy log https://app.netlify.com/projects/sunny-pastelito-5ecb04/deploys/68d51be9f81647000873147b

@pankajastro pankajastro marked this pull request as draft September 25, 2025 10:37
@pankajastro
Copy link
Copy Markdown
Contributor Author

closing in favour of: #1998

tatiana added a commit that referenced this pull request Oct 6, 2025
Introduce a new high-performance execution mode, named
`ExecutionMode.WATCHER`, following the implementation of the producer
and consumer operators in PRs #1982, #1993 and #1998.

Initial performance analysis indicates that this mode will reduce the
total DAG Run time to execute dbt pipelines in Airflow to 1/5 of the
original time. For example, if a Cosmos `DbtDag` takes 5 minutes to run
with the default `ExecutionMode.LOCAL`, it will now run in 1 minute with
the new `ExecutionMode.WATCHER`.

In the near future, there will also be benefits related to CPU and
memory utilisation, as users will be able to run the producer task on a
more powerful node with increased CPU and memory resources. In
comparison, the consumer nodes can have less CPU and memory. Further
development (#1972 and #1973), testing, and analysis are needed to
evaluate this.

# Context

As of Cosmos 1.10, when users leverage the default
`ExecutionMode.LOCAL`, each dbt model becomes an Airflow run task, and
dbt is invoked in each of those tasks.

We noticed that the cost to run the same pipeline with plain dbt core
varies significantly by running:
- the whole dbt command using a single command
- running one dbt command per model

For example, for the https://github.com/google/fhir-dbt-analytics
project, these numbers were, on average, 5 minutes and 30 seconds (by
running a single `dbt run` for the whole pipeline) versus 32 minutes
(when using 184 `dbt run` commands as illustrated in
https://gist.github.com/tatiana/c7831173ab09bf05d88839fb0b557920).

Similar to the
[`ExecutionMode.AIRFLOW_ASYNC`](https://astronomer.github.io/astronomer-cosmos/getting_started/async-execution-mode.html),
this mode aims to reduce the number of times the dbt command is invoked,
while still allowing users to have observability of the dbt workflow via
the Airflow UI and being able to retry individual tasks.

# Overall solution

* Use existing Cosmos DAG rendering techniques - implemented in this PR
* Have a single Airlfow task to run "all the pipeline" (selected by the
user) - implemented in #1982
* Use dbt Core callbacks
https://docs.getdbt.com/reference/programmatic-invocations#registering-callbacks
to track how the model's execution is progressing and update different
Xcoms (one Xcom per model) - implemented in #1982
* All the other tasks, by default, should watch their designated Xcom -
implemented in #1998 and used in this PR

This proposal follows up on a successful internal PoC
(astronomer/oss-integrations-private#185),
available in the branch
https://github.com/astronomer/astronomer-cosmos/tree/single-run-execution-mode.

# Benefits

An initial performance analysis by @pankajkoti showed promising results:

| Experiment | Number of threads | Execution time (s) |

|---------------------------------------------------------------|-------------------|--------------------|
| dbt build | 4 | 6 - 7 |
| dbt run for each of model locally | | 30 |
| Cosmos default ExecutionMode.LOCAL in Astro CLI locally | | 10 - 15 |
| Cosmos proposed ExecutionMode.WATCHER in Astro CLI locally | 1 | 26 |
| | 2 | 14 |
| | 4 | 7 |
| | 8 | 4 |
| | 16 | 2 |
| The ExecutionMode.WATCHER in Airflow with an Astro deployment | 8 | 5
|

# Example of usage

Example of DAG topology, with the producer task preceding the others.
<img width="1624" height="1056" alt="Screenshot 2025-10-06 at 17 34 53"
src="https://github.com/user-attachments/assets/54d3290a-297d-417b-a255-6bb376e7d055"
/>

The dbt root nodes are set with `trigger_rule` `always`, so they start
sensing once the producer begins.
<img width="1624" height="1056" alt="Screenshot 2025-10-06 at 17 44 32"
src="https://github.com/user-attachments/assets/81d8f27c-adb1-47e7-ba99-1a103a32b35e"
/>

Producer task runs dbt Core, as shown on the logs:
<img width="1624" height="1056" alt="Screenshot 2025-10-06 at 17 47 59"
src="https://github.com/user-attachments/assets/10588abf-77c9-4e71-9c6b-1161f18d4bcf"
/>

Consumer task senses XCom, waiting for producer to finish running dbt:
<img width="1624" height="1056" alt="Screenshot 2025-10-06 at 17 48 12"
src="https://github.com/user-attachments/assets/4e5dd334-1e68-4d25-a733-ba4460766eb0"
/>

Evidence that producer is running concurrently to the dbt root nodes
sensing:
<img width="1624" height="1056" alt="Screenshot 2025-10-06 at 17 46 55"
src="https://github.com/user-attachments/assets/1220046a-e686-4bbe-b88b-d020e0e6e2f6"
/>



# Related tickets

Closes #1964
Closes #1959 (*)

(*) I ended up implementing this while trying to enforce the producer
task to run before the consumer tasks when running `airflow dags test`.

Outside of the scope of this PR:
- Documentation (this will be added as part of #245)
- We are not implementing support for the following operators in the
`ExecutionMode.WATCHER` mode:
  - LS
  - Run operation
  - Docs
  - Clone
Since it does not make sense to have them, we can review them later.

There are many other tasks related to this execution mode that can be
tracked by searching issues using `label:execution:watcher`:

https://github.com/astronomer/astronomer-cosmos/issues?q=is%3Aissue%20state%3Aopen%20label%3Aexecution%3Awatcher

# Why is this PR still in draft?

Pending:
- Understand and fix the watcher task is hanging when running
integration tests for [some of our
tests](https://github.com/astronomer/astronomer-cosmos/actions/runs/18220340583/job/51878734681)
- Add more tests 

# Credits

The idea for this approach appeared in a discussion with @ashb.

The implementation of this feature is the result of teamwork with
@pankajastro and @pankajkoti, both directly and indirectly involvement
via PoC and previous PRs:

- Co-authored-by: Pankaj Koti <pankaj.koti@astronomer.io>
- Co-authored-by: Pankaj Singh <pankaj.singh@astronomer.io>
tatiana added a commit that referenced this pull request Oct 28, 2025
Introduce the documentaiton for the recently introduced high-performance
execution mode, named ExecutionMode.WATCHER, following the
implementation in PRs #1982, #1993 and #1998. #1999.

Closes: #1965
Closes: astronomer/oss-integrations-private#245
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add Watcher task for Execution Mode Watcher to monitor models status

3 participants