Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b05f621
Fix unit tests `CosmosRichLogger` crash on `None` msg and test pollut…
tatiana Apr 8, 2026
6491f36
Improve stability of cache hash unit tests (#2539)
tatiana Apr 8, 2026
1d48b67
Solve circular import error in Astro runtime while debugging (#2538)
tatiana Apr 8, 2026
27504d7
Fix mypy 1.20.0 type check failures (#2546)
pankajkoti Apr 9, 2026
d21e9c6
Add redirect for moved partial-parsing docs page (#2550)
tatiana Apr 10, 2026
e3b33db
Update watcher test behavior docs for Cosmos 1.14.0 (#2549)
tatiana Apr 10, 2026
1a33c8c
Fix duplicate logs in dbt build when source freshness is enabled (#2564)
pankajastro Apr 16, 2026
11efefb
Fix incorrectly skipped source downstream tasks in the watcher (#2563)
pankajastro Apr 17, 2026
cc001c9
Enable inlets and outlets using DBT Fusion on Airflow 3 (#2561)
ichirotakami Apr 17, 2026
e5e5e64
Warn and normalize when source_rendering_behavior=None is passed (#2570)
pankajastro Apr 17, 2026
6b9c092
Fix CI failures caused by docs build memory exhaustion (#2580)
pankajkoti Apr 20, 2026
ce2935c
Fix dbt Fusion broken integration tests (#2581)
tatiana Apr 21, 2026
d7cba19
YAML selector parsing should skip malformed selectors instead of fail…
YourRoyalLinus Apr 21, 2026
ed35214
Gracefully handle `Variable.set()` failures on Astro Remote Execution…
hkc-8010 Apr 21, 2026
e05627a
Fix watcher producer retries behaviour (#2559)
tatiana Apr 21, 2026
25d4afb
Fix flaky cosmos_manifest_selectors_example DAG in CI (#2593)
pankajkoti Apr 23, 2026
e427655
Bump version and draft Changelog
pankajkoti Apr 23, 2026
0843821
Reduce pre-commit autoupdate frequency PRs (#2544)
tatiana Apr 8, 2026
90ef922
Update changelog after cherry-picking PR #2544 to resolve precommit f…
pankajkoti Apr 23, 2026
f7b66eb
Include #2592 in Changelog
pankajkoti Apr 23, 2026
bba4a76
Include #2597 to Changelog
pankajkoti Apr 23, 2026
d4b3606
Merge branch 'release-1.14' into release-1.14.1
pankajkoti Apr 23, 2026
8887212
Update CHANGELOG.rst
pankajkoti Apr 23, 2026
1870a57
Apply suggestion from @pankajkoti
pankajkoti Apr 23, 2026
bcb52cb
Prevent watcher producer skip propagating to downstream tasks via gat…
tatiana Apr 23, 2026
3e08a0b
Keep watcher sensor polling when producer is still running (#2592)
pankajkoti Apr 23, 2026
081a349
Bump reviewdog/action-actionlint from 1.71.0 to 1.72.0 (#2542)
dependabot[bot] Apr 8, 2026
93b13cc
Restore memory-optimised imports docs for Cosmos < 1.14.0 (#2604)
pankajkoti Apr 23, 2026
306d2d2
Add docs related to `ExecutionMode.WATCHER` and `depends_on_past` lim…
tatiana Apr 23, 2026
c66c860
Update Changelog and alpha bump
pankajkoti Apr 23, 2026
8c24479
Revert "Keep watcher sensor polling when producer is still running (#…
pankajkoti Apr 23, 2026
5e985db
Remove #2592 from 1.14.1 changelog
pankajkoti Apr 23, 2026
343a309
Speed up Airflow 3.1+ integration tests by caching InProcessExecution…
pankajkoti Apr 10, 2026
d0c7f8c
Add PR 2547 to chanelog
pankajkoti Apr 23, 2026
66e96da
Apply suggestion from @pankajkoti
pankajkoti Apr 23, 2026
6485f09
Keep watcher sensor polling when producer is still running (#2592)
pankajkoti Apr 23, 2026
6f77bb2
Skip watcher gateway test on Airflow 3.0 (#2607)
tatiana Apr 23, 2026
949deeb
Add PR 25492 and 2607 to changelog
pankajkoti Apr 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/actionlint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2

- name: Validate workflow files
uses: reviewdog/action-actionlint@0d952c597ef8459f634d7145b0b044a9699e5e43 # v1.71.0
uses: reviewdog/action-actionlint@6fb7acc99f4a1008869fa8a0f09cfca740837d9d # v1.72.0
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
reporter: github-check
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ jobs:
.github/ISSUE_TEMPLATE/*) ;;
.github/dependabot.yml) ;;
.github/workflows/docs.yml) ;;
.github/workflows/docs-build.yml) ;;
.github/workflows/stale.yml) ;;
.github/workflows/actionlint.yml) ;;
.github/workflows/codeql.yml) ;;
Expand Down Expand Up @@ -243,6 +244,7 @@ jobs:
env:
AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0
AIRFLOW__COSMOS__ENABLE_CACHE_DBT_YAML_SELECTORS: 0
AIRFLOW__COSMOS__ENABLE_LAX_SELECTOR_PARSING: 0
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 120
AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT: 120
Expand Down Expand Up @@ -575,6 +577,7 @@ jobs:
env:
AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0
AIRFLOW__COSMOS__ENABLE_CACHE_DBT_YAML_SELECTORS: 0
AIRFLOW__COSMOS__ENABLE_LAX_SELECTOR_PARSING: 0
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ actionlint
# dbt_packages is a directory that gets created when you run dbt deps
dev/dags/dbt/jaffle_shop/dbt_packages/

# dbt package lockfile — removed so dbt resolves the version range in packages.yml at runtime
dev/dags/dbt/jaffle_shop/package-lock.yml

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
25 changes: 2 additions & 23 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,29 +100,8 @@ repos:
- id: blacken-docs
alias: black
additional_dependencies: [black>=26.3.1]
- repo: local
hooks:
- id: check-docs-build
name: Ensure the docs build without errors
entry: sphinx-build -b html docs docs/_build --fail-on-warning --fresh-env
language: python
additional_dependencies:
[
"aenum",
"deprecation",
"msgpack",
"apache-airflow",
"pydata-sphinx-theme>=0.16.0",
"sphinx",
"sphinx-autoapi",
"sphinx-autobuild",
"sphinx-reredirects",
"sphinxcontrib.mermaid",
]
pass_filenames: false
files: ^docs/
- repo: https://github.com/pre-commit/mirrors-mypy
rev: "v1.19.1"
rev: "v1.20.0"
hooks:
- id: mypy
name: mypy-python
Expand All @@ -142,7 +121,7 @@ exclude: "dev/dags/dbt/simple/dbt_packages/.*"

ci:
autofix_commit_msg: 🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
autoupdate_commit_msg: ⬆ [pre-commit.ci] pre-commit autoupdate
autoupdate_schedule: quarterly
skip:
- mypy # build of https://github.com/pre-commit/mirrors-mypy:types-PyYAML,types-attrs,attrs,types-requests,
#types-python-dateutil,apache-airflow@v1.5.0 for python@python3 exceeds tier max size 250MiB: 262.6MiB
36 changes: 36 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,42 @@
Changelog
=========

1.14.1 (2026-04-23)
-------------------

Bug Fixes

* Fix ``ExecutionMode.WATCHER`` producer retry behaviour by @tatiana in #2559
* Prevent watcher producer skip propagating to downstream tasks via gateway task by @johnhoran and @tatiana in #2597
* Keep watcher sensor polling when producer is still running by @pankajkoti in #2592
* Fix circular import error in Cosmos plugin discovery under Astro Runtime by @tatiana in #2538
* Fix ``CosmosRichLogger`` crash on ``None`` log message by @tatiana in #2540
* Enable inlets and outlets using dbt Fusion on Airflow 3 by @ichirotakami in #2561
* Fix incorrectly skipped source downstream tasks in ``ExecutionMode.WATCHER`` by @pankajastro in #2563
* Fix duplicate logs in ``dbt build`` when source freshness is enabled by @pankajastro in #2564
* Warn and normalize when ``source_rendering_behavior=None`` is passed by @pankajastro in #2570
* Gracefully handle ``Variable.set()`` failures on Astro Remote Execution by @hkc-8010 in #2573
* Skip malformed YAML selectors instead of failing entirely by @YourRoyalLinus in #2577

Docs

* Update watcher test behavior docs for Cosmos 1.14.0 by @tatiana in #2549
* Add redirect for moved partial-parsing docs page by @tatiana in #2550
* Document ``ExecutionMode.WATCHER`` and ``depends_on_past`` limitation by @tatiana in #2602
* Restore memory-optimised imports docs for Cosmos < 1.14.0 by @pankajkoti in #2604

Others

* Speed up Airflow 3.1+ integration tests by caching InProcessExecutionAPI by @pankajkoti in #2547
* Improve stability of cache hash unit tests by @tatiana in #2539
* Fix mypy 1.20.0 type check failures by @pankajkoti in #2546
* Fix CI failures caused by docs build memory exhaustion by @pankajkoti in #2580
* Fix dbt Fusion broken integration tests by @tatiana in #2581
* Fix flaky ``cosmos_manifest_selectors_example`` DAG in CI by @pankajkoti in #2593
* Reduce pre-commit autoupdate frequency PRs by @tatiana in #2544
* Bump ``reviewdog/action-actionlint`` from 1.71.0 to 1.72.0 by @dependabot in #2542
* Skip watcher gateway test on Airflow 3.0 by @tatiana in #2607

1.14.0 (2026-04-07)
---------------------

Expand Down
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import importlib

__version__ = "1.14.0"
__version__ = "1.14.1"


# Mapping of public names to their module paths for lazy loading via __getattr__.
Expand Down
25 changes: 20 additions & 5 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
DBT_SETUP_ASYNC_TASK_ID,
DBT_TEARDOWN_ASYNC_TASK_ID,
DEFAULT_DBT_RESOURCES,
PRODUCER_WATCHER_DONE_TASK_ID,
PRODUCER_WATCHER_TASK_ID,
SUPPORTED_BUILD_RESOURCES,
TESTABLE_DBT_RESOURCES,
Expand Down Expand Up @@ -664,7 +665,7 @@ def _add_dbt_setup_async_task(
else [task_or_taskgroup]
)
for task in node_tasks:
task.producer_task_id = setup_airflow_task.task_id # type: ignore[attr-defined]
task.producer_task_id = setup_airflow_task.task_id # type: ignore[union-attr]
if not task.upstream_list:
setup_airflow_task >> task

Expand Down Expand Up @@ -716,6 +717,20 @@ def _add_watcher_producer_task(
)
producer_airflow_task = create_airflow_task(producer_task_metadata, dag, task_group=task_group)
tasks_map[PRODUCER_WATCHER_TASK_ID] = producer_airflow_task

# For DbtTaskGroup, add a gate task that absorbs the producer's skip state
# so it does not propagate to tasks downstream of the group.
# Not needed for DbtDag where producer >> consumers with trigger_rule="always" handles this.
if task_group is not None:
from cosmos.operators._watcher.base import create_producer_done_task

producer_done_task = create_producer_done_task(
dag=dag,
task_group=task_group,
task_id=PRODUCER_WATCHER_DONE_TASK_ID,
)
producer_airflow_task >> producer_done_task

return producer_airflow_task


Expand All @@ -732,8 +747,8 @@ def _add_watcher_dependencies(
- make the producer task to be the parent of the root dbt nodes, without blocking them from sensing XCom
"""
for node_id, task_or_taskgroup in tasks_map.items():
# We do not want to set a dependency between the producer task and itself
if node_id == PRODUCER_WATCHER_TASK_ID:
# We do not want to set a dependency between the producer task (or its gate) and itself
if node_id in (PRODUCER_WATCHER_TASK_ID, PRODUCER_WATCHER_DONE_TASK_ID):
continue

node_tasks = (
Expand All @@ -742,7 +757,7 @@ def _add_watcher_dependencies(
else [task_or_taskgroup]
)
for task in node_tasks:
task.producer_task_id = producer_airflow_task.task_id # type: ignore[attr-defined]
task.producer_task_id = producer_airflow_task.task_id # type: ignore[union-attr]

# Make the producer task to be the parent of the root dbt nodes, without blocking them from sensing XCom
# We only managed to do this in the case of DbtDag.
Expand All @@ -761,7 +776,7 @@ def _add_watcher_dependencies(
else:
always_run_tasks = [task_or_taskgroup]
for task in always_run_tasks:
task.trigger_rule = task_args.get("trigger_rule", "always") # type: ignore[attr-defined]
task.trigger_rule = task_args.get("trigger_rule", "always") # type: ignore[union-attr]


def should_create_detached_nodes(render_config: RenderConfig) -> bool:
Expand Down
18 changes: 15 additions & 3 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from collections.abc import Callable, Iterator
from dataclasses import InitVar, dataclass, field
from pathlib import Path
from typing import Any
from typing import TYPE_CHECKING, Any

import yaml

Expand All @@ -34,7 +34,9 @@
from cosmos.dbt.executable import get_system_dbt
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
from cosmos.profiles import BaseProfileMapping

if TYPE_CHECKING:
from cosmos.profiles import BaseProfileMapping

logger = get_logger(__name__)

Expand Down Expand Up @@ -116,6 +118,14 @@ def __post_init__(self, dbt_project_path: str | Path | None) -> None:
"RenderConfig.dbt_deps is deprecated since Cosmos 1.9 and will be removed in Cosmos 2.0. Use ProjectConfig.install_dbt_deps instead.",
DeprecationWarning,
)
if self.source_rendering_behavior is None:
warnings.warn(
"Passing None for source_rendering_behavior is not supported. "
"Use SourceRenderingBehavior.NONE to disable source rendering. "
"Defaulting to SourceRenderingBehavior.NONE.",
UserWarning,
)
self.source_rendering_behavior = SourceRenderingBehavior.NONE
self.project_path = Path(dbt_project_path) if dbt_project_path else None
# allows us to initiate this attribute from Path objects and str
self.dbt_ls_path = Path(self.dbt_ls_path) if self.dbt_ls_path else None
Expand Down Expand Up @@ -321,7 +331,9 @@ def validate_profiles_yml(self) -> None:
raise CosmosValueError(f"The file {self.profiles_yml_filepath} does not exist.")

def get_profile_type(self) -> str:
if isinstance(self.profile_mapping, BaseProfileMapping):
from cosmos.profiles.base import BaseProfileMapping as _BaseProfileMapping

if isinstance(self.profile_mapping, _BaseProfileMapping):
return str(self.profile_mapping.dbt_profile_type)

profile_path = self._get_profile_path()
Expand Down
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def _missing_value_(cls, value): # type: ignore
CONSUMER_WATCHER_DEFAULT_PRIORITY_WEIGHT = 2
PRODUCER_WATCHER_DEFAULT_PRIORITY_WEIGHT = 20
PRODUCER_WATCHER_TASK_ID = "dbt_producer_watcher"
PRODUCER_WATCHER_DONE_TASK_ID = f"{PRODUCER_WATCHER_TASK_ID}_done"

# Historical telemetry endpoints retained for reference:
# • v1 (Cosmos 1.8.0–1.10.x)
Expand Down
51 changes: 44 additions & 7 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,40 @@ def get_dbt_yaml_selectors_cache_key_args(self, impl_version: str) -> list[str]:
logger.debug(f"Value of `dbt_yaml_selectors_cache_key` for <{self.cache_key}>: {cache_args}")
return cache_args

def _save_cache_to_variable(self, cache_dict: dict[str, Any], cache_name: str) -> None:
"""Write cache_dict to an Airflow Variable, warning on AirflowRuntimeError.

This failure can be expected in DAG parsing environments where the scheduler or
DAG processor does not have direct access to a usable Airflow metadata database
(for example, when the ``variable`` table is unavailable).
"""
try:
from airflow.sdk.exceptions import AirflowRuntimeError
except ImportError:
Variable.set(self.cache_key, cache_dict, serialize_json=True)
return

is_yaml_cache = "yaml" in cache_name.lower()
disable_cache_env_var = (
"AIRFLOW__COSMOS__ENABLE_CACHE_DBT_YAML_SELECTORS"
if is_yaml_cache
else "AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS"
)
cache_specific_workaround = "" if is_yaml_cache else ", using LoadMode.DBT_MANIFEST"
try:
Variable.set(self.cache_key, cache_dict, serialize_json=True)
except AirflowRuntimeError as e:
logger.warning(
"Failed to save Cosmos %s cache to Airflow Variable '%s': %s. "
"Consider setting AIRFLOW__COSMOS__REMOTE_CACHE_DIR to use object storage for caching%s, "
"or disabling the cache via %s.",
cache_name,
self.cache_key,
e,
cache_specific_workaround,
disable_cache_env_var,
)

def save_dbt_ls_cache(self, dbt_ls_output: str) -> None:
"""
Store compressed dbt ls output into an Airflow Variable.
Expand Down Expand Up @@ -582,7 +616,7 @@ def save_dbt_ls_cache(self, dbt_ls_output: str) -> None:
with remote_cache_key_path.open("w") as fp:
json.dump(cache_dict, fp)
else:
Variable.set(self.cache_key, cache_dict, serialize_json=True)
self._save_cache_to_variable(cache_dict, "dbt ls")

def _get_dbt_ls_remote_cache(self, remote_cache_dir: Path | ObjectStoragePath) -> dict[str, str]:
"""Loads the remote cache for dbt ls."""
Expand Down Expand Up @@ -1104,7 +1138,7 @@ def save_yaml_selectors_cache(self, yaml_selectors: YamlSelectors) -> None:
with remote_cache_key_path.open("w") as fp:
json.dump(cache_dict, fp)
else:
Variable.set(self.cache_key, cache_dict, serialize_json=True)
self._save_cache_to_variable(cache_dict, "YAML selectors")

def parse_yaml_selectors(self, selector_definitions: dict[str, Any]) -> YamlSelectors:
"""
Expand All @@ -1116,8 +1150,9 @@ def parse_yaml_selectors(self, selector_definitions: dict[str, Any]) -> YamlSele
Returns:
YamlSelectors: A YamlSelectors instance
"""

yaml_selectors = YamlSelectors.parse(selector_definitions)
yaml_selectors = YamlSelectors.parse(
selector_definitions, lax_parsing_enabled=settings.enable_lax_selector_parsing
)

if self.should_use_yaml_selectors_cache():
self.save_yaml_selectors_cache(yaml_selectors)
Expand Down Expand Up @@ -1194,9 +1229,11 @@ def _apply_manifest_node_selection(self, nodes: dict[str, DbtNode], manifest: di
yaml_selectors = self.load_parsed_selectors(selector_definitions)
selections = yaml_selectors.get_parsed(self.render_config.selector)
if not selections:
raise CosmosLoadDbtException(
f"Selector `{self.render_config.selector}` not found in parsed YAML selectors `{selector_definitions}`"
)
error_message = f"Selector `{self.render_config.selector}` not found in parsed YAML selectors"
if settings.enable_lax_selector_parsing:
error_message += ". Check logs - the selector may have parsing errors logged as warnings"
raise CosmosLoadDbtException(error_message)

self.nodes = nodes
self.filtered_nodes = select_nodes(
project_dir=project_dir,
Expand Down
19 changes: 16 additions & 3 deletions cosmos/dbt/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -1349,14 +1349,15 @@ def _parse_from_definition(
return (select, exclude, errors)

@classmethod
def parse(cls, selectors: dict[str, dict[str, Any]]) -> YamlSelectors:
def parse(cls, selectors: dict[str, dict[str, Any]], lax_parsing_enabled: bool) -> YamlSelectors:
"""
Parse selector definitions from a dbt manifest into a YamlSelectors instance.

This is the main entry point for parsing manifest selector definitions. It processes all
selector definitions from the manifest and converts them to Cosmos format.

:param selectors: dict[str, dict[str, Any]] - Dictionary of selector definitions from the manifest, keyed by selector name
:param lax_parsing_enabled: bool - Flag to determine whether parser exceptions should be logged instead of raised
:return: YamlSelectors - A YamlSelectors instance containing both raw and parsed selector definitions

Example Input:
Expand Down Expand Up @@ -1386,12 +1387,24 @@ def parse(cls, selectors: dict[str, dict[str, Any]]) -> YamlSelectors:

for name, definition in selectors.items():
if not isinstance(definition, dict):
raise CosmosValueError(
error_message = (
f"Invalid selector definition for '{name}'. Expected a dict, got {type(definition)}: {definition}"
)
if lax_parsing_enabled:
logger.warning(error_message)
continue
else:
raise CosmosValueError(error_message)

if not definition.get("name") or not definition.get("definition"):
raise CosmosValueError(f"Selector definition for '{name}' must contain 'name' and 'definition' keys.")
error_message = f"Selector definition for '{name}' must contain 'name' and 'definition' keys."
if lax_parsing_enabled:
logger.warning(error_message)
continue
else:
raise CosmosValueError(
f"Selector definition for '{name}' must contain 'name' and 'definition' keys."
)

selector_name = definition["name"]
selector_definition = definition["definition"]
Expand Down
Loading
Loading