Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
e4faf8f
Run tests against Airflow 3.1
tatiana Sep 16, 2025
caea171
Change temporarily the pre-install af script to support 3.1
tatiana Sep 16, 2025
cbb6e06
Build tests against current branch in CI
tatiana Sep 16, 2025
f3eb59f
Support installing AF pre-releases in install script
tatiana Sep 16, 2025
1a2e624
Reduce tests currently running against 3.1 to focus on potential issues
tatiana Sep 16, 2025
3aade35
Fix constraint file for af 3.1.0b2
tatiana Sep 16, 2025
70c706e
Drop support to Python 3.9 due to AF 3.1
tatiana Sep 16, 2025
0074a5e
Fixes for vertica mocking with Airflow 3.1
tatiana Sep 16, 2025
8126f79
Change how we mock our profiles so it is compatible with Airflow 3.1
tatiana Sep 16, 2025
5e2f9f4
Merge branch 'main' into af-31
tatiana Oct 13, 2025
b0dba70
Fix ObjectStoragePath importpath in cosmos.operators.local and its te…
tatiana Sep 16, 2025
d86d570
Update setup scripts to use stable AF 3.1 instead of beta
tatiana Oct 13, 2025
09a641f
Fix reference to AF3.1 in GH action workflow
tatiana Oct 13, 2025
0fe0e6a
Fix tests that had TaskInstance without dag_version_id=None (breaking…
tatiana Oct 13, 2025
95d108e
Fix importpath for ObjectStoragePath
tatiana Oct 13, 2025
58f90eb
Fix deprecation warning related to ObjectStoragePath
tatiana Oct 13, 2025
3ca1276
Update deprecated paths in AF3
tatiana Oct 13, 2025
4ea1256
Fix AF3 import warnings
tatiana Oct 13, 2025
4dafbe7
Fix ObjectStoragePath import paths
tatiana Oct 13, 2025
8b0f171
Fix tests related to the breaking change into TaskInstance requiring …
tatiana Oct 13, 2025
9a00a45
Try to solve ObjectStoragePath test issues
tatiana Oct 13, 2025
95a2918
Reduce warnings
tatiana Oct 13, 2025
b6a3d20
Try to solve target/manifest issue
tatiana Oct 13, 2025
d5c12b2
Fix tests for 3.10-3.1-1.10
tatiana Oct 13, 2025
f7d44a9
Fix tests.py3.10-3.0-1.10 unit tests
tatiana Oct 13, 2025
9769941
Merge branch 'main' into af-31
tatiana Oct 13, 2025
e6a6c44
Change the dbt jaffle_shop project to avoid arguments error
tatiana Oct 13, 2025
9bcba84
Fix tests that broke after changing the dbt project
tatiana Oct 13, 2025
5db86e2
update dbt fusion tests to exclude recently removed tests
tatiana Oct 13, 2025
408bbc0
Fix watcher test to reflect last changes to the dbt project itself
tatiana Oct 14, 2025
d461407
Solve unittest related to rich logging
tatiana Oct 14, 2025
4da701e
Simplify rch logging test
tatiana Oct 14, 2025
1a45a7f
Attempt to solve ModuleNotFoundError: No module named 'dbt.adapters.c…
tatiana Oct 14, 2025
3c835b9
Undo change that did not work
tatiana Oct 14, 2025
d672754
Fix No module named 'dbt.adapters.catalogs'
tatiana Oct 14, 2025
b88bc23
Solve 'source: not found'
tatiana Oct 14, 2025
3c86139
Fix pendulum TypeError: 'module' object is not callable
tatiana Oct 14, 2025
411a3f2
Attempt so solve annoying dep conflicts
tatiana Oct 14, 2025
4e22c7a
Remove DuckDB DAG and deps to see if deps conflicts stop
tatiana Oct 14, 2025
0f9a058
Attempt to solve pendulum issue
tatiana Oct 14, 2025
2810507
Attempt to solve pendulum issue
tatiana Oct 14, 2025
432b3e1
Try to fix dependency issues
tatiana Oct 14, 2025
0b5af20
Try to fix dependency issues
tatiana Oct 14, 2025
4678604
Try to fix dependency issues
tatiana Oct 14, 2025
6012dd4
Try to fix dependency issues
tatiana Oct 14, 2025
c4809ea
Try to fix dependency issues
tatiana Oct 14, 2025
94b93ac
Revert to 1a45a7f1ffd5ecf7b56beda0b932d692cead0e77
tatiana Oct 14, 2025
35ba9ee
Revert to 1a45a7f1ffd5ecf7b56beda0b932d692cead0e77
tatiana Oct 14, 2025
bd9513c
No module named 'dbt.adapters.catalogs'
tatiana Oct 15, 2025
30fa5e5
Fix AttributeError: module 'airflow.hooks' has no attribute 'base'
tatiana Oct 15, 2025
42139f0
Re-add deleted DuckDB DAG
tatiana Oct 15, 2025
2ec0f1f
Re-add deleted DuckDB DAG
tatiana Oct 15, 2025
20d4314
Attempt to fix airflow.exceptions.AirflowException: Cannot create Dag…
tatiana Oct 15, 2025
63275cb
Fix AF3 import path warnings
tatiana Oct 15, 2025
43b540d
Restore incorrectly deleted dag example
tatiana Oct 15, 2025
48a078f
Attempt to fix airflow.exceptions.AirflowException: Cannot create Dag…
tatiana Oct 15, 2025
86bbff2
Revert DAG import since it caused more errors
tatiana Oct 15, 2025
65d15f9
Attempt to fix AF3.1 'DbtDag' object has no attribute 'create_dagrun'
tatiana Oct 15, 2025
a4596fb
Try to fix listener test
tatiana Oct 15, 2025
54e5537
Fix TaskGroup issue
tatiana Oct 16, 2025
13dd3a1
Fix Cosmos TaskGroup issue
tatiana Oct 16, 2025
786db22
Working solution for DbtTaskGroup with WATCHER mode
tatiana Oct 17, 2025
42a2209
Merge branch 'main' into af-31
tatiana Oct 17, 2025
9442879
Working solution for DbtTaskGroup with WATCHER mode
tatiana Oct 17, 2025
aecaa44
Fix AF3.1 installation error
tatiana Oct 17, 2025
0a9fdbb
Try to fix listener test for Airflow 3.0
tatiana Oct 17, 2025
c162cd5
Skip listener test in Airflow 3.1
tatiana Oct 20, 2025
d8ccaf8
Fix integration test skip
tatiana Oct 20, 2025
697ad94
Apply suggestion from @tatiana
tatiana Oct 20, 2025
c3059bd
Fix listener test in Airflow 3.0
tatiana Oct 20, 2025
79db770
Fix test_on_dag_run_success for AF3.0
tatiana Oct 20, 2025
4f3953d
Fix some integration tests
tatiana Oct 20, 2025
4997fa7
Fix unittest issues
tatiana Oct 21, 2025
f0fb6b7
Try to fix log-related tests that broke after last AF3.1 fix
tatiana Oct 21, 2025
174e0bc
Remove log level
tatiana Oct 21, 2025
a286246
Clean duplicated imports of ObjectStorage
tatiana Oct 21, 2025
383fc45
Fix bug introduced in test log
tatiana Oct 21, 2025
8a5f9be
Fix a few of the integraiton tests that are failing
tatiana Oct 21, 2025
70cafd2
Apply suggestion from @tatiana
tatiana Oct 21, 2025
40f0b0c
Skip last failing test in Airflow 3.1 https://github.com/astronomer/a…
tatiana Oct 21, 2025
d0c1ccb
Revert TaskGroup fixes that are being added in separate PR https://gi…
tatiana Oct 21, 2025
6762eca
Fix 3.11, 2.6, 1.10
tatiana Oct 21, 2025
eda9844
Merge branch 'main' into af-31
tatiana Oct 21, 2025
b0db04e
Apply suggestion from @tatiana
tatiana Oct 21, 2025
322e660
Apply suggestion from @tatiana
tatiana Oct 21, 2025
5dae9c7
Update tests/operators/test_local.py
tatiana Oct 22, 2025
41848cb
Apply suggestion from @tatiana
tatiana Oct 22, 2025
1420ffd
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Oct 22, 2025
508cbce
Apply suggestion from @tatiana
tatiana Oct 22, 2025
583dfa1
Apply suggestion from @Copilot
tatiana Oct 22, 2025
f5bde1c
Apply suggestion from @Copilot
tatiana Oct 22, 2025
74b1271
Apply suggestion from @tatiana
tatiana Oct 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.9", "3.10", "3.11", "3.12"]
airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0"]
python-version: ["3.10", "3.11", "3.12"]
airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0", "3.1"]
dbt-version: ["1.10"]
exclude:
- python-version: "3.11"
Expand Down Expand Up @@ -123,8 +123,8 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.9", "3.10", "3.11"]
airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0"]
python-version: ["3.10", "3.11"]
airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0", "3.1"]
dbt-version: ["1.10"]
exclude:
- python-version: "3.11"
Expand Down Expand Up @@ -551,7 +551,7 @@ jobs:
fail-fast: false
matrix:
python-version: ["3.11"]
airflow-version: ["2.10", "3.0" ]
airflow-version: ["2.10", "3.0"]
dbt-version: ["1.9"]
num-models: [1, 10, 50, 100, 500]
services:
Expand Down
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from cosmos import settings

__version__ = "1.11.0a4"
__version__ = "1.11.0a10"

if not settings.enable_memory_optimised_imports:
from cosmos.airflow.dag import DbtDag
Expand Down
2 changes: 1 addition & 1 deletion cosmos/airflow/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from typing import Any

from airflow.models.dag import DAG
from airflow.models.dag import DAG # type: ignore[assignment]

from cosmos.converter import DbtToAirflowConverter, airflow_kwargs, specific_kwargs

Expand Down
7 changes: 6 additions & 1 deletion cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@

from airflow.models.base import ID_LEN as AIRFLOW_MAX_ID_LENGTH
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

try:
# Airflow 3.1 onwards
from airflow.sdk import TaskGroup
except ImportError:
from airflow.utils.task_group import TaskGroup

from cosmos import settings
from cosmos.config import RenderConfig
Expand Down
6 changes: 5 additions & 1 deletion cosmos/airflow/task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@

from typing import Any

from airflow.utils.task_group import TaskGroup
try:
# Airflow 3.1 onwards
from airflow.sdk import TaskGroup
except ImportError:
from airflow.utils.task_group import TaskGroup

from cosmos.converter import DbtToAirflowConverter, airflow_kwargs, specific_kwargs

Expand Down
15 changes: 11 additions & 4 deletions cosmos/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from airflow.models import DagRun, Variable
from airflow.models.dag import DAG
from airflow.utils.session import provide_session
from airflow.utils.task_group import TaskGroup
from airflow.version import version as airflow_version
from sqlalchemy import select
from sqlalchemy.orm import Session
Expand All @@ -26,14 +25,16 @@

if TYPE_CHECKING:
try:
# Airflow 3 onwards
from airflow.sdk import ObjectStoragePath
from airflow.utils.task_group import TaskGroup
except ImportError:
try:
from airflow.io.path import ObjectStoragePath
except ImportError:
pass
except ImportError:
pass
from airflow.utils.task_group import TaskGroup

from cosmos.constants import (
DBT_MANIFEST_FILE_NAME,
DBT_TARGET_DIR_NAME,
Expand Down Expand Up @@ -83,7 +84,13 @@ def _configure_remote_cache_dir() -> Path | ObjectStoragePath | None:
"Airflow 2.8 or later."
)

from airflow.io.path import ObjectStoragePath
try:
from airflow.sdk import ObjectStoragePath
except ImportError:
try:
from airflow.io.path import ObjectStoragePath
except ImportError:
pass

_configured_cache_dir = ObjectStoragePath(cache_dir_str, conn_id=remote_cache_conn_id)

Expand Down
9 changes: 4 additions & 5 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@

from cosmos import settings

if TYPE_CHECKING:
if settings.AIRFLOW_IO_AVAILABLE or TYPE_CHECKING:
try:
from airflow.io.path import ObjectStoragePath
from airflow.sdk import ObjectStoragePath
except ImportError:
pass
from airflow.io.path import ObjectStoragePath

from cosmos.cache import create_cache_profile, get_cached_profile, is_profile_cache_enabled
from cosmos.constants import (
DEFAULT_PROFILES_FILE_NAME,
Expand Down Expand Up @@ -236,8 +237,6 @@ def __init__(
)

if settings.AIRFLOW_IO_AVAILABLE:
from airflow.io.path import ObjectStoragePath

self.manifest_path = ObjectStoragePath(manifest_path_str, conn_id=manifest_conn_id)
else:
self.manifest_path = Path(manifest_path_str)
Expand Down
7 changes: 6 additions & 1 deletion cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
from warnings import warn

from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

try:
# Airflow 3.1 onwards
from airflow.sdk import TaskGroup
except ImportError:
from airflow.utils.task_group import TaskGroup

from cosmos import cache, settings
from cosmos.airflow.graph import build_airflow_graph
Expand Down
7 changes: 6 additions & 1 deletion cosmos/core/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
except ImportError: # Airflow 2
from airflow.models import BaseOperator
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

try:
# Airflow 3.1 onwards
from airflow.sdk import TaskGroup
except ImportError:
from airflow.utils.task_group import TaskGroup

from cosmos.core.graph.entities import Task
from cosmos.log import get_logger
Expand Down
12 changes: 11 additions & 1 deletion cosmos/core/graph/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@
from dataclasses import dataclass, field
from typing import Any, Dict, List

import airflow
from packaging.version import Version

from cosmos.log import get_logger

logger = get_logger(__name__)


AIRFLOW_VERSION = Version(airflow.__version__)


@dataclass
class CosmosEntity:
"""
Expand Down Expand Up @@ -58,6 +64,10 @@ class Task(CosmosEntity):
"""

owner: str = ""
operator_class: str = "airflow.operators.empty.EmptyOperator"
operator_class: str = (
"airflow.operators.empty.EmptyOperator"
if AIRFLOW_VERSION < Version("3.0")
else "airflow.providers.standard.operators.empty.EmptyOperator"
)
arguments: Dict[str, Any] = field(default_factory=dict)
extra_context: Dict[str, Any] = field(default_factory=dict)
10 changes: 9 additions & 1 deletion cosmos/dataset.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from airflow import DAG
from airflow.utils.task_group import TaskGroup

if TYPE_CHECKING:
try:
# Airflow 3.1 onwards
from airflow.utils.task_group import TaskGroup
except ImportError:
from airflow.utils.task_group import TaskGroup


def get_dataset_alias_name(dag: DAG | None, task_group: TaskGroup | None, task_id: str) -> str:
Expand Down
8 changes: 6 additions & 2 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@

if TYPE_CHECKING:
try:
from airflow.io.path import ObjectStoragePath
# Airflow 3 onwards
from airflow.sdk import ObjectStoragePath
except ImportError:
pass
try:
from airflow.io.path import ObjectStoragePath
except ImportError:
pass

import cosmos.dbt.runner as dbt_runner
from cosmos import cache, settings
Expand Down
8 changes: 6 additions & 2 deletions cosmos/hooks/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from tempfile import TemporaryDirectory, gettempdir
from typing import NamedTuple

from airflow.hooks.base import BaseHook
try:
# Airflow 3.1 onwards
from airflow.sdk.bases.hook import BaseHook
except ImportError:
from airflow.hooks.base import BaseHook


class FullOutputSubprocessResult(NamedTuple):
Expand All @@ -20,7 +24,7 @@ class FullOutputSubprocessResult(NamedTuple):
full_output: list[str]


class FullOutputSubprocessHook(BaseHook):
class FullOutputSubprocessHook(BaseHook): # type: ignore[misc]
"""Hook for running processes with the ``subprocess`` module."""

def __init__(self) -> None:
Expand Down
13 changes: 8 additions & 5 deletions cosmos/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@
from typing import Any
from urllib.parse import urlparse

try:
from airflow.sdk import ObjectStoragePath
except ImportError:
try:
from airflow.io.path import ObjectStoragePath
except ImportError:
pass

from cosmos import settings
from cosmos.constants import DEFAULT_TARGET_PATH, FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP
from cosmos.exceptions import CosmosValueError
Expand Down Expand Up @@ -158,9 +166,6 @@ def _configure_remote_target_path() -> tuple[Path, str] | tuple[None, None]:
f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
"Airflow 2.8 or later."
)

from airflow.io.path import ObjectStoragePath

_configured_target_path = ObjectStoragePath(target_path_str, conn_id=remote_conn_id)

if not _configured_target_path.exists(): # type: ignore[no-untyped-call]
Expand Down Expand Up @@ -207,8 +212,6 @@ def upload_to_cloud_storage(project_dir: str, source_subpath: str = DEFAULT_TARG
if not dest_target_dir:
raise CosmosValueError("You're trying to upload artifact files, but the remote target path is not configured.")

from airflow.io.path import ObjectStoragePath

source_target_dir = Path(project_dir) / f"{source_subpath}"
files = [str(file) for file in source_target_dir.rglob("*") if file.is_file()]
for file_path in files:
Expand Down
10 changes: 8 additions & 2 deletions cosmos/operators/_asynchronous/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any

try:
from airflow.sdk import ObjectStoragePath
except ImportError:
try:
from airflow.io.path import ObjectStoragePath
except ImportError:
pass

from cosmos.operators.local import DbtRunLocalOperator

if TYPE_CHECKING: # pragma: no cover
Expand Down Expand Up @@ -59,8 +67,6 @@ def execute(self, context: Context, **kwargs: Any) -> Any:

dest_target_dir, dest_conn_id = self._configure_remote_target_path()

from airflow.io.path import ObjectStoragePath

dag_task_group_identifier = self.extra_context["dbt_dag_task_group_identifier"]
run_id = context["run_id"]
run_dir_path_str = f"{str(dest_target_dir).rstrip('/')}/{dag_task_group_identifier}/{run_id}"
Expand Down
5 changes: 4 additions & 1 deletion cosmos/operators/_asynchronous/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ def get_remote_sql(self) -> str:

if not settings.AIRFLOW_IO_AVAILABLE: # pragma: no cover
raise CosmosValueError(f"Cosmos async support is only available starting in Airflow 2.8 or later.")
from airflow.io.path import ObjectStoragePath
try:
Comment thread
tatiana marked this conversation as resolved.
from airflow.sdk import ObjectStoragePath
except ImportError:
from airflow.io.path import ObjectStoragePath

file_path = self.async_context["dbt_node_config"]["file_path"] # type: ignore
dbt_dag_task_group_identifier = self.async_context["dbt_dag_task_group_identifier"]
Expand Down
4 changes: 2 additions & 2 deletions cosmos/operators/_asynchronous/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

from typing import Any

from airflow.utils.context import Context
from airflow.utils.context import Context # type: ignore[attr-defined]

try:
from airflow.sdk.bases.operator import BaseOperator # Airflow 3
except ImportError:
except (ImportError, AttributeError):
from airflow.models import BaseOperator # Airflow 2


Expand Down
Loading