Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
e4faf8f
Run tests against Airflow 3.1
tatiana Sep 16, 2025
caea171
Change temporarily the pre-install af script to support 3.1
tatiana Sep 16, 2025
cbb6e06
Build tests against current branch in CI
tatiana Sep 16, 2025
f3eb59f
Support installing AF pre-releases in install script
tatiana Sep 16, 2025
1a2e624
Reduce tests currently running against 3.1 to focus on potential issues
tatiana Sep 16, 2025
3aade35
Fix constraint file for af 3.1.0b2
tatiana Sep 16, 2025
70c706e
Drop support to Python 3.9 due to AF 3.1
tatiana Sep 16, 2025
0074a5e
Fixes for vertica mocking with Airflow 3.1
tatiana Sep 16, 2025
8126f79
Change how we mock our profiles so it is compatible with Airflow 3.1
tatiana Sep 16, 2025
5e2f9f4
Merge branch 'main' into af-31
tatiana Oct 13, 2025
b0dba70
Fix ObjectStoragePath importpath in cosmos.operators.local and its te…
tatiana Sep 16, 2025
d86d570
Update setup scripts to use stable AF 3.1 instead of beta
tatiana Oct 13, 2025
09a641f
Fix reference to AF3.1 in GH action workflow
tatiana Oct 13, 2025
0fe0e6a
Fix tests that had TaskInstance without dag_version_id=None (breaking…
tatiana Oct 13, 2025
95d108e
Fix importpath for ObjectStoragePath
tatiana Oct 13, 2025
58f90eb
Fix deprecation warning related to ObjectStoragePath
tatiana Oct 13, 2025
3ca1276
Update deprecated paths in AF3
tatiana Oct 13, 2025
4ea1256
Fix AF3 import warnings
tatiana Oct 13, 2025
4dafbe7
Fix ObjectStoragePath import paths
tatiana Oct 13, 2025
8b0f171
Fix tests related to the breaking change into TaskInstance requiring …
tatiana Oct 13, 2025
9a00a45
Try to solve ObjectStoragePath test issues
tatiana Oct 13, 2025
95a2918
Reduce warnings
tatiana Oct 13, 2025
b6a3d20
Try to solve target/manifest issue
tatiana Oct 13, 2025
d5c12b2
Fix tests for 3.10-3.1-1.10
tatiana Oct 13, 2025
f7d44a9
Fix tests.py3.10-3.0-1.10 unit tests
tatiana Oct 13, 2025
9769941
Merge branch 'main' into af-31
tatiana Oct 13, 2025
e6a6c44
Change the dbt jaffle_shop project to avoid arguments error
tatiana Oct 13, 2025
9bcba84
Fix tests that broke after changing the dbt project
tatiana Oct 13, 2025
5db86e2
update dbt fusion tests to exclude recently removed tests
tatiana Oct 13, 2025
408bbc0
Fix watcher test to reflect last changes to the dbt project itself
tatiana Oct 14, 2025
d461407
Solve unittest related to rich logging
tatiana Oct 14, 2025
4da701e
Simplify rch logging test
tatiana Oct 14, 2025
1a45a7f
Attempt to solve ModuleNotFoundError: No module named 'dbt.adapters.c…
tatiana Oct 14, 2025
3c835b9
Undo change that did not work
tatiana Oct 14, 2025
d672754
Fix No module named 'dbt.adapters.catalogs'
tatiana Oct 14, 2025
b88bc23
Solve 'source: not found'
tatiana Oct 14, 2025
3c86139
Fix pendulum TypeError: 'module' object is not callable
tatiana Oct 14, 2025
411a3f2
Attempt so solve annoying dep conflicts
tatiana Oct 14, 2025
4e22c7a
Remove DuckDB DAG and deps to see if deps conflicts stop
tatiana Oct 14, 2025
0f9a058
Attempt to solve pendulum issue
tatiana Oct 14, 2025
2810507
Attempt to solve pendulum issue
tatiana Oct 14, 2025
432b3e1
Try to fix dependency issues
tatiana Oct 14, 2025
0b5af20
Try to fix dependency issues
tatiana Oct 14, 2025
4678604
Try to fix dependency issues
tatiana Oct 14, 2025
6012dd4
Try to fix dependency issues
tatiana Oct 14, 2025
c4809ea
Try to fix dependency issues
tatiana Oct 14, 2025
94b93ac
Revert to 1a45a7f1ffd5ecf7b56beda0b932d692cead0e77
tatiana Oct 14, 2025
35ba9ee
Revert to 1a45a7f1ffd5ecf7b56beda0b932d692cead0e77
tatiana Oct 14, 2025
bd9513c
No module named 'dbt.adapters.catalogs'
tatiana Oct 15, 2025
30fa5e5
Fix AttributeError: module 'airflow.hooks' has no attribute 'base'
tatiana Oct 15, 2025
42139f0
Re-add deleted DuckDB DAG
tatiana Oct 15, 2025
2ec0f1f
Re-add deleted DuckDB DAG
tatiana Oct 15, 2025
20d4314
Attempt to fix airflow.exceptions.AirflowException: Cannot create Dag…
tatiana Oct 15, 2025
63275cb
Fix AF3 import path warnings
tatiana Oct 15, 2025
43b540d
Restore incorrectly deleted dag example
tatiana Oct 15, 2025
48a078f
Attempt to fix airflow.exceptions.AirflowException: Cannot create Dag…
tatiana Oct 15, 2025
86bbff2
Revert DAG import since it caused more errors
tatiana Oct 15, 2025
65d15f9
Attempt to fix AF3.1 'DbtDag' object has no attribute 'create_dagrun'
tatiana Oct 15, 2025
a4596fb
Try to fix listener test
tatiana Oct 15, 2025
54e5537
Fix TaskGroup issue
tatiana Oct 16, 2025
13dd3a1
Fix Cosmos TaskGroup issue
tatiana Oct 16, 2025
cf8a3cb
Fix `dag.test()` for Airflow 3.1+ by syncing DAG to database
kaxil Oct 16, 2025
952542c
Update test.yml
kaxil Oct 16, 2025
dce0edf
Update pre-install-airflow.sh
kaxil Oct 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: test

on:
push: # Run on pushes to the default branch
branches: [main]
branches: [main, af-31, dag-test-backcompat]
# Also run on pull requests originating from forks. Although this is insecure by default, we need it to run
# integration tests on forked PRs. As a guardrail, we’ve added an Authorize step to each job, which requires manually
# approving the workflow run for each pushed commit. Approval only happens after a careful code review of the changes.
Expand Down Expand Up @@ -51,8 +51,8 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.9", "3.10", "3.11", "3.12"]
airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0"]
python-version: ["3.10", "3.11", "3.12"]
airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0", "3.1"]
dbt-version: ["1.10"]
exclude:
- python-version: "3.11"
Expand Down Expand Up @@ -120,8 +120,8 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.9", "3.10", "3.11"]
airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0"]
python-version: ["3.10", "3.11"]
airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0", "3.1"]
dbt-version: ["1.10"]
exclude:
- python-version: "3.11"
Expand Down Expand Up @@ -548,7 +548,7 @@ jobs:
fail-fast: false
matrix:
python-version: ["3.11"]
airflow-version: ["2.10", "3.0" ]
airflow-version: ["2.10", "3.0"]
dbt-version: ["1.9"]
num-models: [1, 10, 50, 100, 500]
services:
Expand Down
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from cosmos import settings

__version__ = "1.11.0a4"
__version__ = "1.11.0a9"

if not settings.enable_memory_optimised_imports:
from cosmos.airflow.dag import DbtDag
Expand Down
2 changes: 1 addition & 1 deletion cosmos/airflow/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from typing import Any

from airflow.models.dag import DAG
from airflow.models.dag import DAG # type: ignore[assignment]

from cosmos.converter import DbtToAirflowConverter, airflow_kwargs, specific_kwargs

Expand Down
54 changes: 37 additions & 17 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@

from airflow.models.base import ID_LEN as AIRFLOW_MAX_ID_LENGTH
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

try:
# Airflow 3.1 onwards
from airflow.sdk import TaskGroup
except ImportError:
from airflow.utils.task_group import TaskGroup

from cosmos import settings
from cosmos.config import RenderConfig
Expand Down Expand Up @@ -541,14 +546,14 @@ def _add_dbt_setup_async_task(
tasks_map[DBT_SETUP_ASYNC_TASK_ID] = setup_airflow_task


def _add_producer_watcher(
def _add_producer_watcher_and_dependencies(
dag: DAG,
task_args: dict[str, Any],
tasks_map: dict[str, Any],
task_group: TaskGroup | None,
render_config: RenderConfig | None = None,
nodes: dict[str, DbtNode] | None = None,
) -> str:

producer_task_args = task_args.copy()

if render_config is not None:
Expand All @@ -562,11 +567,26 @@ def _add_producer_watcher(
arguments=producer_task_args,
)
producer_airflow_task = create_airflow_task(producer_task_metadata, dag, task_group=task_group)
for task_id, task in tasks_map.items():

# Consumer tasks will need to be updated to use the producer task as a dependency
for node_id, task_or_taskgroup in tasks_map.items():
# we want to make the producer task to be the parent of the root dbt nodes, without blocking them from sensing XCom
if not task.upstream_list:
producer_airflow_task >> task
task.trigger_rule = task_args.get("trigger_rule", "always")
node_tasks = (
list(task_or_taskgroup.children.values())
if isinstance(task_or_taskgroup, TaskGroup)
else [task_or_taskgroup]
)

# First, we tackle dbt graph nodes that are root nodes
if nodes and node_id in nodes and not nodes[node_id].depends_on:
producer_airflow_task >> task_or_taskgroup
for root_task in node_tasks:
if hasattr(root_task, "trigger_rule"):
root_task.trigger_rule = task_args.get("trigger_rule", "always")

# We also need to set the producer task id too all consumer tasks, regardless if they are root or not
for task in node_tasks:
task.producer_task_id = producer_airflow_task.task_id # type: ignore[attr-defined]

tasks_map[PRODUCER_WATCHER_TASK_ID] = producer_airflow_task
return producer_airflow_task.task_id
Expand Down Expand Up @@ -745,16 +765,6 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro
logger.debug(f"Conversion of <{node.unique_id}> was successful!")
tasks_map[node_id] = task_or_group

if execution_mode == ExecutionMode.WATCHER:
producer_watcher_task_id = _add_producer_watcher(
dag,
task_args,
tasks_map,
task_group,
render_config=render_config,
)
task_args["producer_watcher_task_id"] = producer_watcher_task_id

# If test_behaviour=="after_all", there will be one test task, run by the end of the DAG
# The end of a DAG is defined by the DAG leaf tasks (tasks which do not have downstream tasks)
if test_behavior == TestBehavior.AFTER_ALL:
Expand Down Expand Up @@ -790,6 +800,16 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro

create_airflow_task_dependencies(nodes, tasks_map)

if execution_mode == ExecutionMode.WATCHER:
_add_producer_watcher_and_dependencies(
dag=dag,
task_args=task_args,
tasks_map=tasks_map,
task_group=task_group,
render_config=render_config,
nodes=nodes,
)

if settings.enable_setup_async_task:
_add_dbt_setup_async_task(
dag,
Expand Down
6 changes: 5 additions & 1 deletion cosmos/airflow/task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@

from typing import Any

from airflow.utils.task_group import TaskGroup
try:
# Airflow 3.1 onwards
from airflow.sdk import TaskGroup
except ImportError:
from airflow.utils.task_group import TaskGroup

from cosmos.converter import DbtToAirflowConverter, airflow_kwargs, specific_kwargs

Expand Down
15 changes: 11 additions & 4 deletions cosmos/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from airflow.models import DagRun, Variable
from airflow.models.dag import DAG
from airflow.utils.session import provide_session
from airflow.utils.task_group import TaskGroup
from airflow.version import version as airflow_version
from sqlalchemy import select
from sqlalchemy.orm import Session
Expand All @@ -26,14 +25,16 @@

if TYPE_CHECKING:
try:
# Airflow 3 onwards
from airflow.sdk import ObjectStoragePath
from airflow.utils.task_group import TaskGroup
except ImportError:
try:
from airflow.io.path import ObjectStoragePath
except ImportError:
pass
except ImportError:
pass
from airflow.utils.task_group import TaskGroup

from cosmos.constants import (
DBT_MANIFEST_FILE_NAME,
DBT_TARGET_DIR_NAME,
Expand Down Expand Up @@ -83,7 +84,13 @@ def _configure_remote_cache_dir() -> Path | ObjectStoragePath | None:
"Airflow 2.8 or later."
)

from airflow.io.path import ObjectStoragePath
try:
from airflow.sdk import ObjectStoragePath
except ImportError:
try:
from airflow.io.path import ObjectStoragePath
except ImportError:
pass

_configured_cache_dir = ObjectStoragePath(cache_dir_str, conn_id=remote_cache_conn_id)

Expand Down
9 changes: 4 additions & 5 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@

from cosmos import settings

if TYPE_CHECKING:
if settings.AIRFLOW_IO_AVAILABLE or TYPE_CHECKING:
try:
from airflow.io.path import ObjectStoragePath
from airflow.sdk import ObjectStoragePath
except ImportError:
pass
from airflow.io.path import ObjectStoragePath

from cosmos.cache import create_cache_profile, get_cached_profile, is_profile_cache_enabled
from cosmos.constants import (
DEFAULT_PROFILES_FILE_NAME,
Expand Down Expand Up @@ -236,8 +237,6 @@ def __init__(
)

if settings.AIRFLOW_IO_AVAILABLE:
from airflow.io.path import ObjectStoragePath

self.manifest_path = ObjectStoragePath(manifest_path_str, conn_id=manifest_conn_id)
else:
self.manifest_path = Path(manifest_path_str)
Expand Down
4 changes: 4 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class TestBehavior(Enum):
Behavior of the tests.
"""

__test__ = False

BUILD = "build"
NONE = "none"
AFTER_EACH = "after_each"
Expand Down Expand Up @@ -116,6 +118,8 @@ class TestIndirectSelection(Enum):
Modes to configure the test behavior when performing indirect selection.
"""

__test__ = False

EAGER = "eager"
CAUTIOUS = "cautious"
BUILDABLE = "buildable"
Expand Down
7 changes: 6 additions & 1 deletion cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
from warnings import warn

from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

try:
# Airflow 3.1 onwards
from airflow.sdk import TaskGroup
except ImportError:
from airflow.utils.task_group import TaskGroup

from cosmos import cache, settings
from cosmos.airflow.graph import build_airflow_graph
Expand Down
7 changes: 6 additions & 1 deletion cosmos/core/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
except ImportError: # Airflow 2
from airflow.models import BaseOperator
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

try:
# Airflow 3.1 onwards
from airflow.sdk import TaskGroup
except ImportError:
from airflow.utils.task_group import TaskGroup

from cosmos.core.graph.entities import Task
from cosmos.log import get_logger
Expand Down
12 changes: 11 additions & 1 deletion cosmos/core/graph/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@
from dataclasses import dataclass, field
from typing import Any, Dict, List

import airflow
from packaging.version import Version

from cosmos.log import get_logger

logger = get_logger(__name__)


AIRFLOW_VERSION = Version(airflow.__version__)


@dataclass
class CosmosEntity:
"""
Expand Down Expand Up @@ -58,6 +64,10 @@ class Task(CosmosEntity):
"""

owner: str = ""
operator_class: str = "airflow.operators.empty.EmptyOperator"
operator_class: str = (
"airflow.operators.empty.EmptyOperator"
if AIRFLOW_VERSION < Version("3.0")
else "airflow.providers.standard.operators.empty.EmptyOperator"
)
arguments: Dict[str, Any] = field(default_factory=dict)
extra_context: Dict[str, Any] = field(default_factory=dict)
10 changes: 9 additions & 1 deletion cosmos/dataset.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from airflow import DAG
from airflow.utils.task_group import TaskGroup

if TYPE_CHECKING:
try:
# Airflow 3.1 onwards
from airflow.utils.task_group import TaskGroup
except ImportError:
from airflow.utils.task_group import TaskGroup


def get_dataset_alias_name(dag: DAG | None, task_group: TaskGroup | None, task_id: str) -> str:
Expand Down
8 changes: 6 additions & 2 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@

if TYPE_CHECKING:
try:
from airflow.io.path import ObjectStoragePath
# Airflow 3 onwards
from airflow.sdk import ObjectStoragePath
except ImportError:
pass
try:
from airflow.io.path import ObjectStoragePath
except ImportError:
pass

import cosmos.dbt.runner as dbt_runner
from cosmos import cache, settings
Expand Down
8 changes: 6 additions & 2 deletions cosmos/hooks/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from tempfile import TemporaryDirectory, gettempdir
from typing import NamedTuple

from airflow.hooks.base import BaseHook
try:
# Airflow 3.1 onwards
from airflow.sdk.bases.hook import BaseHook
except ImportError:
from airflow.hooks.base import BaseHook


class FullOutputSubprocessResult(NamedTuple):
Expand All @@ -20,7 +24,7 @@ class FullOutputSubprocessResult(NamedTuple):
full_output: list[str]


class FullOutputSubprocessHook(BaseHook):
class FullOutputSubprocessHook(BaseHook): # type: ignore[misc]
"""Hook for running processes with the ``subprocess`` module."""

def __init__(self) -> None:
Expand Down
Loading
Loading