Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
40bcd48
Assert Example DAGs' DagRunState
pankajkoti May 26, 2025
64ed30a
Apply suggestions from code review
pankajkoti May 27, 2025
353214e
Add compatibility with older versions of Airflow dag.test()
tatiana May 28, 2025
c1206cd
Merge branch 'main' into assert-example-dagrun-state
tatiana Jun 3, 2025
4c52924
Update dev/dags/cosmos_callback_dag.py
tatiana Jun 3, 2025
3269a8c
Update cosmos/io.py
tatiana Jun 3, 2025
73ae439
Update tests/test_io.py
tatiana Jun 3, 2025
05ba199
Update tests/operators/test_local.py
tatiana Jun 3, 2025
a01ff5c
Update dev/dags/example_cosmos_dbt_build.py
tatiana Jun 3, 2025
98c77d5
Update dev/dags/example_source_rendering.py
tatiana Jun 3, 2025
612e3c0
Fix some of the DAG test failures
tatiana Jun 6, 2025
c1ef13c
Fix AF2 int tests
tatiana Jun 6, 2025
5667c8d
Fix test for dbt 1.5.4
tatiana Jun 6, 2025
445f738
Fix test for dbt 1.5.4
tatiana Jun 6, 2025
4ce1f8a
Fix test for dbt 1.5.4
tatiana Jun 6, 2025
a577203
Skip async test for if Py <= 3.9 due to dbt-bigquery not supporting P…
tatiana Jun 6, 2025
0aebaee
Fix test_async_example_dag_without_setup_task
tatiana Jun 9, 2025
5d7ea92
Temporarily ignore test DAG that used non-ASCII characters in AF3
tatiana Jun 10, 2025
afa6147
Fix cosmos/io.py so it supports AF3 as well as AF2
tatiana Jun 10, 2025
a919d69
Install latest AF3
tatiana Jun 11, 2025
87d2ee3
Change example_cosmos_dbt_build DAG to use unaltered jaffle_shop project
tatiana Jun 11, 2025
514ef78
Try to fix test_async_example_dag_without_setup_task
tatiana Jun 12, 2025
51f40b0
Revert changes to test_example_dags
tatiana Jun 12, 2025
584fae8
Disable emiting outlet from `cosmos_see_dag.py` to avoid `AirflowInac…
tatiana Jun 12, 2025
24e7fae
Try to fix PyDantic errors in AF2.6 + Py 3.10
tatiana Jun 12, 2025
374a116
Revert unecessary change
tatiana Jun 12, 2025
344a478
Try to fix issue with pydantic
tatiana Jun 12, 2025
402ffe5
google provider installation
tatiana Jun 12, 2025
dcc782c
Fix running test_async_example_dag_without_setup_task in the CI
tatiana Jun 12, 2025
4f5b7dc
try to solve pydantic issue
tatiana Jun 12, 2025
a9c6e25
Troubleshoot in the CI
tatiana Jun 12, 2025
6c121c3
Troubleshoot in the CI
tatiana Jun 12, 2025
9306913
Attempt to avoid error seen in
tatiana Jun 12, 2025
521881e
Fix unittest
tatiana Jun 12, 2025
38b02c6
Give more visibility of what is going on
tatiana Jun 12, 2025
408c568
Attempt to solve CI issue with Py2.6
tatiana Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

from cosmos import settings
from cosmos.config import RenderConfig
from cosmos.constants import (
DBT_SETUP_ASYNC_TASK_ID,
Expand All @@ -31,7 +32,6 @@
from cosmos.dbt.graph import DbtNode
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
from cosmos.settings import enable_setup_async_task, enable_teardown_async_task

logger = get_logger(__name__)

Expand Down Expand Up @@ -666,7 +666,7 @@ def build_airflow_graph(
tasks_map[node_id] = test_task

create_airflow_task_dependencies(nodes, tasks_map)
if enable_setup_async_task:
if settings.enable_setup_async_task:
_add_dbt_setup_async_task(
dag,
execution_mode,
Expand All @@ -676,7 +676,7 @@ def build_airflow_graph(
render_config=render_config,
async_py_requirements=async_py_requirements,
)
if enable_teardown_async_task:
if settings.enable_teardown_async_task:
_add_teardown_task(
dag,
execution_mode,
Expand Down
5 changes: 4 additions & 1 deletion cosmos/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@ def upload_to_aws_s3(
hook = S3Hook(aws_conn_id=aws_conn_id)
context = kwargs["context"]

# Airflow 3 and Airflow 2 compatibility, respectively:
try_number = getattr(context["task_instance"], "try_number") or getattr(context["task_instance"], "_try_number")

# Iterate over the files in the target dir and upload them to S3
for dirpath, _, filenames in os.walk(target_dir):
for filename in filenames:
s3_key = (
f"{context['dag'].dag_id}"
f"/{context['run_id']}"
f"/{context['task_instance'].task_id}"
f"/{context['task_instance']._try_number}"
f"/{try_number}"
f"{dirpath.split(project_dir)[-1]}/{filename}"
)
hook.load_file(
Expand Down
20 changes: 13 additions & 7 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ def _configure_remote_target_path() -> tuple[Path, str] | tuple[None, None]:
target_path_schema = urlparse(target_path_str).scheme
remote_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(target_path_schema, None) # type: ignore[assignment]
if remote_conn_id is None:
logger.info(
"Remote target connection not set. Please, configure [cosmos][remote_target_path_conn_id] or set the environment variable AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID"
)
return None, None

if not settings.AIRFLOW_IO_AVAILABLE:
Expand Down Expand Up @@ -500,13 +503,16 @@ def _generate_dbt_flags(self, tmp_project_dir: str, profile_path: Path) -> list[
self.profile_config.target_name,
]
if self.invocation_mode == InvocationMode.DBT_RUNNER:
# PR #1484 introduced the use of dbtRunner during DAG parsing. As a result, invoking dbtRunner again
# during task execution can lead to task hangs—especially on Airflow 2.x. Investigation revealed that
# the issue stems from how dbtRunner handles static parsing. Cosmos copies the dbt project to temporary
# directories, and the use of different temp paths between parsing and execution appears to interfere
# with dbt's static parsing behavior. As a workaround, passing the --no-static-parser flag avoids these
# hangs and ensures reliable task execution.
dbt_flags.append("--no-static-parser")
from dbt.version import __version__ as dbt_version

if Version(dbt_version) >= Version("1.5.6"):
# PR #1484 introduced the use of dbtRunner during DAG parsing. As a result, invoking dbtRunner again
# during task execution can lead to task hangs—especially on Airflow 2.x. Investigation revealed that
# the issue stems from how dbtRunner handles static parsing. Cosmos copies the dbt project to temporary
# directories, and the use of different temp paths between parsing and execution appears to interfere
# with dbt's static parsing behavior. As a workaround, passing the --no-static-parser flag avoids these
# hangs and ensures reliable task execution.
dbt_flags.append("--no-static-parser")
return dbt_flags

def _install_dependencies(
Expand Down
7 changes: 5 additions & 2 deletions dev/dags/cosmos_seed_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from pathlib import Path

from airflow import DAG
from airflow.datasets import Dataset
from airflow.utils.task_group import TaskGroup
from pendulum import datetime

Expand Down Expand Up @@ -57,7 +56,11 @@
jaffle_shop_seed = DbtSeedOperator(
task_id="seed_jaffle_shop",
project_dir=DBT_ROOT_PATH / "jaffle_shop",
outlets=[Dataset("SEED://JAFFLE_SHOP")],
# the following line is commented out because Airflow 3 raises an `AirflowInactiveAssetInInletOrOutletException`
# we can re-enable it once the issue is solved in Airflow 3
# https://github.com/apache/airflow/issues/51644
# https://github.com/astronomer/astronomer-cosmos/issues/1804
# outlets=[Dataset("SEED://JAFFLE_SHOP")],
profile_config=profile_config,
install_deps=True,
)
Expand Down
1 change: 1 addition & 0 deletions dev/dags/dbt/jaffle_shop/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ target/
dbt_packages/
logs/
!target/manifest.json
dbt_internal_packages/
1 change: 1 addition & 0 deletions dev/dags/dbt/jaffle_shop_python/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
target/
dbt_packages/
logs/
dbt_internal_packages/
2 changes: 1 addition & 1 deletion dev/dags/example_cosmos_dbt_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
example_cosmos_dbt_build = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "altered_jaffle_shop",
DBT_ROOT_PATH / "jaffle_shop",
),
render_config=RenderConfig(
test_behavior=TestBehavior.BUILD,
Expand Down
9 changes: 3 additions & 6 deletions docs/airflow3_compatibility/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,12 @@ investigation.
Known Limitations
-----------------

We have observed that the `cosmos_callback_dag.py <https://github.com/astronomer/astronomer-cosmos/blob/main/dev/dags/cosmos_callback_dag.py>`_
DAG is experiencing noticeable **performance lag** in Airflow 3. This DAG leverages the ``ObjectStoragePath`` module
along with several libraries from the **fsspec** ecosystem.
We are prioritizing a deeper investigation into this issue and will provide updates as optimizations or fixes become
available in future releases.

