diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index c7c37a794a..1874099006 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -13,6 +13,7 @@ from airflow.models.dag import DAG from airflow.utils.task_group import TaskGroup +from cosmos import settings from cosmos.config import RenderConfig from cosmos.constants import ( DBT_SETUP_ASYNC_TASK_ID, @@ -31,7 +32,6 @@ from cosmos.dbt.graph import DbtNode from cosmos.exceptions import CosmosValueError from cosmos.log import get_logger -from cosmos.settings import enable_setup_async_task, enable_teardown_async_task logger = get_logger(__name__) @@ -666,7 +666,7 @@ def build_airflow_graph( tasks_map[node_id] = test_task create_airflow_task_dependencies(nodes, tasks_map) - if enable_setup_async_task: + if settings.enable_setup_async_task: _add_dbt_setup_async_task( dag, execution_mode, @@ -676,7 +676,7 @@ def build_airflow_graph( render_config=render_config, async_py_requirements=async_py_requirements, ) - if enable_teardown_async_task: + if settings.enable_teardown_async_task: _add_teardown_task( dag, execution_mode, diff --git a/cosmos/io.py b/cosmos/io.py index e702c09500..a2cbabd35b 100644 --- a/cosmos/io.py +++ b/cosmos/io.py @@ -32,6 +32,9 @@ def upload_to_aws_s3( hook = S3Hook(aws_conn_id=aws_conn_id) context = kwargs["context"] + # Airflow 3 and Airflow 2 compatibility, respectively: + try_number = getattr(context["task_instance"], "try_number") or getattr(context["task_instance"], "_try_number") + # Iterate over the files in the target dir and upload them to S3 for dirpath, _, filenames in os.walk(target_dir): for filename in filenames: @@ -39,7 +42,7 @@ def upload_to_aws_s3( f"{context['dag'].dag_id}" f"/{context['run_id']}" f"/{context['task_instance'].task_id}" - f"/{context['task_instance']._try_number}" + f"/{try_number}" f"{dirpath.split(project_dir)[-1]}/{filename}" ) hook.load_file( diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index a4c2098803..1ace45a596 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -298,6 +298,9 @@ def _configure_remote_target_path() -> tuple[Path, str] | tuple[None, None]: target_path_schema = urlparse(target_path_str).scheme remote_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(target_path_schema, None) # type: ignore[assignment] if remote_conn_id is None: + logger.info( + "Remote target connection not set. Please, configure [cosmos][remote_target_path_conn_id] or set the environment variable AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID" + ) return None, None if not settings.AIRFLOW_IO_AVAILABLE: @@ -500,13 +503,16 @@ def _generate_dbt_flags(self, tmp_project_dir: str, profile_path: Path) -> list[ self.profile_config.target_name, ] if self.invocation_mode == InvocationMode.DBT_RUNNER: - # PR #1484 introduced the use of dbtRunner during DAG parsing. As a result, invoking dbtRunner again - # during task execution can lead to task hangs—especially on Airflow 2.x. Investigation revealed that - # the issue stems from how dbtRunner handles static parsing. Cosmos copies the dbt project to temporary - # directories, and the use of different temp paths between parsing and execution appears to interfere - # with dbt's static parsing behavior. As a workaround, passing the --no-static-parser flag avoids these - # hangs and ensures reliable task execution. - dbt_flags.append("--no-static-parser") + from dbt.version import __version__ as dbt_version + + if Version(dbt_version) >= Version("1.5.6"): + # PR #1484 introduced the use of dbtRunner during DAG parsing. As a result, invoking dbtRunner again + # during task execution can lead to task hangs—especially on Airflow 2.x. Investigation revealed that + # the issue stems from how dbtRunner handles static parsing. Cosmos copies the dbt project to temporary + # directories, and the use of different temp paths between parsing and execution appears to interfere + # with dbt's static parsing behavior. As a workaround, passing the --no-static-parser flag avoids these + # hangs and ensures reliable task execution. + dbt_flags.append("--no-static-parser") return dbt_flags def _install_dependencies( diff --git a/dev/dags/cosmos_seed_dag.py b/dev/dags/cosmos_seed_dag.py index 1403c7af15..6af5e5168c 100644 --- a/dev/dags/cosmos_seed_dag.py +++ b/dev/dags/cosmos_seed_dag.py @@ -14,7 +14,6 @@ from pathlib import Path from airflow import DAG -from airflow.datasets import Dataset from airflow.utils.task_group import TaskGroup from pendulum import datetime @@ -57,7 +56,11 @@ jaffle_shop_seed = DbtSeedOperator( task_id="seed_jaffle_shop", project_dir=DBT_ROOT_PATH / "jaffle_shop", - outlets=[Dataset("SEED://JAFFLE_SHOP")], + # the following line is commented out because Airflow 3 raises an `AirflowInactiveAssetInInletOrOutletException` + # we can re-enable it once the issue is solved in Airflow 3 + # https://github.com/apache/airflow/issues/51644 + # https://github.com/astronomer/astronomer-cosmos/issues/1804 + # outlets=[Dataset("SEED://JAFFLE_SHOP")], profile_config=profile_config, install_deps=True, ) diff --git a/dev/dags/dbt/jaffle_shop/.gitignore b/dev/dags/dbt/jaffle_shop/.gitignore index 45d294b9af..b2bc277ea2 100644 --- a/dev/dags/dbt/jaffle_shop/.gitignore +++ b/dev/dags/dbt/jaffle_shop/.gitignore @@ -3,3 +3,4 @@ target/ dbt_packages/ logs/ !target/manifest.json +dbt_internal_packages/ diff --git a/dev/dags/dbt/jaffle_shop_python/.gitignore b/dev/dags/dbt/jaffle_shop_python/.gitignore index 49f147cb98..1c22a22b50 100644 --- a/dev/dags/dbt/jaffle_shop_python/.gitignore +++ b/dev/dags/dbt/jaffle_shop_python/.gitignore @@ -2,3 +2,4 @@ target/ dbt_packages/ logs/ +dbt_internal_packages/ diff --git a/dev/dags/example_cosmos_dbt_build.py b/dev/dags/example_cosmos_dbt_build.py index efd2201008..2f2a9f6016 100644 --- a/dev/dags/example_cosmos_dbt_build.py +++ b/dev/dags/example_cosmos_dbt_build.py @@ -26,7 +26,7 @@ example_cosmos_dbt_build = DbtDag( # dbt/cosmos-specific parameters project_config=ProjectConfig( - DBT_ROOT_PATH / "altered_jaffle_shop", + DBT_ROOT_PATH / "jaffle_shop", ), render_config=RenderConfig( test_behavior=TestBehavior.BUILD, diff --git a/docs/airflow3_compatibility/index.rst b/docs/airflow3_compatibility/index.rst index f5da5c0911..26a14d8c1a 100644 --- a/docs/airflow3_compatibility/index.rst +++ b/docs/airflow3_compatibility/index.rst @@ -58,15 +58,12 @@ investigation. Known Limitations ----------------- -We have observed that the `cosmos_callback_dag.py `_ -DAG is experiencing noticeable **performance lag** in Airflow 3. This DAG leverages the ``ObjectStoragePath`` module -along with several libraries from the **fsspec** ecosystem. -We are prioritizing a deeper investigation into this issue and will provide updates as optimizations or fixes become -available in future releases. - There have been significant changes to how plugins work in Airflow 3.0, and more changes are coming in Airflow 3.1. Even though the Cosmos dbt docs plugin is not currently working, we are actively working on supporting this feature. +Airflow 3 DatasetAlias no longer support ASCII characters. This issue has been reported to the `Airflow community `_ +and we are also tracking it in the `Cosmos repository `_. + What's Next ----------- diff --git a/scripts/test/integration-setup.sh b/scripts/test/integration-setup.sh index 8f009c66c5..7b397aafd0 100644 --- a/scripts/test/integration-setup.sh +++ b/scripts/test/integration-setup.sh @@ -26,3 +26,12 @@ if python3 -c "import sys; print(sys.version_info >= (3, 9))" | grep -q 'True'; fi pip install 'dbt-databricks!=1.9.0' 'dbt-bigquery' 'dbt-postgres' 'dbt-vertica' 'openlineage-airflow' + +# To overcome CI issues when running Py 3.10 and AF 2.6 with dbt-core 1.9 +# Such as: +# ERROR tests/operators/_asynchronous/test_base.py - pydantic.errors.PydanticUserError: A non-annotated attribute was detected: `dag_id = `. All model fields require a type annotation; if `dag_id` is not meant to be a field, you may be able to resolve this error by annotating it as a `ClassVar` or updating `model_config['ignored_types']`. +if [ "$AIRFLOW_VERSION" = "2.6.0" ] ; then + pip install "pydantic<2" + pip freeze + pip freeze | grep -i pydantic +fi diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index e56cd79903..dbe0e6effb 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -1,7 +1,8 @@ #!/bin/bash -set -x set -v +set -x +set -e AIRFLOW_VERSION="$1" PYTHON_VERSION="$2" @@ -16,7 +17,12 @@ fi echo "${VIRTUAL_ENV}" -CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION.0/constraints-$PYTHON_VERSION.txt" +if [ "$AIRFLOW_VERSION" = "3.0" ] ; then + CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION.2/constraints-$PYTHON_VERSION.txt" +else + CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION.0/constraints-$PYTHON_VERSION.txt" +fi; + curl -sSL $CONSTRAINT_URL -o /tmp/constraint.txt # Workaround to remove PyYAML constraint that will work on both Linux and MacOS sed '/PyYAML==/d' /tmp/constraint.txt > /tmp/constraint.txt.tmp @@ -37,6 +43,12 @@ if [ "$AIRFLOW_VERSION" = "2.4" ] || [ "$AIRFLOW_VERSION" = "2.5" ] || [ "$AIRFL uv pip install "apache-airflow-providers-google<10.11" "apache-airflow==$AIRFLOW_VERSION" uv pip install "apache-airflow-providers-microsoft-azure" "apache-airflow==$AIRFLOW_VERSION" uv pip install pyopenssl --upgrade +elif [ "$AIRFLOW_VERSION" = "2.6" ] ; then + uv pip install "apache-airflow-providers-amazon" --constraint /tmp/constraint.txt + uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt + uv pip install "apache-airflow-providers-google" --constraint /tmp/constraint.txt + uv pip install apache-airflow-providers-microsoft-azure --constraint /tmp/constraint.txt + uv pip install "pydantic<2.0" elif [ "$AIRFLOW_VERSION" = "2.7" ] ; then uv pip install "apache-airflow-providers-amazon" --constraint /tmp/constraint.txt uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt @@ -68,7 +80,7 @@ elif [ "$AIRFLOW_VERSION" = "2.9" ] ; then else uv pip install "apache-airflow-providers-amazon[s3fs]" --constraint /tmp/constraint.txt uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt - uv pip install "apache-airflow-providers-google" --constraint /tmp/constraint.txt + uv pip install "apache-airflow-providers-google" --constraint /tmp/constraint.txt uv pip install "apache-airflow-providers-microsoft-azure" --constraint /tmp/constraint.txt fi diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 9a5e5cd377..6e18f1da5f 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1882,9 +1882,9 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir assert hash_args == "d41d8cd98f00b204e9800998ecf8427e" if sys.platform == "darwin": # We faced inconsistent hashing versions depending on the version of MacOS/Linux - the following line aims to address these. - assert hash_dir in ("391db5c7e1fb90214d829dd0476059a1", "0148da6f5f7fd260c9fa55c3b3c45168") + assert hash_dir in ("64934a984040076870accfc177706353", "159b4a3432c3d0ebad32080a55697089") else: - assert hash_dir == "0148da6f5f7fd260c9fa55c3b3c45168" + assert hash_dir == "159b4a3432c3d0ebad32080a55697089" @pytest.mark.integration diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index ba03b93961..b12cc207fa 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -592,7 +592,7 @@ def test_run_operator_dataset_with_airflow_3_and_enabled_dataset_alias_false_fai caplog.set_level(logging.ERROR) caplog.clear() - run_test_dag(dag) + run_test_dag(dag, expect_success=False) assert "AirflowCompatibilityError" in caplog.text assert "ERROR" in caplog.text diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 669ab667c7..00e6e0219f 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -1,7 +1,9 @@ from __future__ import annotations +import os import sys from pathlib import Path +from unittest.mock import patch try: from functools import cache @@ -53,7 +55,7 @@ def session(): @cache -def get_dag_bag() -> DagBag: +def get_dag_bag() -> DagBag: # noqa: C901 """Create a DagBag by adding the files that are not supported to .airflowignore""" if AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS: @@ -74,6 +76,10 @@ def get_dag_bag() -> DagBag: if _PYTHON_VERSION < (3, 9): file.writelines(["example_duckdb_dag.py\n"]) + if _PYTHON_VERSION < (3, 9): + # dbt-bigquery 1.9 supports Python 3.9 onwards + file.writelines(["simple_dag_async.py\n"]) + if DBT_VERSION < Version("1.6.0"): file.writelines(["simple_dag_async.py\n"]) file.writelines(["example_model_version.py\n"]) @@ -85,6 +91,12 @@ def get_dag_bag() -> DagBag: if AIRFLOW_VERSION < Version("2.8.0"): file.writelines("example_cosmos_dbt_build.py\n") + # Disabling this DAG temporarily due to an Airflow 3 bug on processing DatasetAlias that contain non-ASCII characters: + # https://github.com/apache/airflow/issues/51566 + # https://github.com/astronomer/astronomer-cosmos/issues/1802 + if AIRFLOW_VERSION >= Version("3.0.0"): + file.writelines("example_source_rendering.py\n") + print(".airflowignore contents: ") print(AIRFLOW_IGNORE_FILE.read_text()) db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False) @@ -101,6 +113,7 @@ def get_dag_ids() -> list[str]: def run_dag(dag_id: str): dag_bag = get_dag_bag() dag = dag_bag.get_dag(dag_id) + assert dag test_utils.run_dag(dag) @@ -116,17 +129,17 @@ def test_example_dag(session, dag_id: str): run_dag(dag_id) -async_dag_ids = ["simple_dag_async"] - - @pytest.mark.skipif( - AIRFLOW_VERSION < Version("2.8") or AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, - reason="See PR: https://github.com/apache/airflow/pull/34585 and Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs, and Cosmos errors if `emit_datasets` is not False", + _PYTHON_VERSION < (3, 9) + or AIRFLOW_VERSION < Version("2.8") + or AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, + reason="dbt-bigquery only supports Python 3.9 onwards. See PR: https://github.com/apache/airflow/pull/34585 and Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs, and Cosmos errors if `emit_datasets` is not False", +) +@patch.dict( + os.environ, + {"AIRFLOW__COSMOS__ENABLE_SETUP_ASYNC_TASK": "false", "AIRFLOW__COSMOS__ENABLE_TEARDOWN_ASYNC_TASK": "false"}, ) @pytest.mark.integration def test_async_example_dag_without_setup_task(session, monkeypatch): - monkeypatch.setenv("AIRFLOW__COSMOS__ENABLE_SETUP_ASYNC_TASK", "false") - monkeypatch.setenv("AIRFLOW__COSMOS__ENABLE_TEARDOWN_ASYNC_TASK", "false") - - for dag_id in async_dag_ids: - run_dag(dag_id) + async_dag_id = "simple_dag_async" + run_dag(async_dag_id) diff --git a/tests/utils.py b/tests/utils.py index f75a155afb..0f4a4a9fd8 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -31,12 +31,37 @@ def run_dag(dag: DAG, conn_file_path: str | None = None) -> DagRun: return test_dag(dag=dag, conn_file_path=conn_file_path) -def test_dag(dag, conn_file_path: str | None = None, custom_tester: bool = False) -> DagRun: +def check_dag_success(dag_run: DagRun | None, expect_success: bool = True) -> bool: + """Check if a DAG was successful, if that Airflow version allows it.""" + if dag_run is not None: + if expect_success: + return dag_run.state == DagRunState.SUCCESS + else: + return dag_run.state == DagRunState.FAILED + return True + + +def new_test_dag(dag: DAG) -> DagRun: + if AIRFLOW_VERSION >= version.Version("3.0"): + dr = dag.test(logical_date=timezone.utcnow()) + else: + dr = dag.test() + return dr + + +def test_dag( + dag, conn_file_path: str | None = None, custom_tester: bool = False, expect_success: bool = True +) -> DagRun: + dr = None if custom_tester: - return test_old_dag(dag, conn_file_path) + dr = test_old_dag(dag, conn_file_path) + assert check_dag_success(dr, expect_success), f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. " elif AIRFLOW_VERSION >= version.Version("2.5"): if AIRFLOW_VERSION not in (Version("2.10.0"), Version("2.10.1"), Version("2.10.2")): - return dag.test() + dr = new_test_dag(dag) + assert check_dag_success( + dr, expect_success + ), f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. " else: # This is a work around until we fix the issue in Airflow: # https://github.com/apache/airflow/issues/42495 @@ -49,13 +74,19 @@ def test_dag(dag, conn_file_path: str | None = None, custom_tester: bool = False FAILED tests/test_example_dags.py::test_example_dag[user_defined_profile] """ try: - dag.test() + dr = new_test_dag(dag) + assert check_dag_success( + dr, expect_success + ), f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. " except sqlalchemy.exc.PendingRollbackError: warnings.warn( "Early versions of Airflow 2.10 have issues when running the test command with DatasetAlias / Datasets" ) else: - return test_old_dag(dag, conn_file_path) + dr = test_old_dag(dag, conn_file_path) + assert check_dag_success(dr), f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. " + + return dr # DAG.test() was added in Airflow version 2.5.0. And to test on older Airflow versions, we need to copy the @@ -121,7 +152,7 @@ def test_old_dag( print("conn_file_path", conn_file_path) - return dr, session + return dr def add_logger_if_needed(dag: DAG, ti: TaskInstance): @@ -201,4 +232,5 @@ def _get_or_create_dagrun( conf=conf, ) log.info("created dagrun %s", str(dr)) + return dr