diff --git a/.github/workflows/cicd.yaml b/.github/workflows/cicd.yaml index d10e16c8..bed9ae47 100644 --- a/.github/workflows/cicd.yaml +++ b/.github/workflows/cicd.yaml @@ -2,7 +2,7 @@ name: CI jobs on: push: # Run on pushes to the default branch - branches: [main] + branches: [main, bump_airflow] pull_request_target: # Also run on pull requests originated from forks branches: [main] release: @@ -35,15 +35,8 @@ jobs: fail-fast: false matrix: python-version: [ "3.8", "3.9", "3.10", "3.11", "3.12" ] - airflow-version: [ "2.2", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10" ] + airflow-version: [ "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10" ] exclude: - # Apache Airflow versions prior to 2.3.0 have not been tested with Python 3.10 - # See: https://airflow.apache.org/docs/apache-airflow/2.2.0/installation/prerequisites.html - - python-version: "3.10" - airflow-version: "2.2" - # Apache Airflow versions prior to 2.6.2 have not been tested with Python 3.11 - - python-version: "3.11" - airflow-version: "2.2" - python-version: "3.11" airflow-version: "2.3" - python-version: "3.11" @@ -56,8 +49,6 @@ jobs: # Official support for Python 3.12 and the corresponding constraints.txt are available only for Apache Airflow >= 2.9.0. # See: https://github.com/apache/airflow/tree/2.9.0?tab=readme-ov-file#requirements # See: https://github.com/apache/airflow/tree/2.8.4?tab=readme-ov-file#requirements - - python-version: "3.12" - airflow-version: "2.2" - python-version: "3.12" airflow-version: "2.3" - python-version: "3.12" @@ -110,15 +101,8 @@ jobs: fail-fast: false matrix: python-version: [ "3.8", "3.9", "3.10", "3.11", "3.12" ] - airflow-version: [ "2.2", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10" ] + airflow-version: [ "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10" ] exclude: - # Apache Airflow versions prior to 2.3.0 have not been tested with Python 3.10 - # See: https://airflow.apache.org/docs/apache-airflow/2.2.0/installation/prerequisites.html - - python-version: "3.10" - airflow-version: "2.2" - # Apache Airflow versions prior to 2.6.2 have not been tested with Python 3.11 - - python-version: "3.11" - airflow-version: "2.2" - python-version: "3.11" airflow-version: "2.3" - python-version: "3.11" @@ -131,8 +115,6 @@ jobs: # Official support for Python 3.12 and the corresponding constraints.txt are available only for Apache Airflow >= 2.9.0. # See: https://github.com/apache/airflow/tree/2.9.0?tab=readme-ov-file#requirements # See: https://github.com/apache/airflow/tree/2.8.4?tab=readme-ov-file#requirements - - python-version: "3.12" - airflow-version: "2.2" - python-version: "3.12" airflow-version: "2.3" - python-version: "3.12" diff --git a/README.md b/README.md index 407eba6c..f855b650 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ declaratively via configuration files. The minimum requirements for **dag-factory** are: - Python 3.8.0+ -- [Apache Airflow®](https://airflow.apache.org) 2.0+ +- [Apache Airflow®](https://airflow.apache.org) 2.3+ For a gentle introduction, please take a look at our [Quickstart Guide](https://astronomer.github.io/dag-factory/latest/getting-started/quick-start-airflow-standalone/). For more examples, please see the [examples](/examples) folder. @@ -22,12 +22,12 @@ For a gentle introduction, please take a look at our [Quickstart Guide](https:// - [Quickstart](https://astronomer.github.io/dag-factory/latest/getting-started/quick-start-astro-cli/) - [Benefits](#benefits) - [Features](#features) - - [Dynamically Mapped Tasks](https://astronomer.github.io/dag-factory/latest/features/dynamic_tasks/) - - [Multiple Configuration Files](#multiple-configuration-files) - - [Callbacks](#callbacks) - - [Custom Operators](#custom-operators) + - [Dynamically Mapped Tasks](https://astronomer.github.io/dag-factory/latest/features/dynamic_tasks/) + - [Multiple Configuration Files](#multiple-configuration-files) + - [Callbacks](https://astronomer.github.io/dag-factory/dev/features/callbacks/) + - [Custom Operators](#custom-operators) - [Notes](#notes) - - [HttpSensor (since 1.0.0)](#httpsensor-since-100) + - [HttpSensor (since 1.0.0)](#httpsensor-since-100) - [Contributing](https://astronomer.github.io/dag-factory/latest/contributing/howto/) ## Benefits @@ -65,7 +65,6 @@ If you want to split your DAG configuration into multiple files, you can do so b ![custom_operators.png](img/custom_operators.png) - ## Notes ### HttpSensor (since 1.0.0) diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index c36ab49b..34809eeb 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -8,6 +8,7 @@ import inspect import os import re +import warnings from copy import deepcopy from datetime import datetime, timedelta from functools import partial @@ -29,6 +30,8 @@ except ImportError: from airflow.operators.python_operator import BranchPythonOperator, PythonOperator +from airflow.providers.http.sensors.http import HttpSensor + # http operator was renamed in providers-http 4.11.0 try: from airflow.providers.http.operators.http import HttpOperator @@ -43,11 +46,6 @@ # Fall back to dynamically importing the operator HTTP_OPERATOR_CLASS = None -# http sensor was moved in 2.4 -try: - from airflow.providers.http.sensors.http import HttpSensor -except ImportError: - from airflow.sensors.http_sensor import HttpSensor # sql sensor was moved in 2.4 try: @@ -95,25 +93,14 @@ from airflow.contrib.kubernetes.volume_mount import VolumeMount from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator +from airflow.models import MappedOperator +from airflow.timetables.base import Timetable from airflow.utils.task_group import TaskGroup from kubernetes.client.models import V1Container, V1Pod from dagfactory import parsers, utils from dagfactory.exceptions import DagFactoryConfigException, DagFactoryException -# TimeTable is introduced in Airflow 2.2.0 -if version.parse(AIRFLOW_VERSION) >= version.parse("2.2.0"): - from airflow.timetables.base import Timetable -else: - Timetable = None -# pylint: disable=ungrouped-imports,invalid-name - -if version.parse(AIRFLOW_VERSION) >= version.parse("2.3.0"): - from airflow.models import MappedOperator -else: - MappedOperator = None - - if version.parse(AIRFLOW_VERSION) >= version.parse("2.4.0"): from airflow.datasets import Dataset else: @@ -459,10 +446,7 @@ def make_task(operator: str, task_params: Dict[str, Any]) -> BaseOperator: DagBuilder.adjust_general_task_params(task_params) expand_kwargs: Dict[str, Union[Dict[str, Any], Any]] = {} - # expand available only in airflow >= 2.3.0 - if ( - utils.check_dict_key(task_params, "expand") or utils.check_dict_key(task_params, "partial") - ) and version.parse(AIRFLOW_VERSION) >= version.parse("2.3.0"): + if utils.check_dict_key(task_params, "expand") or utils.check_dict_key(task_params, "partial"): # Getting expand and partial kwargs from task_params (task_params, expand_kwargs, partial_kwargs) = utils.get_expand_partial_kwargs(task_params) @@ -795,19 +779,20 @@ def build(self) -> Dict[str, Union[str, DAG]]: dag_kwargs["description"] = dag_params.get("description", None) - if version.parse(AIRFLOW_VERSION) >= version.parse("2.2.0"): + if "concurrency" in dag_params: + warnings.warn( + "`concurrency` param is deprecated. Please use max_active_tasks.", category=DeprecationWarning + ) + dag_kwargs["max_active_tasks"] = dag_params["concurrency"] + else: dag_kwargs["max_active_tasks"] = dag_params.get( "max_active_tasks", configuration.conf.getint("core", "max_active_tasks_per_dag") ) - if dag_params.get("timetable"): - timetable_args = dag_params.get("timetable") - dag_kwargs["timetable"] = DagBuilder.make_timetable( - timetable_args.get("callable"), timetable_args.get("params") - ) - else: - dag_kwargs["concurrency"] = dag_params.get( - "concurrency", configuration.conf.getint("core", "dag_concurrency") + if dag_params.get("timetable"): + timetable_args = dag_params.get("timetable") + dag_kwargs["timetable"] = DagBuilder.make_timetable( + timetable_args.get("callable"), timetable_args.get("params") ) dag_kwargs["catchup"] = dag_params.get( @@ -830,9 +815,7 @@ def build(self) -> Dict[str, Union[str, DAG]]: dag_kwargs["template_searchpath"] = dag_params.get("template_searchpath", None) - # Jinja NativeEnvironment support has been added in Airflow 2.1.0 - if version.parse(AIRFLOW_VERSION) >= version.parse("2.1.0"): - dag_kwargs["render_template_as_native_obj"] = dag_params.get("render_template_as_native_obj", False) + dag_kwargs["render_template_as_native_obj"] = dag_params.get("render_template_as_native_obj", False) dag_kwargs["sla_miss_callback"] = dag_params.get("sla_miss_callback", None) @@ -904,12 +887,8 @@ def build(self) -> Dict[str, Union[str, DAG]]: if "operator" in task_conf: operator: str = task_conf["operator"] - # Dynamic task mapping available only in Airflow >= 2.3.0 if task_conf.get("expand"): - if version.parse(AIRFLOW_VERSION) < version.parse("2.3.0"): - raise DagFactoryConfigException("Dynamic task mapping available only in Airflow >= 2.3.0") - else: - task_conf = self.replace_expand_values(task_conf, tasks_dict) + task_conf = self.replace_expand_values(task_conf, tasks_dict) task: Union[BaseOperator, MappedOperator] = DagBuilder.make_task(operator=operator, task_params=params) tasks_dict[task.task_id]: BaseOperator = task @@ -1142,9 +1121,6 @@ def set_callback(parameters: Union[dict, str], callback_type: str, has_name_and_ :param has_name_and_file: :returns: Callable """ - # Check Airflow version, raise an exception otherwise - if version.parse(AIRFLOW_VERSION) < version.parse("2.0.0"): - raise DagFactoryException("Cannot parse callbacks with an Airflow version less than 2.0.0.") # There is scenario where a callback is passed in via a file and a name. For the most part, this will be a # Python callable that is treated similarly to a Python callable that the PythonOperator may leverage. That diff --git a/pyproject.toml b/pyproject.toml index df834b0c..fe6b1ed8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ classifiers = [ "Programming Language :: Python :: 3.12", ] dependencies = [ - "apache-airflow>=2.0", + "apache-airflow>=2.3", "apache-airflow-providers-http>=2.0.0", "apache-airflow-providers-cncf-kubernetes", "pyyaml", @@ -60,7 +60,7 @@ pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} [[tool.hatch.envs.tests.matrix]] python = ["3.8", "3.9", "3.10", "3.11", "3.12"] -airflow = ["2.2", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"] +airflow = ["2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"] [tool.hatch.envs.tests.scripts] diff --git a/scripts/test/integration.sh b/scripts/test/integration.sh index 6215e2dd..d7432ca0 100644 --- a/scripts/test/integration.sh +++ b/scripts/test/integration.sh @@ -10,8 +10,8 @@ ls $AIRFLOW_HOME airflow db check -# Necessary for overcoming the following issue with Airflow 2.2: -# ERROR: Cannot install apache-airflow==2.2.0, apache-airflow==2.2.1, apache-airflow==2.2.2, apache-airflow==2.2.3, apache-airflow==2.2.4, apache-airflow==2.2.5, httpx>=0.25.0 and tabulate>=0.9.0 because these package versions have conflicting dependencies. +# Necessary for overcoming the following issue with Airflow 2.3 and 2.4: +# ImportError: Pandas requires version '0.9.0' or newer of 'tabulate' (version '0.8.9' currently installed) pip install "tabulate>=0.9.0" pytest -vv \