Refactor AIRFLOW_ASYNC so that the path in the remote object store is specific per DAG run#1741
Conversation
✅ Deploy Preview for sunny-pastelito-5ecb04 canceled.
|
There was a problem hiding this comment.
Pull Request Overview
This PR refactors the AIRFLOW_ASYNC remote model path generation to include the run_id, ensuring that paths in the remote object store are specific per DAG run.
- Updates file path construction in local and asynchronous operators to incorporate run_id.
- Adds tests to verify that the run_id is correctly embedded in the constructed paths.
- Adjusts async context initialization in asynchronous operators to include run_id.
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| tests/test_async_remote_target_dir_specific_run_id.py | New tests validate that run_id is correctly used in file path building. |
| cosmos/operators/local.py | Updates _construct_dest_file_path to inject run_id into the path. |
| cosmos/operators/_asynchronous/bigquery.py | Modifies remote SQL path construction to include run_id. |
| cosmos/operators/_asynchronous/init.py | Updates async_context creation to include run_id for async operators. |
✅ Deploy Preview for sunny-pastelito-5ecb04 canceled.
|
…piled_sql_should_upload
pankajastro
left a comment
There was a problem hiding this comment.
Thanks a lot for the contribution, @tuantran0910! I've triggered the CI for the integration tests.
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #1741 +/- ##
=======================================
Coverage 97.71% 97.72%
=======================================
Files 84 84
Lines 5252 5264 +12
=======================================
+ Hits 5132 5144 +12
Misses 120 120 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Currently, the teardown task deletes SQL files through multiple calls, and the goal was to reduce the risk of deleting unexpected files when the task runs in parallel. Now that we're uploading based on the run-id, would it be worthwhile to delete the entire run-id folder directly, rather than iterating over each file? What are your thoughts? astronomer-cosmos/cosmos/operators/local.py Line 338 in 9a334a8 This may also apply to the setup task. |
Hmm, I think it's a good idea to delete the entire folder |
|
Hi @pankajastro, can you checkout the logic to delete run-id specific directory at my new commit 2bf7258. Thanks a lot :D |
tatiana
left a comment
There was a problem hiding this comment.
This looks great, @tuantran0910 ! Thank you very much for improving this part of Cosmos and making it more reliable.
I've given some feedback in-line. Also, it seems there is an integration test failing, could you take a look please.
|
Sorry @pankajastro, can you enable the integration tests and approve this PR again ? I have just pushed a new commit to ensure that all the changes are coverage. |
tatiana
left a comment
There was a problem hiding this comment.
@tuantran0910 Thank you for all the patience on getting this to work. There is a minor request regarding code coverage - if you could address this as part of this PR, it would be great. If you can't do this before the release of 1.10.1, please, could you do a follow-up PR?
Hey @tatiana, I will try to push a quick commit to fix the coverage within an hour. Thank you for pointing that. |
pankajkoti
left a comment
There was a problem hiding this comment.
Thanks for improvising this @tuantran0910 . This is certainly a very valuable contribution 👏🏽
I tested this with an example async DAG running in Airflow UI and it works smoothly as intended for. We will include this fix in the upcoming release 1.10.1.
…is specific per DAG run (#1741) Refactor `AIRFLOW_ASYNC` so that the path in the remote object store is specific per DAG run. The format of remote model path will be: ```python # test_cosmos/simple_dag_async/run/jaffle_shop/models/example/my_first_dbt_model.sql remote_model_path = f"{remote_target_path_str}/{dbt_dag_task_group_identifier}/{run_id}/run/{relative_file_path}" ``` Closes #1613 (cherry picked from commit 304e426)
Bug Fixes * Fix ``full_refresh`` parameter in ``AIRFLOW_ASYNC`` ``ExecutionConfig`` mode by @tuantran0910 in #1738 * Fix dbt ls invocation method log message by @tatiana and @dstandish in #1749 * Ensure remote target directory is created when copying files when using local directory by @tuantran0910 and @corsettigyg in #1740 * Support custom ``packages-install-path`` by @tatiana in #1768 * Disable dbt static parser during Airflow task execution using dbt runner by @pankajkoti and @tatiana in #1760 * Fix ``ExecutionMode.LOCAL`` to leverage ``ProjectConfig.manifest_path`` by @tatiana in #1772 * Refactor ``AIRFLOW_ASYNC`` so that the path in the remote object store is specific per DAG run by @tuantran0910 in #1741 * Optimise memory usage with optional explicit imports by @pankajkoti and @tatiana in #1769 Documentation * Fix documentation rendering for ``use_dataset_airflow3_uri_standard`` by @pankajastro in #1742 * Correct custom callback example by @walter9388 in #1747 Others * Re-enable integration tests durations to troubleshoot performance degradation by @tatiana in #1735 * Run listener tests for Airflow 3 by @pankajastro in #1743 * Add Airflow 3 db files to ignore from git tracking by @pankajkoti in #1755 * Log contents of ``packages.yml`` when ``AIRFLOW__LOGGING__LOGGING_LEVEL=DEBUG`` by @tatiana in #1764 * Fix Airflow dependencies in the CI by @tatiana in #1773 * Pre-commit updates: #1744, #1765, #1770
Bug Fixes * Fix ``full_refresh`` parameter in ``AIRFLOW_ASYNC`` ``ExecutionConfig`` mode by @tuantran0910 in #1738 * Fix dbt ls invocation method log message by @tatiana and @dstandish in #1749 * Ensure remote target directory is created when copying files when using local directory by @tuantran0910 and @corsettigyg in #1740 * Support custom ``packages-install-path`` by @tatiana in #1768 * Disable dbt static parser during Airflow task execution using dbt runner by @pankajkoti and @tatiana in #1760 * Fix ``ExecutionMode.LOCAL`` to leverage ``ProjectConfig.manifest_path`` by @tatiana in #1772 * Refactor ``AIRFLOW_ASYNC`` so that the path in the remote object store is specific per DAG run by @tuantran0910 in #1741 * Optimise memory usage with optional explicit imports by @pankajkoti and @tatiana in #1769 Documentation * Fix documentation rendering for ``use_dataset_airflow3_uri_standard`` by @pankajastro in #1742 * Correct custom callback example by @walter9388 in #1747 Others * Re-enable integration tests durations to troubleshoot performance degradation by @tatiana in #1735 * Run listener tests for Airflow 3 by @pankajastro in #1743 * Add Airflow 3 db files to ignore from git tracking by @pankajkoti in #1755 * Log contents of ``packages.yml`` when ``AIRFLOW__LOGGING__LOGGING_LEVEL=DEBUG`` by @tatiana in #1764 * Fix Airflow dependencies in the CI by @tatiana in #1773 * Pre-commit updates: #1744, #1765, #1770 --------- (cherry picked from commit 430be00)
Description
Refactor
AIRFLOW_ASYNCso that the path in the remote object store is specific per DAG run. The format of remote model path will be:Related Issue(s)
Closes #1613