Introduce ExecutionMode.WATCHER to reduce DAG run time by 1/5#1999
Conversation
✅ Deploy Preview for sunny-pastelito-5ecb04 canceled.
|
pankajkoti
left a comment
There was a problem hiding this comment.
It's looking great to me. I don't have any additional comments at this point, apart from the render_config question for the producer task. Hope we can merge this soon! 🤞🏽
Commit squased from previous version of the implementation, before merging to main
- Managed to run ExecutionMode.WATCHER for modified basic_cosmos_dag DAG - Add Test nodes as EmptyOperators
fb9036a to
b7bcd12
Compare
465fbc1 to
b7bcd12
Compare
30bbba7 to
1f8950e
Compare
ExecutionMode.WATCHER to reduce DAG run time by 1//5ExecutionMode.WATCHER to reduce DAG run time by 1/5
pankajastro
left a comment
There was a problem hiding this comment.
Looks great, @tatiana!!
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #1999 +/- ##
==========================================
+ Coverage 97.81% 97.83% +0.01%
==========================================
Files 87 87
Lines 5542 5591 +49
==========================================
+ Hits 5421 5470 +49
Misses 121 121 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull Request Overview
This PR introduces ExecutionMode.WATCHER, a new high-performance execution mode for Cosmos that aims to reduce DAG run time by 80% (from 5 minutes to 1 minute) by using a producer-consumer pattern instead of running separate dbt commands for each model.
Key changes:
- Adds
ExecutionMode.WATCHERenum value andPRODUCER_WATCHER_TASK_IDconstant - Implements watcher operators (
DbtProducerWatcherOperator,DbtConsumerWatcherSensor, and specialized operators for different dbt commands) - Integrates watcher mode into the Airflow graph building process with producer task creation and dependency management
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| cosmos/constants.py | Adds WATCHER execution mode enum and producer task ID constant |
| cosmos/operators/watcher.py | Implements all watcher operators including producer, consumer sensor, and command-specific operators |
| cosmos/airflow/graph.py | Integrates watcher mode into graph building with producer task creation and dependency setup |
| dev/dags/example_watcher.py | Provides example DAG demonstrating watcher execution mode usage |
| tests/operators/test_watcher.py | Adds comprehensive integration test for watcher DAG functionality |
| tests/dbt/test_graph.py | Updates hash value for macOS compatibility in dbt cache testing |
Comments suppressed due to low confidence (1)
cosmos/operators/watcher.py:1
- Using raw string literal
r\"source\"is unnecessary here since there are no escape sequences. Should be just\"source\"."
from __future__ import annotations
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
|
I'm so excited to see this PR! Well done @tatiana @pankajkoti @pankajastro! |
**Features** * Introduce ``ExecutionMode.WATCHER`` to reduce DAG run time by 1/5 in several PRs. Learn more about it [here](https://astronomer.github.io/astronomer-cosmos/getting_started/watcher-execution-mode.html#watcher-execution-mode). This feature was implemented via multiple PRs, including: * Expose new execution mode by @tatiana @pankajastro @pankajkoti in #1999 * Add ``DbtProducerWatcherOperator`` for the proposed ``ExecutionMode.WATCHER`` by @pankajkoti in #1982 * Add ``DbtConsumerWatcherSensor`` for the proposed ``ExecutionMode.WATCHER`` by @pankajastro in #1998 * Push producer's task completion status to XCOM by @pankajkoti in #2000 * Add default priority_weight for ``DbtProducerWatcherOperator`` by @pankajkoti in #1995 * Add sample dbt events for the dbt watcher execution mode by @pankajkoti in #1952 * Add ``compiled_sql`` as a template fields on ```ExecutionMode.WATCHER``` when using ``run_results.json`` by @pankajastro in #2070 * Set ``push_run_results_to_xcom`` kwargs correctly for invocation mode subprocess and Watcher mode by @pankajastro in #2067 * Store compiled SQL as template field for dbt callback events in ``ExecutionMode.WATCHER`` by @pankajkoti in #2068 * Add initial documentation for ``ExecutionMode.WATCHER`` by @tatiana in #2046 * Support running ``State.UPSTREAM_FAILED`` tasks when WATCHER consumer upstream tasks fail by @tatiana in #2062 * Fail sensor tasks immediately if the ``ExecutionMode.WATCHER`` producer task fails by @pankajastro in #2040 * Add ``WATCHER``` to GitHub issue template by @tatiana in #2056 * Add support for ``TestBehavior.AFTER_ALL`` with ``ExecutionMode.WATCHER`` by @pankajastro in #2049 * Add support for ``TestBehavior.NONE`` with ``ExecutionMode.WATCHER`` by @pankajastro in #2047 * Fix ``ExecutionMode.WATCHER`` behaviour with ``DbtTaskGroup`` by @tatiana in #2044 * Fix Cosmos behaviour when using watcher with ``InvocationMode.DBT_RUNNER`` by @tatiana in #2048 * Add Airflow 3 plugin for dbt docs with multiple dbt projects support by @pankajkoti in #2009, check the [documentation](https://astronomer.github.io/astronomer-cosmos/configuration/hosting-docs.html). * Initial support to ``dbt Fusion`` by @tatiana in #1803. More details [here](https://astronomer.github.io/astronomer-cosmos/configuration/dbt-fusion). * Support to prune sources without downstream references in dbt projects by @corsettigyg in #1988 * Allow to set task display name as a user-defined function by @corsettigyg in #1761 * Add dbt project's hash to dag docs to support dag versioning in Airflow 3 by @pankajkoti in #1907 * feat: Add Jinja templating support for ``dbt_cmd_flags`` by @skillicinski in #1899 * Add Scarf metric to collect the execution mode uses by @pankajastro in #1981 * Support Airflow 3.1 by @tatiana in #1980 * Add MySQL profile mapping by @Lee2532 in #1977 * Add sqlserver profile mapping by @pankajastro in #1737 **Enhancement** * Use XCom to store sql when using ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajastro in #1934 * Refactor ``AIRFLOW_ASYNC`` teardown so it doesn't install the virtualenv by @pankajastro in #1938 * Reuse the virtual env for ``AIRFLOW_ASYNC`` setup task by @pankajastro in #1939 * Improve dataset/asset experience in Cosmos by @tatiana in #2030 * Add ``downstreams`` to ``DbtNode`` by @wornjs in #2028 **Bug fixes** * Fix tags extraction by @ms32035 in #1915 * Fix task flow operator args by @anyapriya in #2024 **Documentation** * Add documentation for Airflow 3 Plugin supporting dbt docs for multiple dbt projects by @pankajkoti in #2063 * Add Cosmos Deferrable Operator Guide by @pankajastro in #1922 * Add dbt Fusion documentation by @tatiana in #1824 #1830 * Update dbt-fusion.rst to explicitly highlight it is in alpha by @tatiana in #1838 * Fix a bunch of docs build errors and warnings by @pankajkoti in #1886 * Add docs note for param virtualenv_dir for async execution mode by @pankajastro in #1969 * Use pepy.tech downloads badge in README by @pankajkoti in #1920 * Correct the default value of ``cache_dir`` by @seokyun.ha in #2027 **Others** * Promote @corsettigyg to committer by @tatiana in #1985 * Add @pankajkoti and @pankajastro to ``contributors.rst`` by @tatiana in #1983 * Update setup script for airflow3 script by @dwreeves in #2023 * Prevent pytest from trying to test classes that aren't actually tests by @anyapriya in #2032 * Fix ``dag.test()`` for Airflow 3.1+ by syncing DAG to database bby @kaxil in #2037 * Disable Scarf in CI by @pankajastro in #2016 * Fix failing dbt Fusion tests when run in parallel in CI by @pankajkoti in #1896 * Fix MyPy issues related to ``ObjectStoragePath`` in main branch by @tatiana in #2012 * Cleanup example dbt event JSON dictionaries kept for XCOM referencby @pankajkoti in #1997 * Bump min hatch version that includes fixes for click>=8.3.0 by @pankajkoti in #1996 * Use official postgres image from Docker hub for kubernetes setup by @pankajkoti in #1986 * Use click<8.3.0 for hatch as click 8.3 breaks hatch by @pankajkoti in #1987 * Pin Airflow version in type check CI job by @pankajastro in #2003 * Improve comments after feedback on #1948 by @tatiana in #1963 * Fix running tests with dbt Fusion 2.0.0 preview versions by @tatiana in #1948 * Test hardening of dbt node having tags as unset or missing by @pankajkoti in #1918 * Fix Sphinx issue in the main branch by @tatiana in #2064 * pre-commit autoupdate in #2065, #2043, #2033, #2019, #1990, #2019, #2008, #1941, #1935, #1924 * GitHub dependabot update in #2051, #2050, #2038, #2022, #1947, #1955, #1946, #1944, #1945, #1928, #1921, #1917 Co-authored-by: Pankaj Koti <pankaj.koti@astronomer.io> Co-authored-by: Pankaj Singh <pankaj.singh@astronomer.io> Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
Introduce a new high-performance execution mode, named
ExecutionMode.WATCHER, following the implementation of the producer and consumer operators in PRs #1982, #1993 and #1998.Initial performance analysis indicates that this mode will reduce the total DAG Run time to execute some dbt pipelines in Airflow to 1/5 of the original time. For example, if a Cosmos
DbtDagtakes 5 minutes to run with the defaultExecutionMode.LOCAL, it will now run in 1 minute with the newExecutionMode.WATCHER. The performance gain will be comparable to running the dbt CLI locally.In the near future, there will also be benefits related to CPU and memory utilisation, as users will be able to run the producer task on a more powerful node with increased CPU and memory resources. In comparison, the consumer nodes can have less CPU and memory. Further development (#1972 and #1973), testing, and analysis are needed to evaluate this.
Context
As of Cosmos 1.10, when users leverage the default
ExecutionMode.LOCAL, each dbt model becomes an Airflow run task, and dbt is invoked in each of those tasks.We noticed that the cost to run the same pipeline with plain dbt core varies significantly by running:
For example, for the https://github.com/google/fhir-dbt-analytics project, these numbers were, on average, 5 minutes and 30 seconds (by running a single
dbt runfor the whole pipeline) versus 32 minutes (when using 184dbt runcommands as illustrated in https://gist.github.com/tatiana/c7831173ab09bf05d88839fb0b557920).Similar to the
ExecutionMode.AIRFLOW_ASYNC, this mode aims to reduce the number of times the dbt command is invoked, while still allowing users to have observability of the dbt workflow via the Airflow UI and being able to retry individual tasks.Overall solution
DbtProducerWatcherOperatorfor the proposedExecutionMode.WATCHER#1982DbtProducerWatcherOperatorfor the proposedExecutionMode.WATCHER#1982DbtConsumerWatcherSensorfor the proposedExecutionMode.WATCHER#1998 and used in this PRThis proposal follows up on a successful internal PoC (https://github.com/astronomer/oss-integrations-private/issues/185), available in the branch https://github.com/astronomer/astronomer-cosmos/tree/single-run-execution-mode.
Benefits
An initial performance analysis by @pankajkoti showed promising results:
Example of usage
Example of DAG topology, with the producer task preceding the others.

The dbt root nodes are set with

trigger_rulealways, so they start sensing once the producer begins.Producer task runs dbt Core, as shown on the logs:

Consumer task senses XCom, waiting for producer to finish running dbt:

Evidence that producer is running concurrently to the dbt root nodes sensing:

Related tickets
Closes #1964
Closes #1959 (*)
(*) I ended up implementing this while trying to enforce the producer task to run before the consumer tasks when running
airflow dags test.Outside of the scope of this PR:
ExecutionMode.WATCHERmode:Since it does not make sense to have them, we can review them later.
There are many other tasks related to this execution mode that can be tracked by searching issues using
label:execution:watcher:https://github.com/astronomer/astronomer-cosmos/issues?q=is%3Aissue%20state%3Aopen%20label%3Aexecution%3Awatcher
Credits
The idea for this approach appeared in a discussion with @ashb.
The implementation of this feature is the result of teamwork with @pankajastro and @pankajkoti, both directly and indirectly involvement via PoC and previous PRs: