diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 34fdd2222b..5a19d97d09 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,12 +2,12 @@ name: test on: push: # Run on pushes to the default branch - branches: [main] + branches: [main, drop_old_airflow_versions] # Also run on pull requests originating from forks. Although this is insecure by default, we need it to run # integration tests on forked PRs. As a guardrail, we’ve added an Authorize step to each job, which requires manually # approving the workflow run for each pushed commit. Approval only happens after a careful code review of the changes. pull_request_target: - branches: [main] # zizmor: ignore[dangerous-triggers] + branches: [main, drop_old_airflow_versions] # zizmor: ignore[dangerous-triggers] concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} @@ -55,28 +55,10 @@ jobs: fail-fast: false matrix: python-version: ["3.10", "3.11", "3.12", "3.13"] - airflow-version: ["2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0", "3.1"] + airflow-version: ["2.10", "2.11", "3.0", "3.1"] dbt-version: ["1.10"] exclude: - # Apache Airflow versions prior to 2.9.0 have not been tested with Python 3.12. - # Official support for Python 3.12 and the corresponding constraints.txt are available only for Apache Airflow >= 2.9.0. - # See: https://github.com/apache/airflow/tree/2.9.0?tab=readme-ov-file#requirements - # See: https://github.com/apache/airflow/tree/2.8.4?tab=readme-ov-file#requirements - - python-version: "3.12" - airflow-version: "2.6" - - python-version: "3.12" - airflow-version: "2.7" - - python-version: "3.12" - airflow-version: "2.8" # Apache Airflow versions prior to 3.1.0 have not been tested with Python 3.13. - - python-version: "3.13" - airflow-version: "2.6" - - python-version: "3.13" - airflow-version: "2.7" - - python-version: "3.13" - airflow-version: "2.8" - - python-version: "3.13" - airflow-version: "2.9" - python-version: "3.13" airflow-version: "2.10" - python-version: "3.13" @@ -127,28 +109,10 @@ jobs: fail-fast: false matrix: python-version: ["3.10", "3.11", "3.12", "3.13"] - airflow-version: ["2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0", "3.1"] + airflow-version: ["2.10", "2.11", "3.0", "3.1"] dbt-version: [ "1.11" ] exclude: - # Apache Airflow versions prior to 2.9.0 have not been tested with Python 3.12. - # Official support for Python 3.12 and the corresponding constraints.txt are available only for Apache Airflow >= 2.9.0. - # See: https://github.com/apache/airflow/tree/2.9.0?tab=readme-ov-file#requirements - # See: https://github.com/apache/airflow/tree/2.8.4?tab=readme-ov-file#requirements - - python-version: "3.12" - airflow-version: "2.6" - - python-version: "3.12" - airflow-version: "2.7" - - python-version: "3.12" - airflow-version: "2.8" # Apache Airflow versions prior to 3.1.0 have not been tested with Python 3.13. - - python-version: "3.13" - airflow-version: "2.6" - - python-version: "3.13" - airflow-version: "2.7" - - python-version: "3.13" - airflow-version: "2.8" - - python-version: "3.13" - airflow-version: "2.9" - python-version: "3.13" airflow-version: "2.10" - python-version: "3.13" @@ -234,7 +198,7 @@ jobs: strategy: matrix: python-version: ["3.11"] - airflow-version: ["2.9"] + airflow-version: ["2.10"] dbt-version: ["1.9"] services: @@ -321,7 +285,7 @@ jobs: fail-fast: false matrix: python-version: [ "3.11" ] - airflow-version: [ "2.8", "3.0" ] + airflow-version: [ "2.10", "3.0" ] dbt-version: [ "1.5" ] services: postgres: diff --git a/cosmos/cache.py b/cosmos/cache.py index 096a7fa489..506ab73444 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -17,7 +17,6 @@ from airflow.models import DagRun, Variable from airflow.models.dag import DAG from airflow.utils.session import provide_session -from airflow.version import version as airflow_version from sqlalchemy import select from sqlalchemy.orm import Session @@ -43,10 +42,8 @@ PACKAGE_LOCKFILE_YML, ) from cosmos.dbt.project import get_partial_parse_path -from cosmos.exceptions import CosmosValueError from cosmos.log import get_logger from cosmos.settings import ( - AIRFLOW_IO_AVAILABLE, cache_dir, dbt_profile_cache_dir_name, enable_cache, @@ -77,13 +74,6 @@ def _configure_remote_cache_dir() -> Path | ObjectStoragePath | None: if remote_cache_conn_id is None: return _configured_cache_dir - if not AIRFLOW_IO_AVAILABLE: - raise CosmosValueError( - f"You're trying to specify remote cache_dir {cache_dir_str}, but the required " - f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " - "Airflow 2.8 or later." - ) - try: from airflow.sdk import ObjectStoragePath except ImportError: diff --git a/cosmos/config.py b/cosmos/config.py index de07e59250..296b890b3e 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -9,18 +9,16 @@ from collections.abc import Callable, Iterator from dataclasses import InitVar, dataclass, field from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import Any import yaml -from airflow.version import version as airflow_version from cosmos import settings -if settings.AIRFLOW_IO_AVAILABLE or TYPE_CHECKING: - try: - from airflow.sdk import ObjectStoragePath - except ImportError: - from airflow.io.path import ObjectStoragePath +try: + from airflow.sdk import ObjectStoragePath +except ImportError: + from airflow.io.path import ObjectStoragePath from cosmos.cache import create_cache_profile, get_cached_profile, is_profile_cache_enabled from cosmos.constants import ( @@ -231,17 +229,7 @@ def __init__( # Use the default Airflow connection ID for the scheme if it is not provided. manifest_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(manifest_scheme, lambda: None)() - if manifest_conn_id is not None and not settings.AIRFLOW_IO_AVAILABLE: - raise CosmosValueError( - f"The manifest path {manifest_path_str} uses a remote file scheme, but the required Object " - f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " - f"Airflow 2.8 or later." - ) - - if settings.AIRFLOW_IO_AVAILABLE: - self.manifest_path = ObjectStoragePath(manifest_path_str, conn_id=manifest_conn_id) - else: - self.manifest_path = Path(manifest_path_str) + self.manifest_path = ObjectStoragePath(manifest_path_str, conn_id=manifest_conn_id) self.env_vars = env_vars self.dbt_vars = dbt_vars diff --git a/cosmos/constants.py b/cosmos/constants.py index f768155733..e2ed6aef8c 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -32,10 +32,6 @@ DEFAULT_OPENLINEAGE_NAMESPACE = "cosmos" OPENLINEAGE_PRODUCER = "https://github.com/astronomer/astronomer-cosmos/" -# Cosmos will not emit datasets for the following Airflow versions, due to a breaking change that's fixed in later Airflow 2.x versions -# https://github.com/apache/airflow/issues/39486 -PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS = [Version("2.9.0"), Version("2.9.1")] - AIRFLOW_OBJECT_STORAGE_PATH_URL_SCHEMES = ("s3", "gs", "gcs", "wasb", "abfs", "abfss", "az", "http", "https") diff --git a/cosmos/io.py b/cosmos/io.py index 23a1bef210..2ff293acbf 100644 --- a/cosmos/io.py +++ b/cosmos/io.py @@ -144,7 +144,6 @@ def upload_to_azure_wasb( def _configure_remote_target_path() -> tuple[Path | ObjectStoragePath, str] | tuple[None, None]: """Configure the remote target path if it is provided.""" - from airflow.version import version as airflow_version if not settings.remote_target_path: return None, None @@ -160,12 +159,6 @@ def _configure_remote_target_path() -> tuple[Path | ObjectStoragePath, str] | tu if remote_conn_id is None: return None, None - if not settings.AIRFLOW_IO_AVAILABLE: - raise CosmosValueError( - f"You're trying to specify remote target path {target_path_str}, but the required " - f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " - "Airflow 2.8 or later." - ) _configured_target_path = ObjectStoragePath(target_path_str, conn_id=remote_conn_id) if not _configured_target_path.exists(): # type: ignore[no-untyped-call] diff --git a/cosmos/operators/_asynchronous/bigquery.py b/cosmos/operators/_asynchronous/bigquery.py index a03705bae4..ddb0f3009b 100644 --- a/cosmos/operators/_asynchronous/bigquery.py +++ b/cosmos/operators/_asynchronous/bigquery.py @@ -23,7 +23,6 @@ from airflow.datasets import Dataset as Asset # type: ignore from airflow.utils.context import Context # type: ignore -from packaging.version import Version from cosmos import settings from cosmos.config import ProfileConfig @@ -95,7 +94,7 @@ def __init__( AbstractDbtLocalBase.__init__( self, task_id=task_id, project_dir=project_dir, profile_config=profile_config, **self.dbt_kwargs ) - if kwargs.get("emit_datasets", True) and settings.enable_dataset_alias and AIRFLOW_VERSION >= Version("2.10"): + if kwargs.get("emit_datasets", True) and settings.enable_dataset_alias: from airflow.datasets import DatasetAlias # ignoring the type because older versions of Airflow raise the follow error in mypy @@ -156,8 +155,6 @@ def get_sql_from_xcom(self, context: Context) -> str: def get_remote_sql(self) -> str: start_time = time.time() - if not settings.AIRFLOW_IO_AVAILABLE: # pragma: no cover - raise CosmosValueError(f"Cosmos async support is only available starting in Airflow 2.8 or later.") try: from airflow.sdk import ObjectStoragePath except ImportError: diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index e868c04fd6..1ae89f6989 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -34,11 +34,10 @@ from cosmos import cache, settings -if settings.AIRFLOW_IO_AVAILABLE: - try: - from airflow.sdk import ObjectStoragePath - except ImportError: - from airflow.io.path import ObjectStoragePath +try: + from airflow.sdk import ObjectStoragePath +except ImportError: + from airflow.io.path import ObjectStoragePath from cosmos._utils.importer import load_method_from_module from cosmos.cache import ( _copy_cached_package_lockfile_to_project, @@ -321,13 +320,6 @@ def _configure_remote_target_path() -> tuple[Path | ObjectStoragePath, str] | tu ) return None, None - if not settings.AIRFLOW_IO_AVAILABLE: - raise CosmosValueError( - f"You're trying to specify remote target path {target_path_str}, but the required " - f"Object Storage feature is unavailable in Airflow version {AIRFLOW_VERSION}. Please upgrade to " - "Airflow 2.8 or later." - ) - _configured_target_path = ObjectStoragePath(target_path_str, conn_id=remote_conn_id) if not _configured_target_path.exists(): # type: ignore[no-untyped-call] @@ -813,7 +805,7 @@ def register_dataset(self, new_inlets: list[Asset], new_outlets: list[Asset], co raise AirflowCompatibilityError( "To emit datasets with Airflow 3, the setting `enable_dataset_alias` must be True (default)." ) - elif AIRFLOW_VERSION < Version("2.10") or not settings.enable_dataset_alias: + elif not settings.enable_dataset_alias: from airflow.utils.session import create_session logger.info("Assigning inlets/outlets without DatasetAlias") @@ -952,11 +944,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: AbstractDbtLocalBase.__init__(self, **base_kwargs) if AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION: - if ( - kwargs.get("emit_datasets", True) - and settings.enable_dataset_alias - and AIRFLOW_VERSION >= Version("2.10") - ): + if kwargs.get("emit_datasets", True) and settings.enable_dataset_alias: from airflow.datasets import DatasetAlias # ignoring the type because older versions of Airflow raise the follow error in mypy diff --git a/cosmos/settings.py b/cosmos/settings.py index a4f35aa74d..f8e7e24b45 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -6,8 +6,6 @@ import airflow from airflow.configuration import conf -from airflow.version import version as airflow_version -from packaging.version import Version from cosmos.constants import ( DEFAULT_COSMOS_CACHE_DIR_NAME, @@ -56,8 +54,6 @@ enable_setup_async_task = conf.getboolean("cosmos", "enable_setup_async_task", fallback=True) enable_teardown_async_task = conf.getboolean("cosmos", "enable_teardown_async_task", fallback=True) -AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0") - # The following environment variable is populated in Astro Cloud in_astro_cloud = os.getenv("ASTRONOMER_ENVIRONMENT") == "cloud" diff --git a/dev/dags/cosmos_callback_dag.py b/dev/dags/cosmos_callback_dag.py index dad29dd145..a2b559d5cc 100644 --- a/dev/dags/cosmos_callback_dag.py +++ b/dev/dags/cosmos_callback_dag.py @@ -34,7 +34,7 @@ "install_deps": True, # install any necessary dependencies before running any dbt command "full_refresh": True, # used only in dbt commands that support this flag # -------------------------------------------------------------- - # Callback function to upload files using Airflow Object storage and Cosmos remote_target_path setting on Airflow 2.8 and above + # Callback function to upload files using Airflow Object storage and Cosmos remote_target_path setting "callback": upload_to_cloud_storage, # -------------------------------------------------------------- # Callback function to upload files to AWS S3, works for Airflow < 2.8 too diff --git a/dev/dags/example_virtualenv.py b/dev/dags/example_virtualenv.py index 04083b3bcf..a774f3924b 100644 --- a/dev/dags/example_virtualenv.py +++ b/dev/dags/example_virtualenv.py @@ -64,8 +64,7 @@ def example_virtualenv() -> None: # For the sake of avoiding additional latency observed while uploading files for each of the tasks, the # below callback functions to be executed are commented, but you can uncomment them if you'd like to # enable callback execution. - # Callback function to upload files using Airflow Object storage and Cosmos remote_target_path setting on - # Airflow 2.8 and above + # Callback function to upload files using Airflow Object storage and Cosmos remote_target_path setting # "callback": upload_to_cloud_storage, # -------------------------------------------------------------------------- # Callback function if you'd like to upload files from the target directory to remote store e.g. AWS S3 that diff --git a/docs/compatibility-policy.rst b/docs/compatibility-policy.rst index d7d35447e9..6418ab3a74 100644 --- a/docs/compatibility-policy.rst +++ b/docs/compatibility-policy.rst @@ -37,8 +37,8 @@ Python Apache Airflow ~~~~~~~~~~~~~~ -- **Minimum required**: Apache Airflow 2.6.0 -- **Supported versions**: 2.6, 2.7, 2.8, 2.9, 2.10, 2.11, 3.0, 3.1 +- **Minimum required**: Apache Airflow 2.10.0 +- **Supported versions**: 2.10, 2.11, 3.0, 3.1 dbt Core ~~~~~~~~ diff --git a/docs/configuration/callbacks.rst b/docs/configuration/callbacks.rst index 36ac33f92d..c754245525 100644 --- a/docs/configuration/callbacks.rst +++ b/docs/configuration/callbacks.rst @@ -42,7 +42,7 @@ using a single operator in an Airflow DAG: Example: Using DbtDag or DbtTaskGroup ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -If you're using Airflow 2.8 or later, you can leverage the :ref:`remote_target_path` configuration to upload files +You can leverage the :ref:`remote_target_path` configuration to upload files from the target directory to a remote storage. Below is an example of how to define a callback helper function in your ``DbtDag`` that utilizes this configuration: diff --git a/docs/configuration/cosmos-conf.rst b/docs/configuration/cosmos-conf.rst index 8ebdaa8b82..1619ffd588 100644 --- a/docs/configuration/cosmos-conf.rst +++ b/docs/configuration/cosmos-conf.rst @@ -152,7 +152,7 @@ This page lists all available Airflow configurations that affect ``astronomer-co in a remote location (an alternative to the Variable cache approach released previously since Cosmos 1.5.0) using this configuration. The value for the remote cache directory can be any of the schemes that are supported by the `Airflow Object Store `_ - feature introduced in Airflow 2.8.0 (e.g. ``s3://your_s3_bucket/cache_dir/``, ``gs://your_gs_bucket/cache_dir/``, + feature (e.g. ``s3://your_s3_bucket/cache_dir/``, ``gs://your_gs_bucket/cache_dir/``, ``abfs://your_azure_container/cache_dir``, etc.) This is an experimental feature available since Cosmos 1.6 to gather user feedback and will be merged into the @@ -181,7 +181,7 @@ This page lists all available Airflow configurations that affect ``astronomer-co the target directory. The value for the remote target path can be any of the schemes that are supported by the `Airflow Object Store `_ - feature introduced in Airflow 2.8.0 (e.g. ``s3://your_s3_bucket/target_dir/``, ``gs://your_gs_bucket/target_dir/``, + feature (e.g. ``s3://your_s3_bucket/target_dir/``, ``gs://your_gs_bucket/target_dir/``, ``abfs://your_azure_container/cache_dir``, etc.) - Default: ``None`` diff --git a/docs/configuration/lineage.rst b/docs/configuration/lineage.rst index 0dc6eb161c..51203e67cb 100644 --- a/docs/configuration/lineage.rst +++ b/docs/configuration/lineage.rst @@ -38,17 +38,13 @@ Contributions are also welcome in the `OpenLineage project `_ on how to configure OpenLineage. - -Otherwise, follow `these instructions `_. +Follow `the instructions `_ on how to configure OpenLineage. Namespace diff --git a/docs/configuration/parsing-methods.rst b/docs/configuration/parsing-methods.rst index 6dadd07461..ce4c4608cc 100644 --- a/docs/configuration/parsing-methods.rst +++ b/docs/configuration/parsing-methods.rst @@ -44,10 +44,9 @@ If you already have a ``manifest.json`` file created by dbt, Cosmos will parse t You can supply a ``manifest_path`` parameter on the DbtDag / DbtTaskGroup with a path to a ``manifest.json`` file. Before Cosmos 1.6.0, the path to ``manifest.json`` supplied via the ``DbtDag`` / ``DbtTaskGroup`` ``manifest_path`` -argument accepted only local paths. However, starting with Cosmos 1.6.0, if you've Airflow >= 2.8.0, you can supply a -a remote path (e.g., an S3 URL) too. For supporting remote paths, Cosmos leverages the +argument accepted only local paths. However, starting with Cosmos 1.6.0, you can supply a remote path (e.g., an S3 URL) too. For supporting remote paths, Cosmos leverages the `Airflow Object Storage `_ -feature released in Airflow 2.8.0. +feature. For remote paths, you can specify a ``manifest_conn_id``, which is an Airflow connection ID containing the credentials to access the remote path. If you do not specify a ``manifest_conn_id``, Cosmos will use the default connection ID specific to the scheme, identified using the Airflow diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index 5ac27e03cb..68bd5d7b31 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -24,7 +24,7 @@ The ``RenderConfig`` class takes the following arguments: - ``airflow_vars_to_purge_cache``: (new in v1.5) Specify Airflow variables that will affect the ``LoadMode.DBT_LS`` cache. See `Caching <./caching.html>`_ for more information. - ``source_rendering_behavior``: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6). See `Source Nodes Rendering <./source-nodes-rendering.html>`_ for more information. - ``source_pruning``: When set to ``True``, automatically removes (or "prunes") any dbt source nodes from your Airflow DAG that do not have any downstream dependencies within the selected portion of the dbt graph. Defaults to ``False``. See `Source Nodes Rendering <./source-nodes-rendering.html>`_ for more information. -- ``normalize_task_id``: A callable that takes a dbt node as input and returns the task ID. This function allows users to set a custom task_id independently of the model name, which can be specified as the task's display_name. This way, task_id can be modified using a user-defined function, while the model name remains as the task's display name. The display_name parameter is available in Airflow 2.9 and above. See `Task display name <./task-display-name.html>`_ for more information. +- ``normalize_task_id``: A callable that takes a dbt node as input and returns the task ID. This function allows users to set a custom task_id independently of the model name, which can be specified as the task's display_name. This way, task_id can be modified using a user-defined function, while the model name remains as the task's display name. See `Task display name <./task-display-name.html>`_ for more information. - ``normalize_task_display_name``: This function allows users to set a custom user-defined function to alter the display name independently of the model name. This way, the task_id can be preserved while the model display name is modified. - ``should_detach_multiple_parents_tests``: A boolean to control if tests that depend on multiple parents should be run as standalone tasks. See `Testing Behavior `_ for more information. - ``enable_owner_inheritance``: (introduced in 1.10.2) A boolean to control if dbt owners should be imported as part of the airflow DAG owners. Defaults to True. diff --git a/docs/configuration/scheduling.rst b/docs/configuration/scheduling.rst index 2450663ce4..8545367e90 100644 --- a/docs/configuration/scheduling.rst +++ b/docs/configuration/scheduling.rst @@ -27,7 +27,7 @@ Data-Aware Scheduling `Apache Airflow® `_ 2.4 introduced the concept of `scheduling based on Datasets `_. -By default, if using a version between Airflow 2.4 or higher, Cosmos emits `Airflow Datasets `_ when running dbt projects. This allows you to use Airflow's data-aware scheduling capabilities to schedule your dbt projects. Cosmos emits datasets using the OpenLineage URI format, as detailed in the `OpenLineage Naming Convention `_. +By default, Cosmos emits `Airflow Datasets `_ when running dbt projects. This allows you to use Airflow's data-aware scheduling capabilities to schedule your dbt projects. Cosmos emits datasets using the OpenLineage URI format, as detailed in the `OpenLineage Naming Convention `_. .. important:: @@ -83,7 +83,7 @@ This example DAG: :end-before: [END local_example] -Will trigger the following DAG to be run (when using Cosmos 1.1 when using Airflow 2.4 or newer): +Will trigger the following DAG to be run (when using Cosmos 1.1): .. code-block:: python @@ -158,28 +158,6 @@ If you want to use the Airflow 3 URI standard while still using Airflow 2, pleas Remember to update any DAGs that are scheduled using this dataset. -Airflow 2.9 and below -_____________________ - -If using cosmos with an Airflow 2.9 or below, users will experience the following issues: - -- The task inlets and outlets generated by Cosmos will not be seen in the Airflow UI -- The scheduler logs will contain many messages saying "Orphaning unreferenced dataset" - -Example of scheduler logs: - -.. code-block:: - - scheduler | [2023-09-08T10:18:34.252+0100] {scheduler_job_runner.py:1742} INFO - Orphaning unreferenced dataset 'postgres://0.0.0.0:5432/postgres.public.stg_customers' - scheduler | [2023-09-08T10:18:34.252+0100] {scheduler_job_runner.py:1742} INFO - Orphaning unreferenced dataset 'postgres://0.0.0.0:5432/postgres.public.stg_payments' - scheduler | [2023-09-08T10:18:34.252+0100] {scheduler_job_runner.py:1742} INFO - Orphaning unreferenced dataset 'postgres://0.0.0.0:5432/postgres.public.stg_orders' - scheduler | [2023-09-08T10:18:34.252+0100] {scheduler_job_runner.py:1742} INFO - Orphaning unreferenced dataset 'postgres://0.0.0.0:5432/postgres.public.customers' - - -References about the root cause of these issues: - -- https://github.com/astronomer/astronomer-cosmos/issues/522 -- https://github.com/apache/airflow/issues/34206 Airflow 2.10.0 and 2.10.1 diff --git a/docs/getting_started/async-execution-mode.rst b/docs/getting_started/async-execution-mode.rst index 0e9b39a72d..9e7414d679 100644 --- a/docs/getting_started/async-execution-mode.rst +++ b/docs/getting_started/async-execution-mode.rst @@ -230,7 +230,7 @@ Limitations +++++++++++ -1. **Airflow 2.8 or higher required**: This mode relies on Airflow's `Object Storage `__ feature, introduced in Airflow 2.8, to store and retrieve compiled SQLs. +1. **Airflow's Object Storage**: This mode relies on Airflow's `Object Storage `__ feature to store and retrieve compiled SQLs. 2. **Limited to dbt models**: Only dbt resource type models are run asynchronously using Airflow deferrable operators. Other resource types are executed synchronously, similar to the local execution mode. diff --git a/docs/requirements.txt b/docs/requirements.txt index 7d90fbc8ef..469baeeae9 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,7 +1,7 @@ aenum deprecation msgpack -apache-airflow>=2.6.0 +apache-airflow>=2.10.0 pydantic pydata-sphinx-theme sphinx diff --git a/pyproject.toml b/pyproject.toml index 9fdb596d66..a8794d38a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ classifiers = [ dependencies = [ "aenum", "attrs", - "apache-airflow>=2.6.0", + "apache-airflow>=2.10.0", "deprecation", # Python 3.13 exposes a deprecated operator, we can remove this dependency in the future "Jinja2>=3.0.0", "msgpack", @@ -170,7 +170,7 @@ pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} [[tool.hatch.envs.tests.matrix]] python = ["3.10", "3.11", "3.12", "3.13"] -airflow = ["2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0", "3.1"] +airflow = ["2.10", "2.11", "3.0", "3.1"] dbt = ["1.5", "1.6", "1.7", "1.8", "1.9", "1.10", "1.11", "2.0"] [tool.hatch.envs.tests.overrides] diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index 3bf9d1680f..37c86e0a11 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -41,46 +41,10 @@ uv pip install "apache-airflow==$AIRFLOW_VERSION" apache-airflow-providers-docke uv pip install "gcsfs<2025.3.0" -if [ "$AIRFLOW_VERSION" = "2.6" ] ; then - uv pip install "apache-airflow-providers-amazon" "apache-airflow==$AIRFLOW_VERSION" "urllib3<2" - uv pip install "apache-airflow-providers-cncf-kubernetes" "apache-airflow==$AIRFLOW_VERSION" - uv pip install "apache-airflow-providers-google<10.11" "httplib2==0.31.0" "apache-airflow==$AIRFLOW_VERSION" - uv pip install "apache-airflow-providers-microsoft-azure" "apache-airflow==$AIRFLOW_VERSION" - uv pip install "pydantic<2.0" -elif [ "$AIRFLOW_VERSION" = "2.7" ] ; then - uv pip install "apache-airflow-providers-amazon" --constraint /tmp/constraint.txt - uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt - uv pip install "apache-airflow-providers-google" --constraint /tmp/constraint.txt - uv pip install apache-airflow-providers-microsoft-azure --constraint /tmp/constraint.txt -elif [ "$AIRFLOW_VERSION" = "2.8" ] ; then - uv pip install "apache-airflow-providers-amazon[s3fs]" --constraint /tmp/constraint.txt - uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt - uv pip install "apache-airflow-providers-google<=10.26" "apache-airflow==$AIRFLOW_VERSION" - # The Airflow 2.8 constraints file at - # https://raw.githubusercontent.com/apache/airflow/constraints-2.8.0/constraints-3.11.txt - # specifies apache-airflow-providers-microsoft-azure==8.4.0. However, our Azure connection setup in the CI, - # previously led to authentication issues with this version. This issue got resolved in - # apache-airflow-providers-microsoft-azure==8.5.0. Hence, we are using apache-airflow-providers-microsoft-azure>=8.5.0 - # and skipping installation with constraints, as the specified version does not meet our requirements. - uv pip install "apache-airflow-providers-microsoft-azure>=8.5.0" "apache-airflow==$AIRFLOW_VERSION" -elif [ "$AIRFLOW_VERSION" = "2.9" ] ; then - uv pip install "apache-airflow-providers-amazon[s3fs]" --constraint /tmp/constraint.txt - uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt - uv pip install "apache-airflow-providers-microsoft-azure" --constraint /tmp/constraint.txt - # The Airflow 2.9 constraints file at - # https://raw.githubusercontent.com/apache/airflow/constraints-2.9.0/constraints-3.11.txt - # specifies apache-airflow-providers-google==10.16.0. However, our CI setup uses a Google connection without a token, - # which previously led to authentication issues when the token was None. This issue was resolved in PR - # https://github.com/apache/airflow/pull/38102 and fixed in apache-airflow-providers-google==10.17.0. Consequently, - # we are using apache-airflow-providers-google>=10.17.0 and skipping constraints installation, as the specified - # version does not meet our requirements. - uv pip install "apache-airflow-providers-google>=10.17.0" "apache-airflow==$AIRFLOW_VERSION" -else - uv pip install "apache-airflow-providers-amazon[s3fs]" --constraint /tmp/constraint.txt - uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt - uv pip install "apache-airflow-providers-google" --constraint /tmp/constraint.txt - uv pip install "apache-airflow-providers-microsoft-azure" --constraint /tmp/constraint.txt -fi +uv pip install "apache-airflow-providers-amazon[s3fs]" --constraint /tmp/constraint.txt +uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt +uv pip install "apache-airflow-providers-google" --constraint /tmp/constraint.txt +uv pip install "apache-airflow-providers-microsoft-azure" --constraint /tmp/constraint.txt rm /tmp/constraint.txt diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 2488be63ba..8b3c01f120 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -4,7 +4,6 @@ from unittest.mock import Mock, patch import pytest -from airflow import __version__ as airflow_version from airflow.models import DAG from cosmos.operators.watcher import DbtTestWatcherOperator @@ -19,7 +18,6 @@ from airflow.operators.empty import EmptyOperator from airflow.utils.task_group import TaskGroup -from packaging import version from cosmos.airflow.graph import ( _add_teardown_task, @@ -675,10 +673,6 @@ def _normalize_task_display_name(node: DbtNode) -> str: return f"new_task_display_name_{node.name}_{node.resource_type.value}" -@pytest.mark.skipif( - version.parse(airflow_version) < version.parse("2.9"), - reason="Airflow task did not have display_name until the 2.9 release", -) @pytest.mark.parametrize( "node_type,node_id,normalize_task_id,normalize_task_display_name,use_task_group,test_behavior,expected_node_id,expected_display_name", [ diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 7849bccaf5..1ffd98cfd9 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -37,7 +37,6 @@ run_command, ) from cosmos.profiles import PostgresUserPasswordProfileMapping -from cosmos.settings import AIRFLOW_IO_AVAILABLE DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt" DBT_PROJECT_NAME = "jaffle_shop" @@ -2036,7 +2035,6 @@ def test_should_use_dbt_ls_cache(enable_cache, enable_cache_dbt_ls, cache_id, sh assert graph.should_use_dbt_ls_cache() == should_use -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch(object_storage_path) @patch("cosmos.config.ProjectConfig") @patch("cosmos.dbt.graph._configure_remote_cache_dir") @@ -2060,7 +2058,6 @@ def test_save_dbt_ls_cache_remote_cache_dir( mock_remote_cache_key_path.open.assert_called_once_with("w") -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch(object_storage_path) @patch("cosmos.config.ProjectConfig") @patch("cosmos.dbt.graph._configure_remote_cache_dir") diff --git a/tests/operators/_asynchronous/test_base.py b/tests/operators/_asynchronous/test_base.py index cfd94245c1..896373d5e7 100644 --- a/tests/operators/_asynchronous/test_base.py +++ b/tests/operators/_asynchronous/test_base.py @@ -4,10 +4,8 @@ from unittest.mock import MagicMock, Mock, mock_open, patch import pytest -from packaging.version import Version from cosmos.config import ProfileConfig -from cosmos.constants import AIRFLOW_VERSION from cosmos.hooks.subprocess import FullOutputSubprocessResult from cosmos.operators._asynchronous import SetupAsyncOperator, TeardownAsyncOperator from cosmos.operators._asynchronous.base import DbtRunAirflowAsyncFactoryOperator, _create_async_operator_class @@ -152,7 +150,6 @@ def test_setup_run_subprocess_py_bin_unset( op.run_subprocess(command, env, cwd) -@pytest.mark.skipif(AIRFLOW_VERSION < Version("2.8"), reason="ObjectStoragePath requires Apache Airflow >= 2.8") @patch("cosmos.operators._asynchronous.ObjectStoragePath") def test_execute_removes_existing_path(mock_object_storage_path): mock_path_instance = MagicMock() diff --git a/tests/operators/_asynchronous/test_bigquery.py b/tests/operators/_asynchronous/test_bigquery.py index 700f17f0fb..904927c846 100644 --- a/tests/operators/_asynchronous/test_bigquery.py +++ b/tests/operators/_asynchronous/test_bigquery.py @@ -272,7 +272,6 @@ def test_execute_does_not_call_register_event_when_emit_datasets_false( mock_register_event.assert_not_called() -@pytest.mark.skipif(AIRFLOW_VERSION < Version("2.10.0"), reason="Require Airflow >= 2.10") @patch.object(DbtRunAirflowAsyncBigqueryOperator, "_store_template_fields") @patch.object(DbtRunAirflowAsyncBigqueryOperator, "_register_event") def test_execute_complete_calls_register_event_when_emit_datasets_true( @@ -300,7 +299,6 @@ def test_execute_complete_calls_register_event_when_emit_datasets_true( mock_register_event.assert_called_once_with(mock_context) -@pytest.mark.skipif(AIRFLOW_VERSION < Version("2.10.0"), reason="Require Airflow >= 2.10") @patch.object(DbtRunAirflowAsyncBigqueryOperator, "register_dataset") def test_register_event_with_uri(mock_register_dataset, profile_config_mock): """Test that _register_event correctly extracts table name from complex unique_id.""" diff --git a/tests/operators/_watcher/test_triggerer.py b/tests/operators/_watcher/test_triggerer.py index c1b3a9ad4c..041883792b 100644 --- a/tests/operators/_watcher/test_triggerer.py +++ b/tests/operators/_watcher/test_triggerer.py @@ -142,14 +142,14 @@ async def fake_get_xcom_val(key): @pytest.mark.asyncio async def test_get_producer_task_status_airflow2(self): fetcher = MagicMock(return_value="failed") - with patch("cosmos.operators._watcher.triggerer.AIRFLOW_VERSION", Version("2.9.0")): + with patch("cosmos.operators._watcher.triggerer.AIRFLOW_VERSION", Version("2.10.0")): with patch( "cosmos.operators._watcher.triggerer.build_producer_state_fetcher", return_value=fetcher ) as mock_builder: state = await self.trigger._get_producer_task_status() mock_builder.assert_called_once_with( - airflow_version=Version("2.9.0"), + airflow_version=Version("2.10.0"), dag_id=self.trigger.dag_id, run_id=self.trigger.run_id, producer_task_id=self.trigger.producer_task_id, @@ -162,7 +162,7 @@ async def test_get_producer_task_status_airflow2(self): async def test_get_producer_task_status_airflow2_missing_ti(self): fetcher = MagicMock(return_value=None) - with patch("cosmos.operators._watcher.triggerer.AIRFLOW_VERSION", Version("2.9.0")): + with patch("cosmos.operators._watcher.triggerer.AIRFLOW_VERSION", Version("2.10.0")): with patch("cosmos.operators._watcher.triggerer.build_producer_state_fetcher", return_value=fetcher): state = await self.trigger._get_producer_task_status() diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 18a76e89d9..882f849a09 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -26,7 +26,7 @@ import cosmos.dbt.runner as dbt_runner from cosmos import cache from cosmos.config import ProfileConfig -from cosmos.constants import PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, InvocationMode +from cosmos.constants import InvocationMode from cosmos.dbt.parser.output import ( parse_number_of_warnings_subprocess, ) @@ -51,7 +51,6 @@ DbtTestLocalOperator, ) from cosmos.profiles import PostgresUserPasswordProfileMapping -from cosmos.settings import AIRFLOW_IO_AVAILABLE from tests.utils import new_test_dag from tests.utils import test_dag as run_test_dag @@ -417,58 +416,7 @@ def test_dbt_test_local_operator_invocation_mode_methods(mock_extract_log_issues assert operator.parse_number_of_warnings == dbt_runner.parse_number_of_warnings -@pytest.mark.skipif( - version.parse(airflow_version) >= version.parse("2.10") - or version.parse(airflow_version) in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, - reason="Airflow inlets and outlets do not work by default in Airflow 2.9.0 and 2.9.1. \n" - "From Airflow 2.10 onwards, we started using DatasetAlias, which changed this behaviour.", -) -@pytest.mark.integration -def test_run_operator_dataset_inlets_and_outlets(caplog): - from airflow.datasets import Dataset - - project_dir = Path(__file__).parent.parent.parent / "dev/dags/dbt/altered_jaffle_shop" - - with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: - seed_operator = DbtSeedLocalOperator( - profile_config=real_profile_config, - project_dir=project_dir, - task_id="seed", - dbt_cmd_flags=["--select", "raw_customers"], - install_deps=True, - append_env=True, - ) - run_operator = DbtRunLocalOperator( - profile_config=real_profile_config, - project_dir=project_dir, - task_id="run", - dbt_cmd_flags=["--models", "stg_customers"], - install_deps=True, - append_env=True, - ) - test_operator = DbtTestLocalOperator( - profile_config=real_profile_config, - project_dir=project_dir, - task_id="test", - dbt_cmd_flags=["--models", "stg_customers"], - install_deps=True, - append_env=True, - ) - seed_operator >> run_operator >> test_operator - - run_test_dag(dag) - - assert run_operator.inlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.raw_customers", extra=None)] - assert run_operator.outlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.stg_customers", extra=None)] - assert test_operator.inlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.stg_customers", extra=None)] - assert test_operator.outlets == [] - - @pytest.mark.skipif(version.parse(airflow_version).major >= 3, reason="This test is specific for Airflow 2.10 and 2.11") -@pytest.mark.skipif( - version.parse(airflow_version) < version.parse("2.10"), - reason="From Airflow 2.10 onwards, we started using DatasetAlias, which changed this behaviour.", -) @pytest.mark.integration def test_run_operator_dataset_inlets_and_outlets_airflow_210(caplog): try: @@ -601,10 +549,6 @@ def test_run_operator_dataset_with_airflow_3_and_enabled_dataset_alias_false_fai @patch("cosmos.settings.enable_dataset_alias", 0) -@pytest.mark.skipif( - version.parse(airflow_version) < version.parse("2.10"), - reason="From Airflow 2.10 onwards, we started using DatasetAlias, which changed this behaviour.", -) @pytest.mark.integration def test_run_operator_dataset_inlets_and_outlets_airflow_210_onwards_disabled_via_envvar(caplog): with DAG("test_id_2", start_date=datetime(2022, 1, 1)) as dag: @@ -620,45 +564,6 @@ def test_run_operator_dataset_inlets_and_outlets_airflow_210_onwards_disabled_vi assert run_operator.outlets == [] -@pytest.mark.skipif( - version.parse(airflow_version) not in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, - reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs", - # https://github.com/apache/airflow/issues/39486 -) -@pytest.mark.integration -def test_run_operator_dataset_emission_is_skipped(caplog): - with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: - seed_operator = DbtSeedLocalOperator( - profile_config=real_profile_config, - project_dir=DBT_PROJ_DIR, - task_id="seed", - dbt_cmd_flags=["--select", "raw_customers"], - install_deps=True, - append_env=True, - emit_datasets=False, - ) - run_operator = DbtRunLocalOperator( - profile_config=real_profile_config, - project_dir=DBT_PROJ_DIR, - task_id="run", - dbt_cmd_flags=["--models", "stg_customers"], - install_deps=True, - append_env=True, - emit_datasets=False, - ) - - seed_operator >> run_operator - - run_test_dag(dag) - - assert run_operator.inlets == [] - assert run_operator.outlets == [] - - -@pytest.mark.skipif( - version.parse(airflow_version) in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, - reason="Airflow inlets and outlets do not work by default in Airflow 2.9.0 and 2.9.1", -) @pytest.mark.skipif( version.parse(airflow_version) >= version.parse("3"), reason="We do not support emitting assets with Airflow 3.0 without dataset alias.", @@ -692,10 +597,6 @@ def test_run_operator_dataset_url_encoded_names_in_airflow2(caplog): ] -@pytest.mark.skipif( - version.parse(airflow_version) in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, - reason="Airflow inlets and outlets do not work by default in Airflow 2.9.0 and 2.9.1", -) @pytest.mark.skipif( version.parse(airflow_version) >= version.parse("3"), reason="We do not support emitting assets with Airflow 3.0 without dataset alias.", @@ -1513,39 +1414,6 @@ def test_dbt_clone_local_operator_initialisation(): assert "clone" in operator.base_cmd -@patch("cosmos.operators.local.remote_target_path", new="s3://some-bucket/target") -@patch("cosmos.settings.AIRFLOW_IO_AVAILABLE", new=False) -def test_configure_remote_target_path_object_storage_unavailable_on_earlier_airflow_versions(): - operator = DbtCompileLocalOperator( - task_id="fake-task", - profile_config=profile_config, - project_dir="fake-dir", - ) - with pytest.raises(CosmosValueError, match="Object Storage feature is unavailable"): - operator._configure_remote_target_path() - - -@pytest.mark.parametrize( - "rem_target_path, rem_target_path_conn_id", - [ - (None, "aws_s3_conn"), - ("unknown://some-bucket/cache", None), - ], -) -def test_config_remote_target_path_unset_settings(rem_target_path, rem_target_path_conn_id): - with patch("cosmos.operators.local.remote_target_path", new=rem_target_path): - with patch("cosmos.operators.local.remote_target_path_conn_id", new=rem_target_path_conn_id): - operator = DbtCompileLocalOperator( - task_id="fake-task", - profile_config=profile_config, - project_dir="fake-dir", - ) - target_path, target_conn = operator._configure_remote_target_path() - assert target_path is None - assert target_conn is None - - -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.operators.local.remote_target_path", new="s3://some-bucket/target") @patch("cosmos.operators.local.remote_target_path_conn_id", new="aws_s3_conn") @patch("cosmos.operators.local.ObjectStoragePath") @@ -1607,7 +1475,6 @@ def test_upload_sql_files_xcom(tmp_path): mock_context["ti"].xcom_push.assert_called_once_with(key="dest.sql", value=compressed_b64_sql) -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.settings.upload_sql_to_xcom", False) @patch("cosmos.operators.local.ObjectStoragePath.copy") @patch("cosmos.operators.local.ObjectStoragePath") @@ -1810,7 +1677,6 @@ def test_handle_post_execution_with_multiple_callbacks( @pytest.mark.integration -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.operators.local.AbstractDbtLocalBase._configure_remote_target_path") @patch("cosmos.operators.local.ObjectStoragePath") def test_delete_sql_files_directory_not_exists(mock_object_storage_path, mock_configure_remote, caplog): @@ -1864,7 +1730,6 @@ def test_generate_dbt_flags_does_not_append_no_static_parser_in_subprocess(tmp_p @pytest.mark.integration -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.operators.local.AbstractDbtLocalBase._configure_remote_target_path") def test_delete_sql_files_no_remote_target_configured(mock_configure_remote, caplog): """Test that _delete_sql_files exits early with a warning when remote path is not configured.""" diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index f631fa0022..17c26b315f 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -609,7 +609,7 @@ def make_context(self, ti_mock, *, run_id: str = "test-run", map_index: int = 0) } @pytest.mark.skipif(AIRFLOW_VERSION >= Version("3.0.0"), reason="RuntimeTaskInstance path in Airflow >= 3.0") - @patch("cosmos.operators._watcher.base.AIRFLOW_VERSION", new=Version("2.7.0")) + @patch("cosmos.operators._watcher.base.AIRFLOW_VERSION", new=Version("2.10.0")) def test_get_producer_task_status_airflow2(self): sensor = self.make_sensor() sensor._get_producer_task_status = DbtConsumerWatcherSensor._get_producer_task_status.__get__( @@ -625,7 +625,7 @@ def test_get_producer_task_status_airflow2(self): status = sensor._get_producer_task_status(context) mock_builder.assert_called_once_with( - airflow_version=Version("2.7.0"), + airflow_version=Version("2.10.0"), dag_id="example_dag", run_id="run_1", producer_task_id=sensor.producer_task_id, @@ -635,7 +635,7 @@ def test_get_producer_task_status_airflow2(self): assert status == "success" @pytest.mark.skipif(AIRFLOW_VERSION >= Version("3.0.0"), reason="RuntimeTaskInstance path in Airflow >= 3.0") - @patch("cosmos.operators._watcher.base.AIRFLOW_VERSION", new=Version("2.7.0")) + @patch("cosmos.operators._watcher.base.AIRFLOW_VERSION", new=Version("2.10.0")) def test_get_producer_task_status_airflow2_missing_instance(self): sensor = self.make_sensor() sensor._get_producer_task_status = DbtConsumerWatcherSensor._get_producer_task_status.__get__( @@ -975,7 +975,6 @@ def test_dbt_build_watcher_operator_raises_not_implemented_error(self): DbtBuildWatcherOperator() -@pytest.mark.skipif(AIRFLOW_VERSION < Version("2.7"), reason="Airflow did not have dag.test() until the 2.6 release") @pytest.mark.integration def test_dbt_dag_with_watcher(): """ @@ -1041,7 +1040,6 @@ def test_dbt_dag_with_watcher(): } -@pytest.mark.skipif(AIRFLOW_VERSION < Version("2.7"), reason="Airflow did not have dag.test() until the 2.6 release") @pytest.mark.integration def test_dbt_task_group_with_watcher(): """ @@ -1128,7 +1126,6 @@ def test_dbt_task_group_with_watcher(): assert dag_dbt_task_group_watcher.task_dict["dbt_task_group.dbt_producer_watcher"].downstream_task_ids == set() -@pytest.mark.skipif(AIRFLOW_VERSION < Version("2.7"), reason="Airflow did not have dag.test() until the 2.6 release") @pytest.mark.integration def test_dbt_task_group_with_watcher_has_correct_dbt_cmd(): """ @@ -1180,7 +1177,6 @@ def test_dbt_task_group_with_watcher_has_correct_dbt_cmd(): assert "--full-refresh" in full_cmd -@pytest.mark.skipif(AIRFLOW_VERSION < Version("2.7"), reason="Airflow did not have dag.test() until the 2.6 release") @pytest.mark.integration def test_dbt_task_group_with_watcher_has_correct_templated_dbt_cmd(): """ diff --git a/tests/test_cache.py b/tests/test_cache.py index 3cfbf2a470..2f6b48b5e7 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -45,8 +45,7 @@ DEFAULT_PROFILES_FILE_NAME, _default_s3_conn, ) -from cosmos.exceptions import CosmosValueError -from cosmos.settings import AIRFLOW_IO_AVAILABLE, dbt_profile_cache_dir_name +from cosmos.settings import dbt_profile_cache_dir_name START_DATE = datetime(2024, 4, 16) example_dag = DAG("dag", start_date=START_DATE) @@ -404,14 +403,6 @@ def test_remote_cache_path_initialization_no_remote_cache_dir(): assert configured_remote_cache_dir is None -@patch("cosmos.cache.settings_remote_cache_dir", new="s3://some-bucket/cache") -@patch("cosmos.cache.AIRFLOW_IO_AVAILABLE", new=False) -def test_remote_cache_path_initialization_object_storage_unavailable_on_earlier_airflow_versions(): - with pytest.raises(CosmosValueError, match="Object Storage feature is unavailable"): - _configure_remote_cache_dir() - - -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.cache.settings_remote_cache_dir", new="s3://some-bucket/cache") @patch("airflow.io.path.ObjectStoragePath") def test_remote_cache_path_initialization_path_available_default_connection(mock_object_storage_path): @@ -424,7 +415,6 @@ def test_remote_cache_path_initialization_path_available_default_connection(mock assert configured_remote_cache_dir == mock_cache_dir_path -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.cache.settings_remote_cache_dir", new="s3://some-bucket/cache") @patch("airflow.io.path.ObjectStoragePath") def test_remote_cache_dir_initialization_path_not_exist_creates_path(mock_object_storage_path): @@ -436,7 +426,6 @@ def test_remote_cache_dir_initialization_path_not_exist_creates_path(mock_object mock_cache_dir_path.mkdir.assert_called_once_with(parents=True, exist_ok=True) -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.cache.settings_remote_cache_dir", new="s3://some-bucket/cache") @patch("cosmos.cache.remote_cache_dir_conn_id", new="my_conn_id") @patch("airflow.io.path.ObjectStoragePath") diff --git a/tests/test_config.py b/tests/test_config.py index 48127c4972..7797deeac1 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -9,7 +9,6 @@ from cosmos.exceptions import CosmosValueError from cosmos.profiles.athena.access_key import AthenaAccessKeyProfileMapping from cosmos.profiles.postgres.user_pass import PostgresUserPasswordProfileMapping -from cosmos.settings import AIRFLOW_IO_AVAILABLE DBT_PROJECTS_ROOT_DIR = Path(__file__).parent / "sample/" SAMPLE_PROFILE_YML = Path(__file__).parent / "sample/profiles.yml" @@ -306,7 +305,6 @@ def test_execution_config_default_config(execution_mode, expected_invocation_mod assert execution_config.invocation_mode == expected_invocation_mode -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @pytest.mark.parametrize( "manifest_path, given_manifest_conn_id, used_manifest_conn_id", [ @@ -325,28 +323,3 @@ def test_remote_manifest_path(manifest_path, given_manifest_conn_id, used_manife dbt_project_path="/tmp/some-path", manifest_path=manifest_path, manifest_conn_id=given_manifest_conn_id ) assert project_config.manifest_path == ObjectStoragePath(manifest_path, conn_id=used_manifest_conn_id) - - -@pytest.mark.skipif(AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") -@pytest.mark.parametrize( - "manifest_path, given_manifest_conn_id, used_manifest_conn_id", - [ - ("s3://cosmos-manifest-test/manifest.json", None, "aws_default"), - ("s3://cosmos-manifest-test/manifest.json", "aws_s3_conn", "aws_s3_conn"), - ("gs://cosmos-manifest-test/manifest.json", None, "google_cloud_default"), - ("gs://cosmos-manifest-test/manifest.json", "gcp_gs_conn", "gcp_gs_conn"), - ("abfs://cosmos-manifest-test/manifest.json", None, "wasb_default"), - ("abfs://cosmos-manifest-test/manifest.json", "azure_abfs_conn", "azure_abfs_conn"), - ], -) -def test_remote_manifest_path_airflow_io_unavailable(manifest_path, given_manifest_conn_id, used_manifest_conn_id): - from airflow.version import version as airflow_version - - error_msg = ( - f"The manifest path {manifest_path} uses a remote file scheme, but the required Object Storage feature is " - f"unavailable in Airflow version {airflow_version}. Please upgrade to Airflow 2.8 or later." - ) - with pytest.raises(CosmosValueError, match=error_msg): - _ = ProjectConfig( - dbt_project_path="/tmp/some-path", manifest_path=manifest_path, manifest_conn_id=given_manifest_conn_id - ) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 314064b6f6..57af196522 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -12,7 +12,7 @@ from dbt.version import get_installed_version as get_dbt_version from packaging.version import Version -from cosmos.constants import AIRFLOW_VERSION, PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS +from cosmos.constants import AIRFLOW_VERSION from . import utils as test_utils @@ -21,17 +21,8 @@ DBT_VERSION = Version(get_dbt_version().to_version_string()[1:]) KUBERNETES_DAGS = ["jaffle_shop_kubernetes", "jaffle_shop_watcher_kubernetes"] -MIN_VER_DAG_FILE: dict[str, list[str]] = { - "2.8": ["cosmos_manifest_example.py", "simple_dag_async.py", "cosmos_callback_dag.py"], -} - IGNORED_DAG_FILES = ["performance_dag.py", "jaffle_shop_kubernetes.py", "jaffle_shop_watcher_kubernetes.py"] -# Sort descending based on Versions and convert string to an actual version -MIN_VER_DAG_FILE_VER: dict[Version, list[str]] = { - Version(version): MIN_VER_DAG_FILE[version] for version in sorted(MIN_VER_DAG_FILE, key=Version, reverse=True) -} - @provide_session def get_session(session=None): @@ -48,15 +39,7 @@ def session(): def get_dag_bag() -> DagBag: # noqa: C901 """Create a DagBag by adding the files that are not supported to .airflowignore""" - if AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS: - return DagBag(dag_folder=None, include_examples=False) - with open(AIRFLOW_IGNORE_FILE, "w+") as file: - for min_version, files in MIN_VER_DAG_FILE_VER.items(): - if AIRFLOW_VERSION < min_version: - print(f"Adding {files} to .airflowignore") - file.writelines([f"{file}\n" for file in files]) - for dagfile in IGNORED_DAG_FILES: print(f"Adding {dagfile} to .airflowignore") file.writelines([f"{dagfile}\n"]) @@ -68,9 +51,6 @@ def get_dag_bag() -> DagBag: # noqa: C901 if DBT_VERSION < Version("1.5.0"): file.writelines(["example_source_rendering.py\n"]) - if AIRFLOW_VERSION < Version("2.8.0"): - file.writelines("example_cosmos_dbt_build.py\n") - # Disabling these DAGs temporarily due to an Airflow 3 bug on processing DatasetAlias that contain non-ASCII characters: # https://github.com/apache/airflow/issues/51566 # https://github.com/astronomer/astronomer-cosmos/issues/1802 @@ -125,10 +105,6 @@ def run_dag(dag_id: str): test_utils.run_dag(dag) -@pytest.mark.skipif( - AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, - reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs, and Cosmos errors if `emit_datasets` is not False", -) @pytest.mark.integration @pytest.mark.parametrize("dag_id", get_dag_ids()) def test_example_dag(session, dag_id: str): @@ -138,10 +114,8 @@ def test_example_dag(session, dag_id: str): @pytest.mark.skipif( - AIRFLOW_VERSION >= Version("3.1.0") # TODO: Fix https://github.com/astronomer/astronomer-cosmos/issues/2045 - or AIRFLOW_VERSION < Version("2.8") - or AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, - reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs (see PR: https://github.com/apache/airflow/pull/34585), and Cosmos errors if `emit_datasets` is not False", + AIRFLOW_VERSION >= Version("3.1.0"), # TODO: Fix https://github.com/astronomer/astronomer-cosmos/issues/2045 + reason="TODO: Fix https://github.com/astronomer/astronomer-cosmos/issues/2045", ) @patch.dict( os.environ, diff --git a/tests/test_io.py b/tests/test_io.py index 594097065a..c39e9703ab 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -14,7 +14,6 @@ upload_to_cloud_storage, upload_to_gcp_gs, ) -from cosmos.settings import AIRFLOW_IO_AVAILABLE @pytest.fixture @@ -104,7 +103,6 @@ def test_upload_artifacts_to_cloud_storage_no_remote_path(): upload_to_cloud_storage("/project_dir", **{}) -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") def test_upload_artifacts_to_cloud_storage_success(dummy_kwargs): """Test upload_artifacts_to_cloud_storage with valid setup.""" with ( @@ -131,7 +129,6 @@ def test_upload_artifacts_to_cloud_storage_success(dummy_kwargs): assert mock_copy.call_count == 2 -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.io.settings.remote_target_path", "s3://bucket/path/to/file") @patch("cosmos.io.settings.remote_target_path_conn_id", None) @patch("cosmos.io.ObjectStoragePath") @@ -147,7 +144,6 @@ def test_configure_remote_target_path_no_conn_id(mock_urlparse, mock_object_stor assert result == (mock_object_storage.return_value, _default_s3_conn) -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.io.settings.remote_target_path", "abcd://bucket/path/to/file") @patch("cosmos.io.settings.remote_target_path_conn_id", None) @patch("cosmos.io.ObjectStoragePath") @@ -161,22 +157,3 @@ def test_configure_remote_target_path_conn_id_is_none(mock_urlparse, mock_object result = _configure_remote_target_path() assert result == (None, None) - - -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") -@patch("cosmos.settings.AIRFLOW_IO_AVAILABLE", False) -@patch("cosmos.io.settings.remote_target_path", "s3://bucket/path/to/file") -@patch("cosmos.io.ObjectStoragePath") -@patch("cosmos.io.urlparse") -def test_configure_remote_target_path_airflow_io_unavailable(mock_urlparse, mock_object_storage): - """Test when AIRFLOW_IO_AVAILABLE is False.""" - mock_urlparse.return_value.scheme = "s3" - - mock_storage_path = MagicMock() - mock_storage_path.exists.return_value = True - mock_object_storage.return_value = mock_storage_path - - with pytest.raises(CosmosValueError) as exc_info: - _configure_remote_target_path() - - assert "Object Storage feature is unavailable" in str(exc_info.value)