Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 3 additions & 21 deletions .github/workflows/cicd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: CI jobs

on:
push: # Run on pushes to the default branch
branches: [main]
branches: [main, bump_airflow]
pull_request_target: # Also run on pull requests originated from forks
branches: [main]
release:
Expand Down Expand Up @@ -35,15 +35,8 @@ jobs:
fail-fast: false
matrix:
python-version: [ "3.8", "3.9", "3.10", "3.11", "3.12" ]
airflow-version: [ "2.2", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10" ]
airflow-version: [ "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10" ]
exclude:
# Apache Airflow versions prior to 2.3.0 have not been tested with Python 3.10
# See: https://airflow.apache.org/docs/apache-airflow/2.2.0/installation/prerequisites.html
- python-version: "3.10"
airflow-version: "2.2"
# Apache Airflow versions prior to 2.6.2 have not been tested with Python 3.11
- python-version: "3.11"
airflow-version: "2.2"
- python-version: "3.11"
airflow-version: "2.3"
- python-version: "3.11"
Expand All @@ -56,8 +49,6 @@ jobs:
# Official support for Python 3.12 and the corresponding constraints.txt are available only for Apache Airflow >= 2.9.0.
# See: https://github.com/apache/airflow/tree/2.9.0?tab=readme-ov-file#requirements
# See: https://github.com/apache/airflow/tree/2.8.4?tab=readme-ov-file#requirements
- python-version: "3.12"
airflow-version: "2.2"
- python-version: "3.12"
airflow-version: "2.3"
- python-version: "3.12"
Expand Down Expand Up @@ -110,15 +101,8 @@ jobs:
fail-fast: false
matrix:
python-version: [ "3.8", "3.9", "3.10", "3.11", "3.12" ]
airflow-version: [ "2.2", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10" ]
airflow-version: [ "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10" ]
exclude:
# Apache Airflow versions prior to 2.3.0 have not been tested with Python 3.10
# See: https://airflow.apache.org/docs/apache-airflow/2.2.0/installation/prerequisites.html
- python-version: "3.10"
airflow-version: "2.2"
# Apache Airflow versions prior to 2.6.2 have not been tested with Python 3.11
- python-version: "3.11"
airflow-version: "2.2"
- python-version: "3.11"
airflow-version: "2.3"
- python-version: "3.11"
Expand All @@ -131,8 +115,6 @@ jobs:
# Official support for Python 3.12 and the corresponding constraints.txt are available only for Apache Airflow >= 2.9.0.
# See: https://github.com/apache/airflow/tree/2.9.0?tab=readme-ov-file#requirements
# See: https://github.com/apache/airflow/tree/2.8.4?tab=readme-ov-file#requirements
- python-version: "3.12"
airflow-version: "2.2"
- python-version: "3.12"
airflow-version: "2.3"
- python-version: "3.12"
Expand Down
13 changes: 6 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@ declaratively via configuration files.
The minimum requirements for **dag-factory** are:

- Python 3.8.0+
- [Apache Airflow®](https://airflow.apache.org) 2.0+
- [Apache Airflow®](https://airflow.apache.org) 2.3+

For a gentle introduction, please take a look at our [Quickstart Guide](https://astronomer.github.io/dag-factory/latest/getting-started/quick-start-airflow-standalone/). For more examples, please see the
[examples](/examples) folder.

- [Quickstart](https://astronomer.github.io/dag-factory/latest/getting-started/quick-start-astro-cli/)
- [Benefits](#benefits)
- [Features](#features)
- [Dynamically Mapped Tasks](https://astronomer.github.io/dag-factory/latest/features/dynamic_tasks/)
- [Multiple Configuration Files](#multiple-configuration-files)
- [Callbacks](#callbacks)
- [Custom Operators](#custom-operators)
- [Dynamically Mapped Tasks](https://astronomer.github.io/dag-factory/latest/features/dynamic_tasks/)
- [Multiple Configuration Files](#multiple-configuration-files)
- [Callbacks](https://astronomer.github.io/dag-factory/dev/features/callbacks/)
- [Custom Operators](#custom-operators)
- [Notes](#notes)
- [HttpSensor (since 1.0.0)](#httpsensor-since-100)
- [HttpSensor (since 1.0.0)](#httpsensor-since-100)
- [Contributing](https://astronomer.github.io/dag-factory/latest/contributing/howto/)

## Benefits
Expand Down Expand Up @@ -65,7 +65,6 @@ If you want to split your DAG configuration into multiple files, you can do so b

![custom_operators.png](img/custom_operators.png)


## Notes

### HttpSensor (since 1.0.0)
Expand Down
60 changes: 18 additions & 42 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import inspect
import os
import re
import warnings
from copy import deepcopy
from datetime import datetime, timedelta
from functools import partial
Expand All @@ -29,6 +30,8 @@
except ImportError:
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator

from airflow.providers.http.sensors.http import HttpSensor

# http operator was renamed in providers-http 4.11.0
try:
from airflow.providers.http.operators.http import HttpOperator
Expand All @@ -43,11 +46,6 @@
# Fall back to dynamically importing the operator
HTTP_OPERATOR_CLASS = None

# http sensor was moved in 2.4
try:
from airflow.providers.http.sensors.http import HttpSensor
except ImportError:
from airflow.sensors.http_sensor import HttpSensor

# sql sensor was moved in 2.4
try:
Expand Down Expand Up @@ -95,25 +93,14 @@
from airflow.contrib.kubernetes.volume_mount import VolumeMount
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

from airflow.models import MappedOperator
from airflow.timetables.base import Timetable
from airflow.utils.task_group import TaskGroup
from kubernetes.client.models import V1Container, V1Pod

from dagfactory import parsers, utils
from dagfactory.exceptions import DagFactoryConfigException, DagFactoryException

# TimeTable is introduced in Airflow 2.2.0
if version.parse(AIRFLOW_VERSION) >= version.parse("2.2.0"):
from airflow.timetables.base import Timetable
else:
Timetable = None
# pylint: disable=ungrouped-imports,invalid-name

if version.parse(AIRFLOW_VERSION) >= version.parse("2.3.0"):
from airflow.models import MappedOperator
else:
MappedOperator = None


if version.parse(AIRFLOW_VERSION) >= version.parse("2.4.0"):
from airflow.datasets import Dataset
else:
Expand Down Expand Up @@ -459,10 +446,7 @@ def make_task(operator: str, task_params: Dict[str, Any]) -> BaseOperator:
DagBuilder.adjust_general_task_params(task_params)

expand_kwargs: Dict[str, Union[Dict[str, Any], Any]] = {}
# expand available only in airflow >= 2.3.0
if (
utils.check_dict_key(task_params, "expand") or utils.check_dict_key(task_params, "partial")
) and version.parse(AIRFLOW_VERSION) >= version.parse("2.3.0"):
if utils.check_dict_key(task_params, "expand") or utils.check_dict_key(task_params, "partial"):
# Getting expand and partial kwargs from task_params
(task_params, expand_kwargs, partial_kwargs) = utils.get_expand_partial_kwargs(task_params)

Expand Down Expand Up @@ -795,19 +779,20 @@ def build(self) -> Dict[str, Union[str, DAG]]:

dag_kwargs["description"] = dag_params.get("description", None)

if version.parse(AIRFLOW_VERSION) >= version.parse("2.2.0"):
if "concurrency" in dag_params:
warnings.warn(
"`concurrency` param is deprecated. Please use max_active_tasks.", category=DeprecationWarning
)
dag_kwargs["max_active_tasks"] = dag_params["concurrency"]
else:
dag_kwargs["max_active_tasks"] = dag_params.get(
"max_active_tasks", configuration.conf.getint("core", "max_active_tasks_per_dag")
)

if dag_params.get("timetable"):
timetable_args = dag_params.get("timetable")
dag_kwargs["timetable"] = DagBuilder.make_timetable(
timetable_args.get("callable"), timetable_args.get("params")
)
else:
dag_kwargs["concurrency"] = dag_params.get(
"concurrency", configuration.conf.getint("core", "dag_concurrency")
if dag_params.get("timetable"):
timetable_args = dag_params.get("timetable")
dag_kwargs["timetable"] = DagBuilder.make_timetable(
timetable_args.get("callable"), timetable_args.get("params")
)

dag_kwargs["catchup"] = dag_params.get(
Expand All @@ -830,9 +815,7 @@ def build(self) -> Dict[str, Union[str, DAG]]:

dag_kwargs["template_searchpath"] = dag_params.get("template_searchpath", None)

# Jinja NativeEnvironment support has been added in Airflow 2.1.0
if version.parse(AIRFLOW_VERSION) >= version.parse("2.1.0"):
dag_kwargs["render_template_as_native_obj"] = dag_params.get("render_template_as_native_obj", False)
dag_kwargs["render_template_as_native_obj"] = dag_params.get("render_template_as_native_obj", False)

dag_kwargs["sla_miss_callback"] = dag_params.get("sla_miss_callback", None)

Expand Down Expand Up @@ -904,12 +887,8 @@ def build(self) -> Dict[str, Union[str, DAG]]:
if "operator" in task_conf:
operator: str = task_conf["operator"]

# Dynamic task mapping available only in Airflow >= 2.3.0
if task_conf.get("expand"):
if version.parse(AIRFLOW_VERSION) < version.parse("2.3.0"):
raise DagFactoryConfigException("Dynamic task mapping available only in Airflow >= 2.3.0")
else:
task_conf = self.replace_expand_values(task_conf, tasks_dict)
task_conf = self.replace_expand_values(task_conf, tasks_dict)

task: Union[BaseOperator, MappedOperator] = DagBuilder.make_task(operator=operator, task_params=params)
tasks_dict[task.task_id]: BaseOperator = task
Expand Down Expand Up @@ -1142,9 +1121,6 @@ def set_callback(parameters: Union[dict, str], callback_type: str, has_name_and_
:param has_name_and_file:
:returns: Callable
"""
# Check Airflow version, raise an exception otherwise
if version.parse(AIRFLOW_VERSION) < version.parse("2.0.0"):
raise DagFactoryException("Cannot parse callbacks with an Airflow version less than 2.0.0.")

# There is scenario where a callback is passed in via a file and a name. For the most part, this will be a
# Python callable that is treated similarly to a Python callable that the PythonOperator may leverage. That
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ classifiers = [
"Programming Language :: Python :: 3.12",
]
dependencies = [
"apache-airflow>=2.0",
"apache-airflow>=2.3",
"apache-airflow-providers-http>=2.0.0",
"apache-airflow-providers-cncf-kubernetes",
"pyyaml",
Expand Down Expand Up @@ -60,7 +60,7 @@ pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow}

[[tool.hatch.envs.tests.matrix]]
python = ["3.8", "3.9", "3.10", "3.11", "3.12"]
airflow = ["2.2", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"]
airflow = ["2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"]


[tool.hatch.envs.tests.scripts]
Expand Down
4 changes: 2 additions & 2 deletions scripts/test/integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ ls $AIRFLOW_HOME

airflow db check

# Necessary for overcoming the following issue with Airflow 2.2:
# ERROR: Cannot install apache-airflow==2.2.0, apache-airflow==2.2.1, apache-airflow==2.2.2, apache-airflow==2.2.3, apache-airflow==2.2.4, apache-airflow==2.2.5, httpx>=0.25.0 and tabulate>=0.9.0 because these package versions have conflicting dependencies.
# Necessary for overcoming the following issue with Airflow 2.3 and 2.4:
# ImportError: Pandas requires version '0.9.0' or newer of 'tabulate' (version '0.8.9' currently installed)
pip install "tabulate>=0.9.0"

pytest -vv \
Expand Down