Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,26 @@
Changelog
=========

1.12.1 (2026-01-14)
----------------------

Bug Fixes

* Fix ``DbtSourceWatcherOperator.template_fields`` to inherit from ``DbtSourceLocalOperator`` instead of ``DbtConsumerWatcherSensor`` by @pankajkoti in #2226
* Fix TypeError in Watcher mode with subprocess invocation by @pankajkoti in #2227
Comment thread
tatiana marked this conversation as resolved.
* Error when RenderConfig.invocation_mode is incorrectly set by @tatiana in #2267

Docs

* Fix minor documentation typo by @dnskr in #2093
* Fix default values in documentation by @dnskr in #2092
* Remove emit event for ExecutionMode.AIRFLOW_ASYNC limitation in docs by @pankajastro in #2214

Others

* Add test to check profile metrics with non-cosmos operator by @pankajastro in #2215
Comment thread
tatiana marked this conversation as resolved.
* Fix CI main branch Airflow 2.6 tests by @tatiana in #2268

1.12.0 (2025-12-18)
----------------------

Expand Down
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from cosmos import settings

__version__ = "1.12.0"
__version__ = "1.12.1"

if not settings.enable_memory_optimised_imports:
from cosmos.airflow.dag import DbtDag
Expand Down
3 changes: 1 addition & 2 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ class ProjectConfig:
:param copy_dbt_packages: Copy dbt_packages directory, if it exists, instead of creating a symbolic link. If not set, fetches the value from [cosmos]default_copy_dbt_packages (False by default).
:param models_relative_path: The relative path to the dbt models directory within the project. Defaults to models
:param seeds_relative_path: The relative path to the dbt seeds directory within the project. Defaults to seeds
:param snapshots_relative_path: The relative path to the dbt snapshots directory within the project. Defaults to
snapshots
:param snapshots_relative_path: The relative path to the dbt snapshots directory within the project. Defaults to snapshots
:param manifest_path: The absolute path to the dbt manifest file. Defaults to None
:param manifest_conn_id: Name of the Airflow connection used to access the manifest file if it is not stored locally. Defaults to None
:param project_name: Allows the user to define the project name.
Expand Down
13 changes: 12 additions & 1 deletion cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
from cosmos import cache, settings
from cosmos.airflow.graph import build_airflow_graph
from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import ExecutionMode, LoadMode
from cosmos.constants import ExecutionMode, InvocationMode, LoadMode
from cosmos.dbt.executable import get_system_dbt, is_dbt_installed_in_same_environment
from cosmos.dbt.graph import DbtGraph
from cosmos.dbt.project import has_non_empty_dependencies_file
from cosmos.dbt.selector import retrieve_by_label
Expand Down Expand Up @@ -174,6 +175,16 @@ def validate_initial_user_config(
"please use ProjectConfig.env_vars instead."
)

if render_config is not None and render_config.invocation_mode == InvocationMode.DBT_RUNNER:
if not is_dbt_installed_in_same_environment():
raise CosmosValueError(
"RenderConfig.invocation_mode is set to InvocationMode.DBT_RUNNER, but dbt is not installed in the same environment as Airflow. Use InvocationMode.DBT_SUBPROCESS instead."
)
if render_config.dbt_executable_path and render_config.dbt_executable_path != get_system_dbt():
raise CosmosValueError(
"RenderConfig.dbt_executable_path is set, but it is not the same as the system dbt executable path. Do not set render_config.dbt_executable_path when using InvocationMode.DBT_RUNNER."
)


