diff --git a/.github/workflows/cicd.yaml b/.github/workflows/cicd.yaml index d1cf7c50..47496225 100644 --- a/.github/workflows/cicd.yaml +++ b/.github/workflows/cicd.yaml @@ -185,7 +185,7 @@ jobs: AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0 AIRFLOW_HOME: ${{ github.workspace }} CONFIG_ROOT_DIR: ${{ github.workspace }}/dags - PYTHONPATH: ${{ github.workspace }}:${{ github.workspace }}/examples:$PYTHONPATH + PYTHONPATH: ${{ github.workspace }}:${{ github.workspace }}/dev/dags:$PYTHONPATH AUTO_CONVERT_TO_AF3: true - name: Upload coverage to Github diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index d8ee299c..4f58569b 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -3,8 +3,6 @@ from __future__ import annotations import ast - -# pylint: disable=ungrouped-imports import inspect import os import re @@ -14,8 +12,16 @@ from functools import partial, reduce from typing import Any, Callable, Dict, List, Tuple, Union -from airflow import DAG, configuration -from airflow.models import BaseOperator, Variable +from airflow import configuration + +try: + from airflow.sdk.bases.operator import BaseOperator + from airflow.sdk.definitions.dag import DAG + from airflow.sdk.definitions.variable import Variable +except ImportError: + from airflow.models import BaseOperator, Variable + from airflow.models.dag import DAG + from airflow.utils.module_loading import import_string from airflow.version import version as AIRFLOW_VERSION from packaging import version @@ -1069,8 +1075,12 @@ def adjust_general_task_params(task_params: dict(str, Any)): if utils.check_dict_key(task_params, "variables_as_arguments"): variables: List[Dict[str, str]] = task_params.get("variables_as_arguments") for variable in variables: - if Variable.get(variable["variable"], default_var=None) is not None: - task_params[variable["attribute"]] = Variable.get(variable["variable"], default_var=None) + default_argument_name = "default" + if INSTALLED_AIRFLOW_VERSION.major < AIRFLOW3_MAJOR_VERSION: + default_argument_name = "default_var" + variable_value = Variable.get(variable["variable"], **{default_argument_name: None}) + if variable_value is not None: + task_params[variable["attribute"]] = variable_value del task_params["variables_as_arguments"] if version.parse(AIRFLOW_VERSION) >= version.parse("2.4.0"): diff --git a/dagfactory/dagfactory.py b/dagfactory/dagfactory.py index b95b6b3b..5ecfb19a 100644 --- a/dagfactory/dagfactory.py +++ b/dagfactory/dagfactory.py @@ -8,7 +8,11 @@ import yaml from airflow.configuration import conf as airflow_conf -from airflow.models import DAG + +try: + from airflow.sdk.definitions.dag import DAG +except ImportError: + from airflow.models import DAG from airflow.version import version as AIRFLOW_VERSION from packaging import version diff --git a/dagfactory/listeners/runtime_event.py b/dagfactory/listeners/runtime_event.py index 20b57a65..79031dd6 100644 --- a/dagfactory/listeners/runtime_event.py +++ b/dagfactory/listeners/runtime_event.py @@ -1,7 +1,11 @@ from __future__ import annotations from airflow.listeners import hookimpl -from airflow.models.dag import DAG + +try: + from airflow.sdk.definitions.dag import DAG +except ImportError: + from airflow.models import DAG from airflow.models.dagrun import DagRun from dagfactory import telemetry diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index f84da6fb..770f8028 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -1,5 +1,9 @@ #!/bin/bash +set -v +set -x +set -e + AIRFLOW_VERSION="$1" PYTHON_VERSION="$2" @@ -13,13 +17,35 @@ fi echo "${VIRTUAL_ENV}" -CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION.0/constraints-$PYTHON_VERSION.txt" -curl -sSL $CONSTRAINT_URL -o /tmp/constraint.txt +if [ "$AIRFLOW_VERSION" = "3.0" ] ; then + CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION.2/constraints-$PYTHON_VERSION.txt" +else + CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION.0/constraints-$PYTHON_VERSION.txt" +fi; + +curl -sSL "$CONSTRAINT_URL" -o /tmp/constraint.txt # Workaround to remove PyYAML constraint that will work on both Linux and MacOS sed '/PyYAML==/d' /tmp/constraint.txt > /tmp/constraint.txt.tmp mv /tmp/constraint.txt.tmp /tmp/constraint.txt -# Install Airflow with constraints -uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt -uv pip install apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt +pip install uv +uv pip install pip --upgrade + + +if [ "$AIRFLOW_VERSION" = "3.0" ]; then + uv pip install "apache-airflow>=3.0.2" --constraint /tmp/constraint.txt +else + uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt + uv pip install apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt +fi; + rm /tmp/constraint.txt + +actual_version=$(airflow version | cut -d. -f1,2) + +if [ "$actual_version" = $AIRFLOW_VERSION ]; then + echo "Version is as expected: $AIRFLOW_VERSION" +else + echo "Version does not match. Expected: $AIRFLOW_VERSION, but got: $actual_version" + exit 1 +fi diff --git a/tests/test_dagbuilder.py b/tests/test_dagbuilder.py index ca205695..efcb378d 100644 --- a/tests/test_dagbuilder.py +++ b/tests/test_dagbuilder.py @@ -8,9 +8,10 @@ import pytest try: - from airflow.sdk.definitions import DAG + from airflow.sdk.definitions.dag import DAG except ImportError: from airflow.models import DAG + import yaml from airflow.providers.common.sql.sensors.sql import SqlSensor from airflow.providers.http.sensors.http import HttpSensor diff --git a/tests/test_dagbuilder_httpoperator.py b/tests/test_dagbuilder_httpoperator.py index 0d5be9bc..c8d32c6f 100644 --- a/tests/test_dagbuilder_httpoperator.py +++ b/tests/test_dagbuilder_httpoperator.py @@ -5,7 +5,11 @@ import pendulum import pytest -from airflow import DAG + +try: + from airflow.sdk.definitions.dag import DAG # noqa: F401 +except ImportError: + from airflow.models.dag import DAG # noqa: F401 from dagfactory.dagbuilder import DagBuilder from dagfactory.exceptions import DagFactoryException @@ -47,14 +51,14 @@ "concurrency": 1, "max_active_runs": 1, "dagrun_timeout_sec": 600, - "schedule_interval": "0 1 * * *", + get_schedule_key(): "0 1 * * *", } # Basic DAG config for tests DAG_CONFIG = { "default_args": {"owner": "custom_owner"}, "description": "this is an example dag", - "schedule_interval": "0 3 * * *", + get_schedule_key(): "0 3 * * *", } diff --git a/tests/test_dagfactory.py b/tests/test_dagfactory.py index e542c7db..797f9e21 100644 --- a/tests/test_dagfactory.py +++ b/tests/test_dagfactory.py @@ -6,9 +6,10 @@ from airflow.version import version as AIRFLOW_VERSION try: - from airflow.sdk.definitions.variable import Variable + from airflow.sdk.definitions.variable import Variable # noqa: F401 except ImportError: - from airflow.models.variable import Variable + from airflow.models.variable import Variable # noqa: F401 + from packaging import version from tests.utils import get_bash_operator_path, get_schedule_key @@ -350,10 +351,7 @@ def test_kubernetes_pod_operator_dag_lt_2_7(): def test_variables_as_arguments_dag(monkeypatch): monkeypatch.setenv("AUTO_CONVERT_TO_AF3", "true") override_command = "value_from_variable" - if version.parse(AIRFLOW_VERSION) >= version.parse("1.10.10"): - os.environ["AIRFLOW_VAR_VAR1"] = override_command - else: - Variable.set("var1", override_command) + os.environ["AIRFLOW_VAR_VAR1"] = override_command td = dagfactory.DagFactory(DAG_FACTORY_VARIABLES_AS_ARGUMENTS) td.generate_dags(globals()) tasks = globals()["example_dag"].tasks diff --git a/tests/utils.py b/tests/utils.py index 1275ce80..974bc4f0 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -8,7 +8,11 @@ import yaml from airflow.configuration import secrets_backend_list from airflow.exceptions import AirflowSkipException -from airflow.models.dag import DAG + +try: + from airflow.sdk.definitions.dag import DAG +except ImportError: + from airflow.models.dag import DAG from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance from airflow.secrets.local_filesystem import LocalFilesystemBackend