Support use of YAML selectors when using LoadMode.DBT_MANIFEST#2261
Conversation
✅ Deploy Preview for astronomer-cosmos canceled.
|
|
I don't think the current state of this PR is ready for merge. I wanted to get feedback on this approach before finalizing. I have unit tests from the implementation I manage that I will port over - along with writing additional cosmos-specific tests - before the PR would moved out of draft. Modifications to existing code, as well as how I composed my changes, were mostly done in a non-intrusive way to appease the linter. I'm fully open to alternative approaches. This is my first OSS PR. If I've committed any faux pas, please let me know |
|
This is a very exciting feature, @YourRoyalLinus. Thanks for working on it! |
|
Hi @YourRoyalLinus, thanks a lot for all the tests and updates on this. We really appreciate it. |
|
@YourRoyalLinus, I had a quick look at the tests and tried to fix them in a separate branch (#2296), which is a draft PR so the CI can trigger the job. I'll close it once the checks in your branch are passing. (1) Some integration tests were failing with: The reason is "Object Storage feature is unavailable in Airflow version 2.7.0. Please upgrade to Airflow 2.8 or later". This is how you can fix it: 424a9a1 (2) dbt Fusion tests stopped working because the default selector did not select any nodes in the There were dbt Fusion tests that referenced the dbt project Since we set the default selector to not match any nodes, this behaviour changed: My advice is that we do not add the selector changes to (3) The DAG I fixed all the issues in #2296. If you get a chance, please add these fixes to this PR - otherwise we may merge your PR as it is, and then merge the fixes. |
tatiana
left a comment
There was a problem hiding this comment.
There are significant changes in this PR, which may lead to issues. That said, given the changes are relatively isolated, since they mostly relate to adding support to selector when using LoadMode.DBT_MANIFEST, we decided to go ahead with merging and releasing this in Cosmos 1.13.0.
Even though the tests are not passing in this branch, we managed to fix them in #2296. We decided to go ahead and merge this PR as-is, since we'll merge the test fixes straight after.
HI @tatiana, this is great! Happy to be able to contribute back. My company will likely be the first user of this feature once its live, so if there's anything that got missed I should hopefully be the first to find (and fix) it. |
Minimal changes on #2261 to see tests passing without changing the original branch I also created a PR on the @YourRoyalLinus PR: YourRoyalLinus#1 This is a summary of the main issues: (1) Some integration tests were failing with: ``` ERROR tests/test_example_dags.py - AssertionError: assert not {'/home/runner/work/astronomer-cosmos/astronomer-cosmos/dev/dags/cosmos_manifest_selectors_example.py': 'Traceback (mo...he required Object Storage feature is unavailable in Airflow version 2.7.0. Please upgrade to Airflow 2.8 or later.\n'} + where {'/home/runner/work/astronomer-cosmos/astronomer-cosmos/dev/dags/cosmos_manifest_selectors_example.py': 'Traceback (mo...he required Object Storage feature is unavailable in Airflow version 2.7.0. Please upgrade to Airflow 2.8 or later.\n'} = <airflow.models.dagbag.DagBag object at 0x7fa3c6e724d0>.import_errors ERROR tests/test_example_dags_no_connections.py - AssertionError: assert not {'/home/runner/work/astronomer-cosmos/astronomer-cosmos/dev/dags/cosmos_manifest_selectors_example.py': 'Traceback (mo...he required Object Storage feature is unavailable in Airflow version 2.7.0. Please upgrade to Airflow 2.8 or later.\n'} + where {'/home/runner/work/astronomer-cosmos/astronomer-cosmos/dev/dags/cosmos_manifest_selectors_example.py': 'Traceback (mo...he required Object Storage feature is unavailable in Airflow version 2.7.0. Please upgrade to Airflow 2.8 or later.\n'} = <airflow.models.dagbag.DagBag object at 0x7fa3be1d89d0>.import_errors ``` The reason is "Object Storage feature is unavailable in Airflow version 2.7.0. Please upgrade to Airflow 2.8 or later". This is how you can fix it: 424a9a1 (2) dbt Fusion tests stopped working because the YAML selector file defined a default selector, which did not select any nodes in the `jaffle_shop` dbt project. There were dbt Fusion tests that referenced the dbt project `jaffle_shop` that were changed and would actually run the DAG and check the DAG topology - including which nodes were rendered. Since we set the default selector to not match any nodes, this behaviour changed: ``` =========================== short test summary info ============================ FAILED tests/test_dbtf.py::test_dbt_dag_with_dbt_fusion - assert 0 == 23 + where 0 = len({}) + where {} = <cosmos.dbt.graph.DbtGraph object at 0x7f0b68bb80d0>.filtered_nodes + where <cosmos.dbt.graph.DbtGraph object at 0x7f0b68bb80d0> = <DAG: snowflake_dbt_fusion_dag>.dbt_graph ``` I moved the changes originally done to `dev/dags/dbt/jaffle_shop` to `dev/dags/dbt/altered_jaffle_shop`. I also removed the default selector definition. (3) The DAG `cosmos_manifest_selectors_examples.py` failed to run in a clean database because it would attempt to run the dbt model `customers`, without having run its upstream tasks. This can be observed in https://github.com/astronomer/astronomer-cosmos/actions/runs/21476188465/job/61860709231?pr=2296 (4) The DAG `example_cosmos_cleanup_dag.py` is not compatible with Airflow 3. I logged a follow-up ticket for us to check this: #2300. This issue was not introduced by your PR. For now, it is skipped in AF3.
|
@YourRoyalLinus that's awesome! @pankajastro just released 1.13.0a4 after we merged your PR - please, feel free to try it out and give us any feedback: We're aiming to release the stable version tomorrow, GMT afternoon. |
Hi @tatiana, I was able to use the RC branch to test changes on our deployment in a dev environment and everything rendered as expected! One thing I did notice during my QA was that one bad selector configuration will cause a dag parse error for all dbt dags using If you have any thoughts or objections, please let me know. |
|
Thanks a lot @YourRoyalLinus - we'll appreciate any further improvements in the feature. Have a great weekend! |
Features * Support cross-referencing models across dbt projects using dbt-loom by @pankajkoti in #2271 * Support use of YAML selectors when using ``LoadMode.DBT_MANIFEST`` by @YourRoyalLinus in #2261 * Introduce ``ExecutionMode.WATCHER_KUBERNETES`` to use the watcher with ``KubernetesPodOperator`` by @tatiana in #2207 * Add support for StarRocks profile mapping by @kurkim0661 in #2256 * Allow pushing URIs as XComs for Cosmos tasks by @corsettigyg in #2275 * Support defining custom callbacks alongside the ``WATCHER_KUBERNETES`` callback by @johnhoran in #2307 Enhancements * Refactor: remove duplicate ``_construct_dest_file_path`` by @jx2lee in #2077 * Leverage Airflow ``::group::`` to group logs associated with DAG parsing by @tatiana in #2235 * Refactor ``DbtConsumerWatcherSensor`` for reusability by @tatiana in #2245 * Restore plain text output when using ``ExecutionMode.WATCHER`` by @tiovader in #2241 Bug Fixes * Fix running empty models or ephemeral nodes in ``ExecutionMode.WATCHER`` by @tatiana in #2279 * Improve watcher producer task priority in scheduling and the UI by @tatiana in #2237 * Fix typos and formatting issues in documentation by @pankajkoti in #2259 * Allow watcher producer retries without erroring by @tatiana in #2283 * Fix ``TestBehavior.AFTER_ALL`` is missing project_name information when loading project using manifest file by @tuantran0910 in #2242 * Fix duplicate log lines in watcher subprocess execution and format timestamps by @pankajkoti in #2301 Docs * Add Watcher Kubernetes documentation by @tatiana in #2303 * Document newly added telemetry metrics in the privacy notice by @pankajkoti in #2249 * Add compatibility policy document by @pankajastro in #2251 * Improve watcher documentation related to dbt threads by @tatiana in #2273 * Fix link in watcher execution mode documentation by @jedcunningham in #2277 * Update Apache Airflow minimum compatibility policy by @tatiana in #2285 * Clarify Cosmos runtime support until "End of Basic Support" by @jedcunningham in #2286 * Update watcher docs by @tatiana in #2298 * Update watcher kubernetes documentation by @tatiana in #2306 Others * Add Airflow 3 DAG versioning tests for Cosmos by @michal-mrazek in #2177 * Add dbt Core 1.11 to the test matrix by @tatiana in #2230 * Add integration tests using InvocationMode.SUBPROCESS and validate output by @tatiana in #2287 * Fix main branch failing tests by @tatiana in #2296 * Update pre-commit hooks to the latest versions by @jedcunningham in #2289 * Pre-commit autoupdates by @pre-commit in #2222, #2264, #2274 and #2290 * Dependabot updates by @dependabot in #2218, #2219, #2220, #2280 and #2284 * Add Scarf metrics to understand Cosmos feature usage patterns - Add telemetry tracking for dbt docs plugin usage by @pankajkoti in #2240 - Add DAG run telemetry metrics for load mode, invocation, and render_config parameters by @pankajkoti in #2223 - Collect profile metrics for DAG runs by @pankajastro in #2228 - Compress telemetry metadata to reduce serialized DAG size by @pankajkoti in #2252 - Skip storing telemetry metadata when emission is disabled by @pankajkoti in #2278 - Hide telemetry metadata parameters from the Airflow trigger UI by @pankajkoti in #2247 closes: astronomer/oss-integrations-private#317 --------- Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>
Minimal changes on astronomer#2261 to see tests passing without changing the original branch I also created a PR on the @YourRoyalLinus PR: YourRoyalLinus#1 This is a summary of the main issues: (1) Some integration tests were failing with: ``` ERROR tests/test_example_dags.py - AssertionError: assert not {'/home/runner/work/astronomer-cosmos/astronomer-cosmos/dev/dags/cosmos_manifest_selectors_example.py': 'Traceback (mo...he required Object Storage feature is unavailable in Airflow version 2.7.0. Please upgrade to Airflow 2.8 or later.\n'} + where {'/home/runner/work/astronomer-cosmos/astronomer-cosmos/dev/dags/cosmos_manifest_selectors_example.py': 'Traceback (mo...he required Object Storage feature is unavailable in Airflow version 2.7.0. Please upgrade to Airflow 2.8 or later.\n'} = <airflow.models.dagbag.DagBag object at 0x7fa3c6e724d0>.import_errors ERROR tests/test_example_dags_no_connections.py - AssertionError: assert not {'/home/runner/work/astronomer-cosmos/astronomer-cosmos/dev/dags/cosmos_manifest_selectors_example.py': 'Traceback (mo...he required Object Storage feature is unavailable in Airflow version 2.7.0. Please upgrade to Airflow 2.8 or later.\n'} + where {'/home/runner/work/astronomer-cosmos/astronomer-cosmos/dev/dags/cosmos_manifest_selectors_example.py': 'Traceback (mo...he required Object Storage feature is unavailable in Airflow version 2.7.0. Please upgrade to Airflow 2.8 or later.\n'} = <airflow.models.dagbag.DagBag object at 0x7fa3be1d89d0>.import_errors ``` The reason is "Object Storage feature is unavailable in Airflow version 2.7.0. Please upgrade to Airflow 2.8 or later". This is how you can fix it: astronomer@424a9a1 (2) dbt Fusion tests stopped working because the YAML selector file defined a default selector, which did not select any nodes in the `jaffle_shop` dbt project. There were dbt Fusion tests that referenced the dbt project `jaffle_shop` that were changed and would actually run the DAG and check the DAG topology - including which nodes were rendered. Since we set the default selector to not match any nodes, this behaviour changed: ``` =========================== short test summary info ============================ FAILED tests/test_dbtf.py::test_dbt_dag_with_dbt_fusion - assert 0 == 23 + where 0 = len({}) + where {} = <cosmos.dbt.graph.DbtGraph object at 0x7f0b68bb80d0>.filtered_nodes + where <cosmos.dbt.graph.DbtGraph object at 0x7f0b68bb80d0> = <DAG: snowflake_dbt_fusion_dag>.dbt_graph ``` I moved the changes originally done to `dev/dags/dbt/jaffle_shop` to `dev/dags/dbt/altered_jaffle_shop`. I also removed the default selector definition. (3) The DAG `cosmos_manifest_selectors_examples.py` failed to run in a clean database because it would attempt to run the dbt model `customers`, without having run its upstream tasks. This can be observed in https://github.com/astronomer/astronomer-cosmos/actions/runs/21476188465/job/61860709231?pr=2296 (4) The DAG `example_cosmos_cleanup_dag.py` is not compatible with Airflow 3. I logged a follow-up ticket for us to check this: astronomer#2300. This issue was not introduced by your PR. For now, it is skipped in AF3.
…nomer#2261) Add support for the use of yaml selectors when loading dbt projects using `LoadMode.DBT_MANIFEST`. This works by parsing the YAML selectors found in the manifest file that have already been processed by the dbt parser into corresponding select and exclude selections that are passed onto the `select_nodes()` function. The performance overhead for the initial parse will depend on the user's `selector.yaml` definition, but in testing it has been reasonable. The parsing penalty is only paid once, as future selector to select/exclude mapping access is done from the cache, if enabled (similar to `dbt_ls_cache`). #### Key Changes - Implemented a parser to convert full YAML selector definitions into corresponding `select` and `exclude` selections - Is only used for graphs with `RenderConfig.load_method = DBT_MANIFEST` and a defined `RenderConfig.selector` - Implemented cache behavior to store and retrieve the parsed YAML selector definitions - Added a new `enable_cache_yaml_selectors` cosmos setting to enable/disable caching - Invalidates if `YamlSelector` implementation change - Ensures users do not have to manually clear the cache if the spec is changed - Renamed `dbt_ls_cache` to be more general so it can be used to store parsed selector yaml definitions - Cache should always be mutually exclusive for the `dbt_ls` output or parsed selector yaml #### Limitations - I did not implement parser support for the `indirect_selection` or `default` keywords. - It seems plausible that both of these can be implemented if desired. Omitting them was done to limit scope. - This is not a full YAML selector parser, akin to what dbt implements. - While much of the implementation is borrowed from the latest version of `dbt core`, this expects the selector definitions to be in the state that exists in an unmodified manifest file. - This approach allows the Cosmos parser to only to handle fully structured `method-value` selection definitions. - I kept many high-level parser exceptions in the Cosmos implementation that are technically redundant with the dbt parser - This was done to try and catch user-modified manifest files to fail fast with helpful error messages. - Modifying the selector definitions found in the manifest file is considered undefined behavior. These limitations are reflected in the documentation. ## Related Issue(s) Closes astronomer#2257 ## Breaking Change? - I reused the existing dbt_ls cache cache key and functionality, generalizing where possible - **This is an implementation detail and should not break the public API nor alter any existing behavior** - This requires it to be impossible to have a dbt_ls and yaml_selectors cache at the same time due - This is technically possible by setting the `DbtGraph.cache_identifier` to be the same, but this can't be configured by a user. These are reflected in the documentation.
…nomer#2261) Add support for the use of yaml selectors when loading dbt projects using `LoadMode.DBT_MANIFEST`. This works by parsing the YAML selectors found in the manifest file that have already been processed by the dbt parser into corresponding select and exclude selections that are passed onto the `select_nodes()` function. The performance overhead for the initial parse will depend on the user's `selector.yaml` definition, but in testing it has been reasonable. The parsing penalty is only paid once, as future selector to select/exclude mapping access is done from the cache, if enabled (similar to `dbt_ls_cache`). #### Key Changes - Implemented a parser to convert full YAML selector definitions into corresponding `select` and `exclude` selections - Is only used for graphs with `RenderConfig.load_method = DBT_MANIFEST` and a defined `RenderConfig.selector` - Implemented cache behavior to store and retrieve the parsed YAML selector definitions - Added a new `enable_cache_yaml_selectors` cosmos setting to enable/disable caching - Invalidates if `YamlSelector` implementation change - Ensures users do not have to manually clear the cache if the spec is changed - Renamed `dbt_ls_cache` to be more general so it can be used to store parsed selector yaml definitions - Cache should always be mutually exclusive for the `dbt_ls` output or parsed selector yaml #### Limitations - I did not implement parser support for the `indirect_selection` or `default` keywords. - It seems plausible that both of these can be implemented if desired. Omitting them was done to limit scope. - This is not a full YAML selector parser, akin to what dbt implements. - While much of the implementation is borrowed from the latest version of `dbt core`, this expects the selector definitions to be in the state that exists in an unmodified manifest file. - This approach allows the Cosmos parser to only to handle fully structured `method-value` selection definitions. - I kept many high-level parser exceptions in the Cosmos implementation that are technically redundant with the dbt parser - This was done to try and catch user-modified manifest files to fail fast with helpful error messages. - Modifying the selector definitions found in the manifest file is considered undefined behavior. These limitations are reflected in the documentation. ## Related Issue(s) Closes astronomer#2257 ## Breaking Change? - I reused the existing dbt_ls cache cache key and functionality, generalizing where possible - **This is an implementation detail and should not break the public API nor alter any existing behavior** - This requires it to be impossible to have a dbt_ls and yaml_selectors cache at the same time due - This is technically possible by setting the `DbtGraph.cache_identifier` to be the same, but this can't be configured by a user. These are reflected in the documentation.
Description
Add support for the use of yaml selectors when loading dbt projects using
LoadMode.DBT_MANIFEST. This works by parsing the YAML selectors found in the manifest file that have already been processed by the dbt parser into corresponding select and exclude selections that are passed onto theselect_nodes()function.The performance overhead for the initial parse will depend on the user's
selector.yamldefinition, but in testing it has been reasonable. The parsing penalty is only paid once, as future selector to select/exclude mapping access is done from the cache, if enabled (similar todbt_ls_cache).Key Changes
selectandexcludeselectionsRenderConfig.load_method = DBT_MANIFESTand a definedRenderConfig.selectorenable_cache_yaml_selectorscosmos setting to enable/disable cachingYamlSelectorimplementation changedbt_ls_cacheto be more general so it can be used to store parsed selector yaml definitionsdbt_lsoutput or parsed selector yamlLimitations
indirect_selectionordefaultkeywords.dbt core, this expects the selector definitions to be in the state that exists in an unmodified manifest file.method-valueselection definitions.These limitations are reflected in the documentation.
Related Issue(s)
Closes #2257
Breaking Change?
DbtGraph.cache_identifierto be the same, but this can't be configured by a user.These are reflected in the documentation.
Checklist