def validate_changed_config_paths(
execution_config: ExecutionConfig | None, project_config: ProjectConfig, render_config: RenderConfig | None
Expand Down
13 changes: 13 additions & 0 deletions cosmos/dbt/executable.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
import shutil
from importlib.util import find_spec


def get_system_dbt() -> str:
"""
Tries to identify which is the path to the dbt executable, return "dbt" otherwise.
"""
return shutil.which("dbt") or "dbt"


def is_dbt_installed_in_same_environment() -> bool:
"""
Checks if dbt is installed in the same environment as the current one.
"""
try:
find_spec("dbt")
except ImportError:
return False
else:
return True
8 changes: 5 additions & 3 deletions cosmos/operators/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json
import logging
import zlib
from collections.abc import Callable
from collections.abc import Sequence
from datetime import timedelta
from pathlib import Path
from typing import TYPE_CHECKING, Any
Expand Down Expand Up @@ -109,7 +109,9 @@ class DbtProducerWatcherOperator(DbtBuildMixin, DbtLocalBaseOperator):
"""

template_fields = DbtLocalBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator]
_process_log_line_callable: Callable[[str, dict[str, Any]], None] | None = _store_dbt_resource_status_from_log
# Use staticmethod to prevent Python's descriptor protocol from binding the function to `self`
# when accessed via instance, which would incorrectly pass `self` as the first argument
_process_log_line_callable = staticmethod(_store_dbt_resource_status_from_log)

def __init__(self, *args: Any, **kwargs: Any) -> None:
task_id = kwargs.pop("task_id", "dbt_producer_watcher_operator")
Expand Down Expand Up @@ -502,7 +504,7 @@ class DbtSourceWatcherOperator(DbtSourceLocalOperator):
Executes a dbt source freshness command, synchronously, as ExecutionMode.LOCAL.
"""

template_fields: tuple[str, ...] = DbtConsumerWatcherSensor.template_fields
template_fields: Sequence[str] = DbtSourceLocalOperator.template_fields # type: ignore[assignment]


class DbtRunWatcherOperator(DbtConsumerWatcherSensor):
Expand Down
4 changes: 2 additions & 2 deletions docs/configuration/generating-docs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ You can use the :class:`~cosmos.operators.DbtDocsGCSOperator` to generate and up
from cosmos.operators import DbtDocsGCSOperator

# then, in your DAG code:
generate_dbt_docs_aws = DbtDocsGCSOperator(
generate_dbt_docs_gcs = DbtDocsGCSOperator(
task_id="generate_dbt_docs_gcs",
project_dir="path/to/jaffle_shop",
profile_config=profile_config,
Expand Down Expand Up @@ -113,7 +113,7 @@ The following code snippet shows how to provide this flag with the default jaffl
from cosmos.operators import DbtDocsGCSOperator

# then, in your DAG code:
generate_dbt_docs_aws = DbtDocsGCSOperator(
generate_dbt_docs_gcs = DbtDocsGCSOperator(
task_id="generate_dbt_docs_gcs",
project_dir="path/to/jaffle_shop",
profile_config=profile_config,
Expand Down
15 changes: 6 additions & 9 deletions docs/configuration/project-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@ The ``cosmos.config.ProjectConfig`` allows you to specify information about wher
variables that should be used for rendering and execution. It takes the following arguments:

- ``dbt_project_path``: The full path to your dbt project. This directory should have a ``dbt_project.yml`` file
- ``models_relative_path``: The path to your models directory, relative to the ``dbt_project_path``. This defaults to
``models/``
- ``seeds_relative_path``: The path to your seeds directory, relative to the ``dbt_project_path``. This defaults to
``data/``
- ``snapshots_relative_path``: The path to your snapshots directory, relative to the ``dbt_project_path``. This defaults
to ``snapshots/``
- ``models_relative_path``: The path to your models directory, relative to the ``dbt_project_path``. This defaults to ``models``
- ``seeds_relative_path``: The path to your seeds directory, relative to the ``dbt_project_path``. This defaults to ``seeds``
- ``snapshots_relative_path``: The path to your snapshots directory, relative to the ``dbt_project_path``. This defaults to ``snapshots``
- ``manifest_path``: The absolute path to your manifests directory. This is only required if you're using Cosmos' manifest
parsing mode. Along with supporting local paths for manifest parsing, starting with Cosmos 1.6.0, if you've
Airflow >= 2.8.0, Cosmos also supports remote paths for manifest parsing(e.g. S3 URL). See :ref:`parsing-methods` for more details.
Expand Down Expand Up @@ -39,9 +36,9 @@ Project Config Example

config = ProjectConfig(
dbt_project_path="/path/to/dbt/project",
models_relative_path="models",
seeds_relative_path="data",
snapshots_relative_path="snapshots",
models_relative_path="custom_models_folder",
seeds_relative_path="custom_seeds_folder",
snapshots_relative_path="custom_snapshots_folder",
manifest_path="/path/to/manifests",
env_vars={"MY_ENV_VAR": "my_env_value"},
dbt_vars={
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/render-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ It does this by exposing a ``cosmos.config.RenderConfig`` class that you can use

The ``RenderConfig`` class takes the following arguments:

- ``emit_datasets``: whether or not to emit Airflow datasets to be used for data-aware scheduling. Defaults to True. This feature is only available for ``ExecutionMode.LOCAL``, ``ExecutionMode.VIRTUALENV`` and ``ExecutionMode.WATCHER``.
- ``emit_datasets``: whether or not to emit Airflow datasets to be used for data-aware scheduling. Defaults to True. This feature is only available for ``ExecutionMode.LOCAL``, ``ExecutionMode.VIRTUALENV``, ``ExecutionMode.WATCHER`` and ``ExecutionMode.AIRFLOW_ASYNC``.
- ``test_behavior``: how to run tests. Defaults to running a model's tests immediately after the model is run. For more information, see the `Testing Behavior <testing-behavior.html>`_ section.
- ``load_method``: how to load your dbt project. See `Parsing Methods <parsing-methods.html>`_ for more information.
- ``invocation_mode``: (new in v1.9) how to run ``dbt ls``, when using ``LoadMode.DBT_LS``. Learn more about this below.
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/scheduling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ By default, if using a version between Airflow 2.4 or higher is used, Cosmos emi

.. important::

This feature is only available for ``ExecutionMode.LOCAL``, ``ExecutionMode.VIRTUALENV`` and ``ExecutionMode.WATCHER``.
This feature is only available for ``ExecutionMode.LOCAL``, ``ExecutionMode.VIRTUALENV``, ``ExecutionMode.WATCHER`` and ``ExecutionMode.AIRFLOW_ASYNC``.

Cosmos calculates these URIs during the task execution, by using the library `OpenLineage Integration Common <https://pypi.org/project/openlineage-integration-common/>`_.

Expand Down
3 changes: 0 additions & 3 deletions docs/getting_started/async-execution-mode.rst
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,4 @@ Limitations

9. **TeardownAsyncOperator limitation**: When using a remote object location, in addition to the ``SetupAsyncOperator``, a ``TeardownAsyncOperator`` is also added to the DAG. This task will delete the SQL files from the remote location by the end of the DAG Run. This is can lead to a limitation from a retry perspective, as described in the issue `#2066 <https://github.com/astronomer/astronomer-cosmos/issues/2066>`_. This can be avoided by setting the ``enable_teardown_async_task`` configuration to ``False``, as described in the :ref:`enable_teardown_async_task` section.

10. **Dataset events not emitted**: Dataset events are not currently emitted after dbt models complete when using ``ExecutionMode.AIRFLOW_ASYNC``. This means downstream DAGs scheduled with ``Dataset`` or ``DatasetAlias`` will not trigger automatically. This behaviour is present in ``ExecutionMode.LOCAL`` but is currently missing in async mode. This issue is being tracked in `#2141 <https://github.com/astronomer/astronomer-cosmos/issues/2141>`_.


For a comparison between different Cosmos execution modes, please, check the :ref:`execution-modes-comparison` section.
2 changes: 1 addition & 1 deletion scripts/test/pre-install-airflow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ uv pip install "gcsfs<2025.3.0"
if [ "$AIRFLOW_VERSION" = "2.6" ] ; then
uv pip install "apache-airflow-providers-amazon" "apache-airflow==$AIRFLOW_VERSION" "urllib3<2"
uv pip install "apache-airflow-providers-cncf-kubernetes" "apache-airflow==$AIRFLOW_VERSION"
uv pip install "apache-airflow-providers-google<10.11" "apache-airflow==$AIRFLOW_VERSION"
uv pip install "apache-airflow-providers-google<10.11" "httplib2==0.31.0" "apache-airflow==$AIRFLOW_VERSION"
uv pip install "apache-airflow-providers-microsoft-azure" "apache-airflow==$AIRFLOW_VERSION"
uv pip install "pydantic<2.0"
elif [ "$AIRFLOW_VERSION" = "2.7" ] ; then
Expand Down
29 changes: 29 additions & 0 deletions tests/listeners/test_task_instance_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,29 @@

from pathlib import Path
from types import SimpleNamespace
from typing import Any
from unittest.mock import patch

import pytest
from airflow.models.connection import Connection

try: # Airflow 3
from airflow.sdk import Context
except ImportError: # Airflow 2
from airflow.utils.context import Context


from cosmos import ProfileConfig
from cosmos.constants import InvocationMode
from cosmos.listeners import task_instance_listener
from cosmos.operators.base import AbstractDbtBase
from cosmos.profiles import get_automatic_profile_mapping

try:
from airflow.sdk.bases.operator import BaseOperator # Airflow 3
except ImportError:
from airflow.models import BaseOperator # Airflow 2

DBT_PROJECT_PROFILE = Path(__file__).parent.parent / "sample/mini/profiles.yml"


Expand All @@ -35,6 +47,11 @@ def mock_postgres_conn(): # type: ignore
yield conn


class DummyOperator(BaseOperator):
def execute(self, context: Context) -> Any:
pass


class DummyDbtOperator(AbstractDbtBase):
base_cmd = ["run"]

Expand Down Expand Up @@ -155,6 +172,18 @@ def test_profile_file_metrics():
assert metrics["database"] == "postgres"


def test_profile_metrics_with_non_cosmos_operator():
operator = DummyOperator(task_id="test")
ti = _make_task_instance(operator)
metrics = task_instance_listener._build_task_metrics(ti, status="success")

assert metrics["operator_name"] == "DummyOperator"
assert metrics["is_cosmos_operator_subclass"] is False
assert metrics["profile_strategy"] is None
assert metrics["profile_mapping_class"] is None
assert metrics["database"] is None


def test_build_task_metrics_records_core_fields():
operator = DummyDbtOperator()
ti = _make_task_instance(operator)
Expand Down
Loading
Loading