Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 6 additions & 42 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ name: test

on:
push: # Run on pushes to the default branch
branches: [main]
branches: [main, drop_old_airflow_versions]
# Also run on pull requests originating from forks. Although this is insecure by default, we need it to run
# integration tests on forked PRs. As a guardrail, we’ve added an Authorize step to each job, which requires manually
# approving the workflow run for each pushed commit. Approval only happens after a careful code review of the changes.
pull_request_target:
branches: [main] # zizmor: ignore[dangerous-triggers]
branches: [main, drop_old_airflow_versions] # zizmor: ignore[dangerous-triggers]

concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
Expand Down Expand Up @@ -55,28 +55,10 @@ jobs:
fail-fast: false
matrix:
python-version: ["3.10", "3.11", "3.12", "3.13"]
airflow-version: ["2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0", "3.1"]
airflow-version: ["2.10", "2.11", "3.0", "3.1"]
dbt-version: ["1.10"]
exclude:
# Apache Airflow versions prior to 2.9.0 have not been tested with Python 3.12.
# Official support for Python 3.12 and the corresponding constraints.txt are available only for Apache Airflow >= 2.9.0.
# See: https://github.com/apache/airflow/tree/2.9.0?tab=readme-ov-file#requirements
# See: https://github.com/apache/airflow/tree/2.8.4?tab=readme-ov-file#requirements
- python-version: "3.12"
airflow-version: "2.6"
- python-version: "3.12"
airflow-version: "2.7"
- python-version: "3.12"
airflow-version: "2.8"
# Apache Airflow versions prior to 3.1.0 have not been tested with Python 3.13.
- python-version: "3.13"
airflow-version: "2.6"
- python-version: "3.13"
airflow-version: "2.7"
- python-version: "3.13"
airflow-version: "2.8"
- python-version: "3.13"
airflow-version: "2.9"
- python-version: "3.13"
airflow-version: "2.10"
- python-version: "3.13"
Expand Down Expand Up @@ -127,28 +109,10 @@ jobs:
fail-fast: false
matrix:
python-version: ["3.10", "3.11", "3.12", "3.13"]
airflow-version: ["2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0", "3.1"]
airflow-version: ["2.10", "2.11", "3.0", "3.1"]
dbt-version: [ "1.11" ]
exclude:
# Apache Airflow versions prior to 2.9.0 have not been tested with Python 3.12.
# Official support for Python 3.12 and the corresponding constraints.txt are available only for Apache Airflow >= 2.9.0.
# See: https://github.com/apache/airflow/tree/2.9.0?tab=readme-ov-file#requirements
# See: https://github.com/apache/airflow/tree/2.8.4?tab=readme-ov-file#requirements
- python-version: "3.12"
airflow-version: "2.6"
- python-version: "3.12"
airflow-version: "2.7"
- python-version: "3.12"
airflow-version: "2.8"
# Apache Airflow versions prior to 3.1.0 have not been tested with Python 3.13.
- python-version: "3.13"
airflow-version: "2.6"
- python-version: "3.13"
airflow-version: "2.7"
- python-version: "3.13"
airflow-version: "2.8"
- python-version: "3.13"
airflow-version: "2.9"
- python-version: "3.13"
airflow-version: "2.10"
- python-version: "3.13"
Expand Down Expand Up @@ -234,7 +198,7 @@ jobs:
strategy:
matrix:
python-version: ["3.11"]
airflow-version: ["2.9"]
airflow-version: ["2.10"]
dbt-version: ["1.9"]

services:
Expand Down Expand Up @@ -321,7 +285,7 @@ jobs:
fail-fast: false
matrix:
python-version: [ "3.11" ]
airflow-version: [ "2.8", "3.0" ]
airflow-version: [ "2.10", "3.0" ]
dbt-version: [ "1.5" ]
services:
postgres:
Expand Down
10 changes: 0 additions & 10 deletions cosmos/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from airflow.models import DagRun, Variable
from airflow.models.dag import DAG
from airflow.utils.session import provide_session
from airflow.version import version as airflow_version
from sqlalchemy import select
from sqlalchemy.orm import Session

Expand All @@ -43,10 +42,8 @@
PACKAGE_LOCKFILE_YML,
)
from cosmos.dbt.project import get_partial_parse_path
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
from cosmos.settings import (
AIRFLOW_IO_AVAILABLE,
cache_dir,
dbt_profile_cache_dir_name,
enable_cache,
Expand Down Expand Up @@ -77,13 +74,6 @@ def _configure_remote_cache_dir() -> Path | ObjectStoragePath | None:
if remote_cache_conn_id is None:
return _configured_cache_dir

if not AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"You're trying to specify remote cache_dir {cache_dir_str}, but the required "
f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
"Airflow 2.8 or later."
)

try:
from airflow.sdk import ObjectStoragePath
except ImportError:
Expand Down
24 changes: 6 additions & 18 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,16 @@
from collections.abc import Callable, Iterator
from dataclasses import InitVar, dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any
from typing import Any

import yaml
from airflow.version import version as airflow_version

from cosmos import settings

if settings.AIRFLOW_IO_AVAILABLE or TYPE_CHECKING:
try:
from airflow.sdk import ObjectStoragePath
except ImportError:
from airflow.io.path import ObjectStoragePath
try:
from airflow.sdk import ObjectStoragePath
except ImportError:
from airflow.io.path import ObjectStoragePath

from cosmos.cache import create_cache_profile, get_cached_profile, is_profile_cache_enabled
from cosmos.constants import (
Expand Down Expand Up @@ -231,17 +229,7 @@ def __init__(
# Use the default Airflow connection ID for the scheme if it is not provided.
manifest_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(manifest_scheme, lambda: None)()

if manifest_conn_id is not None and not settings.AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"The manifest path {manifest_path_str} uses a remote file scheme, but the required Object "
f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
f"Airflow 2.8 or later."
)

if settings.AIRFLOW_IO_AVAILABLE:
self.manifest_path = ObjectStoragePath(manifest_path_str, conn_id=manifest_conn_id)
else:
self.manifest_path = Path(manifest_path_str)
self.manifest_path = ObjectStoragePath(manifest_path_str, conn_id=manifest_conn_id)

self.env_vars = env_vars
self.dbt_vars = dbt_vars
Expand Down
4 changes: 0 additions & 4 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@
DEFAULT_OPENLINEAGE_NAMESPACE = "cosmos"
OPENLINEAGE_PRODUCER = "https://github.com/astronomer/astronomer-cosmos/"

# Cosmos will not emit datasets for the following Airflow versions, due to a breaking change that's fixed in later Airflow 2.x versions
# https://github.com/apache/airflow/issues/39486
PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS = [Version("2.9.0"), Version("2.9.1")]


AIRFLOW_OBJECT_STORAGE_PATH_URL_SCHEMES = ("s3", "gs", "gcs", "wasb", "abfs", "abfss", "az", "http", "https")

Expand Down
7 changes: 0 additions & 7 deletions cosmos/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ def upload_to_azure_wasb(

def _configure_remote_target_path() -> tuple[Path | ObjectStoragePath, str] | tuple[None, None]:
"""Configure the remote target path if it is provided."""
from airflow.version import version as airflow_version

if not settings.remote_target_path:
return None, None
Expand All @@ -160,12 +159,6 @@ def _configure_remote_target_path() -> tuple[Path | ObjectStoragePath, str] | tu
if remote_conn_id is None:
return None, None

if not settings.AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"You're trying to specify remote target path {target_path_str}, but the required "
f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
"Airflow 2.8 or later."
)
_configured_target_path = ObjectStoragePath(target_path_str, conn_id=remote_conn_id)

if not _configured_target_path.exists(): # type: ignore[no-untyped-call]
Expand Down
5 changes: 1 addition & 4 deletions cosmos/operators/_asynchronous/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from airflow.datasets import Dataset as Asset # type: ignore

from airflow.utils.context import Context # type: ignore
from packaging.version import Version

from cosmos import settings
from cosmos.config import ProfileConfig
Expand Down Expand Up @@ -95,7 +94,7 @@ def __init__(
AbstractDbtLocalBase.__init__(
self, task_id=task_id, project_dir=project_dir, profile_config=profile_config, **self.dbt_kwargs
)
if kwargs.get("emit_datasets", True) and settings.enable_dataset_alias and AIRFLOW_VERSION >= Version("2.10"):
if kwargs.get("emit_datasets", True) and settings.enable_dataset_alias:
from airflow.datasets import DatasetAlias

# ignoring the type because older versions of Airflow raise the follow error in mypy
Expand Down Expand Up @@ -156,8 +155,6 @@ def get_sql_from_xcom(self, context: Context) -> str:
def get_remote_sql(self) -> str:
start_time = time.time()

if not settings.AIRFLOW_IO_AVAILABLE: # pragma: no cover
raise CosmosValueError(f"Cosmos async support is only available starting in Airflow 2.8 or later.")
try:
from airflow.sdk import ObjectStoragePath
except ImportError:
Expand Down
24 changes: 6 additions & 18 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@

from cosmos import cache, settings

if settings.AIRFLOW_IO_AVAILABLE:
try:
from airflow.sdk import ObjectStoragePath
except ImportError:
from airflow.io.path import ObjectStoragePath
try:
from airflow.sdk import ObjectStoragePath
except ImportError:
from airflow.io.path import ObjectStoragePath
from cosmos._utils.importer import load_method_from_module
from cosmos.cache import (
_copy_cached_package_lockfile_to_project,
Expand Down Expand Up @@ -321,13 +320,6 @@ def _configure_remote_target_path() -> tuple[Path | ObjectStoragePath, str] | tu
)
return None, None

if not settings.AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"You're trying to specify remote target path {target_path_str}, but the required "
f"Object Storage feature is unavailable in Airflow version {AIRFLOW_VERSION}. Please upgrade to "
"Airflow 2.8 or later."
)

_configured_target_path = ObjectStoragePath(target_path_str, conn_id=remote_conn_id)

if not _configured_target_path.exists(): # type: ignore[no-untyped-call]
Expand Down Expand Up @@ -813,7 +805,7 @@ def register_dataset(self, new_inlets: list[Asset], new_outlets: list[Asset], co
raise AirflowCompatibilityError(
"To emit datasets with Airflow 3, the setting `enable_dataset_alias` must be True (default)."
)
elif AIRFLOW_VERSION < Version("2.10") or not settings.enable_dataset_alias:
elif not settings.enable_dataset_alias:
from airflow.utils.session import create_session

logger.info("Assigning inlets/outlets without DatasetAlias")
Expand Down Expand Up @@ -952,11 +944,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:

AbstractDbtLocalBase.__init__(self, **base_kwargs)
if AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION:
if (
kwargs.get("emit_datasets", True)
and settings.enable_dataset_alias
and AIRFLOW_VERSION >= Version("2.10")
):
if kwargs.get("emit_datasets", True) and settings.enable_dataset_alias:
from airflow.datasets import DatasetAlias

# ignoring the type because older versions of Airflow raise the follow error in mypy
Expand Down
4 changes: 0 additions & 4 deletions cosmos/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

import airflow
from airflow.configuration import conf
from airflow.version import version as airflow_version
from packaging.version import Version

from cosmos.constants import (
DEFAULT_COSMOS_CACHE_DIR_NAME,
Expand Down Expand Up @@ -56,8 +54,6 @@
enable_setup_async_task = conf.getboolean("cosmos", "enable_setup_async_task", fallback=True)
enable_teardown_async_task = conf.getboolean("cosmos", "enable_teardown_async_task", fallback=True)

AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0")

# The following environment variable is populated in Astro Cloud
in_astro_cloud = os.getenv("ASTRONOMER_ENVIRONMENT") == "cloud"

Expand Down
2 changes: 1 addition & 1 deletion dev/dags/cosmos_callback_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"install_deps": True, # install any necessary dependencies before running any dbt command
"full_refresh": True, # used only in dbt commands that support this flag
# --------------------------------------------------------------
# Callback function to upload files using Airflow Object storage and Cosmos remote_target_path setting on Airflow 2.8 and above
# Callback function to upload files using Airflow Object storage and Cosmos remote_target_path setting
"callback": upload_to_cloud_storage,
# --------------------------------------------------------------
# Callback function to upload files to AWS S3, works for Airflow < 2.8 too
Expand Down
3 changes: 1 addition & 2 deletions dev/dags/example_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ def example_virtualenv() -> None:
# For the sake of avoiding additional latency observed while uploading files for each of the tasks, the
# below callback functions to be executed are commented, but you can uncomment them if you'd like to
# enable callback execution.
# Callback function to upload files using Airflow Object storage and Cosmos remote_target_path setting on
# Airflow 2.8 and above
# Callback function to upload files using Airflow Object storage and Cosmos remote_target_path setting
# "callback": upload_to_cloud_storage,
# --------------------------------------------------------------------------
# Callback function if you'd like to upload files from the target directory to remote store e.g. AWS S3 that
Expand Down
4 changes: 2 additions & 2 deletions docs/compatibility-policy.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ Python
Apache Airflow
~~~~~~~~~~~~~~

- **Minimum required**: Apache Airflow 2.6.0
- **Supported versions**: 2.6, 2.7, 2.8, 2.9, 2.10, 2.11, 3.0, 3.1
- **Minimum required**: Apache Airflow 2.10.0
- **Supported versions**: 2.10, 2.11, 3.0, 3.1

dbt Core
~~~~~~~~
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/callbacks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ using a single operator in an Airflow DAG:
Example: Using DbtDag or DbtTaskGroup
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If you're using Airflow 2.8 or later, you can leverage the :ref:`remote_target_path` configuration to upload files
You can leverage the :ref:`remote_target_path` configuration to upload files
from the target directory to a remote storage. Below is an example of how to define a callback helper function in your
``DbtDag`` that utilizes this configuration:

Expand Down
4 changes: 2 additions & 2 deletions docs/configuration/cosmos-conf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ This page lists all available Airflow configurations that affect ``astronomer-co
in a remote location (an alternative to the Variable cache approach released previously since Cosmos 1.5.0)
using this configuration. The value for the remote cache directory can be any of the schemes that are supported by
the `Airflow Object Store <https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/objectstorage.html>`_
feature introduced in Airflow 2.8.0 (e.g. ``s3://your_s3_bucket/cache_dir/``, ``gs://your_gs_bucket/cache_dir/``,
feature (e.g. ``s3://your_s3_bucket/cache_dir/``, ``gs://your_gs_bucket/cache_dir/``,
``abfs://your_azure_container/cache_dir``, etc.)

This is an experimental feature available since Cosmos 1.6 to gather user feedback and will be merged into the
Expand Down Expand Up @@ -181,7 +181,7 @@ This page lists all available Airflow configurations that affect ``astronomer-co
the target directory.
The value for the remote target path can be any of the schemes that are supported by the
`Airflow Object Store <https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/objectstorage.html>`_
feature introduced in Airflow 2.8.0 (e.g. ``s3://your_s3_bucket/target_dir/``, ``gs://your_gs_bucket/target_dir/``,
feature (e.g. ``s3://your_s3_bucket/target_dir/``, ``gs://your_gs_bucket/target_dir/``,
``abfs://your_azure_container/cache_dir``, etc.)

- Default: ``None``
Expand Down
8 changes: 2 additions & 6 deletions docs/configuration/lineage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,13 @@ Contributions are also welcome in the `OpenLineage project <https://github.com/O
Installation
------------

If using Airflow 2.7 or higher, install ``apache-airflow-providers-openlineage``.

Otherwise, install Cosmos using ``astronomer-cosmos[openlineage]``.
Install ``apache-airflow-providers-openlineage``.


Configuration
-------------

If using Airflow 2.7, follow `the instructions <https://airflow.apache.org/docs/apache-airflow-providers-openlineage/1.0.2/guides/user.html>`_ on how to configure OpenLineage.

Otherwise, follow `these instructions <https://openlineage.io/docs/integrations/airflow/>`_.
Follow `the instructions <https://airflow.apache.org/docs/apache-airflow-providers-openlineage/1.0.2/guides/user.html>`_ on how to configure OpenLineage.


Namespace
Expand Down
5 changes: 2 additions & 3 deletions docs/configuration/parsing-methods.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ If you already have a ``manifest.json`` file created by dbt, Cosmos will parse t
You can supply a ``manifest_path`` parameter on the DbtDag / DbtTaskGroup with a path to a ``manifest.json`` file.

Before Cosmos 1.6.0, the path to ``manifest.json`` supplied via the ``DbtDag`` / ``DbtTaskGroup`` ``manifest_path``
argument accepted only local paths. However, starting with Cosmos 1.6.0, if you've Airflow >= 2.8.0, you can supply a
a remote path (e.g., an S3 URL) too. For supporting remote paths, Cosmos leverages the
argument accepted only local paths. However, starting with Cosmos 1.6.0, you can supply a remote path (e.g., an S3 URL) too. For supporting remote paths, Cosmos leverages the
`Airflow Object Storage <https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/objectstorage.html>`_
feature released in Airflow 2.8.0.
feature.
For remote paths, you can specify a ``manifest_conn_id``, which is an
Airflow connection ID containing the credentials to access the remote path. If you do not specify a
``manifest_conn_id``, Cosmos will use the default connection ID specific to the scheme, identified using the Airflow
Expand Down
Loading
Loading