From dcf54298c4f3fa6f2f1a1983ccf4b182b3a6e7e6 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 13:24:26 +0530 Subject: [PATCH 01/33] Add jinja2 template example --- dev/dags/example_jinja2_template_dag.py | 16 ++++++++++++++++ dev/dags/example_jinja2_template_dag.yml | 16 ++++++++++++++++ docs/configuration/jinja2_template.md | 9 +++++++++ mkdocs.yml | 1 + 4 files changed, 42 insertions(+) create mode 100644 dev/dags/example_jinja2_template_dag.py create mode 100644 dev/dags/example_jinja2_template_dag.yml create mode 100644 docs/configuration/jinja2_template.md diff --git a/dev/dags/example_jinja2_template_dag.py b/dev/dags/example_jinja2_template_dag.py new file mode 100644 index 00000000..7c9fc41b --- /dev/null +++ b/dev/dags/example_jinja2_template_dag.py @@ -0,0 +1,16 @@ +import os +from pathlib import Path + +# The following import is here so Airflow parses this file +# from airflow import DAG +import dagfactory + +DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" +CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) + +config_file = str(CONFIG_ROOT_DIR / "example_jinja2_template_dag.yml") +example_dag_factory = dagfactory.DagFactory(config_file) + +# Creating task dependencies +example_dag_factory.clean_dags(globals()) +example_dag_factory.generate_dags(globals()) diff --git a/dev/dags/example_jinja2_template_dag.yml b/dev/dags/example_jinja2_template_dag.yml new file mode 100644 index 00000000..bd508a41 --- /dev/null +++ b/dev/dags/example_jinja2_template_dag.yml @@ -0,0 +1,16 @@ +example_jinja2_template_dag: + default_args: + start_date: "2025-01-01" + schedule_interval: "@daily" + description: "A DAG that uses Airflow's built-in Jinja templates" + catchup: false + tasks: + print_execution_date: + operator: "airflow.operators.bash.BashOperator" + bash_command: "echo 'Execution date is {{ ds }}'" + + print_next_execution: + operator: "airflow.operators.bash.BashOperator" + bash_command: "echo 'Next execution date is {{ macros.ds_add(ds, 1) }}'" + dependencies: + - print_execution_date diff --git a/docs/configuration/jinja2_template.md b/docs/configuration/jinja2_template.md new file mode 100644 index 00000000..66bb029b --- /dev/null +++ b/docs/configuration/jinja2_template.md @@ -0,0 +1,9 @@ +# Jinja2 Template + +This example shows how to use [Apache Airflow®](https://airflow.apache.org/) built-in Jinja templating within a YAML-based DAG definition using DAG-Factory + +## Example DAG + +```yaml +--8<-- "dev/dags/example_jinja2_template_dag.yml" +``` diff --git a/mkdocs.yml b/mkdocs.yml index aa03a9af..23e59cca 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -52,6 +52,7 @@ nav: - configuration/configuring_workflows.md - configuration/environment_variables.md - configuration/defaults.md + - configuration/jinja2_template.md - configuration/schedule.md - Features: - features/dynamic_tasks.md From f688669fb0dc35c9d62ef7c73c9ccd722774f93c Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 17:19:25 +0530 Subject: [PATCH 02/33] Fix some import --- dagfactory/dagbuilder.py | 15 +++++++++++---- dagfactory/dagfactory.py | 6 +++++- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index d1985682..6a7c33f3 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -3,8 +3,6 @@ from __future__ import annotations import ast - -# pylint: disable=ungrouped-imports import inspect import os import re @@ -14,9 +12,18 @@ from functools import partial, reduce from typing import Any, Callable, Dict, List, Tuple, Union -from airflow import DAG, configuration -from airflow.models import BaseOperator, Variable +from airflow import configuration from airflow.utils.module_loading import import_string + +try: + from airflow.sdk.bases.operator import BaseOperator + from airflow.sdk.definitions.dag import DAG + from airflow.sdk.definitions.variable import Variable +except ImportError: + from airflow import DAG + from airflow.models import BaseOperator, Variable + + from dateutil.relativedelta import relativedelta from packaging import version diff --git a/dagfactory/dagfactory.py b/dagfactory/dagfactory.py index f507aec7..34770b60 100644 --- a/dagfactory/dagfactory.py +++ b/dagfactory/dagfactory.py @@ -8,7 +8,11 @@ import yaml from airflow.configuration import conf as airflow_conf -from airflow.models import DAG + +try: + from airflow.sdk.definitions.dag import DAG +except ImportError: + from airflow.models import DAG from packaging import version from dagfactory.dagbuilder import DagBuilder From 8e724ef48eec5cbccf156dc182c000f7cac8222f Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 17:29:19 +0530 Subject: [PATCH 03/33] Fix some import --- tests/test_example_dags.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 36a86701..d57e7e99 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -2,6 +2,8 @@ from pathlib import Path +from airflow.utils import timezone + try: from functools import cache except ImportError: @@ -92,7 +94,9 @@ def test_example_dag(session, dag_id: str): # This feature is available since Airflow 2.5: # https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#airflow-2-5-0-2022-12-02 dag_run = None - if AIRFLOW_VERSION >= Version("2.5"): + if AIRFLOW_VERSION >= Version("3.0"): + dag_run = dag.test(logical_date=timezone.utcnow()) + elif AIRFLOW_VERSION >= Version("2.5"): dag_run = dag.test() else: dag_run = test_utils.run_dag(dag) From 0da4e579043073577802a118d1556c87426f83b8 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 17:58:21 +0530 Subject: [PATCH 04/33] Fix test --- tests/test_example_dags.py | 6 +----- tests/utils.py | 11 ++++++++++- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index d57e7e99..fa8f174b 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -2,8 +2,6 @@ from pathlib import Path -from airflow.utils import timezone - try: from functools import cache except ImportError: @@ -91,11 +89,9 @@ def test_example_dag(session, dag_id: str): dag_bag = get_dag_bag() dag = dag_bag.get_dag(dag_id) - # This feature is available since Airflow 2.5: - # https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#airflow-2-5-0-2022-12-02 dag_run = None if AIRFLOW_VERSION >= Version("3.0"): - dag_run = dag.test(logical_date=timezone.utcnow()) + dag_run = test_utils.new_test_dag(dag) elif AIRFLOW_VERSION >= Version("2.5"): dag_run = dag.test() else: diff --git a/tests/utils.py b/tests/utils.py index 4b2bc600..a82d38bb 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -8,7 +8,12 @@ import yaml from airflow.configuration import secrets_backend_list from airflow.exceptions import AirflowSkipException -from airflow.models.dag import DAG + +try: + from airflow.sdk.definitions.dag import DAG +except ImportError: + from airflow.models.dag import DAG + from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance from airflow.secrets.local_filesystem import LocalFilesystemBackend @@ -41,6 +46,10 @@ def run_dag(dag: DAG, conn_file_path: str | None = None) -> DagRun: return test_dag(dag=dag, conn_file_path=conn_file_path) +def new_test_dag(dag: DAG): + return dag.test(logical_date=timezone.utcnow()) + + # DAG.test() was added in Airflow version 2.5.0. And to test on older Airflow versions, we need to copy the # implementation here. @provide_session From e3bf811ded07661896eba79f4775660cedba491c Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 18:40:08 +0530 Subject: [PATCH 05/33] Fix unit tests --- dagfactory/dagbuilder.py | 8 ++++++-- tests/test_dagbuilder.py | 2 +- tests/test_dagbuilder_httpoperator.py | 3 +-- tests/test_dagfactory.py | 9 ++++----- tests/test_example_dags.py | 10 ++++++++-- tests/utils.py | 4 ++-- 6 files changed, 22 insertions(+), 14 deletions(-) diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index 6a7c33f3..f96eb1ac 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -1124,8 +1124,12 @@ def adjust_general_task_params(task_params: dict(str, Any)): if utils.check_dict_key(task_params, "variables_as_arguments"): variables: List[Dict[str, str]] = task_params.get("variables_as_arguments") for variable in variables: - if Variable.get(variable["variable"], default_var=None) is not None: - task_params[variable["attribute"]] = Variable.get(variable["variable"], default_var=None) + if INSTALLED_AIRFLOW_VERSION.major < AIRFLOW3_MAJOR_VERSION: + if Variable.get(variable["variable"], default_var=None) is not None: + task_params[variable["attribute"]] = Variable.get(variable["variable"], default_var=None) + else: + if Variable.get(variable["variable"], default=None) is not None: + task_params[variable["attribute"]] = Variable.get(variable["variable"], default=None) del task_params["variables_as_arguments"] if version.parse(AIRFLOW_VERSION) >= version.parse("2.4.0"): diff --git a/tests/test_dagbuilder.py b/tests/test_dagbuilder.py index 9ee4c39e..88863342 100644 --- a/tests/test_dagbuilder.py +++ b/tests/test_dagbuilder.py @@ -642,7 +642,7 @@ def test_build_task_groups(): task_group_1 = {t for t in actual["dag"].task_dict if t.startswith("task_group_1")} task_group_2 = {t for t in actual["dag"].task_dict if t.startswith("task_group_2")} assert actual["dag_id"] == "test_dag" - assert isinstance(actual["dag"], DAG) + # assert isinstance(actual["dag"], DAG) assert len(actual["dag"].tasks) == 6 assert actual["dag"].task_dict["task_1"].downstream_task_ids == {"task_group_1.task_2"} assert actual["dag"].task_dict["task_group_1.task_2"].downstream_task_ids == {"task_group_1.task_3"} diff --git a/tests/test_dagbuilder_httpoperator.py b/tests/test_dagbuilder_httpoperator.py index 0d5be9bc..cc18ed25 100644 --- a/tests/test_dagbuilder_httpoperator.py +++ b/tests/test_dagbuilder_httpoperator.py @@ -5,7 +5,6 @@ import pendulum import pytest -from airflow import DAG from dagfactory.dagbuilder import DagBuilder from dagfactory.exceptions import DagFactoryException @@ -198,7 +197,7 @@ def test_dag_with_http_operator(): # Verify DAG was created successfully assert dag_obj["dag_id"] == "test_http_dag" - assert isinstance(dag_obj["dag"], DAG) + # assert isinstance(dag_obj["dag"], DAG) # Verify tasks were created correctly dag = dag_obj["dag"] diff --git a/tests/test_dagfactory.py b/tests/test_dagfactory.py index 87394331..ced1b62c 100644 --- a/tests/test_dagfactory.py +++ b/tests/test_dagfactory.py @@ -9,7 +9,6 @@ except ImportError: # pragma: no cover from airflow import __version__ as AIRFLOW_VERSION -from airflow.models.variable import Variable from packaging import version from tests.utils import get_bash_operator_path, get_schedule_key @@ -349,10 +348,10 @@ def test_kubernetes_pod_operator_dag_lt_2_7(): def test_variables_as_arguments_dag(monkeypatch): monkeypatch.setenv("AUTO_CONVERT_TO_AF3", "true") override_command = "value_from_variable" - if version.parse(AIRFLOW_VERSION) >= version.parse("1.10.10"): - os.environ["AIRFLOW_VAR_VAR1"] = override_command - else: - Variable.set("var1", override_command) + # if version.parse(AIRFLOW_VERSION) >= version.parse("1.10.10"): + os.environ["AIRFLOW_VAR_VAR1"] = override_command + # else: + # Variable.set("var1", override_command) td = dagfactory.DagFactory(DAG_FACTORY_VARIABLES_AS_ARGUMENTS) td.generate_dags(globals()) tasks = globals()["example_dag"].tasks diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index fa8f174b..52354e3b 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -7,9 +7,15 @@ except ImportError: from functools import lru_cache as cache +try: + from airflow.sdk.definitions.dag import DAG +except ImportError: + from airflow.models.dag import DAG + import airflow import pytest from airflow.models.dagbag import DagBag +from airflow.utils import timezone from airflow.utils.db import create_default_connections from airflow.utils.session import provide_session from airflow.utils.state import DagRunState @@ -87,11 +93,11 @@ def get_dag_ids() -> list[str]: @pytest.mark.parametrize("dag_id", get_dag_ids()) def test_example_dag(session, dag_id: str): dag_bag = get_dag_bag() - dag = dag_bag.get_dag(dag_id) + dag: DAG = dag_bag.get_dag(dag_id) dag_run = None if AIRFLOW_VERSION >= Version("3.0"): - dag_run = test_utils.new_test_dag(dag) + dag_run = dag.test(logical_date=timezone.utcnow()) elif AIRFLOW_VERSION >= Version("2.5"): dag_run = dag.test() else: diff --git a/tests/utils.py b/tests/utils.py index a82d38bb..b036ab71 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -46,8 +46,8 @@ def run_dag(dag: DAG, conn_file_path: str | None = None) -> DagRun: return test_dag(dag=dag, conn_file_path=conn_file_path) -def new_test_dag(dag: DAG): - return dag.test(logical_date=timezone.utcnow()) +# def new_test_dag(dag: DAG): +# return dag.test(logical_date=timezone.utcnow()) # DAG.test() was added in Airflow version 2.5.0. And to test on older Airflow versions, we need to copy the From 081c67b29c6a1b18ad8978a97165523b2aca29d4 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 18:52:32 +0530 Subject: [PATCH 06/33] Fix unit tests --- tests/test_dagbuilder.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_dagbuilder.py b/tests/test_dagbuilder.py index 88863342..81bb3f11 100644 --- a/tests/test_dagbuilder.py +++ b/tests/test_dagbuilder.py @@ -12,7 +12,6 @@ except ImportError: from airflow import DAG import yaml -from airflow import DAG from packaging import version from dagfactory.dagbuilder import INSTALLED_AIRFLOW_VERSION, DagBuilder, DagFactoryConfigException, Dataset From 35de485da764c9b971404eeb0e4701b57a4bbf3c Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 19:02:11 +0530 Subject: [PATCH 07/33] cleanup --- dagfactory/dagbuilder.py | 2 +- tests/test_dagbuilder.py | 4 ++-- tests/test_dagbuilder_httpoperator.py | 7 ++++++- tests/test_dagfactory.py | 3 --- tests/utils.py | 4 ---- 5 files changed, 9 insertions(+), 11 deletions(-) diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index f96eb1ac..4ab91c06 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -20,8 +20,8 @@ from airflow.sdk.definitions.dag import DAG from airflow.sdk.definitions.variable import Variable except ImportError: - from airflow import DAG from airflow.models import BaseOperator, Variable + from airflow.models.dag import DAG from dateutil.relativedelta import relativedelta diff --git a/tests/test_dagbuilder.py b/tests/test_dagbuilder.py index 81bb3f11..eda62bac 100644 --- a/tests/test_dagbuilder.py +++ b/tests/test_dagbuilder.py @@ -10,7 +10,7 @@ try: from airflow.sdk.definitions import DAG except ImportError: - from airflow import DAG + from airflow.models.dag import DAG import yaml from packaging import version @@ -641,7 +641,7 @@ def test_build_task_groups(): task_group_1 = {t for t in actual["dag"].task_dict if t.startswith("task_group_1")} task_group_2 = {t for t in actual["dag"].task_dict if t.startswith("task_group_2")} assert actual["dag_id"] == "test_dag" - # assert isinstance(actual["dag"], DAG) + assert isinstance(actual["dag"], DAG) assert len(actual["dag"].tasks) == 6 assert actual["dag"].task_dict["task_1"].downstream_task_ids == {"task_group_1.task_2"} assert actual["dag"].task_dict["task_group_1.task_2"].downstream_task_ids == {"task_group_1.task_3"} diff --git a/tests/test_dagbuilder_httpoperator.py b/tests/test_dagbuilder_httpoperator.py index cc18ed25..d3e1097e 100644 --- a/tests/test_dagbuilder_httpoperator.py +++ b/tests/test_dagbuilder_httpoperator.py @@ -10,6 +10,11 @@ from dagfactory.exceptions import DagFactoryException from tests.utils import get_schedule_key +try: + from airflow.sdk.definitions import DAG +except ImportError: + from airflow.models.dag import DAG + # Get current directory and project root here = Path(__file__).parent PROJECT_ROOT_PATH = str(here.parent) @@ -197,7 +202,7 @@ def test_dag_with_http_operator(): # Verify DAG was created successfully assert dag_obj["dag_id"] == "test_http_dag" - # assert isinstance(dag_obj["dag"], DAG) + assert isinstance(dag_obj["dag"], DAG) # Verify tasks were created correctly dag = dag_obj["dag"] diff --git a/tests/test_dagfactory.py b/tests/test_dagfactory.py index ced1b62c..ea750ae7 100644 --- a/tests/test_dagfactory.py +++ b/tests/test_dagfactory.py @@ -348,10 +348,7 @@ def test_kubernetes_pod_operator_dag_lt_2_7(): def test_variables_as_arguments_dag(monkeypatch): monkeypatch.setenv("AUTO_CONVERT_TO_AF3", "true") override_command = "value_from_variable" - # if version.parse(AIRFLOW_VERSION) >= version.parse("1.10.10"): os.environ["AIRFLOW_VAR_VAR1"] = override_command - # else: - # Variable.set("var1", override_command) td = dagfactory.DagFactory(DAG_FACTORY_VARIABLES_AS_ARGUMENTS) td.generate_dags(globals()) tasks = globals()["example_dag"].tasks diff --git a/tests/utils.py b/tests/utils.py index b036ab71..45a8bcfa 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -46,10 +46,6 @@ def run_dag(dag: DAG, conn_file_path: str | None = None) -> DagRun: return test_dag(dag=dag, conn_file_path=conn_file_path) -# def new_test_dag(dag: DAG): -# return dag.test(logical_date=timezone.utcnow()) - - # DAG.test() was added in Airflow version 2.5.0. And to test on older Airflow versions, we need to copy the # implementation here. @provide_session From 7a95ea61bee72406d385d757e5e98ec6f7fbd33a Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 19:12:44 +0530 Subject: [PATCH 08/33] Install Airflow 3.0.2 --- scripts/test/pre-install-airflow.sh | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index f84da6fb..7ecfe040 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -13,8 +13,13 @@ fi echo "${VIRTUAL_ENV}" -CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION.0/constraints-$PYTHON_VERSION.txt" -curl -sSL $CONSTRAINT_URL -o /tmp/constraint.txt +if [ "$AIRFLOW_VERSION" = "3.0" ] ; then + CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION.2/constraints-$PYTHON_VERSION.txt" +else + CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION.0/constraints-$PYTHON_VERSION.txt" +fi; + +curl -sSL "$CONSTRAINT_URL" -o /tmp/constraint.txt # Workaround to remove PyYAML constraint that will work on both Linux and MacOS sed '/PyYAML==/d' /tmp/constraint.txt > /tmp/constraint.txt.tmp mv /tmp/constraint.txt.tmp /tmp/constraint.txt From 2c06ec0257abc3f1cacda968336a4e15ca5d56d2 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 19:25:18 +0530 Subject: [PATCH 09/33] Install Airflow 3.0.2 --- scripts/test/pre-install-airflow.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index 7ecfe040..e1b63ed9 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -1,5 +1,9 @@ #!/bin/bash +set -v +set -x +set -e + AIRFLOW_VERSION="$1" PYTHON_VERSION="$2" From 9c4d9fc3239d3330b07101a85b99260846e5a7d7 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 19:28:55 +0530 Subject: [PATCH 10/33] Force reinstall --- scripts/test/pre-install-airflow.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index e1b63ed9..45e076d8 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -28,7 +28,7 @@ curl -sSL "$CONSTRAINT_URL" -o /tmp/constraint.txt sed '/PyYAML==/d' /tmp/constraint.txt > /tmp/constraint.txt.tmp mv /tmp/constraint.txt.tmp /tmp/constraint.txt # Install Airflow with constraints -uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt +uv pip install --force-reinstall "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt -uv pip install apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt +uv pip install --force-reinstall apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt rm /tmp/constraint.txt From 567aff02706252f3fa60ee7918f0acbd3ef25831 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 19:34:16 +0530 Subject: [PATCH 11/33] Force reinstall --- scripts/test/pre-install-airflow.sh | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index 45e076d8..12c677fd 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -27,8 +27,21 @@ curl -sSL "$CONSTRAINT_URL" -o /tmp/constraint.txt # Workaround to remove PyYAML constraint that will work on both Linux and MacOS sed '/PyYAML==/d' /tmp/constraint.txt > /tmp/constraint.txt.tmp mv /tmp/constraint.txt.tmp /tmp/constraint.txt + +pip install uv +uv pip install pip --upgrade + # Install Airflow with constraints uv pip install --force-reinstall "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt uv pip install --force-reinstall apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt rm /tmp/constraint.txt + +actual_version=$(airflow version | cut -d. -f1,2) + +if [ "$actual_version" = $AIRFLOW_VERSION ]; then + echo "Version is as expected: $AIRFLOW_VERSION" +else + echo "Version does not match. Expected: $AIRFLOW_VERSION, but got: $actual_version" + exit 1 +fi From 52658ae0f8361fd283aecb544fce024a8fc7e259 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 19:47:49 +0530 Subject: [PATCH 12/33] change pythonpath --- .github/workflows/cicd.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/cicd.yaml b/.github/workflows/cicd.yaml index 49d6a34d..5b3c1ce5 100644 --- a/.github/workflows/cicd.yaml +++ b/.github/workflows/cicd.yaml @@ -3,7 +3,7 @@ name: CI jobs on: # run on pushes to the default branch push: - branches: [main] + branches: [main, jinja_template_example] # run on pull requests originated from forks based on the `main` branch. # note that if you're trying to edit the CI in a pull request, # your changes won't run here. you need to temporarily add your branch to @@ -182,7 +182,7 @@ jobs: AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0 AIRFLOW_HOME: ${{ github.workspace }} CONFIG_ROOT_DIR: ${{ github.workspace }}/dags - PYTHONPATH: ${{ github.workspace }}:${{ github.workspace }}/examples:$PYTHONPATH + PYTHONPATH: ${{ github.workspace }}:${{ github.workspace }}/dev/dags:$PYTHONPATH AUTO_CONVERT_TO_AF3: true - name: Upload coverage to Github From 64bc5ecfdaa0dd524c8356b5fcbc7fd2fb4661ec Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 19:55:07 +0530 Subject: [PATCH 13/33] Remove reinstall --- scripts/test/pre-install-airflow.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index 12c677fd..436ef5ed 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -32,9 +32,9 @@ pip install uv uv pip install pip --upgrade # Install Airflow with constraints -uv pip install --force-reinstall "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt +uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt -uv pip install --force-reinstall apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt +uv pip install apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt rm /tmp/constraint.txt actual_version=$(airflow version | cut -d. -f1,2) From 386478213f78515683702d2cf3de6424a7908edd Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 19:57:59 +0530 Subject: [PATCH 14/33] Not use dag.test --- tests/test_example_dags.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 52354e3b..335537d4 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -15,7 +15,6 @@ import airflow import pytest from airflow.models.dagbag import DagBag -from airflow.utils import timezone from airflow.utils.db import create_default_connections from airflow.utils.session import provide_session from airflow.utils.state import DagRunState @@ -97,7 +96,7 @@ def test_example_dag(session, dag_id: str): dag_run = None if AIRFLOW_VERSION >= Version("3.0"): - dag_run = dag.test(logical_date=timezone.utcnow()) + dag_run = test_utils.run_dag() elif AIRFLOW_VERSION >= Version("2.5"): dag_run = dag.test() else: From 07ec3c2651863661ea67494a75007386201965be Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 20:04:36 +0530 Subject: [PATCH 15/33] Not use dag.test --- tests/test_example_dags.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 335537d4..09dbd811 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -96,7 +96,7 @@ def test_example_dag(session, dag_id: str): dag_run = None if AIRFLOW_VERSION >= Version("3.0"): - dag_run = test_utils.run_dag() + dag_run = test_utils.run_dag(dag) elif AIRFLOW_VERSION >= Version("2.5"): dag_run = dag.test() else: From 64e9099b31220650b2e351f222e1a6de26d105f0 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 21:14:42 +0530 Subject: [PATCH 16/33] Add more linting rule --- dagfactory/listeners/runtime_event.py | 6 +++++- pyproject.toml | 3 +++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/dagfactory/listeners/runtime_event.py b/dagfactory/listeners/runtime_event.py index 20b57a65..e8001022 100644 --- a/dagfactory/listeners/runtime_event.py +++ b/dagfactory/listeners/runtime_event.py @@ -1,7 +1,11 @@ from __future__ import annotations from airflow.listeners import hookimpl -from airflow.models.dag import DAG + +try: + from airflow.sdk.definitions import DAG +except ImportError: + from airflow.models.dag import DAG from airflow.models.dagrun import DagRun from dagfactory import telemetry diff --git a/pyproject.toml b/pyproject.toml index b5e2d76c..ba2e80e8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -159,14 +159,17 @@ target-version = ['py39', 'py310', 'py311', 'py312'] [tool.ruff] line-length = 120 +fix = true [tool.ruff.lint] select = ["C901", "D300", "I", "F"] ignore = ["F541", "C901"] + [tool.ruff.lint.isort] combine-as-imports = true known-first-party = ["dagfactory", "tests"] +force-sort-within-sections = true [tool.ruff.lint.mccabe] max-complexity = 10 From 3cd420db571eb8e918fef0c4bc429dec741ea6d3 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 21:24:27 +0530 Subject: [PATCH 17/33] use no cache --- scripts/test/pre-install-airflow.sh | 4 ++-- tests/test_example_dags.py | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index 436ef5ed..1ecb3699 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -32,9 +32,9 @@ pip install uv uv pip install pip --upgrade # Install Airflow with constraints -uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt +uv pip install --no-cache-dir "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt -uv pip install apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt +# uv pip install apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt rm /tmp/constraint.txt actual_version=$(airflow version | cut -d. -f1,2) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 09dbd811..6478aa0c 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -13,12 +13,13 @@ from airflow.models.dag import DAG import airflow -import pytest from airflow.models.dagbag import DagBag +from airflow.utils import timezone from airflow.utils.db import create_default_connections from airflow.utils.session import provide_session from airflow.utils.state import DagRunState from packaging.version import Version +import pytest from . import utils as test_utils @@ -96,7 +97,7 @@ def test_example_dag(session, dag_id: str): dag_run = None if AIRFLOW_VERSION >= Version("3.0"): - dag_run = test_utils.run_dag(dag) + dag_run = dag.test(logical_date=timezone.utcnow()) elif AIRFLOW_VERSION >= Version("2.5"): dag_run = dag.test() else: From ac94c0e0a0981f0c0968b21a7ca36680c6e3a990 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 21:31:21 +0530 Subject: [PATCH 18/33] Force to install apache-airflow>3.0.2 --- scripts/test/pre-install-airflow.sh | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index 1ecb3699..7a984b81 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -31,8 +31,13 @@ mv /tmp/constraint.txt.tmp /tmp/constraint.txt pip install uv uv pip install pip --upgrade -# Install Airflow with constraints -uv pip install --no-cache-dir "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt + +if [ "$AIRFLOW_VERSION" = "3.0" ] ; then + uv pip install "apache-airflow>3.0.2" --constraint /tmp/constraint.txt +else + # Install Airflow with constraints + uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt +if; # uv pip install apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt rm /tmp/constraint.txt From d89c941b566cded972338b56b6f7f35ade3f7dec Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 21:31:59 +0530 Subject: [PATCH 19/33] Force to install apache-airflow>3.0.2 --- scripts/test/pre-install-airflow.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index 7a984b81..d6251486 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -33,7 +33,7 @@ uv pip install pip --upgrade if [ "$AIRFLOW_VERSION" = "3.0" ] ; then - uv pip install "apache-airflow>3.0.2" --constraint /tmp/constraint.txt + uv pip install "apache-airflow>=3.0.2" --constraint /tmp/constraint.txt else # Install Airflow with constraints uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt From 2557b0bba392831d6fb7f5e0ef53a6501cc470ed Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 21:33:48 +0530 Subject: [PATCH 20/33] Force to install apache-airflow>=3.0.2 --- scripts/test/pre-install-airflow.sh | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index d6251486..cedf1bd9 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -32,12 +32,11 @@ pip install uv uv pip install pip --upgrade -if [ "$AIRFLOW_VERSION" = "3.0" ] ; then - uv pip install "apache-airflow>=3.0.2" --constraint /tmp/constraint.txt +if [ "$AIRFLOW_VERSION" = "3.0" ]; then + uv pip install --no-cache-dir "apache-airflow>=3.0.2" --constraint /tmp/constraint.txt else - # Install Airflow with constraints - uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt -if; + uv pip install --no-cache-dir "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt +fi; # uv pip install apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt rm /tmp/constraint.txt From be68b9eeb18f677185a33999d92d8b0d11196e91 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 21:38:39 +0530 Subject: [PATCH 21/33] Force to install apache-airflow>=3.0.2 --- scripts/test/pre-install-airflow.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index cedf1bd9..8cfe2982 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -33,9 +33,9 @@ uv pip install pip --upgrade if [ "$AIRFLOW_VERSION" = "3.0" ]; then - uv pip install --no-cache-dir "apache-airflow>=3.0.2" --constraint /tmp/constraint.txt + uv pip install "apache-airflow>=3.0.2" --constraint /tmp/constraint.txt else - uv pip install --no-cache-dir "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt + uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt fi; # uv pip install apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt From 79ad54c09d36a570e3d41ddcea78c6bc5e8b2fac Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 21:50:45 +0530 Subject: [PATCH 22/33] Force reinstall airflow 3 --- scripts/test/pre-install-airflow.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index 8cfe2982..e53b4688 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -33,7 +33,7 @@ uv pip install pip --upgrade if [ "$AIRFLOW_VERSION" = "3.0" ]; then - uv pip install "apache-airflow>=3.0.2" --constraint /tmp/constraint.txt + uv pip install --force-reinstall "apache-airflow==3.0" --constraint /tmp/constraint.txt else uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt fi; From f81b5fe6dfd462bd5d3f5b6c97ae47cf9345de6c Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 21:59:56 +0530 Subject: [PATCH 23/33] Install cncf --- scripts/test/pre-install-airflow.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index e53b4688..017a3e68 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -38,7 +38,7 @@ else uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt fi; -# uv pip install apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt +uv pip install apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt rm /tmp/constraint.txt actual_version=$(airflow version | cut -d. -f1,2) From 4a414845e5dfe76f13706e665441442f01a900f1 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 22:01:42 +0530 Subject: [PATCH 24/33] Install cncf --- scripts/test/pre-install-airflow.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index 017a3e68..b8127d18 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -36,9 +36,9 @@ if [ "$AIRFLOW_VERSION" = "3.0" ]; then uv pip install --force-reinstall "apache-airflow==3.0" --constraint /tmp/constraint.txt else uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt + uv pip install apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt fi; -uv pip install apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt rm /tmp/constraint.txt actual_version=$(airflow version | cut -d. -f1,2) From 8925000c6e60f72f8127c5e8537799504cc179f3 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 22:04:44 +0530 Subject: [PATCH 25/33] Install cncf --- scripts/test/pre-install-airflow.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index b8127d18..770f8028 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -33,7 +33,7 @@ uv pip install pip --upgrade if [ "$AIRFLOW_VERSION" = "3.0" ]; then - uv pip install --force-reinstall "apache-airflow==3.0" --constraint /tmp/constraint.txt + uv pip install "apache-airflow>=3.0.2" --constraint /tmp/constraint.txt else uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt uv pip install apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt From 90bb092aff4b12bcf400ead5ef9737ae86397467 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 30 Jun 2025 22:16:13 +0530 Subject: [PATCH 26/33] Fix static check --- dagfactory/dagbuilder.py | 8 ++++---- dagfactory/dagfactory.py | 4 ++-- dagfactory/telemetry.py | 2 +- dagfactory/utils.py | 6 +++--- tests/test_dagbuilder.py | 13 ++++++++----- tests/test_dagbuilder_httpoperator.py | 9 +++++---- 6 files changed, 23 insertions(+), 19 deletions(-) diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index 4ab91c06..0e02aeef 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -3,14 +3,14 @@ from __future__ import annotations import ast -import inspect -import os -import re -import warnings from copy import deepcopy from datetime import datetime, timedelta from functools import partial, reduce +import inspect +import os +import re from typing import Any, Callable, Dict, List, Tuple, Union +import warnings from airflow import configuration from airflow.utils.module_loading import import_string diff --git a/dagfactory/dagfactory.py b/dagfactory/dagfactory.py index 34770b60..d7e66fca 100644 --- a/dagfactory/dagfactory.py +++ b/dagfactory/dagfactory.py @@ -1,13 +1,13 @@ """Module contains code for loading a DagFactory config and generating DAGs""" +from itertools import chain import logging import os -from itertools import chain from pathlib import Path from typing import Any, Dict, List, Optional, Union -import yaml from airflow.configuration import conf as airflow_conf +import yaml try: from airflow.sdk.definitions.dag import DAG diff --git a/dagfactory/telemetry.py b/dagfactory/telemetry.py index 1ad6d10e..aa516620 100644 --- a/dagfactory/telemetry.py +++ b/dagfactory/telemetry.py @@ -5,8 +5,8 @@ from urllib import parse from urllib.parse import urlencode -import httpx from airflow import __version__ as airflow_version +import httpx import dagfactory from dagfactory import constants, settings diff --git a/dagfactory/utils.py b/dagfactory/utils.py index 6f3e5261..a019afbb 100644 --- a/dagfactory/utils.py +++ b/dagfactory/utils.py @@ -1,16 +1,16 @@ """Module contains various utilities used by dag-factory""" import ast +from datetime import date, datetime, timedelta +from importlib import import_module import importlib.util import json import logging import os +from pathlib import Path import re import sys import types -from datetime import date, datetime, timedelta -from importlib import import_module -from pathlib import Path from typing import Any, AnyStr, Dict, List, Match, Optional, Pattern, Tuple, Union import pendulum diff --git a/tests/test_dagbuilder.py b/tests/test_dagbuilder.py index eda62bac..0f3db679 100644 --- a/tests/test_dagbuilder.py +++ b/tests/test_dagbuilder.py @@ -8,11 +8,12 @@ import pytest try: - from airflow.sdk.definitions import DAG + from airflow.sdk.definitions import DAG # noqa: F401 except ImportError: - from airflow.models.dag import DAG -import yaml + from airflow.models.dag import DAG # noqa: F401 + from packaging import version +import yaml from dagfactory.dagbuilder import INSTALLED_AIRFLOW_VERSION, DagBuilder, DagFactoryConfigException, Dataset from tests.utils import ( @@ -565,7 +566,8 @@ def test_build(): td = dagbuilder.DagBuilder("test_dag", DAG_CONFIG, DEFAULT_CONFIG) actual = td.build() assert actual["dag_id"] == "test_dag" - assert isinstance(actual["dag"], DAG) + # TODO: https://github.com/astronomer/dag-factory/issues/451 + # assert isinstance(actual["dag"], DAG) assert len(actual["dag"].tasks) == 3 assert actual["dag"].task_dict["task_1"].downstream_task_ids == {"task_2", "task_3"} if version.parse(AIRFLOW_VERSION) >= version.parse("2.9.0"): @@ -641,7 +643,8 @@ def test_build_task_groups(): task_group_1 = {t for t in actual["dag"].task_dict if t.startswith("task_group_1")} task_group_2 = {t for t in actual["dag"].task_dict if t.startswith("task_group_2")} assert actual["dag_id"] == "test_dag" - assert isinstance(actual["dag"], DAG) + # TODO: https://github.com/astronomer/dag-factory/issues/451 + # assert isinstance(actual["dag"], DAG) assert len(actual["dag"].tasks) == 6 assert actual["dag"].task_dict["task_1"].downstream_task_ids == {"task_group_1.task_2"} assert actual["dag"].task_dict["task_group_1.task_2"].downstream_task_ids == {"task_group_1.task_3"} diff --git a/tests/test_dagbuilder_httpoperator.py b/tests/test_dagbuilder_httpoperator.py index d3e1097e..904dd5ff 100644 --- a/tests/test_dagbuilder_httpoperator.py +++ b/tests/test_dagbuilder_httpoperator.py @@ -11,9 +11,9 @@ from tests.utils import get_schedule_key try: - from airflow.sdk.definitions import DAG + from airflow.sdk.definitions import DAG # noqa: F401 except ImportError: - from airflow.models.dag import DAG + from airflow.models.dag import DAG # noqa: F401 # Get current directory and project root here = Path(__file__).parent @@ -51,7 +51,7 @@ "concurrency": 1, "max_active_runs": 1, "dagrun_timeout_sec": 600, - "schedule_interval": "0 1 * * *", + get_schedule_key(): "0 1 * * *", } # Basic DAG config for tests @@ -202,7 +202,8 @@ def test_dag_with_http_operator(): # Verify DAG was created successfully assert dag_obj["dag_id"] == "test_http_dag" - assert isinstance(dag_obj["dag"], DAG) + # TODO: https://github.com/astronomer/dag-factory/issues/451 + # assert isinstance(dag_obj["dag"], DAG) # Verify tasks were created correctly dag = dag_obj["dag"] From 2b863a8a9013d319fb949da1d60ad794b2c73aed Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 8 Jul 2025 14:52:40 +0530 Subject: [PATCH 27/33] Revert CI changes --- dagfactory/dagbuilder.py | 75 ++++++----------------- dagfactory/dagfactory.py | 85 +++++++++++++-------------- dagfactory/listeners/runtime_event.py | 6 +- dagfactory/telemetry.py | 2 +- dagfactory/utils.py | 42 ++++++++++--- pyproject.toml | 6 +- scripts/test/pre-install-airflow.sh | 36 ++---------- tests/test_dagbuilder.py | 39 ++++-------- tests/test_dagbuilder_httpoperator.py | 11 +--- tests/test_example_dags.py | 25 ++------ tests/utils.py | 38 ++---------- 11 files changed, 127 insertions(+), 238 deletions(-) diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index 0e02aeef..82a50ba3 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -3,38 +3,26 @@ from __future__ import annotations import ast -from copy import deepcopy -from datetime import datetime, timedelta -from functools import partial, reduce + +# pylint: disable=ungrouped-imports import inspect import os import re -from typing import Any, Callable, Dict, List, Tuple, Union import warnings +from copy import deepcopy +from datetime import datetime, timedelta +from functools import partial, reduce +from typing import Any, Callable, Dict, List, Tuple, Union -from airflow import configuration +from airflow import DAG, configuration +from airflow.models import BaseOperator, Variable from airflow.utils.module_loading import import_string - -try: - from airflow.sdk.bases.operator import BaseOperator - from airflow.sdk.definitions.dag import DAG - from airflow.sdk.definitions.variable import Variable -except ImportError: - from airflow.models import BaseOperator, Variable - from airflow.models.dag import DAG - - +from airflow.version import version as AIRFLOW_VERSION from dateutil.relativedelta import relativedelta from packaging import version from dagfactory.constants import AIRFLOW3_MAJOR_VERSION -try: - from airflow.version import version as AIRFLOW_VERSION -except ImportError: # pragma: no cover - from airflow import __version__ as AIRFLOW_VERSION - - try: from airflow.providers.cncf.kubernetes import get_provider_info @@ -52,16 +40,10 @@ try: # Try Airflow 3 from airflow.providers.standard.operators.python import BranchPythonOperator, PythonOperator except ImportError: - try: # Try Airflow 2.4+ - from airflow.operators.python import BranchPythonOperator, PythonOperator - except ImportError: - # Fallback to older versions - from airflow.operators.python_operator import BranchPythonOperator, PythonOperator + from airflow.operators.python import BranchPythonOperator, PythonOperator -try: - from airflow.providers.http.sensors.http import HttpSensor -except ImportError: # Airflow < 2.4 - from airflow.sensors.http_sensor import HttpSensor +from airflow.providers.common.sql.sensors.sql import SqlSensor +from airflow.providers.http.sensors.http import HttpSensor # http operator was renamed in providers-http 4.11.0 try: @@ -78,23 +60,12 @@ HTTP_OPERATOR_CLASS = None -# sql sensor was moved in 2.4 -try: - from airflow.sensors.sql_sensor import SqlSensor -except ImportError: # pragma: no cover - from airflow.providers.common.sql.sensors.sql import SqlSensor - - try: # Try Airflow 3 from airflow.providers.standard.sensors.python import PythonSensor except ImportError: - try: - # Try Airflow 2.4 - from airflow.sensors.python import PythonSensor - except ImportError: - # Fallback to older versions - from airflow.sensors.python import PythonSensor + from airflow.sensors.python import PythonSensor + from airflow.models import MappedOperator @@ -108,6 +79,7 @@ except ImportError: from airflow.kubernetes.secret import Secret +from airflow.datasets import Dataset from airflow.timetables.base import Timetable from airflow.utils.task_group import TaskGroup from kubernetes.client.models import ( @@ -127,12 +99,6 @@ from dagfactory import parsers, utils from dagfactory.exceptions import DagFactoryConfigException, DagFactoryException -if version.parse(AIRFLOW_VERSION) >= version.parse("2.4.0"): - from airflow.datasets import Dataset -else: - Dataset = None - - # these are params only used in the DAG factory, not in the tasks SYSTEM_PARAMS: List[str] = ["operator", "dependencies", "task_group_name", "parent_group_name"] @@ -143,7 +109,7 @@ class DagBuilder: :param dag_name: the name of the DAG :param dag_config: a dictionary containing configuration for the DAG - :param default_config: a dictitionary containing defaults for all DAGs + :param default_config: a dictionary containing defaults for all DAGs in the YAML file """ @@ -1124,16 +1090,11 @@ def adjust_general_task_params(task_params: dict(str, Any)): if utils.check_dict_key(task_params, "variables_as_arguments"): variables: List[Dict[str, str]] = task_params.get("variables_as_arguments") for variable in variables: - if INSTALLED_AIRFLOW_VERSION.major < AIRFLOW3_MAJOR_VERSION: - if Variable.get(variable["variable"], default_var=None) is not None: - task_params[variable["attribute"]] = Variable.get(variable["variable"], default_var=None) - else: - if Variable.get(variable["variable"], default=None) is not None: - task_params[variable["attribute"]] = Variable.get(variable["variable"], default=None) + if Variable.get(variable["variable"], default_var=None) is not None: + task_params[variable["attribute"]] = Variable.get(variable["variable"], default_var=None) del task_params["variables_as_arguments"] if version.parse(AIRFLOW_VERSION) >= version.parse("2.4.0"): - print("task_params перед обработкой:", task_params) for key in ["inlets", "outlets"]: if utils.check_dict_key(task_params, key): if utils.check_dict_key(task_params[key], "file") and utils.check_dict_key( diff --git a/dagfactory/dagfactory.py b/dagfactory/dagfactory.py index d7e66fca..b95b6b3b 100644 --- a/dagfactory/dagfactory.py +++ b/dagfactory/dagfactory.py @@ -1,28 +1,20 @@ """Module contains code for loading a DagFactory config and generating DAGs""" -from itertools import chain import logging import os +from itertools import chain from pathlib import Path from typing import Any, Dict, List, Optional, Union -from airflow.configuration import conf as airflow_conf import yaml - -try: - from airflow.sdk.definitions.dag import DAG -except ImportError: - from airflow.models import DAG +from airflow.configuration import conf as airflow_conf +from airflow.models import DAG +from airflow.version import version as AIRFLOW_VERSION from packaging import version from dagfactory.dagbuilder import DagBuilder from dagfactory.exceptions import DagFactoryConfigException, DagFactoryException -from dagfactory.utils import update_yaml_structure - -try: - from airflow.version import version as AIRFLOW_VERSION -except ImportError: # pragma: no cover - from airflow import __version__ as AIRFLOW_VERSION +from dagfactory.utils import cast_with_type, update_yaml_structure # these are params that cannot be a dag name SYSTEM_PARAMS: List[str] = ["default", "task_groups"] @@ -49,17 +41,41 @@ def __init__( self.default_args_config_path = default_args_config_path if config_filepath: DagFactory._validate_config_filepath(config_filepath=config_filepath) - self.config: Dict[str, Any] = DagFactory._load_config(config_filepath=config_filepath) + self.config: Dict[str, Any] = self._load_dag_config(config_filepath=config_filepath) if config: self.config: Dict[str, Any] = config - def _global_default_args(self): + def _load_yaml_config(self, config_filepath: str) -> Dict[str, Any]: + """For loading yaml config file, including DAG config and default args config.""" + + def __join(loader: yaml.FullLoader, node: yaml.Node) -> str: + seq = loader.construct_sequence(node) + return "".join([str(i) for i in seq]) + + def __or(loader: yaml.FullLoader, node: yaml.Node) -> str: + seq = loader.construct_sequence(node) + return " | ".join([f"({str(i)})" for i in seq]) + + def __and(loader: yaml.FullLoader, node: yaml.Node) -> str: + seq = loader.construct_sequence(node) + return " & ".join([f"({str(i)})" for i in seq]) + + yaml.add_constructor("!join", __join, yaml.FullLoader) + yaml.add_constructor("!or", __or, yaml.FullLoader) + yaml.add_constructor("!and", __and, yaml.FullLoader) + + with open(config_filepath, "r", encoding="utf-8") as fp: + config_with_env = os.path.expandvars(fp.read()) + config: Dict[str, Any] = yaml.load(stream=config_with_env, Loader=yaml.FullLoader) + config = cast_with_type(config) + return config + + def _global_default_args(self) -> Optional[Dict[str, Any]]: """If a defaults.yml exists, use this as the global default arguments (to be applied to each DAG).""" default_args_yml = Path(self.default_args_config_path) / "defaults.yml" if default_args_yml.exists(): - with open(default_args_yml, "r") as file: - return yaml.safe_load(file) + return self._load_yaml_config(default_args_yml) @staticmethod def _serialise_config_md(dag_name, dag_config, default_config): @@ -90,40 +106,19 @@ def _validate_config_filepath(config_filepath: str) -> None: if not os.path.isabs(config_filepath): raise DagFactoryConfigException("DAG Factory `config_filepath` must be absolute path") - @staticmethod - def _load_config(config_filepath: str) -> Dict[str, Any]: + def _load_dag_config(self, config_filepath: str) -> Dict[str, Any]: """ - Loads YAML config file to dictionary + Loads DAG config file to dictionary :returns: dict from YAML config file """ # pylint: disable=consider-using-with try: - - def __join(loader: yaml.FullLoader, node: yaml.Node) -> str: - seq = loader.construct_sequence(node) - return "".join([str(i) for i in seq]) - - def __or(loader: yaml.FullLoader, node: yaml.Node) -> str: - seq = loader.construct_sequence(node) - return " | ".join([f"({str(i)})" for i in seq]) - - def __and(loader: yaml.FullLoader, node: yaml.Node) -> str: - seq = loader.construct_sequence(node) - return " & ".join([f"({str(i)})" for i in seq]) - - yaml.add_constructor("!join", __join, yaml.FullLoader) - yaml.add_constructor("!or", __or, yaml.FullLoader) - yaml.add_constructor("!and", __and, yaml.FullLoader) - - with open(config_filepath, "r", encoding="utf-8") as fp: - config_with_env = os.path.expandvars(fp.read()) - config: Dict[str, Any] = yaml.load(stream=config_with_env, Loader=yaml.FullLoader) - - # This will only invoke in the CI - # Make yaml DAG compatible for Airflow 3 - if version.parse(AIRFLOW_VERSION) >= version.parse("3.0.0") and os.getenv("AUTO_CONVERT_TO_AF3"): - config = update_yaml_structure(config) + config = self._load_yaml_config(config_filepath) + # This will only invoke in the CI + # Make yaml DAG compatible for Airflow 3 + if version.parse(AIRFLOW_VERSION) >= version.parse("3.0.0") and os.getenv("AUTO_CONVERT_TO_AF3"): + config = update_yaml_structure(config) except Exception as err: raise DagFactoryConfigException("Invalid DAG Factory config file") from err diff --git a/dagfactory/listeners/runtime_event.py b/dagfactory/listeners/runtime_event.py index e8001022..20b57a65 100644 --- a/dagfactory/listeners/runtime_event.py +++ b/dagfactory/listeners/runtime_event.py @@ -1,11 +1,7 @@ from __future__ import annotations from airflow.listeners import hookimpl - -try: - from airflow.sdk.definitions import DAG -except ImportError: - from airflow.models.dag import DAG +from airflow.models.dag import DAG from airflow.models.dagrun import DagRun from dagfactory import telemetry diff --git a/dagfactory/telemetry.py b/dagfactory/telemetry.py index aa516620..1ad6d10e 100644 --- a/dagfactory/telemetry.py +++ b/dagfactory/telemetry.py @@ -5,8 +5,8 @@ from urllib import parse from urllib.parse import urlencode -from airflow import __version__ as airflow_version import httpx +from airflow import __version__ as airflow_version import dagfactory from dagfactory import constants, settings diff --git a/dagfactory/utils.py b/dagfactory/utils.py index a019afbb..cb013f02 100644 --- a/dagfactory/utils.py +++ b/dagfactory/utils.py @@ -1,16 +1,15 @@ """Module contains various utilities used by dag-factory""" import ast -from datetime import date, datetime, timedelta -from importlib import import_module import importlib.util import json import logging import os -from pathlib import Path import re import sys import types +from datetime import date, datetime, timedelta +from pathlib import Path from typing import Any, AnyStr, Dict, List, Match, Optional, Pattern, Tuple, Union import pendulum @@ -19,11 +18,14 @@ from dagfactory.exceptions import DagFactoryException -def _import_from_string(dotted_path: str): - """Import a class or function from a dotted path string.""" - module_path, _, attr = dotted_path.rpartition(".") - module = import_module(module_path) - return getattr(module, attr) +def _import_from_string(class_path): + """Dynamically import a class from a string.""" + try: + module_path, class_name = class_path.rsplit(".", 1) + module = importlib.import_module(module_path) + return getattr(module, class_name) + except (ImportError, AttributeError) as e: + raise ImportError(f"Could not import '{class_path}': {e}") def get_datetime(date_value: Union[str, datetime, date], timezone: str = "UTC") -> datetime: @@ -366,6 +368,8 @@ def update_yaml_structure(data): "airflow.operators.bash_operator.BashOperator": "airflow.providers.standard.operators.bash.BashOperator", "airflow.operators.python_operator.PythonOperator": "airflow.providers.standard.operators.python.PythonOperator", "airflow.operators.python.PythonOperator": "airflow.providers.standard.operators.python.PythonOperator", + "airflow.sensors.external_task.ExternalTaskSensor": "airflow.providers.standard.sensors.external_task.ExternalTaskSensor", + "airflow.sensors.external_task_sensor.ExternalTaskSensor": "airflow.providers.standard.sensors.external_task.ExternalTaskSensor", } if isinstance(data, dict): keys_to_update = [] @@ -389,3 +393,25 @@ def update_yaml_structure(data): update_yaml_structure(item) return data + + +def cast_with_type(data): + """Recursively cast dictionaries with a __type__ key.""" + if isinstance(data, dict): + # Handle typed list + if data.get("__type__") == "builtins.list" and "items" in data: + return [cast_with_type(item) for item in data["items"]] + + # Normal typed dict + processed = {k: cast_with_type(v) for k, v in data.items() if k != "__type__"} + + if "__type__" in data: + class_type = _import_from_string(data["__type__"]) + return class_type(**processed) + + return processed + + elif isinstance(data, list): + return [cast_with_type(item) for item in data] + + return data diff --git a/pyproject.toml b/pyproject.toml index ba2e80e8..3f855b49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,7 @@ classifiers = [ "Programming Language :: Python :: 3.12", ] dependencies = [ - "apache-airflow>=2.3", + "apache-airflow>=2.4", "apache-airflow-providers-http>=2.0.0", "apache-airflow-providers-cncf-kubernetes", "pyyaml", @@ -57,6 +57,7 @@ dev-dependencies = [ "apache-airflow-providers-slack", "httpx>=0.25.0", "pandas", + "hatch", ] # Respect existing constraints for Airflow compatibility @@ -159,17 +160,14 @@ target-version = ['py39', 'py310', 'py311', 'py312'] [tool.ruff] line-length = 120 -fix = true [tool.ruff.lint] select = ["C901", "D300", "I", "F"] ignore = ["F541", "C901"] - [tool.ruff.lint.isort] combine-as-imports = true known-first-party = ["dagfactory", "tests"] -force-sort-within-sections = true [tool.ruff.lint.mccabe] max-complexity = 10 diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index 770f8028..f84da6fb 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -1,9 +1,5 @@ #!/bin/bash -set -v -set -x -set -e - AIRFLOW_VERSION="$1" PYTHON_VERSION="$2" @@ -17,35 +13,13 @@ fi echo "${VIRTUAL_ENV}" -if [ "$AIRFLOW_VERSION" = "3.0" ] ; then - CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION.2/constraints-$PYTHON_VERSION.txt" -else - CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION.0/constraints-$PYTHON_VERSION.txt" -fi; - -curl -sSL "$CONSTRAINT_URL" -o /tmp/constraint.txt +CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION.0/constraints-$PYTHON_VERSION.txt" +curl -sSL $CONSTRAINT_URL -o /tmp/constraint.txt # Workaround to remove PyYAML constraint that will work on both Linux and MacOS sed '/PyYAML==/d' /tmp/constraint.txt > /tmp/constraint.txt.tmp mv /tmp/constraint.txt.tmp /tmp/constraint.txt +# Install Airflow with constraints +uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt -pip install uv -uv pip install pip --upgrade - - -if [ "$AIRFLOW_VERSION" = "3.0" ]; then - uv pip install "apache-airflow>=3.0.2" --constraint /tmp/constraint.txt -else - uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt - uv pip install apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt -fi; - +uv pip install apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt rm /tmp/constraint.txt - -actual_version=$(airflow version | cut -d. -f1,2) - -if [ "$actual_version" = $AIRFLOW_VERSION ]; then - echo "Version is as expected: $AIRFLOW_VERSION" -else - echo "Version does not match. Expected: $AIRFLOW_VERSION, but got: $actual_version" - exit 1 -fi diff --git a/tests/test_dagbuilder.py b/tests/test_dagbuilder.py index 0f3db679..3c3edbc7 100644 --- a/tests/test_dagbuilder.py +++ b/tests/test_dagbuilder.py @@ -8,12 +8,14 @@ import pytest try: - from airflow.sdk.definitions import DAG # noqa: F401 + from airflow.sdk.definitions import DAG except ImportError: - from airflow.models.dag import DAG # noqa: F401 - -from packaging import version + from airflow.models import DAG import yaml +from airflow.providers.common.sql.sensors.sql import SqlSensor +from airflow.providers.http.sensors.http import HttpSensor +from airflow.version import version as AIRFLOW_VERSION +from packaging import version from dagfactory.dagbuilder import INSTALLED_AIRFLOW_VERSION, DagBuilder, DagFactoryConfigException, Dataset from tests.utils import ( @@ -27,33 +29,16 @@ ) try: - from airflow.providers.http.sensors.http import HttpSensor -except ImportError: # Airflow < 2.4 - from airflow.sensors.http_sensor import HttpSensor - -try: - from airflow.sensors.sql_sensor import SqlSensor + from airflow.providers.standard.operators.bash import BashOperator except ImportError: - from airflow.providers.common.sql.sensors.sql import SqlSensor - -try: from airflow.operators.bash import BashOperator -except ImportError: - from airflow.operators.bash_operator import BashOperator + try: # Try Airflow 3 from airflow.providers.standard.operators.python import PythonOperator except ImportError: - try: # Try Airflow 2.4+ - from airflow.operators.python import PythonOperator - except ImportError: - # Fallback to older versions - from airflow.operators.python_operator import PythonOperator + from airflow.operators.python import PythonOperator -try: - from airflow.version import version as AIRFLOW_VERSION -except ImportError: - from airflow import __version__ as AIRFLOW_VERSION from dagfactory import dagbuilder @@ -566,8 +551,7 @@ def test_build(): td = dagbuilder.DagBuilder("test_dag", DAG_CONFIG, DEFAULT_CONFIG) actual = td.build() assert actual["dag_id"] == "test_dag" - # TODO: https://github.com/astronomer/dag-factory/issues/451 - # assert isinstance(actual["dag"], DAG) + assert isinstance(actual["dag"], DAG) assert len(actual["dag"].tasks) == 3 assert actual["dag"].task_dict["task_1"].downstream_task_ids == {"task_2", "task_3"} if version.parse(AIRFLOW_VERSION) >= version.parse("2.9.0"): @@ -643,8 +627,7 @@ def test_build_task_groups(): task_group_1 = {t for t in actual["dag"].task_dict if t.startswith("task_group_1")} task_group_2 = {t for t in actual["dag"].task_dict if t.startswith("task_group_2")} assert actual["dag_id"] == "test_dag" - # TODO: https://github.com/astronomer/dag-factory/issues/451 - # assert isinstance(actual["dag"], DAG) + assert isinstance(actual["dag"], DAG) assert len(actual["dag"].tasks) == 6 assert actual["dag"].task_dict["task_1"].downstream_task_ids == {"task_group_1.task_2"} assert actual["dag"].task_dict["task_group_1.task_2"].downstream_task_ids == {"task_group_1.task_3"} diff --git a/tests/test_dagbuilder_httpoperator.py b/tests/test_dagbuilder_httpoperator.py index 904dd5ff..0d5be9bc 100644 --- a/tests/test_dagbuilder_httpoperator.py +++ b/tests/test_dagbuilder_httpoperator.py @@ -5,16 +5,12 @@ import pendulum import pytest +from airflow import DAG from dagfactory.dagbuilder import DagBuilder from dagfactory.exceptions import DagFactoryException from tests.utils import get_schedule_key -try: - from airflow.sdk.definitions import DAG # noqa: F401 -except ImportError: - from airflow.models.dag import DAG # noqa: F401 - # Get current directory and project root here = Path(__file__).parent PROJECT_ROOT_PATH = str(here.parent) @@ -51,7 +47,7 @@ "concurrency": 1, "max_active_runs": 1, "dagrun_timeout_sec": 600, - get_schedule_key(): "0 1 * * *", + "schedule_interval": "0 1 * * *", } # Basic DAG config for tests @@ -202,8 +198,7 @@ def test_dag_with_http_operator(): # Verify DAG was created successfully assert dag_obj["dag_id"] == "test_http_dag" - # TODO: https://github.com/astronomer/dag-factory/issues/451 - # assert isinstance(dag_obj["dag"], DAG) + assert isinstance(dag_obj["dag"], DAG) # Verify tasks were created correctly dag = dag_obj["dag"] diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 6478aa0c..0d1c8514 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -1,25 +1,15 @@ from __future__ import annotations +from functools import cache from pathlib import Path -try: - from functools import cache -except ImportError: - from functools import lru_cache as cache - -try: - from airflow.sdk.definitions.dag import DAG -except ImportError: - from airflow.models.dag import DAG - import airflow +import pytest from airflow.models.dagbag import DagBag -from airflow.utils import timezone from airflow.utils.db import create_default_connections from airflow.utils.session import provide_session from airflow.utils.state import DagRunState from packaging.version import Version -import pytest from . import utils as test_utils @@ -29,8 +19,6 @@ IGNORED_DAG_FILES = ["example_callbacks.py", "example_http_operator_task.py"] MIN_VER_DAG_FILE_VER: dict[str, list[str]] = { - # TaskFlow examples unrelated to dynamic task mapping work in earlier versions - "2.3": ["example_dynamic_task_mapping.py", "example_taskflow.py"], "2.5": [ "example_pypi_stats_dagfactory", "example_hackernews_dagfactory", @@ -38,7 +26,6 @@ "example_pypi_stats_plain_airflow", ], "2.7": ["example_map_index_template.py"], - "2.4": ["example_external_sensor_dag.py"], "2.9": ["example_map_index_template.py"], } @@ -93,12 +80,12 @@ def get_dag_ids() -> list[str]: @pytest.mark.parametrize("dag_id", get_dag_ids()) def test_example_dag(session, dag_id: str): dag_bag = get_dag_bag() - dag: DAG = dag_bag.get_dag(dag_id) + dag = dag_bag.get_dag(dag_id) + # This feature is available since Airflow 2.5: + # https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#airflow-2-5-0-2022-12-02 dag_run = None - if AIRFLOW_VERSION >= Version("3.0"): - dag_run = dag.test(logical_date=timezone.utcnow()) - elif AIRFLOW_VERSION >= Version("2.5"): + if AIRFLOW_VERSION >= Version("2.5"): dag_run = dag.test() else: dag_run = test_utils.run_dag(dag) diff --git a/tests/utils.py b/tests/utils.py index 45a8bcfa..1275ce80 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -8,39 +8,20 @@ import yaml from airflow.configuration import secrets_backend_list from airflow.exceptions import AirflowSkipException - -try: - from airflow.sdk.definitions.dag import DAG -except ImportError: - from airflow.models.dag import DAG - +from airflow.models.dag import DAG from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance from airflow.secrets.local_filesystem import LocalFilesystemBackend from airflow.utils import timezone -from airflow.utils.session import provide_session +from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import DagRunState, State from airflow.utils.types import DagRunType +from airflow.version import version as AIRFLOW_VERSION from packaging import version from sqlalchemy.orm.session import Session -try: - from airflow.utils.session import NEW_SESSION -except ImportError: - # Airflow < 2.3 did not have NEW_SESSION in airflow.utils.session - from typing import cast - - from airflow import settings - - NEW_SESSION: settings.SASession = cast(settings.SASession, None) - log = logging.getLogger(__name__) -try: - from airflow.version import version as AIRFLOW_VERSION -except ImportError: - from airflow import __version__ as AIRFLOW_VERSION - def run_dag(dag: DAG, conn_file_path: str | None = None) -> DagRun: return test_dag(dag=dag, conn_file_path=conn_file_path) @@ -216,27 +197,20 @@ def get_bash_operator_path(): airflow_version = version.parse(AIRFLOW_VERSION) if airflow_version >= version.parse("3.0.0"): return "airflow.providers.standard.operators.bash.BashOperator" - elif airflow_version >= version.parse("2.4.0"): - return "airflow.operators.bash.BashOperator" else: - return "airflow.operators.bash_operator.BashOperator" + return "airflow.operators.bash.BashOperator" def get_python_operator_path(): airflow_version = version.parse(AIRFLOW_VERSION) if airflow_version >= version.parse("3.0.0"): return "airflow.providers.standard.operators.python.PythonOperator" - elif airflow_version >= version.parse("2.4.0"): - return "airflow.operators.python.PythonOperator" else: - return "airflow.operators.python_operator.PythonOperator" + return "airflow.operators.python.PythonOperator" def get_sql_sensor_path(): - if version.parse(AIRFLOW_VERSION) < version.parse("2.4.0"): - return "airflow.sensors.sql_sensor.SqlSensor" - else: - return "airflow.providers.common.sql.sensors.sql.SqlSensor" + return "airflow.providers.common.sql.sensors.sql.SqlSensor" def read_yml(path): From ebdd7a796b2a9f6add7f4bc9b90f9af37ff581f2 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 8 Jul 2025 14:55:04 +0530 Subject: [PATCH 28/33] Revert CI changes --- tests/test_dagfactory.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/tests/test_dagfactory.py b/tests/test_dagfactory.py index ea750ae7..e542c7db 100644 --- a/tests/test_dagfactory.py +++ b/tests/test_dagfactory.py @@ -3,12 +3,12 @@ import os import pytest +from airflow.version import version as AIRFLOW_VERSION try: - from airflow.version import version as AIRFLOW_VERSION -except ImportError: # pragma: no cover - from airflow import __version__ as AIRFLOW_VERSION - + from airflow.sdk.definitions.variable import Variable +except ImportError: + from airflow.models.variable import Variable from packaging import version from tests.utils import get_bash_operator_path, get_schedule_key @@ -112,7 +112,7 @@ def test_validate_config_filepath_invalid(): @pytest.mark.skipif( version.parse(AIRFLOW_VERSION) < version.parse("2.4.0"), reason="Requires Airflow version greater than 2.4.0" ) -def test_load_config_valid(monkeypatch): +def test_load_dag_config_valid(monkeypatch): monkeypatch.setenv("AUTO_CONVERT_TO_AF3", "true") expected = { "default": { @@ -193,15 +193,17 @@ def test_load_config_valid(monkeypatch): }, }, } - actual = dagfactory.DagFactory._load_config(TEST_DAG_FACTORY) + td = dagfactory.DagFactory(DAG_FACTORY_VARIABLES_AS_ARGUMENTS) + actual = td._load_dag_config(TEST_DAG_FACTORY) actual["example_dag2"]["doc_md_file_path"] = DOC_MD_FIXTURE_FILE actual["example_dag3"]["doc_md_python_callable_file"] = DOC_MD_PYTHON_CALLABLE_FILE assert actual == expected -def test_load_config_invalid(): +def test_load_dag_config_invalid(): + td = dagfactory.DagFactory(DAG_FACTORY_VARIABLES_AS_ARGUMENTS) with pytest.raises(Exception): - dagfactory.DagFactory._load_config(INVALID_YAML) + td._load_dag_config(INVALID_YAML) @pytest.mark.skipif( @@ -348,7 +350,10 @@ def test_kubernetes_pod_operator_dag_lt_2_7(): def test_variables_as_arguments_dag(monkeypatch): monkeypatch.setenv("AUTO_CONVERT_TO_AF3", "true") override_command = "value_from_variable" - os.environ["AIRFLOW_VAR_VAR1"] = override_command + if version.parse(AIRFLOW_VERSION) >= version.parse("1.10.10"): + os.environ["AIRFLOW_VAR_VAR1"] = override_command + else: + Variable.set("var1", override_command) td = dagfactory.DagFactory(DAG_FACTORY_VARIABLES_AS_ARGUMENTS) td.generate_dags(globals()) tasks = globals()["example_dag"].tasks From b2614b8a18163ec76bfb2af2204a369f2869eac6 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 8 Jul 2025 14:57:15 +0530 Subject: [PATCH 29/33] Revert CI changes --- mkdocs.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/mkdocs.yml b/mkdocs.yml index 23e59cca..78c98822 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -52,6 +52,7 @@ nav: - configuration/configuring_workflows.md - configuration/environment_variables.md - configuration/defaults.md + - configuration/params.md - configuration/jinja2_template.md - configuration/schedule.md - Features: From 635e0cf42551d77ab8f7abeb6a0320eea227f91c Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 8 Jul 2025 15:03:51 +0530 Subject: [PATCH 30/33] Revert CI changes --- .github/workflows/cicd.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/cicd.yaml b/.github/workflows/cicd.yaml index bad081f1..f6c9175e 100644 --- a/.github/workflows/cicd.yaml +++ b/.github/workflows/cicd.yaml @@ -3,7 +3,7 @@ name: CI jobs on: # run on pushes to the default branch push: - branches: [main, jinja_template_example] + branches: [main] # run on pull requests originated from forks based on the `main` branch. # note that if you're trying to edit the CI in a pull request, # your changes won't run here. you need to temporarily add your branch to @@ -185,7 +185,7 @@ jobs: AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0 AIRFLOW_HOME: ${{ github.workspace }} CONFIG_ROOT_DIR: ${{ github.workspace }}/dags - PYTHONPATH: ${{ github.workspace }}:${{ github.workspace }}/dev/dags:$PYTHONPATH + PYTHONPATH: ${{ github.workspace }}:${{ github.workspace }}/examples:$PYTHONPATH AUTO_CONVERT_TO_AF3: true - name: Upload coverage to Github From bc63f7c2236837f4d325f447e487bab17a010d40 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Thu, 10 Jul 2025 17:43:51 +0530 Subject: [PATCH 31/33] Fix tests --- dagfactory/listeners/runtime_event.py | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/dagfactory/listeners/runtime_event.py b/dagfactory/listeners/runtime_event.py index 79031dd6..1d7f2e3f 100644 --- a/dagfactory/listeners/runtime_event.py +++ b/dagfactory/listeners/runtime_event.py @@ -5,11 +5,17 @@ try: from airflow.sdk.definitions.dag import DAG except ImportError: - from airflow.models import DAG + from airflow.models.dag import DAG +import hashlib + from airflow.models.dagrun import DagRun +from airflow.version import version as AIRFLOW_VERSION +from packaging import version from dagfactory import telemetry +INSTALLED_AIRFLOW_VERSION = version.parse(AIRFLOW_VERSION) + class EventStatus: SUCCESS = "success" @@ -30,8 +36,15 @@ def on_dag_run_success(dag_run: DagRun, msg: str): dag = dag_run.get_dag() if not is_dagfactory_dag(dag): return + + if INSTALLED_AIRFLOW_VERSION < version.Version("3.0.0"): + dag_hash = dag_run.dag_hash + else: + dag_id_str = str(dag_run.dag_id) + dag_hash = hashlib.md5(dag_id_str.encode("utf-8")).hexdigest() + additional_telemetry_metrics = { - "dag_hash": dag_run.dag_hash, + "dag_hash": dag_hash, "status": EventStatus.SUCCESS, "task_count": len(dag.task_ids), } @@ -44,8 +57,15 @@ def on_dag_run_failed(dag_run: DagRun, msg: str): dag = dag_run.get_dag() if not is_dagfactory_dag(dag): return + + if INSTALLED_AIRFLOW_VERSION < version.Version("3.0.0"): + dag_hash = dag_run.dag_hash + else: + dag_id_str = str(dag_run.dag_id) + dag_hash = hashlib.md5(dag_id_str.encode("utf-8")).hexdigest() + additional_telemetry_metrics = { - "dag_hash": dag_run.dag_hash, + "dag_hash": dag_hash, "status": EventStatus.FAILED, "task_count": len(dag.task_ids), } From ec7f67bced51257005a3d3984653df4772eb15c0 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Thu, 10 Jul 2025 18:12:14 +0530 Subject: [PATCH 32/33] Update dag --- dev/dags/example_jinja2_template_dag.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dev/dags/example_jinja2_template_dag.yml b/dev/dags/example_jinja2_template_dag.yml index bd508a41..0dde3716 100644 --- a/dev/dags/example_jinja2_template_dag.yml +++ b/dev/dags/example_jinja2_template_dag.yml @@ -5,12 +5,12 @@ example_jinja2_template_dag: description: "A DAG that uses Airflow's built-in Jinja templates" catchup: false tasks: - print_execution_date: + print_run_id: operator: "airflow.operators.bash.BashOperator" - bash_command: "echo 'Execution date is {{ ds }}'" + bash_command: "echo 'run-id is {{ run_id }}'" - print_next_execution: + print_task: operator: "airflow.operators.bash.BashOperator" - bash_command: "echo 'Next execution date is {{ macros.ds_add(ds, 1) }}'" + bash_command: "echo 'The task is {{ task }}'" dependencies: - - print_execution_date + - print_run_id From c39879d80c71cc3f74c4bf4ee88b5a25611affa6 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Thu, 10 Jul 2025 18:14:58 +0530 Subject: [PATCH 33/33] Revert some changes --- dagfactory/listeners/runtime_event.py | 26 +++----------------------- 1 file changed, 3 insertions(+), 23 deletions(-) diff --git a/dagfactory/listeners/runtime_event.py b/dagfactory/listeners/runtime_event.py index 1d7f2e3f..79031dd6 100644 --- a/dagfactory/listeners/runtime_event.py +++ b/dagfactory/listeners/runtime_event.py @@ -5,17 +5,11 @@ try: from airflow.sdk.definitions.dag import DAG except ImportError: - from airflow.models.dag import DAG -import hashlib - + from airflow.models import DAG from airflow.models.dagrun import DagRun -from airflow.version import version as AIRFLOW_VERSION -from packaging import version from dagfactory import telemetry -INSTALLED_AIRFLOW_VERSION = version.parse(AIRFLOW_VERSION) - class EventStatus: SUCCESS = "success" @@ -36,15 +30,8 @@ def on_dag_run_success(dag_run: DagRun, msg: str): dag = dag_run.get_dag() if not is_dagfactory_dag(dag): return - - if INSTALLED_AIRFLOW_VERSION < version.Version("3.0.0"): - dag_hash = dag_run.dag_hash - else: - dag_id_str = str(dag_run.dag_id) - dag_hash = hashlib.md5(dag_id_str.encode("utf-8")).hexdigest() - additional_telemetry_metrics = { - "dag_hash": dag_hash, + "dag_hash": dag_run.dag_hash, "status": EventStatus.SUCCESS, "task_count": len(dag.task_ids), } @@ -57,15 +44,8 @@ def on_dag_run_failed(dag_run: DagRun, msg: str): dag = dag_run.get_dag() if not is_dagfactory_dag(dag): return - - if INSTALLED_AIRFLOW_VERSION < version.Version("3.0.0"): - dag_hash = dag_run.dag_hash - else: - dag_id_str = str(dag_run.dag_id) - dag_hash = hashlib.md5(dag_id_str.encode("utf-8")).hexdigest() - additional_telemetry_metrics = { - "dag_hash": dag_hash, + "dag_hash": dag_run.dag_hash, "status": EventStatus.FAILED, "task_count": len(dag.task_ids), }