From 4fdfa68ae81b0195f3e7d2d9599aef04ecdfb59e Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Fri, 19 Dec 2025 16:07:48 +0530 Subject: [PATCH 01/14] Add test to check profile metrics with non-cosmos operator (#2215) Address: https://github.com/astronomer/astronomer-cosmos/pull/2198#pullrequestreview-3593551753 This pull request adds a test to verify that profile metrics are correctly handled when dealing with non-Cosmos operators. The test ensures that when `_build_task_metrics` is called on a task instance with a regular Airflow operator (not a dbt/Cosmos operator), the profile-related metrics return appropriate `None` values. **Key changes:** - Added a `DummyOperator` class that extends `BaseOperator` without Cosmos-specific functionality - Added `test_profile_metrics_with_non_cosmos_operator()` to validate metrics behavior with non-Cosmos operators --- .../listeners/test_task_instance_listener.py | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/tests/listeners/test_task_instance_listener.py b/tests/listeners/test_task_instance_listener.py index 2afce2809c..1ac1c5a395 100644 --- a/tests/listeners/test_task_instance_listener.py +++ b/tests/listeners/test_task_instance_listener.py @@ -2,17 +2,29 @@ from pathlib import Path from types import SimpleNamespace +from typing import Any from unittest.mock import patch import pytest from airflow.models.connection import Connection +try: # Airflow 3 + from airflow.sdk import Context +except ImportError: # Airflow 2 + from airflow.utils.context import Context + + from cosmos import ProfileConfig from cosmos.constants import InvocationMode from cosmos.listeners import task_instance_listener from cosmos.operators.base import AbstractDbtBase from cosmos.profiles import get_automatic_profile_mapping +try: + from airflow.sdk.bases.operator import BaseOperator # Airflow 3 +except ImportError: + from airflow.models import BaseOperator # Airflow 2 + DBT_PROJECT_PROFILE = Path(__file__).parent.parent / "sample/mini/profiles.yml" @@ -35,6 +47,11 @@ def mock_postgres_conn(): # type: ignore yield conn +class DummyOperator(BaseOperator): + def execute(self, context: Context) -> Any: + pass + + class DummyDbtOperator(AbstractDbtBase): base_cmd = ["run"] @@ -155,6 +172,18 @@ def test_profile_file_metrics(): assert metrics["database"] == "postgres" +def test_profile_metrics_with_non_cosmos_operator(): + operator = DummyOperator(task_id="test") + ti = _make_task_instance(operator) + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["operator_name"] == "DummyOperator" + assert metrics["is_cosmos_operator_subclass"] is False + assert metrics["profile_strategy"] is None + assert metrics["profile_mapping_class"] is None + assert metrics["database"] is None + + def test_build_task_metrics_records_core_fields(): operator = DummyDbtOperator() ti = _make_task_instance(operator) From b81a27909c96451ed6813887059af46957037ffb Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Fri, 19 Dec 2025 16:08:15 +0530 Subject: [PATCH 02/14] Remove emit event for ExecutionMode.AIRFLOW_ASYNC limitation in docs (#2214) Related to: https://github.com/astronomer/astronomer-cosmos/issues/2141 This PR updates the documentation to reflect that dataset emission is now supported in `ExecutionMode.AIRFLOW_ASYNC` since Cosmos 1.12.0. The limitation that prevented dataset events from being emitted in async mode has been resolved. - Removes the documented limitation about dataset events not being emitted in AIRFLOW_ASYNC mode - Updates documentation to include AIRFLOW_ASYNC in the list of execution modes supporting the `emit_datasets` feature --- docs/configuration/render-config.rst | 2 +- docs/configuration/scheduling.rst | 2 +- docs/getting_started/async-execution-mode.rst | 3 --- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index 95687ac4db..5ac27e03cb 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -7,7 +7,7 @@ It does this by exposing a ``cosmos.config.RenderConfig`` class that you can use The ``RenderConfig`` class takes the following arguments: -- ``emit_datasets``: whether or not to emit Airflow datasets to be used for data-aware scheduling. Defaults to True. This feature is only available for ``ExecutionMode.LOCAL``, ``ExecutionMode.VIRTUALENV`` and ``ExecutionMode.WATCHER``. +- ``emit_datasets``: whether or not to emit Airflow datasets to be used for data-aware scheduling. Defaults to True. This feature is only available for ``ExecutionMode.LOCAL``, ``ExecutionMode.VIRTUALENV``, ``ExecutionMode.WATCHER`` and ``ExecutionMode.AIRFLOW_ASYNC``. - ``test_behavior``: how to run tests. Defaults to running a model's tests immediately after the model is run. For more information, see the `Testing Behavior `_ section. - ``load_method``: how to load your dbt project. See `Parsing Methods `_ for more information. - ``invocation_mode``: (new in v1.9) how to run ``dbt ls``, when using ``LoadMode.DBT_LS``. Learn more about this below. diff --git a/docs/configuration/scheduling.rst b/docs/configuration/scheduling.rst index b6e812a136..d9e860ec2a 100644 --- a/docs/configuration/scheduling.rst +++ b/docs/configuration/scheduling.rst @@ -31,7 +31,7 @@ By default, if using a version between Airflow 2.4 or higher is used, Cosmos emi .. important:: - This feature is only available for ``ExecutionMode.LOCAL``, ``ExecutionMode.VIRTUALENV`` and ``ExecutionMode.WATCHER``. + This feature is only available for ``ExecutionMode.LOCAL``, ``ExecutionMode.VIRTUALENV``, ``ExecutionMode.WATCHER`` and ``ExecutionMode.AIRFLOW_ASYNC``. Cosmos calculates these URIs during the task execution, by using the library `OpenLineage Integration Common `_. diff --git a/docs/getting_started/async-execution-mode.rst b/docs/getting_started/async-execution-mode.rst index aa6e96b0b3..b502abd865 100644 --- a/docs/getting_started/async-execution-mode.rst +++ b/docs/getting_started/async-execution-mode.rst @@ -248,7 +248,4 @@ Limitations 9. **TeardownAsyncOperator limitation**: When using a remote object location, in addition to the ``SetupAsyncOperator``, a ``TeardownAsyncOperator`` is also added to the DAG. This task will delete the SQL files from the remote location by the end of the DAG Run. This is can lead to a limitation from a retry perspective, as described in the issue `#2066 `_. This can be avoided by setting the ``enable_teardown_async_task`` configuration to ``False``, as described in the :ref:`enable_teardown_async_task` section. -10. **Dataset events not emitted**: Dataset events are not currently emitted after dbt models complete when using ``ExecutionMode.AIRFLOW_ASYNC``. This means downstream DAGs scheduled with ``Dataset`` or ``DatasetAlias`` will not trigger automatically. This behaviour is present in ``ExecutionMode.LOCAL`` but is currently missing in async mode. This issue is being tracked in `#2141 `_. - - For a comparison between different Cosmos execution modes, please, check the :ref:`execution-modes-comparison` section. From b56236c5b99f8eb5253a901877dd356de0413d24 Mon Sep 17 00:00:00 2001 From: Denis Krivenko Date: Mon, 22 Dec 2025 11:56:27 +0100 Subject: [PATCH 03/14] docs: Fix default values in documentation (#2092) The PR fixes default values in the documentation for `models_relative_path`, `seeds_relative_path` and `snapshots_relative_path`. Current values are taken from: https://github.com/astronomer/astronomer-cosmos/blob/975ebae2b4c6ef75dd442678fb538771bc717829/cosmos/config.py#L166-L168 Before changes: image After changes: image --- cosmos/config.py | 3 +-- docs/configuration/project-config.rst | 15 ++++++--------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index 6ae2a527a8..de07e59250 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -166,8 +166,7 @@ class ProjectConfig: :param copy_dbt_packages: Copy dbt_packages directory, if it exists, instead of creating a symbolic link. If not set, fetches the value from [cosmos]default_copy_dbt_packages (False by default). :param models_relative_path: The relative path to the dbt models directory within the project. Defaults to models :param seeds_relative_path: The relative path to the dbt seeds directory within the project. Defaults to seeds - :param snapshots_relative_path: The relative path to the dbt snapshots directory within the project. Defaults to - snapshots + :param snapshots_relative_path: The relative path to the dbt snapshots directory within the project. Defaults to snapshots :param manifest_path: The absolute path to the dbt manifest file. Defaults to None :param manifest_conn_id: Name of the Airflow connection used to access the manifest file if it is not stored locally. Defaults to None :param project_name: Allows the user to define the project name. diff --git a/docs/configuration/project-config.rst b/docs/configuration/project-config.rst index 7eb771f1f3..4435f07873 100644 --- a/docs/configuration/project-config.rst +++ b/docs/configuration/project-config.rst @@ -5,12 +5,9 @@ The ``cosmos.config.ProjectConfig`` allows you to specify information about wher variables that should be used for rendering and execution. It takes the following arguments: - ``dbt_project_path``: The full path to your dbt project. This directory should have a ``dbt_project.yml`` file -- ``models_relative_path``: The path to your models directory, relative to the ``dbt_project_path``. This defaults to - ``models/`` -- ``seeds_relative_path``: The path to your seeds directory, relative to the ``dbt_project_path``. This defaults to - ``data/`` -- ``snapshots_relative_path``: The path to your snapshots directory, relative to the ``dbt_project_path``. This defaults - to ``snapshots/`` +- ``models_relative_path``: The path to your models directory, relative to the ``dbt_project_path``. This defaults to ``models`` +- ``seeds_relative_path``: The path to your seeds directory, relative to the ``dbt_project_path``. This defaults to ``seeds`` +- ``snapshots_relative_path``: The path to your snapshots directory, relative to the ``dbt_project_path``. This defaults to ``snapshots`` - ``manifest_path``: The absolute path to your manifests directory. This is only required if you're using Cosmos' manifest parsing mode. Along with supporting local paths for manifest parsing, starting with Cosmos 1.6.0, if you've Airflow >= 2.8.0, Cosmos also supports remote paths for manifest parsing(e.g. S3 URL). See :ref:`parsing-methods` for more details. @@ -39,9 +36,9 @@ Project Config Example config = ProjectConfig( dbt_project_path="/path/to/dbt/project", - models_relative_path="models", - seeds_relative_path="data", - snapshots_relative_path="snapshots", + models_relative_path="custom_models_folder", + seeds_relative_path="custom_seeds_folder", + snapshots_relative_path="custom_snapshots_folder", manifest_path="/path/to/manifests", env_vars={"MY_ENV_VAR": "my_env_value"}, dbt_vars={ From 2f6e1d3f1b17b0d97e4a66ec2afe5f60e64a3244 Mon Sep 17 00:00:00 2001 From: Denis Krivenko Date: Mon, 22 Dec 2025 20:41:42 +0100 Subject: [PATCH 04/14] Fix minor documentation typo (#2093) ## Description Fix minor typo in the documentation. --- docs/configuration/generating-docs.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/configuration/generating-docs.rst b/docs/configuration/generating-docs.rst index ad9b926438..a0c4eb91db 100644 --- a/docs/configuration/generating-docs.rst +++ b/docs/configuration/generating-docs.rst @@ -80,7 +80,7 @@ You can use the :class:`~cosmos.operators.DbtDocsGCSOperator` to generate and up from cosmos.operators import DbtDocsGCSOperator # then, in your DAG code: - generate_dbt_docs_aws = DbtDocsGCSOperator( + generate_dbt_docs_gcs = DbtDocsGCSOperator( task_id="generate_dbt_docs_gcs", project_dir="path/to/jaffle_shop", profile_config=profile_config, @@ -113,7 +113,7 @@ The following code snippet shows how to provide this flag with the default jaffl from cosmos.operators import DbtDocsGCSOperator # then, in your DAG code: - generate_dbt_docs_aws = DbtDocsGCSOperator( + generate_dbt_docs_gcs = DbtDocsGCSOperator( task_id="generate_dbt_docs_gcs", project_dir="path/to/jaffle_shop", profile_config=profile_config, From 098892e0b8f77cec2f0a2b1dae0f194af9ad1de8 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 26 Dec 2025 14:42:19 +0530 Subject: [PATCH 05/14] Fix TypeError in Watcher mode with subprocess invocation (#2227) When using ExecutionMode.WATCHER with InvocationMode.SUBPROCESS, the _process_log_line_callable was being incorrectly bound as a method when accessed through an instance. This caused Python's descriptor protocol to pass 'self' as the first argument, resulting in: TypeError: _store_dbt_resource_status_from_log() takes 2 positional arguments but 3 were given The fix wraps the function with staticmethod() to prevent binding when accessed via self._process_log_line_callable. Also adds comprehensive tests for the _store_dbt_resource_status_from_log function and verifies the callable is not bound as a method. closes: #2224 --- cosmos/operators/watcher.py | 5 +- tests/operators/test_watcher.py | 129 ++++++++++++++++++++++++++++++++ 2 files changed, 132 insertions(+), 2 deletions(-) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 3318e25540..1c44191df8 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -4,7 +4,6 @@ import json import logging import zlib -from collections.abc import Callable from datetime import timedelta from pathlib import Path from typing import TYPE_CHECKING, Any @@ -109,7 +108,9 @@ class DbtProducerWatcherOperator(DbtBuildMixin, DbtLocalBaseOperator): """ template_fields = DbtLocalBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator] - _process_log_line_callable: Callable[[str, dict[str, Any]], None] | None = _store_dbt_resource_status_from_log + # Use staticmethod to prevent Python's descriptor protocol from binding the function to `self` + # when accessed via instance, which would incorrectly pass `self` as the first argument + _process_log_line_callable = staticmethod(_store_dbt_resource_status_from_log) def __init__(self, *args: Any, **kwargs: Any) -> None: task_id = kwargs.pop("task_id", "dbt_producer_watcher_operator") diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index cc506495c7..eb545bb879 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -27,6 +27,7 @@ DbtRunWatcherOperator, DbtSeedWatcherOperator, DbtTestWatcherOperator, + _store_dbt_resource_status_from_log, ) from cosmos.profiles import PostgresUserPasswordProfileMapping, get_automatic_profile_mapping from tests.utils import AIRFLOW_VERSION, new_test_dag @@ -425,6 +426,134 @@ def fake_build_run(self, context, **kw): assert data["results"][0]["status"] == "success" +class TestStoreDbStatusFromLog: + """Tests for _store_dbt_resource_status_from_log and _process_log_line_callable.""" + + def test_store_dbt_resource_status_from_log_success(self): + """Test that success status is correctly parsed and stored in XCom.""" + ti = _MockTI() + ctx = {"ti": ti} + + log_line = json.dumps({"data": {"node_info": {"node_status": "success", "unique_id": "model.pkg.my_model"}}}) + + _store_dbt_resource_status_from_log(log_line, {"context": ctx}) + + assert ti.store.get("model__pkg__my_model_status") == "success" + + def test_store_dbt_resource_status_from_log_failed(self): + """Test that failed status is correctly parsed and stored in XCom.""" + ti = _MockTI() + ctx = {"ti": ti} + + log_line = json.dumps({"data": {"node_info": {"node_status": "failed", "unique_id": "model.pkg.failed_model"}}}) + + _store_dbt_resource_status_from_log(log_line, {"context": ctx}) + + assert ti.store.get("model__pkg__failed_model_status") == "failed" + + def test_store_dbt_resource_status_from_log_ignores_other_statuses(self): + """Test that statuses other than success/failed are ignored.""" + ti = _MockTI() + ctx = {"ti": ti} + + log_line = json.dumps( + {"data": {"node_info": {"node_status": "running", "unique_id": "model.pkg.running_model"}}} + ) + + _store_dbt_resource_status_from_log(log_line, {"context": ctx}) + + assert "model__pkg__running_model_status" not in ti.store + + def test_store_dbt_resource_status_from_log_handles_invalid_json(self, caplog): + """Test that invalid JSON doesn't raise an exception.""" + ti = _MockTI() + ctx = {"ti": ti} + + # Should not raise an exception + _store_dbt_resource_status_from_log("not valid json {{{", {"context": ctx}) + + # No status should be stored + assert len(ti.store) == 0 + + def test_store_dbt_resource_status_from_log_handles_missing_node_info(self): + """Test that missing node_info doesn't raise an exception.""" + ti = _MockTI() + ctx = {"ti": ti} + + log_line = json.dumps({"data": {"other_key": "value"}}) + + # Should not raise an exception + _store_dbt_resource_status_from_log(log_line, {"context": ctx}) + + # No status should be stored + assert len(ti.store) == 0 + + def test_process_log_line_callable_is_not_bound_method(self): + """Test that _process_log_line_callable is not bound as a method when accessed through an instance. + + This test verifies the fix for the bug where accessing _process_log_line_callable through + an instance would create a bound method, causing 'self' to be passed as the first argument. + """ + import inspect + + op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) + + # Access the callable through the instance + callable_from_instance = op._process_log_line_callable + + # Verify it's not a bound method (which would have __self__ attribute) + assert not inspect.ismethod( + callable_from_instance + ), "_process_log_line_callable should not be a bound method when accessed through instance" + + # Verify it's the original function + assert callable_from_instance is _store_dbt_resource_status_from_log + + def test_process_log_line_callable_accepts_two_arguments(self): + """Test that the callable can be called with exactly 2 arguments (line, kwargs). + + This tests the integration pattern used in subprocess.py where process_log_line(line, kwargs) is called. + """ + op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) + callable_from_instance = op._process_log_line_callable + + ti = _MockTI() + ctx = {"ti": ti} + + log_line = json.dumps({"data": {"node_info": {"node_status": "success", "unique_id": "model.pkg.test_model"}}}) + + # This should NOT raise TypeError about wrong number of arguments + callable_from_instance(log_line, {"context": ctx}) + + assert ti.store.get("model__pkg__test_model_status") == "success" + + def test_process_log_line_callable_integration_with_subprocess_pattern(self): + """Test the exact pattern used in subprocess.py: process_log_line(line, kwargs).""" + op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) + + ti = _MockTI() + ctx = {"ti": ti} + + # Simulate the kwargs dict that subprocess.py passes + kwargs = {"context": ctx, "other_param": "value"} + + log_lines = [ + json.dumps({"data": {"node_info": {"node_status": "success", "unique_id": "model.pkg.model_a"}}}), + json.dumps({"data": {"node_info": {"node_status": "failed", "unique_id": "model.pkg.model_b"}}}), + json.dumps({"info": {"msg": "Running with dbt=1.10.11"}}), # Non-node log line + ] + + # Simulate the subprocess.py pattern + process_log_line = op._process_log_line_callable + for line in log_lines: + if process_log_line: + process_log_line(line, kwargs) + + assert ti.store.get("model__pkg__model_a_status") == "success" + assert ti.store.get("model__pkg__model_b_status") == "failed" + assert len(ti.store) == 2 # Only success and failed statuses are stored + + @patch("cosmos.dbt.runner.is_available", return_value=False) @patch("cosmos.operators.watcher.DbtLocalBaseOperator.execute", return_value="done") def test_execute_discovers_invocation_mode(_mock_execute, _mock_is_available): From b2a4dca864006c90b9e45c1bd217f471d8645658 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 26 Dec 2025 15:38:55 +0530 Subject: [PATCH 06/14] Fix DbtSourceWatcherOperator template_fields inheritance (#2226) `DbtSourceWatcherOperator` should inherit `template_fields` from its parent `DbtSourceLocalOperator` instead of `DbtConsumerWatcherSensor`. The previous assignment incorrectly included fields like `model_unique_id` that do not exist on source operators, causing `AttributeError` when using sources with tests in `ExecutionMode.WATCHER` with `SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS`. closes: #2203 --- CHANGELOG.rst | 7 +++++++ cosmos/__init__.py | 2 +- cosmos/operators/watcher.py | 3 ++- tests/operators/test_watcher.py | 13 +++++++++++++ 4 files changed, 23 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 18824b4fc7..9e6ecefd77 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,13 @@ Changelog ========= +1.12.1a1 (2025-12-26) +---------------------- + +Bug Fixes + +* Fix ``DbtSourceWatcherOperator.template_fields`` to inherit from ``DbtSourceLocalOperator`` instead of ``DbtConsumerWatcherSensor`` by @pankajkoti in #2214 + 1.12.0 (2025-12-18) ---------------------- diff --git a/cosmos/__init__.py b/cosmos/__init__.py index d113ba240b..be4f762632 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -9,7 +9,7 @@ from cosmos import settings -__version__ = "1.12.0" +__version__ = "1.12.1a1" if not settings.enable_memory_optimised_imports: from cosmos.airflow.dag import DbtDag diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 1c44191df8..282e8706b5 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -4,6 +4,7 @@ import json import logging import zlib +from collections.abc import Sequence from datetime import timedelta from pathlib import Path from typing import TYPE_CHECKING, Any @@ -503,7 +504,7 @@ class DbtSourceWatcherOperator(DbtSourceLocalOperator): Executes a dbt source freshness command, synchronously, as ExecutionMode.LOCAL. """ - template_fields: tuple[str, ...] = DbtConsumerWatcherSensor.template_fields + template_fields: Sequence[str] = DbtSourceLocalOperator.template_fields # type: ignore[assignment] class DbtRunWatcherOperator(DbtConsumerWatcherSensor): diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index eb545bb879..d8d34886ed 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -1287,3 +1287,16 @@ def test_sensor_and_producer_different_param_values(mock_bigquery_conn): assert task.execution_timeout == timedelta(seconds=2) else: assert task.execution_timeout == timedelta(seconds=1) + + +def test_dbt_source_watcher_operator_template_fields(): + """Test that DbtSourceWatcherOperator doesn't include model_unique_id in template_fields.""" + from cosmos.operators.local import DbtSourceLocalOperator + from cosmos.operators.watcher import DbtSourceWatcherOperator + + # DbtSourceWatcherOperator should NOT have model_unique_id in template_fields + # because it runs locally and doesn't watch models, it executes source freshness + assert "model_unique_id" not in DbtSourceWatcherOperator.template_fields + + # DbtSourceWatcherOperator should inherit template_fields from DbtSourceLocalOperator + assert DbtSourceWatcherOperator.template_fields == DbtSourceLocalOperator.template_fields From 99b60e0b9a3007b5f519f6d5264623e1b576440f Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 29 Dec 2025 09:27:06 +0000 Subject: [PATCH 07/14] Update CHANGELOG for 1.12.1a1 release Add complete list of changes included in the 1.12.1a1 alpha release: - Bug fixes for DbtSourceWatcherOperator and Watcher mode - Documentation fixes and improvements - Additional test coverage for profile metrics --- CHANGELOG.rst | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 9e6ecefd77..18454c1509 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,12 +1,23 @@ Changelog ========= -1.12.1a1 (2025-12-26) +1.12.1a1 (2025-12-27) ---------------------- Bug Fixes -* Fix ``DbtSourceWatcherOperator.template_fields`` to inherit from ``DbtSourceLocalOperator`` instead of ``DbtConsumerWatcherSensor`` by @pankajkoti in #2214 +* Fix ``DbtSourceWatcherOperator.template_fields`` to inherit from ``DbtSourceLocalOperator`` instead of ``DbtConsumerWatcherSensor`` by @pankajkoti in #2226 +* Fix TypeError in Watcher mode with subprocess invocation by @pankajkoti in #2227 + +Docs + +* Fix minor documentation typo by @dnskr in #2093 +* Fix default values in documentation by @dnskr in #2092 +* Remove emit event for ExecutionMode.AIRFLOW_ASYNC limitation in docs by @pankajastro in #2214 + +Others + +* Add test to check profile metrics with non-cosmos operator by @pankajastro in #2215 1.12.0 (2025-12-18) ---------------------- From 2f5f89b3d6493a4e7db25003145e4ed244b5e4ea Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 29 Dec 2025 15:43:44 +0530 Subject: [PATCH 08/14] Apply suggestion from @pankajkoti --- CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 18454c1509..92521d8022 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,7 +1,7 @@ Changelog ========= -1.12.1a1 (2025-12-27) +1.12.1a1 (2025-12-29) ---------------------- Bug Fixes From 1633c69ddbee3438aca6e85ef638e22dbe00a6ed Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 14 Jan 2026 17:26:10 +0000 Subject: [PATCH 09/14] Apply suggestion from @pankajkoti Co-authored-by: Pankaj Koti --- CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 92521d8022..622ad31c0b 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,7 +1,7 @@ Changelog ========= -1.12.1a1 (2025-12-29) +1.12.1 (2026-01-14) ---------------------- Bug Fixes From ce07295c59ee394f425bbf7cd5431382035504eb Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 14 Jan 2026 17:28:19 +0000 Subject: [PATCH 10/14] Apply suggestion from @tatiana --- CHANGELOG.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 622ad31c0b..62e8193b56 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -18,6 +18,7 @@ Docs Others * Add test to check profile metrics with non-cosmos operator by @pankajastro in #2215 +* Fix CI main branch Airflow 2.6 tests by @tatiana in #2268 1.12.0 (2025-12-18) ---------------------- From daa013381038a380a984a2bf0c51dc70e4c5c34b Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 14 Jan 2026 17:28:28 +0000 Subject: [PATCH 11/14] Apply suggestion from @tatiana --- CHANGELOG.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 62e8193b56..945aee94ac 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -8,6 +8,7 @@ Bug Fixes * Fix ``DbtSourceWatcherOperator.template_fields`` to inherit from ``DbtSourceLocalOperator`` instead of ``DbtConsumerWatcherSensor`` by @pankajkoti in #2226 * Fix TypeError in Watcher mode with subprocess invocation by @pankajkoti in #2227 +* Error when RenderConfig.invocation_mode is incorrectly set by @tatiana in #2267 Docs From 669ff65dc97e82db91cf0449fad51beaba5390dc Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 14 Jan 2026 17:28:41 +0000 Subject: [PATCH 12/14] Apply suggestions from code review Co-authored-by: Pankaj Koti --- cosmos/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/__init__.py b/cosmos/__init__.py index be4f762632..bbfc2b5ff6 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -9,7 +9,7 @@ from cosmos import settings -__version__ = "1.12.1a1" +__version__ = "1.12.1" if not settings.enable_memory_optimised_imports: from cosmos.airflow.dag import DbtDag From 7282e829c609647f50cddb42336d9bba77770156 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 14 Jan 2026 17:22:13 +0000 Subject: [PATCH 13/14] Fix CI main branch Airflow 2.6 tests (#2268) Cosmos main branch Python 2.6 tests started failing, as observed while testing Python 3.10, AF 2.6 and dbt 1.11: ``` =========================== short test summary info ============================ ERROR tests/operators/_asynchronous/test_base.py - AttributeError: module 'pyparsing' has no attribute 'DelimitedList'. Did you mean: 'delimitedList'? ERROR tests/operators/_asynchronous/test_bigquery.py - AttributeError: module 'pyparsing' has no attribute 'DelimitedList'. Did you mean: 'delimitedList'? ERROR tests/test_example_dags_no_connections.py - AssertionError: assert not {'/home/runner/work/astronomer-cosmos/astronomer-cosmos/dev/dags/simple_dag_async.py': 'Traceback (most recent call la...s.operators._asynchronous.bigquery.DbtRunAirflowAsyncBigqueryOperator. Unable to find the specified operator class.\n'} + where {'/home/runner/work/astronomer-cosmos/astronomer-cosmos/dev/dags/simple_dag_async.py': 'Traceback (most recent call la...s.operators._asynchronous.bigquery.DbtRunAirflowAsyncBigqueryOperator. Unable to find the specified operator class.\n'} = .import_errors ``` https://github.com/astronomer/astronomer-cosmos/actions/runs/20999444973/job/60365156803 The issue seemed to be conflicting dependencies, particularly the installation of "httplib2==0.30.0" (failure) instead of "httplib2==0.31.0" (success) Example of success after this change: https://github.com/astronomer/astronomer-cosmos/actions/runs/21002268536/job/60375248518 (cherry picked from commit dbaef2c56087cd0b44c6dc3639db07084d239131) --- scripts/test/pre-install-airflow.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index 36bf121ac7..3bf9d1680f 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -44,7 +44,7 @@ uv pip install "gcsfs<2025.3.0" if [ "$AIRFLOW_VERSION" = "2.6" ] ; then uv pip install "apache-airflow-providers-amazon" "apache-airflow==$AIRFLOW_VERSION" "urllib3<2" uv pip install "apache-airflow-providers-cncf-kubernetes" "apache-airflow==$AIRFLOW_VERSION" - uv pip install "apache-airflow-providers-google<10.11" "apache-airflow==$AIRFLOW_VERSION" + uv pip install "apache-airflow-providers-google<10.11" "httplib2==0.31.0" "apache-airflow==$AIRFLOW_VERSION" uv pip install "apache-airflow-providers-microsoft-azure" "apache-airflow==$AIRFLOW_VERSION" uv pip install "pydantic<2.0" elif [ "$AIRFLOW_VERSION" = "2.7" ] ; then From 8511b6b86f027ceda2809c0eae32494c2f275bdc Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 14 Jan 2026 17:43:33 +0000 Subject: [PATCH 14/14] Error when `RenderConfig.invocation_mode` is incorrectly set (#2267) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make our configuration validation more robust, to guide users when they misconfigure Cosmos `RenderConfig.invocation_mode`. This configuration should only be used if dbt is installed in the same Python virtualenv as Cosmos. Recently, I observed a customer incorrectly configuring Cosmos as follows: ``` ExecutionConfig( dbt_executable_path=airflowHome + '/dbt_venv/bin/dbt', execution_mode=ExecutionMode.WATCHER, invocation_mode=InvocationMode.DBT_RUNNER ) ``` Which led to the error: ``` ModuleNotFoundError: No module named 'dbt' ``` This was raised in Cosmos 1.12, when trying to resolve `dbt.version` ``` cosmos/operators/watcher.py:_fallback_to_local_run → build_and_run_cmd → _generate_dbt_flags ``` (cherry picked from commit 4cfac6944a1377715f43f0433d39f5bedf5cc9e3) --- cosmos/converter.py | 13 ++++++++- cosmos/dbt/executable.py | 13 +++++++++ tests/test_converter.py | 62 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 87 insertions(+), 1 deletion(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index 854d4f2710..1a204c199b 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -23,7 +23,8 @@ from cosmos import cache, settings from cosmos.airflow.graph import build_airflow_graph from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig -from cosmos.constants import ExecutionMode, LoadMode +from cosmos.constants import ExecutionMode, InvocationMode, LoadMode +from cosmos.dbt.executable import get_system_dbt, is_dbt_installed_in_same_environment from cosmos.dbt.graph import DbtGraph from cosmos.dbt.project import has_non_empty_dependencies_file from cosmos.dbt.selector import retrieve_by_label @@ -174,6 +175,16 @@ def validate_initial_user_config( "please use ProjectConfig.env_vars instead." ) + if render_config is not None and render_config.invocation_mode == InvocationMode.DBT_RUNNER: + if not is_dbt_installed_in_same_environment(): + raise CosmosValueError( + "RenderConfig.invocation_mode is set to InvocationMode.DBT_RUNNER, but dbt is not installed in the same environment as Airflow. Use InvocationMode.DBT_SUBPROCESS instead." + ) + if render_config.dbt_executable_path and render_config.dbt_executable_path != get_system_dbt(): + raise CosmosValueError( + "RenderConfig.dbt_executable_path is set, but it is not the same as the system dbt executable path. Do not set render_config.dbt_executable_path when using InvocationMode.DBT_RUNNER." + ) + def validate_changed_config_paths( execution_config: ExecutionConfig | None, project_config: ProjectConfig, render_config: RenderConfig | None diff --git a/cosmos/dbt/executable.py b/cosmos/dbt/executable.py index d926e1c832..6a75bc7173 100644 --- a/cosmos/dbt/executable.py +++ b/cosmos/dbt/executable.py @@ -1,4 +1,5 @@ import shutil +from importlib.util import find_spec def get_system_dbt() -> str: @@ -6,3 +7,15 @@ def get_system_dbt() -> str: Tries to identify which is the path to the dbt executable, return "dbt" otherwise. """ return shutil.which("dbt") or "dbt" + + +def is_dbt_installed_in_same_environment() -> bool: + """ + Checks if dbt is installed in the same environment as the current one. + """ + try: + find_spec("dbt") + except ImportError: + return False + else: + return True diff --git a/tests/test_converter.py b/tests/test_converter.py index 8cf22834ae..12050c8e74 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -143,6 +143,68 @@ def test_validate_user_config_fails_project_config_render_config_env_vars(): validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) +@patch("cosmos.converter.is_dbt_installed_in_same_environment", return_value=False) +def test_validate_initial_user_config_dbt_runner_without_dbt_installed(mock_is_dbt_installed): + """Test that validation fails when using DBT_RUNNER but dbt is not installed in the same environment.""" + project_config = ProjectConfig() + execution_config = ExecutionConfig() + render_config = RenderConfig(invocation_mode=InvocationMode.DBT_RUNNER) + profile_config = MagicMock() + operator_args = {} + + expected_error_match = "RenderConfig.invocation_mode is set to InvocationMode.DBT_RUNNER, but dbt is not installed in the same environment as Airflow.*" + with pytest.raises(CosmosValueError, match=expected_error_match): + validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) + + +@patch("cosmos.converter.get_system_dbt", return_value="/usr/local/bin/dbt") +@patch("cosmos.converter.is_dbt_installed_in_same_environment", return_value=True) +def test_validate_initial_user_config_dbt_runner_with_different_dbt_executable_path( + mock_is_dbt_installed, mock_get_system_dbt +): + """Test that validation fails when using DBT_RUNNER with a custom dbt_executable_path that differs from system dbt.""" + project_config = ProjectConfig() + execution_config = ExecutionConfig() + render_config = RenderConfig(invocation_mode=InvocationMode.DBT_RUNNER, dbt_executable_path="/custom/path/to/dbt") + profile_config = MagicMock() + operator_args = {} + + expected_error_match = ( + "RenderConfig.dbt_executable_path is set, but it is not the same as the system dbt executable path.*" + ) + with pytest.raises(CosmosValueError, match=expected_error_match): + validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) + + +@patch("cosmos.converter.get_system_dbt", return_value="/usr/local/bin/dbt") +@patch("cosmos.converter.is_dbt_installed_in_same_environment", return_value=True) +def test_validate_initial_user_config_dbt_runner_with_matching_dbt_executable_path( + mock_is_dbt_installed, mock_get_system_dbt +): + """Test that validation passes when using DBT_RUNNER with a dbt_executable_path matching system dbt.""" + project_config = ProjectConfig() + execution_config = ExecutionConfig() + render_config = RenderConfig(invocation_mode=InvocationMode.DBT_RUNNER, dbt_executable_path="/usr/local/bin/dbt") + profile_config = MagicMock() + operator_args = {} + + # Should not raise any exception + validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) + + +@patch("cosmos.converter.is_dbt_installed_in_same_environment", return_value=True) +def test_validate_initial_user_config_dbt_runner_without_dbt_executable_path(mock_is_dbt_installed): + """Test that validation passes when using DBT_RUNNER without setting dbt_executable_path.""" + project_config = ProjectConfig() + execution_config = ExecutionConfig() + render_config = RenderConfig(invocation_mode=InvocationMode.DBT_RUNNER) + profile_config = MagicMock() + operator_args = {} + + # Should not raise any exception + validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) + + def test_validate_arguments_schema_in_task_args(): execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL, dbt_project_path="/tmp/project-dir") render_config = RenderConfig()