Release 1.13.1#2388
Merged
Merged
Conversation
…task` in `calculate_tasks_map` (#2309) Fixed a potential scope issue where `producer_task` could be accessed before initialization in the `calculate_tasks_map` function. While I was digging through the watcher implementation, it became clear that the variable was only defined within a specific branch but referenced later in a separate block. Initializing it at the top and simplifying the downstream check makes the code much more robust against accidental crashes. Updating the logic to verify the presence of the task object itself instead of re-checking the execution mode keeps the flow consistent. This ensures the dependency setup only triggers when the producer is actually available. (cherry picked from commit c2731af)
Add a list of known limitations related to the `ExecutionMode.KUBERNETES`. (cherry picked from commit 43f616f)
…2335) dbt adapters maintain an internal state that holds onto semaphores. When running with Airflow 3, this causes "leaked semaphore objects" warnings from Python's resource tracker at task shutdown. This fix resets dbt adapters and triggers garbage collection after each dbt command execution to properly release these resources. Closes: #2334 (cherry picked from commit 2128e94)
Updated the get cache methods to return an empty cache if they do not find the keys they were expecting. This will lead to a cache miss in the calling function and the recreation of the cache, which is the intended behavior. This will only occur the first time the `LoadMode` of the graph has been changed with caching is enabled. I've added unit tests and completed local QA to confirm behavior. Closes #2330 (cherry picked from commit 4f9f787)
While trying to run some DAGs locally with Airflow 3.1.7, I observed this behaviour intermitently: <img width="825" height="755" alt="Screenshot 2026-02-13 at 09 45 06" src="https://github.com/user-attachments/assets/b5305a77-e07b-4dcd-84d3-9668f300013c" /> Although I was unable to create a reproducible sequence of steps to reproduce it, I believe we should handle this exception so it is not disruptive to end users.
closes: #1093 Before Changes <img width="1685" height="978" alt="Screenshot 2026-02-05 at 9 34 35 PM" src="https://github.com/user-attachments/assets/ab924a8c-e81c-4b39-b945-888fbb75fdfc" /> After Changes <img width="1698" height="978" alt="Screenshot 2026-02-05 at 9 29 41 PM" src="https://github.com/user-attachments/assets/37f1837c-5791-4bc9-850a-4489538801f1" /> ---------
…WATCHER (#2319) Add support for populating the compiled_sql rendered template field when using watcher subprocess mode, for both successful and failed models. Changes: - Centralise `_extract_compiled_sql` to read compiled SQL from `target/compiled` directory for both node event in case of dbt Runner and logs processing in case of suprocess - Push `compiled_sql` to XCom per-model as each model completes - Extract `compiled_sql` in `poke()` for non-deferred execution - Include `compiled_sql` in `WatcherTrigger` events for deferred execution - Handle `compiled_sql` in `execute_complete()` when returning from defer closes: #2233 related: #2260
…fig.selector` (#2316) Ensure that DAG parsing only fails for DAGs that are attempting to render using a misconfigured yaml selector. This was done by collecting any errors during yaml selector parsing and raising an exception when the selector is accessed rather than when the issue is encountered. The exception to this is for invalid yaml structures. In those cases, we can't be certain whether we have a valid selector name or definition. When the graph attempts to grab the parsed selector, it would always get a selector not found exception, hiding the error messages related to the invalid structure. As a result, if there is an invalid yaml structure, all DAGs using `LoadMode.DBT_MANIFEST` and `RenderConfig.selector` will fail during dag parse. As long as a user never directly modifies the `manifest.json`, this should not occur. When doing local QA, I created separate example DAGs, however keeping them did not seem valuable as the unit tests cover the expected behavior. Follow up from #2257 Closes #2312 (cherry picked from commit 81b1310)
The [`ExecutionMode.WATCHER`](https://astronomer.github.io/astronomer-cosmos/getting_started/watcher-execution-mode.html) performance improvements are highly dependent on the number of threads the dbt command is run with. While dbt Core default number of [threads is 4](https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles\#understanding-threads): <img width="1490" height="407" alt="Screenshot 2026-02-17 at 12 36 01" src="https://github.com/user-attachments/assets/b53f873d-92c6-4125-b1b4-a5f4e7a3d54b" /> The [default number of threads set by the Snowflake adapter is 1:](https://docs.getdbt.com/docs/core/connect-data-platform/snowflake-setup) <img width="1427" height="342" alt="Screenshot 2026-02-17 at 12 36 44" src="https://github.com/user-attachments/assets/305c6b4a-c1e8-4fa6-a318-0de900733374" /> With `ExecutionMode.WATCHER`, setting `threads=1` results in models running sequentially, one a time, leading to worse performance than with `ExecutionMode.LOCAL`. While Cosmos aims to be compatible with dbt Core and dbt adapters' default values, we decided to change this for Snowflake to align with dbt Core's default. Users can still override customising the number of threads, for instance, via: ``` profile_mapping=SnowflakeUserPasswordProfileMapping( conn_id="snowflake_conn", profile_args={"threads": 8}, ), ```
Store back `extra_context` on `BaseConsumerSensor` after popping from `kwargs`, so subclasses and user customisations can use it at runtime (`BaseSensorOperator` does not accept `extra_context` and hence, it is popped from `kwargs` before calling `super.init`). closes: #2250
1038830 to
ef5c447
Compare
Contributor
There was a problem hiding this comment.
Pull request overview
Release 1.13.1 roll-up that updates Cosmos’ dbt watcher behavior, selector parsing error handling, caching robustness, and Snowflake defaults, plus accompanying tests and docs.
Changes:
- Default Snowflake profile mappings to 4 dbt threads and document profile-mapping default values.
- Refactor watcher compiled SQL handling to use a canonical per-model XCom key across event/subprocess strategies; preserve
extra_contextand respectdeferrable=False. - Improve YAML selector error handling (defer definition errors until access), harden cache swap behavior, and add dbt adapter cleanup to reduce Airflow 3 semaphore warnings.
Reviewed changes
Copilot reviewed 35 out of 35 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
cosmos/profiles/snowflake/base.py |
Adds profile_defaults (threads=4) and merges defaults into generated profiles. |
cosmos/converter.py |
Catches ParamValidationError when storing telemetry metadata in dag.params. |
cosmos/dbt/graph.py |
Adds fqn to nodes, includes it in dbt ls, and hardens cache miss behavior on swapped caches. |
cosmos/airflow/graph.py |
Uses fqn: selectors when available; refactors watcher dependency setup to avoid UnboundLocalError. |
cosmos/dbt/selector.py |
Collects selector definition errors during parse and raises them when accessed via get_parsed(). |
cosmos/dbt/runner.py |
Adds adapter reset + gc.collect() cleanup in run_command() finally. |
cosmos/operators/watcher.py |
Centralizes compiled SQL extraction/push; fixes consumer sensor init passthrough for producer_task_id/deferrable. |
cosmos/operators/_watcher/base.py |
Adds canonical compiled SQL extraction/push helpers; stores extra_context; reads compiled SQL from XCom in poke(). |
cosmos/operators/_watcher/triggerer.py |
Parses status + compiled SQL and emits compiled SQL in trigger events when available. |
cosmos/hooks/subprocess.py |
Provides project_dir to log-processing callbacks. |
tests/test_converter.py |
Adds test for handling ParamValidationError during telemetry storage. |
tests/dbt/test_graph.py |
Updates hashing expectation; adds cache swap regression tests; updates expected args count. |
tests/dbt/test_selector.py |
Updates tests to reflect deferred selector-definition error raising and aggregated error messages. |
tests/dbt/test_runner.py |
Adds unit tests for adapter cleanup and run_command() cleanup behavior. |
tests/operators/test_watcher.py |
Updates watcher expectations and adds compiled SQL XCom/trigger coverage + deferrable=False coverage. |
tests/operators/_watcher/test_triggerer.py |
Updates triggerer tests for compiled SQL extraction behavior. |
tests/operators/_watcher/test_watcher_base.py |
Adds tests ensuring extra_context is preserved on consumer sensors. |
tests/operators/test_watcher_kubernetes_unit.py |
Adds tests for callback normalization/appending behavior in watcher-kubernetes producer. |
tests/profiles/snowflake/*.py |
Updates Snowflake mapping expectations for default threads and override behavior. |
docs/templates/profile_mapping.rst.jinja2 |
Renders a “Default Values” section when profile_defaults are present. |
docs/generate_mappings.py |
Passes profile_defaults into mapping docs template context. |
docs/getting_started/kubernetes.rst |
Adds a “Known Limitations” section and anchor for cross-references. |
docs/getting_started/watcher-kubernetes-execution-mode.rst |
Cross-references Kubernetes limitations for watcher-kubernetes. |
docs/configuration/selecting-excluding.rst |
Documents YAML selector error-handling semantics. |
docs/configuration/caching.rst |
Clarifies cache key wording for yaml selectors cache. |
docs/configuration/*.rst |
Removes stray blank lines in code blocks for formatting consistency. |
README.rst |
Adds Apache license badge. |
CHANGELOG.rst |
Adds 1.13.1 release notes entry. |
.pre-commit-config.yaml / .codespell-ignore-words |
Configures codespell ignore-words file and adds rin to ignores. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
#2389) Fixes a `KeyError` when using the `+` (precursor) graph selector on a project that uses dbt-loom for cross-project references. cc @pankajkoti @tatiana — This is a follow-up to your dbt-loom support in #2271. The external node skipping works great for basic rendering, but we hit a `KeyError` when combining it with the `+` graph selector. The `+` operator triggers `select_node_precursors` which traverses `depends_on` entries — and those can point to external nodes that were already filtered out during manifest loading. This code path wasn't exercised by the tests in #2271 since the example DAGs don't use graph selectors. ## Problem The dbt-loom support added in #2271 correctly skips external nodes (those without `original_file_path`) during manifest loading in `load_from_dbt_manifest`. However, local nodes still have `depends_on` entries pointing to these filtered-out external nodes. When the `+` graph operator traverses upstream dependencies via `select_node_precursors`, it does `nodes[node_id]` on these external node IDs and raises a `KeyError`: File "cosmos/dbt/selector.py", line 172, in select_node_precursors new_generation.update(set(nodes[node_id].depends_on)) ~~~~~^^^^^^^^^ KeyError: 'model.upstream_project.external_model' **Reproduction:** Use `select: ["+downstream_model"]` in `RenderConfig` with `LoadMode.DBT_MANIFEST` on a project that uses dbt-loom with cross-project `{{ ref('upstream_project', 'model_name') }}` references. ## Fix Adds bounds checks in two locations in `cosmos/dbt/selector.py`: 1. **`GraphSelector.select_node_precursors`** (line 172): Skip node IDs not present in the `nodes` dict during upstream traversal 2. **`NodeSelector.select_nodes_ids_by_intersection`** (line 552): Skip external node IDs that were collected during graph traversal but don't exist in the `nodes` dict This allows the `+` traversal to gracefully stop at project boundaries — the correct behavior for cross-project setups where external dependencies are managed by their own DAGs/task groups. This is consistent with how `select_node_descendants` already handles missing parents via `defaultdict(set)`. ## Test plan - [x] Added `test_select_nodes_by_precursors_with_external_dependency` — creates a graph where a local node's `depends_on` includes an external node ID not in the `nodes` dict, verifies `+` selector returns local nodes without `KeyError` - [x] All 166 existing selector tests pass - [x] All existing dbt-loom tests in `test_graph.py` pass Co-authored-by: Alex Ward <award@Mac.lan> Co-authored-by: Cursor <cursoragent@cursor.com> (cherry picked from commit 4d86173)
e36b511 to
8558067
Compare
8ff2cae to
52745bb
Compare
pankajkoti
approved these changes
Feb 24, 2026
Contributor
pankajkoti
left a comment
There was a problem hiding this comment.
Thanks for working on the release!
tatiana
commented
Feb 25, 2026
tatiana
commented
Feb 25, 2026
tatiana
commented
Feb 25, 2026
Declares Cosmos modules (operators and others) so the Apache Airflow Provider Registry can discover them automatically instead of relying on a hardcoded listing in the airflow repo. Each module entry includes a description for display in the registry. Both bare string and object formats are supported — descriptions and docs-url are optional per-module overrides. --------- Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com> (cherry picked from commit 8d23559)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Enhancements
UnboundLocalErrorforproducer_taskincalculate_tasks_mapby @rin in Refactor to avoid potential futureUnboundLocalErrorforproducer_taskincalculate_tasks_map#2309Bug Fixes
dbt-loomby @award1230 in Fix KeyError in graph selector when using + operator with dbt-loom ex… #2389compiled_sqlforInvocationMode.SUBPROCESSinExecutionMode.WATCHERby @pankajkoti in Populate compiled_sql for InvocationMode.SUBPROCESS in ExecutionMode.WATCHER #2319extra_contextfor watcher consumer task instances by @pankajkoti in Preserve extra_context for watcher consumer task instances #2381deferrable=Falsefromoperator_argson consumer sensor by @pankajkoti in fix(watcher): Respect deferrable=False from operator_args on consumer sensor #2384LoadMode.DBT_MANIFESTandRenderConfig.selectorby @YourRoyalLinus in Error handle invalid YAML withLoadMode.DBT_MANIFESTandRenderConfig.selector#2316Docs
ExecutionMode.KUBERNETESlimitations by @tatiana in DocumentExecutionMode.KUBERNETESlimitations #2326Others
WATCHER_KUBERNETEScallback #2307 by @tatiana in Add tests for PR #2307 #2308Closes: https://github.com/astronomer/oss-integrations-private/issues/333