Add dbt project's hash to dag docs to support dag versioning in Airflow 3#1907
Conversation
✅ Deploy Preview for sunny-pastelito-5ecb04 canceled.
|
43fdf72 to
eaa9fbe
Compare
eaa9fbe to
6d05305
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #1907 +/- ##
=======================================
Coverage 98.06% 98.06%
=======================================
Files 85 86 +1
Lines 5313 5334 +21
=======================================
+ Hits 5210 5231 +21
Misses 103 103 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
tatiana
left a comment
There was a problem hiding this comment.
Looks great, @pankajkoti , thank you! Very excited to see Cosmos DAGs being versioned in AF3!
There was a problem hiding this comment.
Pull Request Overview
This PR adds support for Airflow 3's automatic DAG versioning by incorporating a hash of the dbt project contents into the DAG documentation. This allows Airflow to detect changes in dbt project files and automatically generate new DAG versions without manual intervention.
Key changes:
- Extracted the
_create_folder_version_hashfunction fromcache.pyto a newversioning.pymodule for reusability - Modified
DbtToAirflowConverterto append the dbt project hash to DAG documentation - Added comprehensive test coverage for hash appending scenarios and error handling
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| cosmos/versioning.py | New module containing the extracted folder hash generation function |
| cosmos/converter.py | Added method to append dbt project hash to DAG documentation for versioning support |
| cosmos/cache.py | Removed _create_folder_version_hash function and imported it from versioning module |
| tests/test_versioning.py | New test file for the versioning module functionality |
| tests/test_converter.py | Added comprehensive tests for DAG documentation hash appending scenarios |
| tests/test_cache.py | Removed test that was moved to the versioning test file |
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
|
BTW: as we discussed, please, let's check on a long-term solution with @jedcunningham and other Airflow commiters - to see if we can have a more robust solution that does not rely on the Airflow DAG docs. Last time we discussed this, #1488, there wasn't a better way - but there may be one moving forward 🤞 |
**Features** * Introduce ``ExecutionMode.WATCHER`` to reduce DAG run time by 1/5 in several PRs. Learn more about it [here](https://astronomer.github.io/astronomer-cosmos/getting_started/watcher-execution-mode.html#watcher-execution-mode). This feature was implemented via multiple PRs, including: * Expose new execution mode by @tatiana @pankajastro @pankajkoti in #1999 * Add ``DbtProducerWatcherOperator`` for the proposed ``ExecutionMode.WATCHER`` by @pankajkoti in #1982 * Add ``DbtConsumerWatcherSensor`` for the proposed ``ExecutionMode.WATCHER`` by @pankajastro in #1998 * Push producer's task completion status to XCOM by @pankajkoti in #2000 * Add default priority_weight for ``DbtProducerWatcherOperator`` by @pankajkoti in #1995 * Add sample dbt events for the dbt watcher execution mode by @pankajkoti in #1952 * Add ``compiled_sql`` as a template fields on ```ExecutionMode.WATCHER``` when using ``run_results.json`` by @pankajastro in #2070 * Set ``push_run_results_to_xcom`` kwargs correctly for invocation mode subprocess and Watcher mode by @pankajastro in #2067 * Store compiled SQL as template field for dbt callback events in ``ExecutionMode.WATCHER`` by @pankajkoti in #2068 * Add initial documentation for ``ExecutionMode.WATCHER`` by @tatiana in #2046 * Support running ``State.UPSTREAM_FAILED`` tasks when WATCHER consumer upstream tasks fail by @tatiana in #2062 * Fail sensor tasks immediately if the ``ExecutionMode.WATCHER`` producer task fails by @pankajastro in #2040 * Add ``WATCHER``` to GitHub issue template by @tatiana in #2056 * Add support for ``TestBehavior.AFTER_ALL`` with ``ExecutionMode.WATCHER`` by @pankajastro in #2049 * Add support for ``TestBehavior.NONE`` with ``ExecutionMode.WATCHER`` by @pankajastro in #2047 * Fix ``ExecutionMode.WATCHER`` behaviour with ``DbtTaskGroup`` by @tatiana in #2044 * Fix Cosmos behaviour when using watcher with ``InvocationMode.DBT_RUNNER`` by @tatiana in #2048 * Add Airflow 3 plugin for dbt docs with multiple dbt projects support by @pankajkoti in #2009, check the [documentation](https://astronomer.github.io/astronomer-cosmos/configuration/hosting-docs.html). * Initial support to ``dbt Fusion`` by @tatiana in #1803. More details [here](https://astronomer.github.io/astronomer-cosmos/configuration/dbt-fusion). * Support to prune sources without downstream references in dbt projects by @corsettigyg in #1988 * Allow to set task display name as a user-defined function by @corsettigyg in #1761 * Add dbt project's hash to dag docs to support dag versioning in Airflow 3 by @pankajkoti in #1907 * feat: Add Jinja templating support for ``dbt_cmd_flags`` by @skillicinski in #1899 * Add Scarf metric to collect the execution mode uses by @pankajastro in #1981 * Support Airflow 3.1 by @tatiana in #1980 * Add MySQL profile mapping by @Lee2532 in #1977 * Add sqlserver profile mapping by @pankajastro in #1737 **Enhancement** * Use XCom to store sql when using ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajastro in #1934 * Refactor ``AIRFLOW_ASYNC`` teardown so it doesn't install the virtualenv by @pankajastro in #1938 * Reuse the virtual env for ``AIRFLOW_ASYNC`` setup task by @pankajastro in #1939 * Improve dataset/asset experience in Cosmos by @tatiana in #2030 * Add ``downstreams`` to ``DbtNode`` by @wornjs in #2028 **Bug fixes** * Fix tags extraction by @ms32035 in #1915 * Fix task flow operator args by @anyapriya in #2024 **Documentation** * Add documentation for Airflow 3 Plugin supporting dbt docs for multiple dbt projects by @pankajkoti in #2063 * Add Cosmos Deferrable Operator Guide by @pankajastro in #1922 * Add dbt Fusion documentation by @tatiana in #1824 #1830 * Update dbt-fusion.rst to explicitly highlight it is in alpha by @tatiana in #1838 * Fix a bunch of docs build errors and warnings by @pankajkoti in #1886 * Add docs note for param virtualenv_dir for async execution mode by @pankajastro in #1969 * Use pepy.tech downloads badge in README by @pankajkoti in #1920 * Correct the default value of ``cache_dir`` by @seokyun.ha in #2027 **Others** * Promote @corsettigyg to committer by @tatiana in #1985 * Add @pankajkoti and @pankajastro to ``contributors.rst`` by @tatiana in #1983 * Update setup script for airflow3 script by @dwreeves in #2023 * Prevent pytest from trying to test classes that aren't actually tests by @anyapriya in #2032 * Fix ``dag.test()`` for Airflow 3.1+ by syncing DAG to database bby @kaxil in #2037 * Disable Scarf in CI by @pankajastro in #2016 * Fix failing dbt Fusion tests when run in parallel in CI by @pankajkoti in #1896 * Fix MyPy issues related to ``ObjectStoragePath`` in main branch by @tatiana in #2012 * Cleanup example dbt event JSON dictionaries kept for XCOM referencby @pankajkoti in #1997 * Bump min hatch version that includes fixes for click>=8.3.0 by @pankajkoti in #1996 * Use official postgres image from Docker hub for kubernetes setup by @pankajkoti in #1986 * Use click<8.3.0 for hatch as click 8.3 breaks hatch by @pankajkoti in #1987 * Pin Airflow version in type check CI job by @pankajastro in #2003 * Improve comments after feedback on #1948 by @tatiana in #1963 * Fix running tests with dbt Fusion 2.0.0 preview versions by @tatiana in #1948 * Test hardening of dbt node having tags as unset or missing by @pankajkoti in #1918 * Fix Sphinx issue in the main branch by @tatiana in #2064 * pre-commit autoupdate in #2065, #2043, #2033, #2019, #1990, #2019, #2008, #1941, #1935, #1924 * GitHub dependabot update in #2051, #2050, #2038, #2022, #1947, #1955, #1946, #1944, #1945, #1928, #1921, #1917 Co-authored-by: Pankaj Koti <pankaj.koti@astronomer.io> Co-authored-by: Pankaj Singh <pankaj.singh@astronomer.io> Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
|
Hi Team, this change is impacting parsing performance of cosmos because our IO is slow in k8s. And there is no way to turn this feature off while we are using airflow 2+. Do you have any suggestions? |
hi @pmuntianu, I just logged an issue #2386 to allow disabling this work done here in the PR. We will try to prioritise this, but would like to check with you that, before we get a chance to take a stab at it, is there a chance you could contribute a PR for this? |
|
Hi @pankajkoti , thanks a ton. This is really pormpt response. I appreciate your dedication. For us, the solution is to not pass with dag:
DbtTaskGroup(
# by commenting line below we prevent airflow dag caching in _add_dbt_project_hash_to_dag_docs method
# this is important because our io is slow in k8s. We might negatively impact caching as well.
# dag=dag,
group_id=f"{self.workflow_settings.name}_group",
project_config=self._parse_project(),
...Thanks a lot one more time for taking this work |
This PR leverages the existing method for generating a hash of the dbt project contents and appends that hash to the DAG documentation. This enables Airflow 3’s automatic DAG versioning feature to track changes in dbt project content.
Airflow 3 introduces automatic DAG versioning by detecting changes in a DAG’s structure or metadata. By embedding the dbt content hash into the doc_md field (a part of the serialized DAG), this change allows Airflow to:
• Detect when dbt project files are added, modified, or deleted
• Automatically generate a new DAG version in response to those changes
This ensures accurate and automatic versioning of DAGs tied to evolving dbt projects, without requiring manual intervention.
closes: #1868