Add DbtProducerWatcherOperator for the proposed ExecutionMode.WATCHER#1982
Conversation
✅ Deploy Preview for sunny-pastelito-5ecb04 canceled.
|
8ef18f8 to
2adcea1
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #1982 +/- ##
==========================================
+ Coverage 97.67% 97.77% +0.10%
==========================================
Files 87 87
Lines 5371 5443 +72
==========================================
+ Hits 5246 5322 +76
+ Misses 125 121 -4 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
041bcfa to
fb9e7cb
Compare
There was a problem hiding this comment.
Pull Request Overview
This PR introduces a new DbtBuildCoordinatorOperator that enables real-time monitoring of dbt model execution status through XCom for the proposed ExecutionMode.WATCHER. The operator provides two execution paths: streaming mode using dbtRunner events for real-time per-model status updates, and fallback mode that pushes compressed run results after completion.
Key changes:
- New
DbtBuildCoordinatorOperatorclass with streaming and fallback execution modes - Support for pushing compressed run results to XCom via new
push_run_results_to_xcomparameter - Enhanced local execution operators to support XCom-based result sharing
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| cosmos/operators/watcher.py | Implements the main DbtBuildCoordinatorOperator with event streaming and XCom coordination |
| cosmos/operators/local.py | Adds _push_run_results_to_xcom helper method and push_run_results_to_xcom parameter support |
| cosmos/operators/virtualenv.py | Updates run_command method to pass through the new push_run_results_to_xcom parameter |
| tests/operators/test_watcher.py | Comprehensive test suite for the new operator functionality |
Comments suppressed due to low confidence (1)
cosmos/operators/local.py:1
- Remove commented-out code. If this alternative implementation is needed for future reference, document the reasoning or move it to documentation.
from __future__ import annotations
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
tatiana
left a comment
There was a problem hiding this comment.
Looks great, @pankajkoti , thanks a lot for the great work and addressing the feedback.
I'm happy for us to merge this PR once the follow up ticket is logged.
Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>
Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>
Introduce a new high-performance execution mode, named `ExecutionMode.WATCHER`, following the implementation of the producer and consumer operators in PRs #1982, #1993 and #1998. Initial performance analysis indicates that this mode will reduce the total DAG Run time to execute dbt pipelines in Airflow to 1/5 of the original time. For example, if a Cosmos `DbtDag` takes 5 minutes to run with the default `ExecutionMode.LOCAL`, it will now run in 1 minute with the new `ExecutionMode.WATCHER`. In the near future, there will also be benefits related to CPU and memory utilisation, as users will be able to run the producer task on a more powerful node with increased CPU and memory resources. In comparison, the consumer nodes can have less CPU and memory. Further development (#1972 and #1973), testing, and analysis are needed to evaluate this. # Context As of Cosmos 1.10, when users leverage the default `ExecutionMode.LOCAL`, each dbt model becomes an Airflow run task, and dbt is invoked in each of those tasks. We noticed that the cost to run the same pipeline with plain dbt core varies significantly by running: - the whole dbt command using a single command - running one dbt command per model For example, for the https://github.com/google/fhir-dbt-analytics project, these numbers were, on average, 5 minutes and 30 seconds (by running a single `dbt run` for the whole pipeline) versus 32 minutes (when using 184 `dbt run` commands as illustrated in https://gist.github.com/tatiana/c7831173ab09bf05d88839fb0b557920). Similar to the [`ExecutionMode.AIRFLOW_ASYNC`](https://astronomer.github.io/astronomer-cosmos/getting_started/async-execution-mode.html), this mode aims to reduce the number of times the dbt command is invoked, while still allowing users to have observability of the dbt workflow via the Airflow UI and being able to retry individual tasks. # Overall solution * Use existing Cosmos DAG rendering techniques - implemented in this PR * Have a single Airlfow task to run "all the pipeline" (selected by the user) - implemented in #1982 * Use dbt Core callbacks https://docs.getdbt.com/reference/programmatic-invocations#registering-callbacks to track how the model's execution is progressing and update different Xcoms (one Xcom per model) - implemented in #1982 * All the other tasks, by default, should watch their designated Xcom - implemented in #1998 and used in this PR This proposal follows up on a successful internal PoC (astronomer/oss-integrations-private#185), available in the branch https://github.com/astronomer/astronomer-cosmos/tree/single-run-execution-mode. # Benefits An initial performance analysis by @pankajkoti showed promising results: | Experiment | Number of threads | Execution time (s) | |---------------------------------------------------------------|-------------------|--------------------| | dbt build | 4 | 6 - 7 | | dbt run for each of model locally | | 30 | | Cosmos default ExecutionMode.LOCAL in Astro CLI locally | | 10 - 15 | | Cosmos proposed ExecutionMode.WATCHER in Astro CLI locally | 1 | 26 | | | 2 | 14 | | | 4 | 7 | | | 8 | 4 | | | 16 | 2 | | The ExecutionMode.WATCHER in Airflow with an Astro deployment | 8 | 5 | # Example of usage Example of DAG topology, with the producer task preceding the others. <img width="1624" height="1056" alt="Screenshot 2025-10-06 at 17 34 53" src="https://github.com/user-attachments/assets/54d3290a-297d-417b-a255-6bb376e7d055" /> The dbt root nodes are set with `trigger_rule` `always`, so they start sensing once the producer begins. <img width="1624" height="1056" alt="Screenshot 2025-10-06 at 17 44 32" src="https://github.com/user-attachments/assets/81d8f27c-adb1-47e7-ba99-1a103a32b35e" /> Producer task runs dbt Core, as shown on the logs: <img width="1624" height="1056" alt="Screenshot 2025-10-06 at 17 47 59" src="https://github.com/user-attachments/assets/10588abf-77c9-4e71-9c6b-1161f18d4bcf" /> Consumer task senses XCom, waiting for producer to finish running dbt: <img width="1624" height="1056" alt="Screenshot 2025-10-06 at 17 48 12" src="https://github.com/user-attachments/assets/4e5dd334-1e68-4d25-a733-ba4460766eb0" /> Evidence that producer is running concurrently to the dbt root nodes sensing: <img width="1624" height="1056" alt="Screenshot 2025-10-06 at 17 46 55" src="https://github.com/user-attachments/assets/1220046a-e686-4bbe-b88b-d020e0e6e2f6" /> # Related tickets Closes #1964 Closes #1959 (*) (*) I ended up implementing this while trying to enforce the producer task to run before the consumer tasks when running `airflow dags test`. Outside of the scope of this PR: - Documentation (this will be added as part of #245) - We are not implementing support for the following operators in the `ExecutionMode.WATCHER` mode: - LS - Run operation - Docs - Clone Since it does not make sense to have them, we can review them later. There are many other tasks related to this execution mode that can be tracked by searching issues using `label:execution:watcher`: https://github.com/astronomer/astronomer-cosmos/issues?q=is%3Aissue%20state%3Aopen%20label%3Aexecution%3Awatcher # Why is this PR still in draft? Pending: - Understand and fix the watcher task is hanging when running integration tests for [some of our tests](https://github.com/astronomer/astronomer-cosmos/actions/runs/18220340583/job/51878734681) - Add more tests # Credits The idea for this approach appeared in a discussion with @ashb. The implementation of this feature is the result of teamwork with @pankajastro and @pankajkoti, both directly and indirectly involvement via PoC and previous PRs: - Co-authored-by: Pankaj Koti <pankaj.koti@astronomer.io> - Co-authored-by: Pankaj Singh <pankaj.singh@astronomer.io>
**Features** * Introduce ``ExecutionMode.WATCHER`` to reduce DAG run time by 1/5 in several PRs. Learn more about it [here](https://astronomer.github.io/astronomer-cosmos/getting_started/watcher-execution-mode.html#watcher-execution-mode). This feature was implemented via multiple PRs, including: * Expose new execution mode by @tatiana @pankajastro @pankajkoti in #1999 * Add ``DbtProducerWatcherOperator`` for the proposed ``ExecutionMode.WATCHER`` by @pankajkoti in #1982 * Add ``DbtConsumerWatcherSensor`` for the proposed ``ExecutionMode.WATCHER`` by @pankajastro in #1998 * Push producer's task completion status to XCOM by @pankajkoti in #2000 * Add default priority_weight for ``DbtProducerWatcherOperator`` by @pankajkoti in #1995 * Add sample dbt events for the dbt watcher execution mode by @pankajkoti in #1952 * Add ``compiled_sql`` as a template fields on ```ExecutionMode.WATCHER``` when using ``run_results.json`` by @pankajastro in #2070 * Set ``push_run_results_to_xcom`` kwargs correctly for invocation mode subprocess and Watcher mode by @pankajastro in #2067 * Store compiled SQL as template field for dbt callback events in ``ExecutionMode.WATCHER`` by @pankajkoti in #2068 * Add initial documentation for ``ExecutionMode.WATCHER`` by @tatiana in #2046 * Support running ``State.UPSTREAM_FAILED`` tasks when WATCHER consumer upstream tasks fail by @tatiana in #2062 * Fail sensor tasks immediately if the ``ExecutionMode.WATCHER`` producer task fails by @pankajastro in #2040 * Add ``WATCHER``` to GitHub issue template by @tatiana in #2056 * Add support for ``TestBehavior.AFTER_ALL`` with ``ExecutionMode.WATCHER`` by @pankajastro in #2049 * Add support for ``TestBehavior.NONE`` with ``ExecutionMode.WATCHER`` by @pankajastro in #2047 * Fix ``ExecutionMode.WATCHER`` behaviour with ``DbtTaskGroup`` by @tatiana in #2044 * Fix Cosmos behaviour when using watcher with ``InvocationMode.DBT_RUNNER`` by @tatiana in #2048 * Add Airflow 3 plugin for dbt docs with multiple dbt projects support by @pankajkoti in #2009, check the [documentation](https://astronomer.github.io/astronomer-cosmos/configuration/hosting-docs.html). * Initial support to ``dbt Fusion`` by @tatiana in #1803. More details [here](https://astronomer.github.io/astronomer-cosmos/configuration/dbt-fusion). * Support to prune sources without downstream references in dbt projects by @corsettigyg in #1988 * Allow to set task display name as a user-defined function by @corsettigyg in #1761 * Add dbt project's hash to dag docs to support dag versioning in Airflow 3 by @pankajkoti in #1907 * feat: Add Jinja templating support for ``dbt_cmd_flags`` by @skillicinski in #1899 * Add Scarf metric to collect the execution mode uses by @pankajastro in #1981 * Support Airflow 3.1 by @tatiana in #1980 * Add MySQL profile mapping by @Lee2532 in #1977 * Add sqlserver profile mapping by @pankajastro in #1737 **Enhancement** * Use XCom to store sql when using ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajastro in #1934 * Refactor ``AIRFLOW_ASYNC`` teardown so it doesn't install the virtualenv by @pankajastro in #1938 * Reuse the virtual env for ``AIRFLOW_ASYNC`` setup task by @pankajastro in #1939 * Improve dataset/asset experience in Cosmos by @tatiana in #2030 * Add ``downstreams`` to ``DbtNode`` by @wornjs in #2028 **Bug fixes** * Fix tags extraction by @ms32035 in #1915 * Fix task flow operator args by @anyapriya in #2024 **Documentation** * Add documentation for Airflow 3 Plugin supporting dbt docs for multiple dbt projects by @pankajkoti in #2063 * Add Cosmos Deferrable Operator Guide by @pankajastro in #1922 * Add dbt Fusion documentation by @tatiana in #1824 #1830 * Update dbt-fusion.rst to explicitly highlight it is in alpha by @tatiana in #1838 * Fix a bunch of docs build errors and warnings by @pankajkoti in #1886 * Add docs note for param virtualenv_dir for async execution mode by @pankajastro in #1969 * Use pepy.tech downloads badge in README by @pankajkoti in #1920 * Correct the default value of ``cache_dir`` by @seokyun.ha in #2027 **Others** * Promote @corsettigyg to committer by @tatiana in #1985 * Add @pankajkoti and @pankajastro to ``contributors.rst`` by @tatiana in #1983 * Update setup script for airflow3 script by @dwreeves in #2023 * Prevent pytest from trying to test classes that aren't actually tests by @anyapriya in #2032 * Fix ``dag.test()`` for Airflow 3.1+ by syncing DAG to database bby @kaxil in #2037 * Disable Scarf in CI by @pankajastro in #2016 * Fix failing dbt Fusion tests when run in parallel in CI by @pankajkoti in #1896 * Fix MyPy issues related to ``ObjectStoragePath`` in main branch by @tatiana in #2012 * Cleanup example dbt event JSON dictionaries kept for XCOM referencby @pankajkoti in #1997 * Bump min hatch version that includes fixes for click>=8.3.0 by @pankajkoti in #1996 * Use official postgres image from Docker hub for kubernetes setup by @pankajkoti in #1986 * Use click<8.3.0 for hatch as click 8.3 breaks hatch by @pankajkoti in #1987 * Pin Airflow version in type check CI job by @pankajastro in #2003 * Improve comments after feedback on #1948 by @tatiana in #1963 * Fix running tests with dbt Fusion 2.0.0 preview versions by @tatiana in #1948 * Test hardening of dbt node having tags as unset or missing by @pankajkoti in #1918 * Fix Sphinx issue in the main branch by @tatiana in #2064 * pre-commit autoupdate in #2065, #2043, #2033, #2019, #1990, #2019, #2008, #1941, #1935, #1924 * GitHub dependabot update in #2051, #2050, #2038, #2022, #1947, #1955, #1946, #1944, #1945, #1928, #1921, #1917 Co-authored-by: Pankaj Koti <pankaj.koti@astronomer.io> Co-authored-by: Pankaj Singh <pankaj.singh@astronomer.io> Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
This PR introduces the
DbtProducerWatcherOperatorincosmos/operators/watcher.pyfor use with the proposedExecutionMode.WATCHER.The operator triggers a single dbt build and streams real-time per-model
run statuses via dbtRunner events, pushing keys such as
nodefinished_<uid>,aggregated
dbt_startup_events, and falling back to pushingrun_resultsfrom the target directory to XCom when dbtRunner is unavailable.
Additionally, local execution has been updated with a
_push_run_results_to_xcomhelper and apush_run_results_to_xcomflag,enabling gzip+base64–compressed run-results to be stored in XCom for
fallback support.
closes: #1958
closes: https://github.com/astronomer/oss-integrations-private/issues/238