From d778719b671b899fcc7f4fc18897434688e1df81 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Mon, 26 Jan 2026 08:59:09 -0700 Subject: [PATCH 1/5] Drop support for Airflow < 2.9 Neither Runtime or Apache Airflow support 2.8 or earlier Airflow versions any longer, so we can drop support per our policy. --- .github/workflows/test.yml | 38 ++----------------- cosmos/cache.py | 20 +--------- cosmos/config.py | 36 ++++++------------ cosmos/io.py | 7 ---- cosmos/operators/_asynchronous/bigquery.py | 2 - cosmos/operators/local.py | 22 +++-------- cosmos/settings.py | 4 -- dev/dags/cosmos_callback_dag.py | 11 +++--- dev/dags/example_virtualenv.py | 6 +-- docs/compatibility-policy.rst | 4 +- docs/configuration/callbacks.rst | 2 +- docs/configuration/cosmos-conf.rst | 4 +- docs/configuration/lineage.rst | 8 +--- docs/configuration/parsing-methods.rst | 7 ++-- docs/configuration/project-config.rst | 3 +- docs/configuration/scheduling.rst | 2 +- docs/getting_started/async-execution-mode.rst | 20 +++++----- docs/requirements.txt | 2 +- pyproject.toml | 5 +-- scripts/test/integration-setup.sh | 9 ----- scripts/test/pre-install-airflow.sh | 24 +----------- tests/airflow/test_graph.py | 7 ---- tests/dbt/test_graph.py | 3 -- tests/operators/_asynchronous/test_base.py | 3 -- tests/operators/test_local.py | 17 --------- tests/operators/test_watcher.py | 15 ++------ tests/test_cache.py | 13 +------ tests/test_config.py | 27 ------------- tests/test_example_dags.py | 18 --------- tests/test_example_dags_no_connections.py | 15 -------- tests/test_io.py | 23 ----------- 31 files changed, 59 insertions(+), 318 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a42473de4f..39ed8fb703 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -55,26 +55,10 @@ jobs: fail-fast: false matrix: python-version: ["3.10", "3.11", "3.12", "3.13"] - airflow-version: ["2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0", "3.1"] + airflow-version: ["2.9", "2.10", "2.11", "3.0", "3.1"] dbt-version: ["1.10"] exclude: - # Apache Airflow versions prior to 2.9.0 have not been tested with Python 3.12. - # Official support for Python 3.12 and the corresponding constraints.txt are available only for Apache Airflow >= 2.9.0. - # See: https://github.com/apache/airflow/tree/2.9.0?tab=readme-ov-file#requirements - # See: https://github.com/apache/airflow/tree/2.8.4?tab=readme-ov-file#requirements - - python-version: "3.12" - airflow-version: "2.6" - - python-version: "3.12" - airflow-version: "2.7" - - python-version: "3.12" - airflow-version: "2.8" # Apache Airflow versions prior to 3.1.0 have not been tested with Python 3.13. - - python-version: "3.13" - airflow-version: "2.6" - - python-version: "3.13" - airflow-version: "2.7" - - python-version: "3.13" - airflow-version: "2.8" - python-version: "3.13" airflow-version: "2.9" - python-version: "3.13" @@ -127,26 +111,10 @@ jobs: fail-fast: false matrix: python-version: ["3.10", "3.11", "3.12", "3.13"] - airflow-version: ["2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0", "3.1"] + airflow-version: ["2.9", "2.10", "2.11", "3.0", "3.1"] dbt-version: [ "1.11" ] exclude: - # Apache Airflow versions prior to 2.9.0 have not been tested with Python 3.12. - # Official support for Python 3.12 and the corresponding constraints.txt are available only for Apache Airflow >= 2.9.0. - # See: https://github.com/apache/airflow/tree/2.9.0?tab=readme-ov-file#requirements - # See: https://github.com/apache/airflow/tree/2.8.4?tab=readme-ov-file#requirements - - python-version: "3.12" - airflow-version: "2.6" - - python-version: "3.12" - airflow-version: "2.7" - - python-version: "3.12" - airflow-version: "2.8" # Apache Airflow versions prior to 3.1.0 have not been tested with Python 3.13. - - python-version: "3.13" - airflow-version: "2.6" - - python-version: "3.13" - airflow-version: "2.7" - - python-version: "3.13" - airflow-version: "2.8" - python-version: "3.13" airflow-version: "2.9" - python-version: "3.13" @@ -321,7 +289,7 @@ jobs: fail-fast: false matrix: python-version: [ "3.11" ] - airflow-version: [ "2.8", "3.0" ] + airflow-version: [ "2.9", "3.0" ] dbt-version: [ "1.5" ] services: postgres: diff --git a/cosmos/cache.py b/cosmos/cache.py index 096a7fa489..b38f8d6d58 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -17,7 +17,6 @@ from airflow.models import DagRun, Variable from airflow.models.dag import DAG from airflow.utils.session import provide_session -from airflow.version import version as airflow_version from sqlalchemy import select from sqlalchemy.orm import Session @@ -29,10 +28,7 @@ from airflow.sdk import ObjectStoragePath from airflow.utils.task_group import TaskGroup except ImportError: - try: - from airflow.io.path import ObjectStoragePath - except ImportError: - pass + from airflow.io.path import ObjectStoragePath from airflow.utils.task_group import TaskGroup from cosmos.constants import ( @@ -43,10 +39,8 @@ PACKAGE_LOCKFILE_YML, ) from cosmos.dbt.project import get_partial_parse_path -from cosmos.exceptions import CosmosValueError from cosmos.log import get_logger from cosmos.settings import ( - AIRFLOW_IO_AVAILABLE, cache_dir, dbt_profile_cache_dir_name, enable_cache, @@ -77,20 +71,10 @@ def _configure_remote_cache_dir() -> Path | ObjectStoragePath | None: if remote_cache_conn_id is None: return _configured_cache_dir - if not AIRFLOW_IO_AVAILABLE: - raise CosmosValueError( - f"You're trying to specify remote cache_dir {cache_dir_str}, but the required " - f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " - "Airflow 2.8 or later." - ) - try: from airflow.sdk import ObjectStoragePath except ImportError: - try: - from airflow.io.path import ObjectStoragePath - except ImportError: - pass + from airflow.io.path import ObjectStoragePath _configured_cache_dir = ObjectStoragePath(cache_dir_str, conn_id=remote_cache_conn_id) diff --git a/cosmos/config.py b/cosmos/config.py index de07e59250..326690fac7 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -9,19 +9,16 @@ from collections.abc import Callable, Iterator from dataclasses import InitVar, dataclass, field from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import Any import yaml -from airflow.version import version as airflow_version -from cosmos import settings - -if settings.AIRFLOW_IO_AVAILABLE or TYPE_CHECKING: - try: - from airflow.sdk import ObjectStoragePath - except ImportError: - from airflow.io.path import ObjectStoragePath +try: + from airflow.sdk import ObjectStoragePath +except ImportError: + from airflow.io.path import ObjectStoragePath +from cosmos import settings from cosmos.cache import create_cache_profile, get_cached_profile, is_profile_cache_enabled from cosmos.constants import ( DEFAULT_PROFILES_FILE_NAME, @@ -231,17 +228,7 @@ def __init__( # Use the default Airflow connection ID for the scheme if it is not provided. manifest_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(manifest_scheme, lambda: None)() - if manifest_conn_id is not None and not settings.AIRFLOW_IO_AVAILABLE: - raise CosmosValueError( - f"The manifest path {manifest_path_str} uses a remote file scheme, but the required Object " - f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " - f"Airflow 2.8 or later." - ) - - if settings.AIRFLOW_IO_AVAILABLE: - self.manifest_path = ObjectStoragePath(manifest_path_str, conn_id=manifest_conn_id) - else: - self.manifest_path = Path(manifest_path_str) + self.manifest_path = ObjectStoragePath(manifest_path_str, conn_id=manifest_conn_id) self.env_vars = env_vars self.dbt_vars = dbt_vars @@ -261,11 +248,10 @@ def validate_project(self) -> None: mandatory_paths: dict[str, Path | ObjectStoragePath | None] = {} # We validate the existence of paths added to the `mandatory_paths` map by calling the `exists()` method on each - # one. Starting with Cosmos 1.6.0, if the Airflow version is `>= 2.8.0` and a `manifest_path` is provided, we - # cast it to an `airflow.io.path.ObjectStoragePath` instance during `ProjectConfig` initialisation, and it - # includes the `exists()` method. For the remaining paths in the `mandatory_paths` map, we cast them to - # `pathlib.Path` objects to ensure that the subsequent `exists()` call while iterating on the `mandatory_paths` - # map works correctly for all paths, thereby validating the project. + # one. If a `manifest_path` is provided, we cast it to an `airflow.io.path.ObjectStoragePath` instance during + # `ProjectConfig` initialisation, and it includes the `exists()` method. For the remaining paths in the + # `mandatory_paths` map, we cast them to `pathlib.Path` objects to ensure that the subsequent `exists()` call + # while iterating on the `mandatory_paths` map works correctly for all paths, thereby validating the project. if self.dbt_project_path: project_yml_path = self.dbt_project_path / "dbt_project.yml" mandatory_paths.update( diff --git a/cosmos/io.py b/cosmos/io.py index 23a1bef210..2ff293acbf 100644 --- a/cosmos/io.py +++ b/cosmos/io.py @@ -144,7 +144,6 @@ def upload_to_azure_wasb( def _configure_remote_target_path() -> tuple[Path | ObjectStoragePath, str] | tuple[None, None]: """Configure the remote target path if it is provided.""" - from airflow.version import version as airflow_version if not settings.remote_target_path: return None, None @@ -160,12 +159,6 @@ def _configure_remote_target_path() -> tuple[Path | ObjectStoragePath, str] | tu if remote_conn_id is None: return None, None - if not settings.AIRFLOW_IO_AVAILABLE: - raise CosmosValueError( - f"You're trying to specify remote target path {target_path_str}, but the required " - f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " - "Airflow 2.8 or later." - ) _configured_target_path = ObjectStoragePath(target_path_str, conn_id=remote_conn_id) if not _configured_target_path.exists(): # type: ignore[no-untyped-call] diff --git a/cosmos/operators/_asynchronous/bigquery.py b/cosmos/operators/_asynchronous/bigquery.py index a03705bae4..070ac9f57c 100644 --- a/cosmos/operators/_asynchronous/bigquery.py +++ b/cosmos/operators/_asynchronous/bigquery.py @@ -156,8 +156,6 @@ def get_sql_from_xcom(self, context: Context) -> str: def get_remote_sql(self) -> str: start_time = time.time() - if not settings.AIRFLOW_IO_AVAILABLE: # pragma: no cover - raise CosmosValueError(f"Cosmos async support is only available starting in Airflow 2.8 or later.") try: from airflow.sdk import ObjectStoragePath except ImportError: diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index e868c04fd6..26efbfd86e 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -32,13 +32,12 @@ from attrs import define -from cosmos import cache, settings +try: + from airflow.sdk import ObjectStoragePath +except ImportError: + from airflow.io.path import ObjectStoragePath -if settings.AIRFLOW_IO_AVAILABLE: - try: - from airflow.sdk import ObjectStoragePath - except ImportError: - from airflow.io.path import ObjectStoragePath +from cosmos import cache, settings from cosmos._utils.importer import load_method_from_module from cosmos.cache import ( _copy_cached_package_lockfile_to_project, @@ -142,9 +141,7 @@ try: from openlineage.airflow.extractors.base import OperatorLineage except (ImportError, ModuleNotFoundError): - logger.warning( - "To enable emitting Openlineage events, upgrade to Airflow 2.7 or install astronomer-cosmos[openlineage]." - ) + logger.warning("To enable emitting Openlineage events, install apache-airflow-providers-openlineage.") logger.debug( "Further details on lack of Openlineage Airflow provider:", stack_info=True, @@ -321,13 +318,6 @@ def _configure_remote_target_path() -> tuple[Path | ObjectStoragePath, str] | tu ) return None, None - if not settings.AIRFLOW_IO_AVAILABLE: - raise CosmosValueError( - f"You're trying to specify remote target path {target_path_str}, but the required " - f"Object Storage feature is unavailable in Airflow version {AIRFLOW_VERSION}. Please upgrade to " - "Airflow 2.8 or later." - ) - _configured_target_path = ObjectStoragePath(target_path_str, conn_id=remote_conn_id) if not _configured_target_path.exists(): # type: ignore[no-untyped-call] diff --git a/cosmos/settings.py b/cosmos/settings.py index a4f35aa74d..f8e7e24b45 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -6,8 +6,6 @@ import airflow from airflow.configuration import conf -from airflow.version import version as airflow_version -from packaging.version import Version from cosmos.constants import ( DEFAULT_COSMOS_CACHE_DIR_NAME, @@ -56,8 +54,6 @@ enable_setup_async_task = conf.getboolean("cosmos", "enable_setup_async_task", fallback=True) enable_teardown_async_task = conf.getboolean("cosmos", "enable_teardown_async_task", fallback=True) -AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0") - # The following environment variable is populated in Astro Cloud in_astro_cloud = os.getenv("ASTRONOMER_ENVIRONMENT") == "cloud" diff --git a/dev/dags/cosmos_callback_dag.py b/dev/dags/cosmos_callback_dag.py index dad29dd145..058819f53b 100644 --- a/dev/dags/cosmos_callback_dag.py +++ b/dev/dags/cosmos_callback_dag.py @@ -34,18 +34,17 @@ "install_deps": True, # install any necessary dependencies before running any dbt command "full_refresh": True, # used only in dbt commands that support this flag # -------------------------------------------------------------- - # Callback function to upload files using Airflow Object storage and Cosmos remote_target_path setting on Airflow 2.8 and above + # Callback function to upload files using Airflow Object Storage and Cosmos remote_target_path setting "callback": upload_to_cloud_storage, # -------------------------------------------------------------- - # Callback function to upload files to AWS S3, works for Airflow < 2.8 too + # Alternative callback functions to upload files using specific cloud providers: + # AWS S3: # "callback": upload_to_aws_s3, # "callback_args": {"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"}, - # -------------------------------------------------------------- - # Callback function to upload files to GCP GS, works for Airflow < 2.8 too + # GCP GS: # "callback": upload_to_gcp_gs, # "callback_args": {"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"}, - # -------------------------------------------------------------- - # Callback function to upload files to Azure WASB, works for Airflow < 2.8 too + # Azure WASB: # "callback": upload_to_azure_wasb, # "callback_args": {"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"}, # -------------------------------------------------------------- diff --git a/dev/dags/example_virtualenv.py b/dev/dags/example_virtualenv.py index 04083b3bcf..363c1556eb 100644 --- a/dev/dags/example_virtualenv.py +++ b/dev/dags/example_virtualenv.py @@ -64,12 +64,10 @@ def example_virtualenv() -> None: # For the sake of avoiding additional latency observed while uploading files for each of the tasks, the # below callback functions to be executed are commented, but you can uncomment them if you'd like to # enable callback execution. - # Callback function to upload files using Airflow Object storage and Cosmos remote_target_path setting on - # Airflow 2.8 and above + # Callback function to upload files using Airflow Object Storage and Cosmos remote_target_path setting # "callback": upload_to_cloud_storage, # -------------------------------------------------------------------------- - # Callback function if you'd like to upload files from the target directory to remote store e.g. AWS S3 that - # works with Airflow < 2.8 too + # Alternative callback function to upload files from the target directory to remote store e.g. AWS S3 # "callback": upload_to_aws_s3, # "callback_args": {"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"} # -------------------------------------------------------------------------- diff --git a/docs/compatibility-policy.rst b/docs/compatibility-policy.rst index dd998181b2..f85600e196 100644 --- a/docs/compatibility-policy.rst +++ b/docs/compatibility-policy.rst @@ -41,8 +41,8 @@ New minor or major releases of Cosmos may drop support for Apache Airflow versio In some cases, Cosmos may continue to support older Airflow versions, depending on the Cosmos release cycle. -- **Minimum required version**: Apache Airflow 2.6.0 -- **Supported versions**: 2.6, 2.7, 2.8, 2.9, 2.10, 2.11, 3.0, 3.1 +- **Minimum required version**: Apache Airflow 2.9.0 +- **Supported versions**: 2.9, 2.10, 2.11, 3.0, 3.1 dbt Core ~~~~~~~~ diff --git a/docs/configuration/callbacks.rst b/docs/configuration/callbacks.rst index 36ac33f92d..c754245525 100644 --- a/docs/configuration/callbacks.rst +++ b/docs/configuration/callbacks.rst @@ -42,7 +42,7 @@ using a single operator in an Airflow DAG: Example: Using DbtDag or DbtTaskGroup ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -If you're using Airflow 2.8 or later, you can leverage the :ref:`remote_target_path` configuration to upload files +You can leverage the :ref:`remote_target_path` configuration to upload files from the target directory to a remote storage. Below is an example of how to define a callback helper function in your ``DbtDag`` that utilizes this configuration: diff --git a/docs/configuration/cosmos-conf.rst b/docs/configuration/cosmos-conf.rst index 8ebdaa8b82..1619ffd588 100644 --- a/docs/configuration/cosmos-conf.rst +++ b/docs/configuration/cosmos-conf.rst @@ -152,7 +152,7 @@ This page lists all available Airflow configurations that affect ``astronomer-co in a remote location (an alternative to the Variable cache approach released previously since Cosmos 1.5.0) using this configuration. The value for the remote cache directory can be any of the schemes that are supported by the `Airflow Object Store `_ - feature introduced in Airflow 2.8.0 (e.g. ``s3://your_s3_bucket/cache_dir/``, ``gs://your_gs_bucket/cache_dir/``, + feature (e.g. ``s3://your_s3_bucket/cache_dir/``, ``gs://your_gs_bucket/cache_dir/``, ``abfs://your_azure_container/cache_dir``, etc.) This is an experimental feature available since Cosmos 1.6 to gather user feedback and will be merged into the @@ -181,7 +181,7 @@ This page lists all available Airflow configurations that affect ``astronomer-co the target directory. The value for the remote target path can be any of the schemes that are supported by the `Airflow Object Store `_ - feature introduced in Airflow 2.8.0 (e.g. ``s3://your_s3_bucket/target_dir/``, ``gs://your_gs_bucket/target_dir/``, + feature (e.g. ``s3://your_s3_bucket/target_dir/``, ``gs://your_gs_bucket/target_dir/``, ``abfs://your_azure_container/cache_dir``, etc.) - Default: ``None`` diff --git a/docs/configuration/lineage.rst b/docs/configuration/lineage.rst index 0dc6eb161c..51203e67cb 100644 --- a/docs/configuration/lineage.rst +++ b/docs/configuration/lineage.rst @@ -38,17 +38,13 @@ Contributions are also welcome in the `OpenLineage project `_ on how to configure OpenLineage. - -Otherwise, follow `these instructions `_. +Follow `the instructions `_ on how to configure OpenLineage. Namespace diff --git a/docs/configuration/parsing-methods.rst b/docs/configuration/parsing-methods.rst index 6dadd07461..d841bf32be 100644 --- a/docs/configuration/parsing-methods.rst +++ b/docs/configuration/parsing-methods.rst @@ -43,11 +43,10 @@ If you already have a ``manifest.json`` file created by dbt, Cosmos will parse t You can supply a ``manifest_path`` parameter on the DbtDag / DbtTaskGroup with a path to a ``manifest.json`` file. -Before Cosmos 1.6.0, the path to ``manifest.json`` supplied via the ``DbtDag`` / ``DbtTaskGroup`` ``manifest_path`` -argument accepted only local paths. However, starting with Cosmos 1.6.0, if you've Airflow >= 2.8.0, you can supply a -a remote path (e.g., an S3 URL) too. For supporting remote paths, Cosmos leverages the +Starting with Cosmos 1.6.0, you can supply a remote path (e.g., an S3 URL) for ``manifest_path``, +in addition to local paths. For remote paths, Cosmos leverages the `Airflow Object Storage `_ -feature released in Airflow 2.8.0. +feature. For remote paths, you can specify a ``manifest_conn_id``, which is an Airflow connection ID containing the credentials to access the remote path. If you do not specify a ``manifest_conn_id``, Cosmos will use the default connection ID specific to the scheme, identified using the Airflow diff --git a/docs/configuration/project-config.rst b/docs/configuration/project-config.rst index 4435f07873..682472be2c 100644 --- a/docs/configuration/project-config.rst +++ b/docs/configuration/project-config.rst @@ -9,8 +9,7 @@ variables that should be used for rendering and execution. It takes the followin - ``seeds_relative_path``: The path to your seeds directory, relative to the ``dbt_project_path``. This defaults to ``seeds`` - ``snapshots_relative_path``: The path to your snapshots directory, relative to the ``dbt_project_path``. This defaults to ``snapshots`` - ``manifest_path``: The absolute path to your manifests directory. This is only required if you're using Cosmos' manifest - parsing mode. Along with supporting local paths for manifest parsing, starting with Cosmos 1.6.0, if you've - Airflow >= 2.8.0, Cosmos also supports remote paths for manifest parsing(e.g. S3 URL). See :ref:`parsing-methods` for more details. + parsing mode. Cosmos supports both local and remote paths for manifest parsing (e.g. S3 URL). See :ref:`parsing-methods` for more details. - ``project_name`` : The name of the project. If ``dbt_project_path`` is provided, the ``project_name`` defaults to the folder name containing ``dbt_project.yml``. If ``dbt_project_path`` is not provided, and ``manifest_path`` is provided, ``project_name`` is required as the name can not be inferred from ``dbt_project_path`` diff --git a/docs/configuration/scheduling.rst b/docs/configuration/scheduling.rst index 2450663ce4..d618d6dfd5 100644 --- a/docs/configuration/scheduling.rst +++ b/docs/configuration/scheduling.rst @@ -83,7 +83,7 @@ This example DAG: :end-before: [END local_example] -Will trigger the following DAG to be run (when using Cosmos 1.1 when using Airflow 2.4 or newer): +Will trigger the following DAG to be run: .. code-block:: python diff --git a/docs/getting_started/async-execution-mode.rst b/docs/getting_started/async-execution-mode.rst index 0e9b39a72d..6d61bcf22b 100644 --- a/docs/getting_started/async-execution-mode.rst +++ b/docs/getting_started/async-execution-mode.rst @@ -44,7 +44,7 @@ Prerequisites +++++++++++++ - `Astro CLI `_ -- Airflow>=2.8 +- Airflow>=2.9 1. Create Astro-CLI Project +++++++++++++++++++++++++++ @@ -230,22 +230,20 @@ Limitations +++++++++++ -1. **Airflow 2.8 or higher required**: This mode relies on Airflow's `Object Storage `__ feature, introduced in Airflow 2.8, to store and retrieve compiled SQLs. +1. **Limited to dbt models**: Only dbt resource type models are run asynchronously using Airflow deferrable operators. Other resource types are executed synchronously, similar to the local execution mode. -2. **Limited to dbt models**: Only dbt resource type models are run asynchronously using Airflow deferrable operators. Other resource types are executed synchronously, similar to the local execution mode. +2. **BigQuery support only**: This mode only supports BigQuery as the target database. If a different target is specified, Cosmos will throw an error indicating the target database is unsupported in this mode. Adding support for other adapters is on the roadmap. -3. **BigQuery support only**: This mode only supports BigQuery as the target database. If a different target is specified, Cosmos will throw an error indicating the target database is unsupported in this mode. Adding support for other adapters is on the roadmap. +3. **ProfileMapping parameter required**: You need to specify the ``ProfileMapping`` parameter in the ``ProfileConfig`` for your DAG. Refer to the example DAG below for details on setting this parameter. -4. **ProfileMapping parameter required**: You need to specify the ``ProfileMapping`` parameter in the ``ProfileConfig`` for your DAG. Refer to the example DAG below for details on setting this parameter. +4. **Location parameter required**: You must specify the location of the BigQuery dataset in the ``operator_args`` of the ``DbtDag`` or ``DbtTaskGroup``. The example DAG below provides guidance on this. -5. **Location parameter required**: You must specify the location of the BigQuery dataset in the ``operator_args`` of the ``DbtDag`` or ``DbtTaskGroup``. The example DAG below provides guidance on this. +5. **async_py_requirements parameter required**: If you're using the default approach of having a setup task, you must specify the necessary dbt adapter Python requirements based on your profile type for the async execution mode in the ``ExecutionConfig`` of your ``DbtDag`` or ``DbtTaskGroup``. The example DAG below provides guidance on this. -6. **async_py_requirements parameter required**: If you're using the default approach of having a setup task, you must specify the necessary dbt adapter Python requirements based on your profile type for the async execution mode in the ``ExecutionConfig`` of your ``DbtDag`` or ``DbtTaskGroup``. The example DAG below provides guidance on this. +6. **Creation of new isolated virtual environment for each task run**: By default, the ``SetupAsyncOperator`` creates and executes within a new isolated virtual environment for each task run, which can cause performance issues. To reuse an existing virtual environment, use the ``virtualenv_dir`` parameter within the ``operator_args`` of the ``DbtDag``. We have observed that for ``dbt-bigquery``, the ``SetupAsyncOperator`` executes approximately 30% faster when reusing an existing virtual environment, particularly for transformations that take around 10 minutes to complete. -7. **Creation of new isolated virtual environment for each task run**: By default, the ``SetupAsyncOperator`` creates and executes within a new isolated virtual environment for each task run, which can cause performance issues. To reuse an existing virtual environment, use the ``virtualenv_dir`` parameter within the ``operator_args`` of the ``DbtDag``. We have observed that for ``dbt-bigquery``, the ``SetupAsyncOperator`` executes approximately 30% faster when reusing an existing virtual environment, particularly for transformations that take around 10 minutes to complete. +7. **Performance degradation when uploading to remote object location**: Even though it is possible to upload the SQL files to a remote object location by setting environment variables, it is slow. We observed that this introduces a significant overhead in the execution time (500s for 129 models). -8. **Performance degradation when uploading to remote object location**: Even though it is possible to upload the SQL files to a remote object location by setting environment variables, it is slow. We observed that this introduces a significant overhead in the execution time (500s for 129 models). - -9. **TeardownAsyncOperator limitation**: When using a remote object location, in addition to the ``SetupAsyncOperator``, a ``TeardownAsyncOperator`` is also added to the DAG. This task will delete the SQL files from the remote location by the end of the DAG Run. This is can lead to a limitation from a retry perspective, as described in the issue `#2066 `_. This can be avoided by setting the ``enable_teardown_async_task`` configuration to ``False``, as described in the :ref:`enable_teardown_async_task` section. +8. **TeardownAsyncOperator limitation**: When using a remote object location, in addition to the ``SetupAsyncOperator``, a ``TeardownAsyncOperator`` is also added to the DAG. This task will delete the SQL files from the remote location by the end of the DAG Run. This is can lead to a limitation from a retry perspective, as described in the issue `#2066 `_. This can be avoided by setting the ``enable_teardown_async_task`` configuration to ``False``, as described in the :ref:`enable_teardown_async_task` section. For a comparison between different Cosmos execution modes, please, check the :ref:`execution-modes-comparison` section. diff --git a/docs/requirements.txt b/docs/requirements.txt index 7d90fbc8ef..24c50d360e 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,7 +1,7 @@ aenum deprecation msgpack -apache-airflow>=2.6.0 +apache-airflow>=2.9.0 pydantic pydata-sphinx-theme sphinx diff --git a/pyproject.toml b/pyproject.toml index 9fdb596d66..bc1de9f058 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ classifiers = [ dependencies = [ "aenum", "attrs", - "apache-airflow>=2.6.0", + "apache-airflow>=2.9.0", "deprecation", # Python 3.13 exposes a deprecated operator, we can remove this dependency in the future "Jinja2>=3.0.0", "msgpack", @@ -170,12 +170,11 @@ pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} [[tool.hatch.envs.tests.matrix]] python = ["3.10", "3.11", "3.12", "3.13"] -airflow = ["2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0", "3.1"] +airflow = ["2.9", "2.10", "2.11", "3.0", "3.1"] dbt = ["1.5", "1.6", "1.7", "1.8", "1.9", "1.10", "1.11", "2.0"] [tool.hatch.envs.tests.overrides] matrix.airflow.dependencies = [ - { value = "typing-extensions<4.6", if = ["2.6"] }, # Known issue: https://github.com/zmievsa/cadwyn/issues/283 { value = "typing-extensions!=4.14.0", if = ["3.0"] } ] diff --git a/scripts/test/integration-setup.sh b/scripts/test/integration-setup.sh index 99c277fe8d..5377d7590c 100644 --- a/scripts/test/integration-setup.sh +++ b/scripts/test/integration-setup.sh @@ -38,15 +38,6 @@ if python3 -c "import sys; print(sys.version_info >= (3, 10))" | grep -q 'True'; pip install 'dbt-duckdb' "airflow-provider-duckdb>=0.2.0" fi -# To overcome CI issues when running Py 3.10 and AF 2.6 with dbt-core 1.9 -# Such as: -# ERROR tests/operators/_asynchronous/test_base.py - pydantic.errors.PydanticUserError: A non-annotated attribute was detected: `dag_id = `. All model fields require a type annotation; if `dag_id` is not meant to be a field, you may be able to resolve this error by annotating it as a `ClassVar` or updating `model_config['ignored_types']`. -if [ "$AIRFLOW_VERSION" = "2.6.0" ] ; then - pip install "pydantic<2" - pip freeze - pip freeze | grep -i pydantic -fi - pip install -U openlineage-airflow apache-airflow==$AIRFLOW_VERSION if [ "$AIRFLOW_VERSION" = "3.1.0" ] ; then diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index 3bf9d1680f..a5590503b3 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -41,29 +41,7 @@ uv pip install "apache-airflow==$AIRFLOW_VERSION" apache-airflow-providers-docke uv pip install "gcsfs<2025.3.0" -if [ "$AIRFLOW_VERSION" = "2.6" ] ; then - uv pip install "apache-airflow-providers-amazon" "apache-airflow==$AIRFLOW_VERSION" "urllib3<2" - uv pip install "apache-airflow-providers-cncf-kubernetes" "apache-airflow==$AIRFLOW_VERSION" - uv pip install "apache-airflow-providers-google<10.11" "httplib2==0.31.0" "apache-airflow==$AIRFLOW_VERSION" - uv pip install "apache-airflow-providers-microsoft-azure" "apache-airflow==$AIRFLOW_VERSION" - uv pip install "pydantic<2.0" -elif [ "$AIRFLOW_VERSION" = "2.7" ] ; then - uv pip install "apache-airflow-providers-amazon" --constraint /tmp/constraint.txt - uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt - uv pip install "apache-airflow-providers-google" --constraint /tmp/constraint.txt - uv pip install apache-airflow-providers-microsoft-azure --constraint /tmp/constraint.txt -elif [ "$AIRFLOW_VERSION" = "2.8" ] ; then - uv pip install "apache-airflow-providers-amazon[s3fs]" --constraint /tmp/constraint.txt - uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt - uv pip install "apache-airflow-providers-google<=10.26" "apache-airflow==$AIRFLOW_VERSION" - # The Airflow 2.8 constraints file at - # https://raw.githubusercontent.com/apache/airflow/constraints-2.8.0/constraints-3.11.txt - # specifies apache-airflow-providers-microsoft-azure==8.4.0. However, our Azure connection setup in the CI, - # previously led to authentication issues with this version. This issue got resolved in - # apache-airflow-providers-microsoft-azure==8.5.0. Hence, we are using apache-airflow-providers-microsoft-azure>=8.5.0 - # and skipping installation with constraints, as the specified version does not meet our requirements. - uv pip install "apache-airflow-providers-microsoft-azure>=8.5.0" "apache-airflow==$AIRFLOW_VERSION" -elif [ "$AIRFLOW_VERSION" = "2.9" ] ; then +if [ "$AIRFLOW_VERSION" = "2.9" ] ; then uv pip install "apache-airflow-providers-amazon[s3fs]" --constraint /tmp/constraint.txt uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt uv pip install "apache-airflow-providers-microsoft-azure" --constraint /tmp/constraint.txt diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 2488be63ba..67752d45bf 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -4,7 +4,6 @@ from unittest.mock import Mock, patch import pytest -from airflow import __version__ as airflow_version from airflow.models import DAG from cosmos.operators.watcher import DbtTestWatcherOperator @@ -19,8 +18,6 @@ from airflow.operators.empty import EmptyOperator from airflow.utils.task_group import TaskGroup -from packaging import version - from cosmos.airflow.graph import ( _add_teardown_task, _snake_case_to_camelcase, @@ -675,10 +672,6 @@ def _normalize_task_display_name(node: DbtNode) -> str: return f"new_task_display_name_{node.name}_{node.resource_type.value}" -@pytest.mark.skipif( - version.parse(airflow_version) < version.parse("2.9"), - reason="Airflow task did not have display_name until the 2.9 release", -) @pytest.mark.parametrize( "node_type,node_id,normalize_task_id,normalize_task_display_name,use_task_group,test_behavior,expected_node_id,expected_display_name", [ diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 7849bccaf5..1ffd98cfd9 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -37,7 +37,6 @@ run_command, ) from cosmos.profiles import PostgresUserPasswordProfileMapping -from cosmos.settings import AIRFLOW_IO_AVAILABLE DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt" DBT_PROJECT_NAME = "jaffle_shop" @@ -2036,7 +2035,6 @@ def test_should_use_dbt_ls_cache(enable_cache, enable_cache_dbt_ls, cache_id, sh assert graph.should_use_dbt_ls_cache() == should_use -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch(object_storage_path) @patch("cosmos.config.ProjectConfig") @patch("cosmos.dbt.graph._configure_remote_cache_dir") @@ -2060,7 +2058,6 @@ def test_save_dbt_ls_cache_remote_cache_dir( mock_remote_cache_key_path.open.assert_called_once_with("w") -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch(object_storage_path) @patch("cosmos.config.ProjectConfig") @patch("cosmos.dbt.graph._configure_remote_cache_dir") diff --git a/tests/operators/_asynchronous/test_base.py b/tests/operators/_asynchronous/test_base.py index cfd94245c1..896373d5e7 100644 --- a/tests/operators/_asynchronous/test_base.py +++ b/tests/operators/_asynchronous/test_base.py @@ -4,10 +4,8 @@ from unittest.mock import MagicMock, Mock, mock_open, patch import pytest -from packaging.version import Version from cosmos.config import ProfileConfig -from cosmos.constants import AIRFLOW_VERSION from cosmos.hooks.subprocess import FullOutputSubprocessResult from cosmos.operators._asynchronous import SetupAsyncOperator, TeardownAsyncOperator from cosmos.operators._asynchronous.base import DbtRunAirflowAsyncFactoryOperator, _create_async_operator_class @@ -152,7 +150,6 @@ def test_setup_run_subprocess_py_bin_unset( op.run_subprocess(command, env, cwd) -@pytest.mark.skipif(AIRFLOW_VERSION < Version("2.8"), reason="ObjectStoragePath requires Apache Airflow >= 2.8") @patch("cosmos.operators._asynchronous.ObjectStoragePath") def test_execute_removes_existing_path(mock_object_storage_path): mock_path_instance = MagicMock() diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 18a76e89d9..9137591201 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -51,7 +51,6 @@ DbtTestLocalOperator, ) from cosmos.profiles import PostgresUserPasswordProfileMapping -from cosmos.settings import AIRFLOW_IO_AVAILABLE from tests.utils import new_test_dag from tests.utils import test_dag as run_test_dag @@ -1513,18 +1512,6 @@ def test_dbt_clone_local_operator_initialisation(): assert "clone" in operator.base_cmd -@patch("cosmos.operators.local.remote_target_path", new="s3://some-bucket/target") -@patch("cosmos.settings.AIRFLOW_IO_AVAILABLE", new=False) -def test_configure_remote_target_path_object_storage_unavailable_on_earlier_airflow_versions(): - operator = DbtCompileLocalOperator( - task_id="fake-task", - profile_config=profile_config, - project_dir="fake-dir", - ) - with pytest.raises(CosmosValueError, match="Object Storage feature is unavailable"): - operator._configure_remote_target_path() - - @pytest.mark.parametrize( "rem_target_path, rem_target_path_conn_id", [ @@ -1545,7 +1532,6 @@ def test_config_remote_target_path_unset_settings(rem_target_path, rem_target_pa assert target_conn is None -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.operators.local.remote_target_path", new="s3://some-bucket/target") @patch("cosmos.operators.local.remote_target_path_conn_id", new="aws_s3_conn") @patch("cosmos.operators.local.ObjectStoragePath") @@ -1607,7 +1593,6 @@ def test_upload_sql_files_xcom(tmp_path): mock_context["ti"].xcom_push.assert_called_once_with(key="dest.sql", value=compressed_b64_sql) -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.settings.upload_sql_to_xcom", False) @patch("cosmos.operators.local.ObjectStoragePath.copy") @patch("cosmos.operators.local.ObjectStoragePath") @@ -1810,7 +1795,6 @@ def test_handle_post_execution_with_multiple_callbacks( @pytest.mark.integration -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.operators.local.AbstractDbtLocalBase._configure_remote_target_path") @patch("cosmos.operators.local.ObjectStoragePath") def test_delete_sql_files_directory_not_exists(mock_object_storage_path, mock_configure_remote, caplog): @@ -1864,7 +1848,6 @@ def test_generate_dbt_flags_does_not_append_no_static_parser_in_subprocess(tmp_p @pytest.mark.integration -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.operators.local.AbstractDbtLocalBase._configure_remote_target_path") def test_delete_sql_files_no_remote_target_configured(mock_configure_remote, caplog): """Test that _delete_sql_files exits early with a warning when remote path is not configured.""" diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index e882efe6e3..5bc1874e4e 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -635,7 +635,7 @@ def make_context(self, ti_mock, *, run_id: str = "test-run", map_index: int = 0) } @pytest.mark.skipif(AIRFLOW_VERSION >= Version("3.0.0"), reason="RuntimeTaskInstance path in Airflow >= 3.0") - @patch("cosmos.operators._watcher.base.AIRFLOW_VERSION", new=Version("2.7.0")) + @patch("cosmos.operators._watcher.base.AIRFLOW_VERSION", new=Version("2.9.0")) def test_get_producer_task_status_airflow2(self): sensor = self.make_sensor() sensor._get_producer_task_status = DbtConsumerWatcherSensor._get_producer_task_status.__get__( @@ -651,7 +651,7 @@ def test_get_producer_task_status_airflow2(self): status = sensor._get_producer_task_status(context) mock_builder.assert_called_once_with( - airflow_version=Version("2.7.0"), + airflow_version=Version("2.9.0"), dag_id="example_dag", run_id="run_1", producer_task_id=sensor.producer_task_id, @@ -661,7 +661,7 @@ def test_get_producer_task_status_airflow2(self): assert status == "success" @pytest.mark.skipif(AIRFLOW_VERSION >= Version("3.0.0"), reason="RuntimeTaskInstance path in Airflow >= 3.0") - @patch("cosmos.operators._watcher.base.AIRFLOW_VERSION", new=Version("2.7.0")) + @patch("cosmos.operators._watcher.base.AIRFLOW_VERSION", new=Version("2.9.0")) def test_get_producer_task_status_airflow2_missing_instance(self): sensor = self.make_sensor() sensor._get_producer_task_status = DbtConsumerWatcherSensor._get_producer_task_status.__get__( @@ -1001,7 +1001,6 @@ def test_dbt_build_watcher_operator_raises_not_implemented_error(self): DbtBuildWatcherOperator() -@pytest.mark.skipif(AIRFLOW_VERSION < Version("2.7"), reason="Airflow did not have dag.test() until the 2.6 release") @pytest.mark.integration def test_dbt_dag_with_watcher(): """ @@ -1069,11 +1068,8 @@ def test_dbt_dag_with_watcher(): # Airflow 3.0.0 hangs indefinitely, while Airflow 3.0.6 fails due to this Airflow bug: # https://github.com/apache/airflow/issues/51816 -conditions_to_skip = (AIRFLOW_VERSION < Version("2.8"), AIRFLOW_VERSION == Version("3.0")) - - @pytest.mark.skipif( - conditions_to_skip, + AIRFLOW_VERSION == Version("3.0"), reason="Airflow hangs in these versions when trying to fetch XCom from the triggerer when using dags.test()", ) @pytest.mark.integration @@ -1155,7 +1151,6 @@ def test_dbt_dag_with_watcher_and_empty_model(caplog): assert "Model 'model.micro_dbt_project.empty_model' was skipped by the dbt command" in caplog.text -@pytest.mark.skipif(AIRFLOW_VERSION < Version("2.7"), reason="Airflow did not have dag.test() until the 2.6 release") @pytest.mark.integration def test_dbt_task_group_with_watcher(): """ @@ -1242,7 +1237,6 @@ def test_dbt_task_group_with_watcher(): assert dag_dbt_task_group_watcher.task_dict["dbt_task_group.dbt_producer_watcher"].downstream_task_ids == set() -@pytest.mark.skipif(AIRFLOW_VERSION < Version("2.7"), reason="Airflow did not have dag.test() until the 2.6 release") @pytest.mark.integration def test_dbt_task_group_with_watcher_has_correct_dbt_cmd(): """ @@ -1294,7 +1288,6 @@ def test_dbt_task_group_with_watcher_has_correct_dbt_cmd(): assert "--full-refresh" in full_cmd -@pytest.mark.skipif(AIRFLOW_VERSION < Version("2.7"), reason="Airflow did not have dag.test() until the 2.6 release") @pytest.mark.integration def test_dbt_task_group_with_watcher_has_correct_templated_dbt_cmd(): """ diff --git a/tests/test_cache.py b/tests/test_cache.py index 3cfbf2a470..2f6b48b5e7 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -45,8 +45,7 @@ DEFAULT_PROFILES_FILE_NAME, _default_s3_conn, ) -from cosmos.exceptions import CosmosValueError -from cosmos.settings import AIRFLOW_IO_AVAILABLE, dbt_profile_cache_dir_name +from cosmos.settings import dbt_profile_cache_dir_name START_DATE = datetime(2024, 4, 16) example_dag = DAG("dag", start_date=START_DATE) @@ -404,14 +403,6 @@ def test_remote_cache_path_initialization_no_remote_cache_dir(): assert configured_remote_cache_dir is None -@patch("cosmos.cache.settings_remote_cache_dir", new="s3://some-bucket/cache") -@patch("cosmos.cache.AIRFLOW_IO_AVAILABLE", new=False) -def test_remote_cache_path_initialization_object_storage_unavailable_on_earlier_airflow_versions(): - with pytest.raises(CosmosValueError, match="Object Storage feature is unavailable"): - _configure_remote_cache_dir() - - -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.cache.settings_remote_cache_dir", new="s3://some-bucket/cache") @patch("airflow.io.path.ObjectStoragePath") def test_remote_cache_path_initialization_path_available_default_connection(mock_object_storage_path): @@ -424,7 +415,6 @@ def test_remote_cache_path_initialization_path_available_default_connection(mock assert configured_remote_cache_dir == mock_cache_dir_path -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.cache.settings_remote_cache_dir", new="s3://some-bucket/cache") @patch("airflow.io.path.ObjectStoragePath") def test_remote_cache_dir_initialization_path_not_exist_creates_path(mock_object_storage_path): @@ -436,7 +426,6 @@ def test_remote_cache_dir_initialization_path_not_exist_creates_path(mock_object mock_cache_dir_path.mkdir.assert_called_once_with(parents=True, exist_ok=True) -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.cache.settings_remote_cache_dir", new="s3://some-bucket/cache") @patch("cosmos.cache.remote_cache_dir_conn_id", new="my_conn_id") @patch("airflow.io.path.ObjectStoragePath") diff --git a/tests/test_config.py b/tests/test_config.py index 48127c4972..7797deeac1 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -9,7 +9,6 @@ from cosmos.exceptions import CosmosValueError from cosmos.profiles.athena.access_key import AthenaAccessKeyProfileMapping from cosmos.profiles.postgres.user_pass import PostgresUserPasswordProfileMapping -from cosmos.settings import AIRFLOW_IO_AVAILABLE DBT_PROJECTS_ROOT_DIR = Path(__file__).parent / "sample/" SAMPLE_PROFILE_YML = Path(__file__).parent / "sample/profiles.yml" @@ -306,7 +305,6 @@ def test_execution_config_default_config(execution_mode, expected_invocation_mod assert execution_config.invocation_mode == expected_invocation_mode -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @pytest.mark.parametrize( "manifest_path, given_manifest_conn_id, used_manifest_conn_id", [ @@ -325,28 +323,3 @@ def test_remote_manifest_path(manifest_path, given_manifest_conn_id, used_manife dbt_project_path="/tmp/some-path", manifest_path=manifest_path, manifest_conn_id=given_manifest_conn_id ) assert project_config.manifest_path == ObjectStoragePath(manifest_path, conn_id=used_manifest_conn_id) - - -@pytest.mark.skipif(AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") -@pytest.mark.parametrize( - "manifest_path, given_manifest_conn_id, used_manifest_conn_id", - [ - ("s3://cosmos-manifest-test/manifest.json", None, "aws_default"), - ("s3://cosmos-manifest-test/manifest.json", "aws_s3_conn", "aws_s3_conn"), - ("gs://cosmos-manifest-test/manifest.json", None, "google_cloud_default"), - ("gs://cosmos-manifest-test/manifest.json", "gcp_gs_conn", "gcp_gs_conn"), - ("abfs://cosmos-manifest-test/manifest.json", None, "wasb_default"), - ("abfs://cosmos-manifest-test/manifest.json", "azure_abfs_conn", "azure_abfs_conn"), - ], -) -def test_remote_manifest_path_airflow_io_unavailable(manifest_path, given_manifest_conn_id, used_manifest_conn_id): - from airflow.version import version as airflow_version - - error_msg = ( - f"The manifest path {manifest_path} uses a remote file scheme, but the required Object Storage feature is " - f"unavailable in Airflow version {airflow_version}. Please upgrade to Airflow 2.8 or later." - ) - with pytest.raises(CosmosValueError, match=error_msg): - _ = ProjectConfig( - dbt_project_path="/tmp/some-path", manifest_path=manifest_path, manifest_conn_id=given_manifest_conn_id - ) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 314064b6f6..a77d4dc839 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -21,17 +21,8 @@ DBT_VERSION = Version(get_dbt_version().to_version_string()[1:]) KUBERNETES_DAGS = ["jaffle_shop_kubernetes", "jaffle_shop_watcher_kubernetes"] -MIN_VER_DAG_FILE: dict[str, list[str]] = { - "2.8": ["cosmos_manifest_example.py", "simple_dag_async.py", "cosmos_callback_dag.py"], -} - IGNORED_DAG_FILES = ["performance_dag.py", "jaffle_shop_kubernetes.py", "jaffle_shop_watcher_kubernetes.py"] -# Sort descending based on Versions and convert string to an actual version -MIN_VER_DAG_FILE_VER: dict[Version, list[str]] = { - Version(version): MIN_VER_DAG_FILE[version] for version in sorted(MIN_VER_DAG_FILE, key=Version, reverse=True) -} - @provide_session def get_session(session=None): @@ -52,11 +43,6 @@ def get_dag_bag() -> DagBag: # noqa: C901 return DagBag(dag_folder=None, include_examples=False) with open(AIRFLOW_IGNORE_FILE, "w+") as file: - for min_version, files in MIN_VER_DAG_FILE_VER.items(): - if AIRFLOW_VERSION < min_version: - print(f"Adding {files} to .airflowignore") - file.writelines([f"{file}\n" for file in files]) - for dagfile in IGNORED_DAG_FILES: print(f"Adding {dagfile} to .airflowignore") file.writelines([f"{dagfile}\n"]) @@ -68,9 +54,6 @@ def get_dag_bag() -> DagBag: # noqa: C901 if DBT_VERSION < Version("1.5.0"): file.writelines(["example_source_rendering.py\n"]) - if AIRFLOW_VERSION < Version("2.8.0"): - file.writelines("example_cosmos_dbt_build.py\n") - # Disabling these DAGs temporarily due to an Airflow 3 bug on processing DatasetAlias that contain non-ASCII characters: # https://github.com/apache/airflow/issues/51566 # https://github.com/astronomer/astronomer-cosmos/issues/1802 @@ -139,7 +122,6 @@ def test_example_dag(session, dag_id: str): @pytest.mark.skipif( AIRFLOW_VERSION >= Version("3.1.0") # TODO: Fix https://github.com/astronomer/astronomer-cosmos/issues/2045 - or AIRFLOW_VERSION < Version("2.8") or AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs (see PR: https://github.com/apache/airflow/pull/34585), and Cosmos errors if `emit_datasets` is not False", ) diff --git a/tests/test_example_dags_no_connections.py b/tests/test_example_dags_no_connections.py index cb7eb68486..fed853158f 100644 --- a/tests/test_example_dags_no_connections.py +++ b/tests/test_example_dags_no_connections.py @@ -3,7 +3,6 @@ from functools import cache from pathlib import Path -import airflow import pytest from airflow.models.dagbag import DagBag from dbt.version import get_installed_version as get_dbt_version @@ -13,27 +12,13 @@ AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore" DBT_VERSION = Version(get_dbt_version().to_version_string()[1:]) -MIN_VER_DAG_FILE: dict[str, list[str]] = { - "2.8": ["cosmos_manifest_example.py"], -} - IGNORED_DAG_FILES = ["performance_dag.py", "jaffle_shop_kubernetes.py", "jaffle_shop_watcher_kubernetes.py"] -# Sort descending based on Versions and convert string to an actual version -MIN_VER_DAG_FILE_VER: dict[Version, list[str]] = { - Version(version): MIN_VER_DAG_FILE[version] for version in sorted(MIN_VER_DAG_FILE, key=Version, reverse=True) -} - @cache def get_dag_bag() -> DagBag: """Create a DagBag by adding the files that are not supported to .airflowignore""" with open(AIRFLOW_IGNORE_FILE, "w+") as file: - for min_version, files in MIN_VER_DAG_FILE_VER.items(): - if Version(airflow.__version__) < min_version: - print(f"Adding {files} to .airflowignore") - file.writelines([f"{file_name}\n" for file_name in files]) - for dagfile in IGNORED_DAG_FILES: print(f"Adding {dagfile} to .airflowignore") file.writelines([f"{dagfile}\n"]) diff --git a/tests/test_io.py b/tests/test_io.py index 594097065a..c39e9703ab 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -14,7 +14,6 @@ upload_to_cloud_storage, upload_to_gcp_gs, ) -from cosmos.settings import AIRFLOW_IO_AVAILABLE @pytest.fixture @@ -104,7 +103,6 @@ def test_upload_artifacts_to_cloud_storage_no_remote_path(): upload_to_cloud_storage("/project_dir", **{}) -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") def test_upload_artifacts_to_cloud_storage_success(dummy_kwargs): """Test upload_artifacts_to_cloud_storage with valid setup.""" with ( @@ -131,7 +129,6 @@ def test_upload_artifacts_to_cloud_storage_success(dummy_kwargs): assert mock_copy.call_count == 2 -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.io.settings.remote_target_path", "s3://bucket/path/to/file") @patch("cosmos.io.settings.remote_target_path_conn_id", None) @patch("cosmos.io.ObjectStoragePath") @@ -147,7 +144,6 @@ def test_configure_remote_target_path_no_conn_id(mock_urlparse, mock_object_stor assert result == (mock_object_storage.return_value, _default_s3_conn) -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.io.settings.remote_target_path", "abcd://bucket/path/to/file") @patch("cosmos.io.settings.remote_target_path_conn_id", None) @patch("cosmos.io.ObjectStoragePath") @@ -161,22 +157,3 @@ def test_configure_remote_target_path_conn_id_is_none(mock_urlparse, mock_object result = _configure_remote_target_path() assert result == (None, None) - - -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") -@patch("cosmos.settings.AIRFLOW_IO_AVAILABLE", False) -@patch("cosmos.io.settings.remote_target_path", "s3://bucket/path/to/file") -@patch("cosmos.io.ObjectStoragePath") -@patch("cosmos.io.urlparse") -def test_configure_remote_target_path_airflow_io_unavailable(mock_urlparse, mock_object_storage): - """Test when AIRFLOW_IO_AVAILABLE is False.""" - mock_urlparse.return_value.scheme = "s3" - - mock_storage_path = MagicMock() - mock_storage_path.exists.return_value = True - mock_object_storage.return_value = mock_storage_path - - with pytest.raises(CosmosValueError) as exc_info: - _configure_remote_target_path() - - assert "Object Storage feature is unavailable" in str(exc_info.value) From aabdf4193f3238c7dfc467b04cdef57a58d11bee Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Mon, 26 Jan 2026 11:45:57 -0700 Subject: [PATCH 2/5] [DROP ME] run on ci? --- .github/workflows/test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 39ed8fb703..123a180f26 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,12 +2,12 @@ name: test on: push: # Run on pushes to the default branch - branches: [main] + branches: [main, drop_airflow_under_29] # Also run on pull requests originating from forks. Although this is insecure by default, we need it to run # integration tests on forked PRs. As a guardrail, we’ve added an Authorize step to each job, which requires manually # approving the workflow run for each pushed commit. Approval only happens after a careful code review of the changes. pull_request_target: - branches: [main] # zizmor: ignore[dangerous-triggers] + branches: [main, drop_airflow_under_29] # zizmor: ignore[dangerous-triggers] concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} From 0075f6a4c72d99650fe6304f4e61636bb2f740bd Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Mon, 2 Feb 2026 12:07:07 +0000 Subject: [PATCH 3/5] Update .github/workflows/test.yml Co-authored-by: Pankaj Koti --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 851e9b00b4..964940a054 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,7 +2,7 @@ name: test on: push: # Run on pushes to the default branch - branches: [main, drop_airflow_under_29] + branches: [main] # Also run on pull requests originating from forks. Although this is insecure by default, we need it to run # integration tests on forked PRs. As a guardrail, we’ve added an Authorize step to each job, which requires manually # approving the workflow run for each pushed commit. Approval only happens after a careful code review of the changes. From f6a7eabbd27ddab5016ca3c44419da8ab265d336 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Mon, 2 Feb 2026 12:07:15 +0000 Subject: [PATCH 4/5] Update .github/workflows/test.yml Co-authored-by: Pankaj Koti --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 964940a054..201c39bb05 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -7,7 +7,7 @@ on: # integration tests on forked PRs. As a guardrail, we’ve added an Authorize step to each job, which requires manually # approving the workflow run for each pushed commit. Approval only happens after a careful code review of the changes. pull_request_target: - branches: [main, drop_airflow_under_29] # zizmor: ignore[dangerous-triggers] + branches: [main] # zizmor: ignore[dangerous-triggers] concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} From 56c08c132a0566d2926f04273b4b388fd9bc23e1 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Mon, 2 Feb 2026 12:27:45 +0000 Subject: [PATCH 5/5] Fix issues after rebase on dbt/test_graph.py There is no longer a `AIRFLOW_IO_AVAILABLE` --- tests/dbt/test_graph.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index fa1f16c747..90e495dd37 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -2376,7 +2376,6 @@ def test_should_use_yaml_selectors_cache(enable_cache, enable_cache_yaml_selecto assert graph.should_use_yaml_selectors_cache() == should_use -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch(object_storage_path) @patch("cosmos.config.ProjectConfig") @patch("cosmos.dbt.graph._configure_remote_cache_dir") @@ -2400,7 +2399,6 @@ def test_save_dbt_ls_cache_remote_cache_dir( mock_remote_cache_key_path.open.assert_called_once_with("w") -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch(object_storage_path) @patch("cosmos.config.ProjectConfig") @patch("cosmos.dbt.graph._configure_remote_cache_dir") @@ -2464,7 +2462,6 @@ def test_get_dbt_ls_cache_remote_cache_dir( assert result == expected_result -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch(object_storage_path) @patch("cosmos.config.ProjectConfig") @patch("cosmos.dbt.graph._configure_remote_cache_dir")