[Review] Support use of YAML selectors when using LoadMode.DBT_MANIFEST#1
[Review] Support use of YAML selectors when using LoadMode.DBT_MANIFEST#1govambam wants to merge 2 commits into
Conversation
…nomer#2261) Add support for the use of yaml selectors when loading dbt projects using `LoadMode.DBT_MANIFEST`. This works by parsing the YAML selectors found in the manifest file that have already been processed by the dbt parser into corresponding select and exclude selections that are passed onto the `select_nodes()` function. The performance overhead for the initial parse will depend on the user's `selector.yaml` definition, but in testing it has been reasonable. The parsing penalty is only paid once, as future selector to select/exclude mapping access is done from the cache, if enabled (similar to `dbt_ls_cache`). #### Key Changes - Implemented a parser to convert full YAML selector definitions into corresponding `select` and `exclude` selections - Is only used for graphs with `RenderConfig.load_method = DBT_MANIFEST` and a defined `RenderConfig.selector` - Implemented cache behavior to store and retrieve the parsed YAML selector definitions - Added a new `enable_cache_yaml_selectors` cosmos setting to enable/disable caching - Invalidates if `YamlSelector` implementation change - Ensures users do not have to manually clear the cache if the spec is changed - Renamed `dbt_ls_cache` to be more general so it can be used to store parsed selector yaml definitions - Cache should always be mutually exclusive for the `dbt_ls` output or parsed selector yaml #### Limitations - I did not implement parser support for the `indirect_selection` or `default` keywords. - It seems plausible that both of these can be implemented if desired. Omitting them was done to limit scope. - This is not a full YAML selector parser, akin to what dbt implements. - While much of the implementation is borrowed from the latest version of `dbt core`, this expects the selector definitions to be in the state that exists in an unmodified manifest file. - This approach allows the Cosmos parser to only to handle fully structured `method-value` selection definitions. - I kept many high-level parser exceptions in the Cosmos implementation that are technically redundant with the dbt parser - This was done to try and catch user-modified manifest files to fail fast with helpful error messages. - Modifying the selector definitions found in the manifest file is considered undefined behavior. These limitations are reflected in the documentation. ## Related Issue(s) Closes astronomer#2257 ## Breaking Change? - I reused the existing dbt_ls cache cache key and functionality, generalizing where possible - **This is an implementation detail and should not break the public API nor alter any existing behavior** - This requires it to be impossible to have a dbt_ls and yaml_selectors cache at the same time due - This is technically possible by setting the `DbtGraph.cache_identifier` to be the same, but this can't be configured by a user. These are reflected in the documentation.
| intersection_def_parts = cls._get_list_dicts(definition, "intersection") | ||
| include, exclude = cls._parse_include_exclude_subdefs(intersection_def_parts, cache=cache) | ||
|
|
||
| intersection = [",".join(include)] |
There was a problem hiding this comment.
🟡 Medium dbt/selector.py:1068
When include is empty, [",".join(include)] produces [""] instead of []. Consider checking for an empty list before joining.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file cosmos/dbt/selector.py around line 1068:
When `include` is empty, `[",".join(include)]` produces `[""]` instead of `[]`. Consider checking for an empty list before joining.
Evidence trail:
cosmos/dbt/selector.py lines 1068-1069 show `intersection = [",".join(include)]` at REVIEWED_COMMIT. cosmos/dbt/selector.py lines 969-1003 show `_parse_include_exclude_subdefs` initializes `include: list[str] = []` and can return an empty list. Python behavior: `",".join([])` returns `""`, so `[",".join([])]` produces `[""]`.
| for definition in definitions: | ||
| if isinstance(definition, dict) and "exclude" in definition: | ||
| # Do not allow multiple exclude: defs at the same level | ||
| if exclude: | ||
| yaml_sel_cfg = yaml.dump(definition) |
There was a problem hiding this comment.
🟡 Medium dbt/selector.py:988
Inline exclude on a method definition is parsed as a pure exclude container, so the include is dropped. In _parse_include_exclude_subdefs, handle method first or restrict the exclude branch to dicts that have only the exclude key.
for definition in definitions:
- if isinstance(definition, dict) and "exclude" in definition:
+ if isinstance(definition, dict) and "exclude" in definition and "method" not in definition:
# Do not allow multiple exclude: defs at the same level
if exclude:
yaml_sel_cfg = yaml.dump(definition)🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file cosmos/dbt/selector.py around lines 988-992:
Inline `exclude` on a `method` definition is parsed as a pure `exclude` container, so the include is dropped. In `_parse_include_exclude_subdefs`, handle `method` first or restrict the `exclude` branch to dicts that have only the `exclude` key.
Evidence trail:
cosmos/dbt/selector.py lines 968-1002 (REVIEWED_COMMIT): `_parse_include_exclude_subdefs` method - line 988 condition `if isinstance(definition, dict) and "exclude" in definition:` catches any dict with "exclude" key; lines 997-998 call `_parse_exclusions` which only extracts the "exclude" list, dropping any "method"/"value" keys.
cosmos/dbt/selector.py lines 944-962 (REVIEWED_COMMIT): `_parse_exclusions` only processes the "exclude" key via `cls._get_list_dicts(definition, "exclude")`.
cosmos/dbt/selector.py lines 1116-1128 (REVIEWED_COMMIT): `_parse_method_definition` properly handles inline excludes when it receives a method definition.
cosmos/dbt/selector.py lines 1168-1172 (REVIEWED_COMMIT): `_parse_from_definition` routes to `_parse_method_definition` when `"method" in definition`.
| f"in a root level selector definition; found {keys}." | ||
| ) | ||
|
|
||
| if "union" in definition: |
There was a problem hiding this comment.
🟢 Low dbt/selector.py:1167
The in checks work on strings as substring matches (e.g., "union" in "reunion" is True). Since the error message handles non-dict types, consider adding an explicit isinstance(definition, dict) guard before the in checks, or document that callers must ensure definition is always a dict.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file cosmos/dbt/selector.py around line 1167:
The `in` checks work on strings as substring matches (e.g., `"union" in "reunion"` is `True`). Since the error message handles non-dict types, consider adding an explicit `isinstance(definition, dict)` guard before the `in` checks, or document that callers must ensure `definition` is always a dict.
Evidence trail:
cosmos/dbt/selector.py:1167-1181 (REVIEWED_COMMIT) - shows `if "union" in definition:` without isinstance guard before it, and the else block with type-aware error message. cosmos/dbt/selector.py:1230-1232 (REVIEWED_COMMIT) - shows `selector_definition = definition["definition"]` passed to `_parse_from_definition` without type check. cosmos/dbt/selector.py:780-812 (REVIEWED_COMMIT) - shows `_get_list_dicts` would fail with TypeError when trying `dct[key]` on a string.
| with remote_cache_key_path.open("w") as fp: | ||
| json.dump(cache_dict, fp) | ||
| else: | ||
| Variable.set(self.cache_key, cache_dict, serialize_json=True) |
There was a problem hiding this comment.
🟡 Medium dbt/graph.py:1042
Both save_dbt_ls_cache and save_yaml_selectors_cache write to the same self.cache_key Variable, overwriting each other's data. Consider using distinct keys (e.g., {cache_key}_dbt_ls and {cache_key}_yaml_selectors) to prevent data corruption.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file cosmos/dbt/graph.py around line 1042:
Both `save_dbt_ls_cache` and `save_yaml_selectors_cache` write to the same `self.cache_key` Variable, overwriting each other's data. Consider using distinct keys (e.g., `{cache_key}_dbt_ls` and `{cache_key}_yaml_selectors`) to prevent data corruption.
Evidence trail:
cosmos/dbt/graph.py lines 519-525 (save_dbt_ls_cache): Variable.set(self.cache_key, ...) at line 525; cosmos/dbt/graph.py lines 1037-1042 (save_yaml_selectors_cache): Variable.set(self.cache_key, ...) at line 1042. Both use identical `self.cache_key` for Airflow Variable storage when remote_cache_dir is not configured.
There was a problem hiding this comment.
I'm working on the fix now. I'll update the code to use distinct Airflow Variable keys ({cache_key}_dbt_ls and {cache_key}_yaml_selectors) to prevent the cache collision while keeping the remote cache behavior unchanged.
There was a problem hiding this comment.
Fixed cache key collision by appending _dbt_ls and _yaml_selectors suffixes to Airflow Variable keys in the respective cache methods. This prevents the dbt ls cache and yaml selectors cache from overwriting each other.
Commit fc04e1f pushed to #2. When checks pass, it will merge automatically into this PR.
| if raw_selectors_compressed and parsed_selectors_compressed: | ||
| encoded_raw = base64.b64decode(raw_selectors_compressed.encode()) | ||
| raw_selectors = json.loads(zlib.decompress(encoded_raw).decode()) | ||
|
|
||
| encoded_parsed = base64.b64decode(parsed_selectors_compressed.encode()) | ||
| parsed_selectors = json.loads(zlib.decompress(encoded_parsed).decode()) | ||
|
|
||
| cache_dict["yaml_selectors"] = YamlSelectors(raw_selectors, parsed_selectors) | ||
|
|
There was a problem hiding this comment.
🟡 Medium dbt/graph.py:991
Cache robustness: please ensure get_yaml_selectors_cache() returns an empty dict whenever yaml_selectors cannot be reconstructed (e.g., missing/corrupt compressed blobs). This treats it as a cache miss and prevents KeyError in load_parsed_selectors().
| if raw_selectors_compressed and parsed_selectors_compressed: | |
| encoded_raw = base64.b64decode(raw_selectors_compressed.encode()) | |
| raw_selectors = json.loads(zlib.decompress(encoded_raw).decode()) | |
| encoded_parsed = base64.b64decode(parsed_selectors_compressed.encode()) | |
| parsed_selectors = json.loads(zlib.decompress(encoded_parsed).decode()) | |
| cache_dict["yaml_selectors"] = YamlSelectors(raw_selectors, parsed_selectors) | |
| if raw_selectors_compressed and parsed_selectors_compressed: | |
| encoded_raw = base64.b64decode(raw_selectors_compressed.encode()) | |
| raw_selectors = json.loads(zlib.decompress(encoded_raw).decode()) | |
| encoded_parsed = base64.b64decode(parsed_selectors_compressed.encode()) | |
| parsed_selectors = json.loads(zlib.decompress(encoded_parsed).decode()) | |
| cache_dict["yaml_selectors"] = YamlSelectors(raw_selectors, parsed_selectors) | |
| else: | |
| return {} |
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file cosmos/dbt/graph.py around lines 991-999:
Cache robustness: please ensure `get_yaml_selectors_cache()` returns an empty dict whenever `yaml_selectors` cannot be reconstructed (e.g., missing/corrupt compressed blobs). This treats it as a cache miss and prevents `KeyError` in `load_parsed_selectors()`.
Evidence trail:
cosmos/dbt/graph.py lines 968-999: `get_yaml_selectors_cache()` only adds `yaml_selectors` key when both `raw_selectors_compressed` and `parsed_selectors_compressed` are truthy (line 990). Otherwise returns dict without `yaml_selectors`.
cosmos/dbt/graph.py lines 1075-1083: `load_parsed_selectors()` checks `if not cache_dict:` which only catches empty dicts, then directly accesses `cache_dict["yaml_selectors"]` which would raise `KeyError` if the key is missing.
Commit: REVIEWED_COMMIT
…iables (#2) Co-authored-by: macroscopeapp[bot] <170038800+macroscopeapp[bot]@users.noreply.github.com>
Recreated from astronomer#2261 for Macroscope review.
Original PR: astronomer#2261
Status: closed (merged)
Recreated using squash merge commit - exact merged state preserved.
Original PR: astronomer#2261
Note
Add YAML selector support for
LoadMode.DBT_MANIFESTand introduce cached parsing viaYamlSelectorsincosmos.dbt.graph.DbtGraphImplement YAML selector parsing and caching using
YamlSelectors, wire selector-based filtering into manifest loads, switch dbt ls cache key usage tocache_key, and document new cache flags and cleanup utilities.📍Where to Start
Start with
load_from_dbt_manifestand selector handling in cosmos/dbt/graph.py, then reviewYamlSelectorsin cosmos/dbt/selector.py and cache versioning in cosmos/cache.py.Macroscope summarized 71e7cdf.