There have been significant changes to how plugins work in Airflow 3.0, and more changes are coming in Airflow 3.1.
Even though the Cosmos dbt docs plugin is not currently working, we are actively working on supporting this feature.

Airflow 3 DatasetAlias no longer support ASCII characters. This issue has been reported to the `Airflow community <https://github.com/apache/airflow/issues/51566>`_
and we are also tracking it in the `Cosmos repository <https://github.com/astronomer/astronomer-cosmos/issues/1802>`_.

What's Next
-----------

Expand Down
9 changes: 9 additions & 0 deletions scripts/test/integration-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,12 @@ if python3 -c "import sys; print(sys.version_info >= (3, 9))" | grep -q 'True';
fi

pip install 'dbt-databricks!=1.9.0' 'dbt-bigquery' 'dbt-postgres' 'dbt-vertica' 'openlineage-airflow'

# To overcome CI issues when running Py 3.10 and AF 2.6 with dbt-core 1.9
# Such as:
# ERROR tests/operators/_asynchronous/test_base.py - pydantic.errors.PydanticUserError: A non-annotated attribute was detected: `dag_id = <class 'str'>`. All model fields require a type annotation; if `dag_id` is not meant to be a field, you may be able to resolve this error by annotating it as a `ClassVar` or updating `model_config['ignored_types']`.
if [ "$AIRFLOW_VERSION" = "2.6.0" ] ; then
pip install "pydantic<2"
pip freeze
pip freeze | grep -i pydantic
fi
18 changes: 15 additions & 3 deletions scripts/test/pre-install-airflow.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#!/bin/bash

set -x
set -v
set -x
set -e

AIRFLOW_VERSION="$1"
PYTHON_VERSION="$2"
Expand All @@ -16,7 +17,12 @@ fi

echo "${VIRTUAL_ENV}"

CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION.0/constraints-$PYTHON_VERSION.txt"
if [ "$AIRFLOW_VERSION" = "3.0" ] ; then
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION.2/constraints-$PYTHON_VERSION.txt"
else
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION.0/constraints-$PYTHON_VERSION.txt"
fi;

curl -sSL $CONSTRAINT_URL -o /tmp/constraint.txt
# Workaround to remove PyYAML constraint that will work on both Linux and MacOS
sed '/PyYAML==/d' /tmp/constraint.txt > /tmp/constraint.txt.tmp
Expand All @@ -37,6 +43,12 @@ if [ "$AIRFLOW_VERSION" = "2.4" ] || [ "$AIRFLOW_VERSION" = "2.5" ] || [ "$AIRFL
uv pip install "apache-airflow-providers-google<10.11" "apache-airflow==$AIRFLOW_VERSION"
uv pip install "apache-airflow-providers-microsoft-azure" "apache-airflow==$AIRFLOW_VERSION"
uv pip install pyopenssl --upgrade
elif [ "$AIRFLOW_VERSION" = "2.6" ] ; then
uv pip install "apache-airflow-providers-amazon" --constraint /tmp/constraint.txt
uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt
uv pip install "apache-airflow-providers-google" --constraint /tmp/constraint.txt
uv pip install apache-airflow-providers-microsoft-azure --constraint /tmp/constraint.txt
uv pip install "pydantic<2.0"
elif [ "$AIRFLOW_VERSION" = "2.7" ] ; then
uv pip install "apache-airflow-providers-amazon" --constraint /tmp/constraint.txt
uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt
Expand Down Expand Up @@ -68,7 +80,7 @@ elif [ "$AIRFLOW_VERSION" = "2.9" ] ; then
else
uv pip install "apache-airflow-providers-amazon[s3fs]" --constraint /tmp/constraint.txt
uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt
uv pip install "apache-airflow-providers-google" --constraint /tmp/constraint.txt
uv pip install "apache-airflow-providers-google" --constraint /tmp/constraint.txt
uv pip install "apache-airflow-providers-microsoft-azure" --constraint /tmp/constraint.txt
fi

Expand Down
4 changes: 2 additions & 2 deletions tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1882,9 +1882,9 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir
assert hash_args == "d41d8cd98f00b204e9800998ecf8427e"
if sys.platform == "darwin":
# We faced inconsistent hashing versions depending on the version of MacOS/Linux - the following line aims to address these.
assert hash_dir in ("391db5c7e1fb90214d829dd0476059a1", "0148da6f5f7fd260c9fa55c3b3c45168")
assert hash_dir in ("64934a984040076870accfc177706353", "159b4a3432c3d0ebad32080a55697089")
else:
assert hash_dir == "0148da6f5f7fd260c9fa55c3b3c45168"
assert hash_dir == "159b4a3432c3d0ebad32080a55697089"


@pytest.mark.integration
Expand Down
2 changes: 1 addition & 1 deletion tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ def test_run_operator_dataset_with_airflow_3_and_enabled_dataset_alias_false_fai

caplog.set_level(logging.ERROR)
caplog.clear()
run_test_dag(dag)
run_test_dag(dag, expect_success=False)

assert "AirflowCompatibilityError" in caplog.text
assert "ERROR" in caplog.text
Expand Down
35 changes: 24 additions & 11 deletions tests/test_example_dags.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import os
import sys
from pathlib import Path
from unittest.mock import patch

try:
from functools import cache
Expand Down Expand Up @@ -53,7 +55,7 @@ def session():


@cache
def get_dag_bag() -> DagBag:
def get_dag_bag() -> DagBag: # noqa: C901
"""Create a DagBag by adding the files that are not supported to .airflowignore"""

if AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS:
Expand All @@ -74,6 +76,10 @@ def get_dag_bag() -> DagBag:
if _PYTHON_VERSION < (3, 9):
file.writelines(["example_duckdb_dag.py\n"])

if _PYTHON_VERSION < (3, 9):
# dbt-bigquery 1.9 supports Python 3.9 onwards
file.writelines(["simple_dag_async.py\n"])

if DBT_VERSION < Version("1.6.0"):
file.writelines(["simple_dag_async.py\n"])
file.writelines(["example_model_version.py\n"])
Expand All @@ -85,6 +91,12 @@ def get_dag_bag() -> DagBag:
if AIRFLOW_VERSION < Version("2.8.0"):
file.writelines("example_cosmos_dbt_build.py\n")

# Disabling this DAG temporarily due to an Airflow 3 bug on processing DatasetAlias that contain non-ASCII characters:
# https://github.com/apache/airflow/issues/51566
# https://github.com/astronomer/astronomer-cosmos/issues/1802
if AIRFLOW_VERSION >= Version("3.0.0"):
file.writelines("example_source_rendering.py\n")

print(".airflowignore contents: ")
print(AIRFLOW_IGNORE_FILE.read_text())
db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False)
Expand All @@ -101,6 +113,7 @@ def get_dag_ids() -> list[str]:
def run_dag(dag_id: str):
dag_bag = get_dag_bag()
dag = dag_bag.get_dag(dag_id)
assert dag
test_utils.run_dag(dag)


Expand All @@ -116,17 +129,17 @@ def test_example_dag(session, dag_id: str):
run_dag(dag_id)


async_dag_ids = ["simple_dag_async"]


@pytest.mark.skipif(
AIRFLOW_VERSION < Version("2.8") or AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS,
reason="See PR: https://github.com/apache/airflow/pull/34585 and Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs, and Cosmos errors if `emit_datasets` is not False",
_PYTHON_VERSION < (3, 9)
or AIRFLOW_VERSION < Version("2.8")
or AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS,
reason="dbt-bigquery only supports Python 3.9 onwards. See PR: https://github.com/apache/airflow/pull/34585 and Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs, and Cosmos errors if `emit_datasets` is not False",
)
@patch.dict(
os.environ,
{"AIRFLOW__COSMOS__ENABLE_SETUP_ASYNC_TASK": "false", "AIRFLOW__COSMOS__ENABLE_TEARDOWN_ASYNC_TASK": "false"},
)
@pytest.mark.integration
def test_async_example_dag_without_setup_task(session, monkeypatch):
monkeypatch.setenv("AIRFLOW__COSMOS__ENABLE_SETUP_ASYNC_TASK", "false")
monkeypatch.setenv("AIRFLOW__COSMOS__ENABLE_TEARDOWN_ASYNC_TASK", "false")

for dag_id in async_dag_ids:
run_dag(dag_id)
async_dag_id = "simple_dag_async"
run_dag(async_dag_id)
44 changes: 38 additions & 6 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,37 @@ def run_dag(dag: DAG, conn_file_path: str | None = None) -> DagRun:
return test_dag(dag=dag, conn_file_path=conn_file_path)


def test_dag(dag, conn_file_path: str | None = None, custom_tester: bool = False) -> DagRun:
def check_dag_success(dag_run: DagRun | None, expect_success: bool = True) -> bool:
"""Check if a DAG was successful, if that Airflow version allows it."""
if dag_run is not None:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a hint here on when/why the dag_run could be None?

if expect_success:
return dag_run.state == DagRunState.SUCCESS
else:
return dag_run.state == DagRunState.FAILED
return True


def new_test_dag(dag: DAG) -> DagRun:
if AIRFLOW_VERSION >= version.Version("3.0"):
dr = dag.test(logical_date=timezone.utcnow())
else:
dr = dag.test()
return dr


def test_dag(
dag, conn_file_path: str | None = None, custom_tester: bool = False, expect_success: bool = True
) -> DagRun:
dr = None
if custom_tester:
return test_old_dag(dag, conn_file_path)
dr = test_old_dag(dag, conn_file_path)
assert check_dag_success(dr, expect_success), f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. "
elif AIRFLOW_VERSION >= version.Version("2.5"):
if AIRFLOW_VERSION not in (Version("2.10.0"), Version("2.10.1"), Version("2.10.2")):
return dag.test()
dr = new_test_dag(dag)
assert check_dag_success(
dr, expect_success
), f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. "
else:
# This is a work around until we fix the issue in Airflow:
# https://github.com/apache/airflow/issues/42495
Expand All @@ -49,13 +74,19 @@ def test_dag(dag, conn_file_path: str | None = None, custom_tester: bool = False
FAILED tests/test_example_dags.py::test_example_dag[user_defined_profile]
"""
try:
dag.test()
dr = new_test_dag(dag)
assert check_dag_success(
dr, expect_success
), f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. "
except sqlalchemy.exc.PendingRollbackError:
warnings.warn(
"Early versions of Airflow 2.10 have issues when running the test command with DatasetAlias / Datasets"
)
else:
return test_old_dag(dag, conn_file_path)
dr = test_old_dag(dag, conn_file_path)
assert check_dag_success(dr), f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. "

return dr


# DAG.test() was added in Airflow version 2.5.0. And to test on older Airflow versions, we need to copy the
Expand Down Expand Up @@ -121,7 +152,7 @@ def test_old_dag(

print("conn_file_path", conn_file_path)

return dr, session
return dr


def add_logger_if_needed(dag: DAG, ti: TaskInstance):
Expand Down Expand Up @@ -201,4 +232,5 @@ def _get_or_create_dagrun(
conf=conf,
)
log.info("created dagrun %s", str(dr))

return dr