diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index fc2e3e9b..a825efed 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -6,7 +6,7 @@ jobs: build: strategy: matrix: - python: ['3.7', '3.8', '3.9', '3.10'] + python: ['3.8', '3.9', '3.10'] runs-on: 'ubuntu-latest' steps: - uses: actions/checkout@master diff --git a/CHANGELOG.md b/CHANGELOG.md index b9898b52..79d0db8b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Changed +- Removed support for Python 3.7 ## [0.19.0] - 2023-07-19 ### Added diff --git a/README.md b/README.md index c23d7894..d1ca1796 100644 --- a/README.md +++ b/README.md @@ -6,39 +6,47 @@ [![Code Style](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/ambv/black) [![Downloads](https://pepy.tech/badge/dag-factory)](https://pepy.tech/project/dag-factory) -*dag-factory* is a library for dynamically generating [Apache Airflow](https://github.com/apache/incubator-airflow) DAGs from YAML configuration files. -- [Installation](#installation) -- [Usage](#usage) +Welcome to *dag-factory*! *dag-factory* is a library for [Apache Airflow](https://github.com/apache/incubator-airflow) to construct DAGs declaratively via configuration files. + +The minimum requirements for **dag-factory** are: +- Python 3.8.0+ +- Apache Airflow 2.0+ + +For a gentle introduction, please take a look at our [Quickstart Guide](#quickstart). For more examples, please see the [examples](/examples) folder. + +- [Quickstart](#quickstart) +- [Features](#features) + - [Multiple Configuration Files](#multiple-configuration-files) + - [Dynamically Mapped Tasks](#dynamically-mapped-tasks) + - [Datasets](#datasets) + - [Custom Operators](#custom-operators) - [Benefits](#benefits) +- [Notes](#notes) + - [HttpSensor (since 0.10.0)](#httpsensor-since-0100) - [Contributing](#contributing) -## Installation +## Quickstart -To install *dag-factory* run `pip install dag-factory`. It requires Python 3.6.0+ and Apache Airflow 2.0+. +The following example demonstrates how to create a simple DAG using *dag-factory*. We will be generating a DAG with three tasks, where `task_2` and `task_3` depend on `task_1`. +These tasks will be leveraging the `BashOperator` to execute simple bash commands. -## Usage +![screenshot](/img/quickstart_dag.png) -After installing *dag-factory* in your Airflow environment, there are two steps to creating DAGs. First, we need to create a YAML configuration file. For example: +1. To install *dag-factory*, run the following pip command in your Airflow environment: +```bash +pip install dag-factory +``` +2. Create a YAML configuration file called `config_file.yml` and save it within your airflow dags folder: ```yaml example_dag1: default_args: owner: 'example_owner' - start_date: 2018-01-01 # or '2 days' - end_date: 2018-01-05 retries: 1 - retry_delay_sec: 300 + start_date: '2024-01-01' schedule_interval: '0 3 * * *' - concurrency: 1 - max_active_runs: 1 - dagrun_timeout_sec: 60 - default_view: 'tree' # or 'graph', 'duration', 'gantt', 'landing_times' - orientation: 'LR' # or 'TB', 'RL', 'BT' + catchup: False description: 'this is an example dag!' - on_success_callback_name: print_hello - on_success_callback_file: /usr/local/airflow/dags/print_hello.py - on_failure_callback_name: print_hello - on_failure_callback_file: /usr/local/airflow/dags/print_hello.py tasks: task_1: operator: airflow.operators.bash_operator.BashOperator @@ -52,23 +60,33 @@ example_dag1: bash_command: 'echo 3' dependencies: [task_1] ``` +We are setting the execution order of the tasks by specifying the `dependencies` key. -Then in the DAGs folder in your Airflow environment you need to create a python file like this: +3. In the same folder, create a python file called `generate_dags.py`. This file is responsible for generating the DAGs from the configuration file and is a one-time setup. +You won't need to modify this file unless you want to add more configuration files or change the configuration file name. ```python -from airflow import DAG +from airflow import DAG ## by default, this is needed for the dagbag to parse this file import dagfactory +from pathlib import Path -dag_factory = dagfactory.DagFactory("/path/to/dags/config_file.yml") +config_file = Path.cwd() / "dags/config_file.yml" +dag_factory = dagfactory.DagFactory(config_file) dag_factory.clean_dags(globals()) dag_factory.generate_dags(globals()) ``` -And this DAG will be generated and ready to run in Airflow! +After a few moments, the DAG will be generated and ready to run in Airflow. Unpause the DAG in the Airflow UI and watch the tasks execute! -If you have several configuration files you can import them like this: +![screenshot](/img/quickstart_gantt.png) +Please look at the [examples](/examples) folder for more examples. + +## Features + +### Multiple Configuration Files +If you want to split your DAG configuration into multiple files, you can do so by leveraging a suffix in the configuration file name. ```python # 'airflow' word is required for the dagbag to parse this file from dagfactory import load_yaml_dags @@ -76,8 +94,82 @@ from dagfactory import load_yaml_dags load_yaml_dags(globals_dict=globals(), suffix=['dag.yaml']) ``` -![screenshot](/img/example_dag.png) +### Dynamically Mapped Tasks +If you want to create a dynamic number of tasks, you can use the `mapped_tasks` key in the configuration file. The `mapped_tasks` key is a list of dictionaries, where each dictionary represents a task. + +```yaml +... + tasks: + request: + operator: airflow.operators.python.PythonOperator + python_callable_name: example_task_mapping + python_callable_file: /usr/local/airflow/dags/expand_tasks.py # this file should contain the python callable + process: + operator: airflow.operators.python_operator.PythonOperator + python_callable_name: expand_task + python_callable_file: /usr/local/airflow/dags/expand_tasks.py + partial: + op_kwargs: + test_id: "test" + expand: + op_args: + request.output + dependencies: [request] +``` +![mapped_tasks_example.png](img/mapped_tasks_example.png) + +### Datasets +**dag-factory** supports scheduling DAGs via [Apache Airflow Datasets](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html). + +To leverage, you need to specify the `Dataset` in the `outlets` key in the configuration file. The `outlets` key is a list of strings that represent the dataset locations. +In the `schedule` key of the consumer dag, you can set the `Dataset` you would like to schedule against. The key is a list of strings that represent the dataset locations. +The consumer dag will run when all the datasets are available. +```yaml +producer_dag: + default_args: + owner: "example_owner" + retries: 1 + start_date: '2024-01-01' + description: "Example DAG producer simple datasets" + schedule_interval: "0 5 * * *" + tasks: + task_1: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 1" + outlets: [ 's3://bucket_example/raw/dataset1.json' ] + task_2:![custom_operators.png](..%2F..%2FDesktop%2Fcustom_operators.png) + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 2" + dependencies: [ task_1 ] + outlets: [ 's3://bucket_example/raw/dataset2.json' ] +consumer_dag: + default_args: + owner: "example_owner" + retries: 1 + start_date: '2024-01-01' + description: "Example DAG consumer simple datasets" + schedule: [ 's3://bucket_example/raw/dataset1.json', 's3://bucket_example/raw/dataset2.json' ] + tasks: + task_1: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 'consumer datasets'" +``` +![datasets_example.png](img/datasets_example.png) + +### Custom Operators +**dag-factory** supports using custom operators. To leverage, set the path to the custom operator within the `operator` key in the configuration file. You can add any additional parameters that the custom operator requires. + +```yaml +... + tasks: + begin: + operator: airflow.operators.dummy_operator.DummyOperator + make_bread_1: + operator: customized.operators.breakfast_operators.MakeBreadOperator + bread_type: 'Sourdough' +``` +![custom_operators.png](img/custom_operators.png) ## Notes ### HttpSensor (since 0.10.0) @@ -88,23 +180,23 @@ The following example shows `response_check` logic in a python file: ```yaml task_2: - operator: airflow.sensors.http_sensor.HttpSensor - http_conn_id: 'test-http' - method: 'GET' - response_check_name: check_sensor - response_check_file: /path/to/example1/http_conn.py - dependencies: [task_1] + operator: airflow.sensors.http_sensor.HttpSensor + http_conn_id: 'test-http' + method: 'GET' + response_check_name: check_sensor + response_check_file: /path/to/example1/http_conn.py + dependencies: [task_1] ``` The `response_check` logic can also be provided as a lambda: ```yaml task_2: - operator: airflow.sensors.http_sensor.HttpSensor - http_conn_id: 'test-http' - method: 'GET' - response_check_lambda: 'lambda response: "ok" in reponse.text' - dependencies: [task_1] + operator: airflow.sensors.http_sensor.HttpSensor + http_conn_id: 'test-http' + method: 'GET' + response_check_lambda: 'lambda response: "ok" in reponse.text' + dependencies: [task_1] ``` ## Benefits diff --git a/dagfactory/__init__.py b/dagfactory/__init__.py index 1c51dcc5..4803e708 100644 --- a/dagfactory/__init__.py +++ b/dagfactory/__init__.py @@ -1,2 +1,3 @@ """Modules and methods to export for easier access""" + from .dagfactory import DagFactory, load_yaml_dags diff --git a/dagfactory/__version__.py b/dagfactory/__version__.py index bc907cce..9af95997 100644 --- a/dagfactory/__version__.py +++ b/dagfactory/__version__.py @@ -1,2 +1,3 @@ """Module contains the version of dag-factory""" + __version__ = "0.19.0" diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index be45ef3a..689b3e8d 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -1,4 +1,5 @@ """Module contains code for generating tasks and constructing a DAG""" + # pylint: disable=ungrouped-imports import os import re @@ -197,34 +198,26 @@ def get_dag_params(self) -> Dict[str, Any]: if utils.check_dict_key(dag_params["default_args"], "sla_miss_callback"): if isinstance(dag_params["default_args"]["sla_miss_callback"], str): - dag_params["default_args"][ - "sla_miss_callback" - ]: Callable = import_string( - dag_params["default_args"]["sla_miss_callback"] + dag_params["default_args"]["sla_miss_callback"]: Callable = ( + import_string(dag_params["default_args"]["sla_miss_callback"]) ) if utils.check_dict_key(dag_params["default_args"], "on_success_callback"): if isinstance(dag_params["default_args"]["on_success_callback"], str): - dag_params["default_args"][ - "on_success_callback" - ]: Callable = import_string( - dag_params["default_args"]["on_success_callback"] + dag_params["default_args"]["on_success_callback"]: Callable = ( + import_string(dag_params["default_args"]["on_success_callback"]) ) if utils.check_dict_key(dag_params["default_args"], "on_failure_callback"): if isinstance(dag_params["default_args"]["on_failure_callback"], str): - dag_params["default_args"][ - "on_failure_callback" - ]: Callable = import_string( - dag_params["default_args"]["on_failure_callback"] + dag_params["default_args"]["on_failure_callback"]: Callable = ( + import_string(dag_params["default_args"]["on_failure_callback"]) ) if utils.check_dict_key(dag_params["default_args"], "on_retry_callback"): if isinstance(dag_params["default_args"]["on_retry_callback"], str): - dag_params["default_args"][ - "on_retry_callback" - ]: Callable = import_string( - dag_params["default_args"]["on_retry_callback"] + dag_params["default_args"]["on_retry_callback"]: Callable = ( + import_string(dag_params["default_args"]["on_retry_callback"]) ) if utils.check_dict_key(dag_params, "sla_miss_callback"): @@ -351,11 +344,11 @@ def make_task(operator: str, task_params: Dict[str, Any]) -> BaseOperator: " python_callable_file: !!python/name:my_module.my_func" ) if not task_params.get("python_callable"): - task_params[ - "python_callable" - ]: Callable = utils.get_python_callable( - task_params["python_callable_name"], - task_params["python_callable_file"], + task_params["python_callable"]: Callable = ( + utils.get_python_callable( + task_params["python_callable_name"], + task_params["python_callable_file"], + ) ) # remove dag-factory specific parameters # Airflow 2.0 doesn't allow these to be passed to operator @@ -419,10 +412,10 @@ def make_task(operator: str, task_params: Dict[str, Any]) -> BaseOperator: del task_params["response_check_name"] del task_params["response_check_file"] else: - task_params[ - "response_check" - ]: Callable = utils.get_python_callable_lambda( - task_params["response_check_lambda"] + task_params["response_check"]: Callable = ( + utils.get_python_callable_lambda( + task_params["response_check_lambda"] + ) ) # remove dag-factory specific parameters # Airflow 2.0 doesn't allow these to be passed to operator @@ -669,18 +662,18 @@ def set_dependencies( group_id = conf["task_group"].group_id name = f"{group_id}.{name}" if conf.get("dependencies"): - source: Union[ - BaseOperator, "TaskGroup" - ] = tasks_and_task_groups_instances[name] + source: Union[BaseOperator, "TaskGroup"] = ( + tasks_and_task_groups_instances[name] + ) for dep in conf["dependencies"]: if tasks_and_task_groups_config[dep].get("task_group"): group_id = tasks_and_task_groups_config[dep][ "task_group" ].group_id dep = f"{group_id}.{dep}" - dep: Union[ - BaseOperator, "TaskGroup" - ] = tasks_and_task_groups_instances[dep] + dep: Union[BaseOperator, "TaskGroup"] = ( + tasks_and_task_groups_instances[dep] + ) source.set_upstream(dep) @staticmethod diff --git a/dagfactory/dagfactory.py b/dagfactory/dagfactory.py index 0ae5e9de..9833500b 100644 --- a/dagfactory/dagfactory.py +++ b/dagfactory/dagfactory.py @@ -1,4 +1,5 @@ """Module contains code for loading a DagFactory config and generating DAGs""" + import logging import os from itertools import chain diff --git a/dagfactory/utils.py b/dagfactory/utils.py index d42b3d9f..29695fd7 100644 --- a/dagfactory/utils.py +++ b/dagfactory/utils.py @@ -1,4 +1,5 @@ """Module contains various utilities used by dag-factory""" + import ast import importlib.util import os @@ -212,9 +213,7 @@ def check_template_searchpath(template_searchpath: Union[str, List[str]]) -> boo return False -def get_expand_partial_kwargs( - task_params: Dict[str, Any] -) -> Tuple[ +def get_expand_partial_kwargs(task_params: Dict[str, Any]) -> Tuple[ Dict[str, Any], Dict[str, Union[Dict[str, Any], Any]], Dict[str, Union[Dict[str, Any], Any]], diff --git a/examples/expand_tasks.py b/examples/expand_tasks.py index a1be8da4..8e4aa00a 100644 --- a/examples/expand_tasks.py +++ b/examples/expand_tasks.py @@ -5,4 +5,4 @@ def example_task_mapping(): def expand_task(x, test_id): print(test_id) print(x) - return [x] \ No newline at end of file + return [x] diff --git a/img/custom_operators.png b/img/custom_operators.png new file mode 100644 index 00000000..17e4fafd Binary files /dev/null and b/img/custom_operators.png differ diff --git a/img/datasets_example.png b/img/datasets_example.png new file mode 100644 index 00000000..f0485f09 Binary files /dev/null and b/img/datasets_example.png differ diff --git a/img/example_dag.png b/img/example_dag.png deleted file mode 100644 index fea4c09a..00000000 Binary files a/img/example_dag.png and /dev/null differ diff --git a/img/mapped_tasks_example.png b/img/mapped_tasks_example.png new file mode 100644 index 00000000..e1a497d2 Binary files /dev/null and b/img/mapped_tasks_example.png differ diff --git a/img/quickstart_dag.png b/img/quickstart_dag.png new file mode 100644 index 00000000..bb106c24 Binary files /dev/null and b/img/quickstart_dag.png differ diff --git a/img/quickstart_gantt.png b/img/quickstart_gantt.png new file mode 100644 index 00000000..c715148f Binary files /dev/null and b/img/quickstart_gantt.png differ diff --git a/setup.py b/setup.py index 3e767e5b..0d69d3c9 100644 --- a/setup.py +++ b/setup.py @@ -15,10 +15,10 @@ NAME = "dag-factory" PKG_NAME = "dagfactory" DESCRIPTION = "Dynamically build Airflow DAGs from YAML files" -URL = "https://github.com/ajbosco/dag-factory" -EMAIL = "adam@boscarino.me" +URL = "https://github.com/astronomer/dag-factory" +EMAIL = "humans@astronomer.io" AUTHOR = "Adam Boscarino" -REQUIRES_PYTHON = ">=3.7.0" +REQUIRES_PYTHON = ">=3.8.0" VERSION = None here = os.path.abspath(os.path.dirname(__file__)) diff --git a/tests/test_dagbuilder.py b/tests/test_dagbuilder.py index 1426c883..119f8060 100644 --- a/tests/test_dagbuilder.py +++ b/tests/test_dagbuilder.py @@ -142,20 +142,14 @@ "request": { "operator": "airflow.operators.python_operator.PythonOperator", "python_callable_name": "example_task_mapping", - "python_callable_file": os.path.realpath(__file__) + "python_callable_file": os.path.realpath(__file__), }, "process_1": { "operator": "airflow.operators.python_operator.PythonOperator", "python_callable_name": "expand_task", "python_callable_file": os.path.realpath(__file__), - "partial": { - "op_kwargs": { - "test_id": "test" - } - }, - "expand": { - "op_args": "request.output" - } + "partial": {"op_kwargs": {"test_id": "test"}}, + "expand": {"op_args": "request.output"}, }, }, } @@ -617,10 +611,7 @@ def test_make_timetable(): if version.parse(AIRFLOW_VERSION) >= version.parse("2.0.0"): td = dagbuilder.DagBuilder("test_dag", DAG_CONFIG, DEFAULT_CONFIG) timetable = "airflow.timetables.interval.CronDataIntervalTimetable" - timetable_params = { - "cron": "0 8,16 * * 1-5", - "timezone": "UTC" - } + timetable_params = {"cron": "0 8,16 * * 1-5", "timezone": "UTC"} actual = td.make_timetable(timetable, timetable_params) assert actual.periodic assert actual.can_run @@ -633,22 +624,31 @@ def test_make_dag_with_callback(): def test_get_dag_params_with_template_searchpath(): from dagfactory import utils - td = dagbuilder.DagBuilder("test_dag", {"template_searchpath": ["./sql"]}, DEFAULT_CONFIG) + + td = dagbuilder.DagBuilder( + "test_dag", {"template_searchpath": ["./sql"]}, DEFAULT_CONFIG + ) error_message = "template_searchpath must be absolute paths" with pytest.raises(Exception, match=error_message): td.get_dag_params() - td = dagbuilder.DagBuilder("test_dag", {"template_searchpath": ["/sql"]}, DEFAULT_CONFIG) + td = dagbuilder.DagBuilder( + "test_dag", {"template_searchpath": ["/sql"]}, DEFAULT_CONFIG + ) error_message = "template_searchpath must be existing paths" with pytest.raises(Exception, match=error_message): td.get_dag_params() - - td = dagbuilder.DagBuilder("test_dag", {"template_searchpath": "./sql"}, DEFAULT_CONFIG) + + td = dagbuilder.DagBuilder( + "test_dag", {"template_searchpath": "./sql"}, DEFAULT_CONFIG + ) error_message = "template_searchpath must be absolute paths" with pytest.raises(Exception, match=error_message): td.get_dag_params() - td = dagbuilder.DagBuilder("test_dag", {"template_searchpath": "/sql"}, DEFAULT_CONFIG) + td = dagbuilder.DagBuilder( + "test_dag", {"template_searchpath": "/sql"}, DEFAULT_CONFIG + ) error_message = "template_searchpath must be existing paths" with pytest.raises(Exception, match=error_message): td.get_dag_params() @@ -659,31 +659,40 @@ def test_get_dag_params_with_template_searchpath(): def test_get_dag_params_with_render_template_as_native_obj(): - td = dagbuilder.DagBuilder("test_dag", {"render_template_as_native_obj": "true"}, DEFAULT_CONFIG) + td = dagbuilder.DagBuilder( + "test_dag", {"render_template_as_native_obj": "true"}, DEFAULT_CONFIG + ) error_message = "render_template_as_native_obj should be bool type!" with pytest.raises(Exception, match=error_message): td.get_dag_params() false = lambda x: print(x) - td = dagbuilder.DagBuilder("test_dag", {"render_template_as_native_obj": false}, DEFAULT_CONFIG) + td = dagbuilder.DagBuilder( + "test_dag", {"render_template_as_native_obj": false}, DEFAULT_CONFIG + ) error_message = "render_template_as_native_obj should be bool type!" with pytest.raises(Exception, match=error_message): td.get_dag_params() def test_make_task_with_duplicated_partial_kwargs(): - td = dagbuilder.DagBuilder("test_dag", DAG_CONFIG_DYNAMIC_TASK_MAPPING, DEFAULT_CONFIG) + td = dagbuilder.DagBuilder( + "test_dag", DAG_CONFIG_DYNAMIC_TASK_MAPPING, DEFAULT_CONFIG + ) operator = "airflow.operators.bash_operator.BashOperator" - task_params = {"task_id": "task_bash", - "bash_command": "echo 2", - "partial": {"bash_command": "echo 4"} - } + task_params = { + "task_id": "task_bash", + "bash_command": "echo 2", + "partial": {"bash_command": "echo 4"}, + } with pytest.raises(Exception): td.make_task(operator, task_params) def test_dynamic_task_mapping(): - td = dagbuilder.DagBuilder("test_dag", DAG_CONFIG_DYNAMIC_TASK_MAPPING, DEFAULT_CONFIG) + td = dagbuilder.DagBuilder( + "test_dag", DAG_CONFIG_DYNAMIC_TASK_MAPPING, DEFAULT_CONFIG + ) if version.parse(AIRFLOW_VERSION) < version.parse("2.3.0"): error_message = "Dynamic task mapping available only in Airflow >= 2.3.0" with pytest.raises(Exception, match=error_message): @@ -694,14 +703,8 @@ def test_dynamic_task_mapping(): "task_id": "process", "python_callable_name": "expand_task", "python_callable_file": os.path.realpath(__file__), - "partial": { - "op_kwargs": { - "test_id": "test" - } - }, - "expand": { - "op_args": "request.output" - } + "partial": {"op_kwargs": {"test_id": "test"}}, + "expand": {"op_args": "request.output"}, } actual = td.make_task(operator, task_params) assert isinstance(actual, MappedOperator) @@ -709,16 +712,27 @@ def test_dynamic_task_mapping(): @patch("dagfactory.dagbuilder.PythonOperator", new=MockPythonOperator) def test_replace_expand_string_with_xcom(): - td = dagbuilder.DagBuilder("test_dag", DAG_CONFIG_DYNAMIC_TASK_MAPPING, DEFAULT_CONFIG) + td = dagbuilder.DagBuilder( + "test_dag", DAG_CONFIG_DYNAMIC_TASK_MAPPING, DEFAULT_CONFIG + ) if version.parse(AIRFLOW_VERSION) < version.parse("2.3.0"): with pytest.raises(Exception): td.build() else: from airflow.models.xcom_arg import XComArg + task_conf_output = {"expand": {"key_1": "task_1.output"}} task_conf_xcomarg = {"expand": {"key_1": "XcomArg(task_1)"}} tasks_dict = {"task_1": MockPythonOperator()} - updated_task_conf_output = dagbuilder.DagBuilder.replace_expand_values(task_conf_output, tasks_dict) - updated_task_conf_xcomarg = dagbuilder.DagBuilder.replace_expand_values(task_conf_xcomarg, tasks_dict) - assert updated_task_conf_output["expand"]["key_1"] == XComArg(tasks_dict["task_1"]) - assert updated_task_conf_xcomarg["expand"]["key_1"] == XComArg(tasks_dict["task_1"]) + updated_task_conf_output = dagbuilder.DagBuilder.replace_expand_values( + task_conf_output, tasks_dict + ) + updated_task_conf_xcomarg = dagbuilder.DagBuilder.replace_expand_values( + task_conf_xcomarg, tasks_dict + ) + assert updated_task_conf_output["expand"]["key_1"] == XComArg( + tasks_dict["task_1"] + ) + assert updated_task_conf_xcomarg["expand"]["key_1"] == XComArg( + tasks_dict["task_1"] + ) diff --git a/tests/test_dagfactory.py b/tests/test_dagfactory.py index 542e4605..2a715e42 100644 --- a/tests/test_dagfactory.py +++ b/tests/test_dagfactory.py @@ -185,7 +185,7 @@ def test_load_config_valid(): }, }, "example_dag4": { - "vars": {'arg1': 'hello', 'arg2': 'hello world'}, + "vars": {"arg1": "hello", "arg2": "hello world"}, "tasks": { "task_1": { "operator": "airflow.operators.bash_operator.BashOperator", @@ -262,7 +262,7 @@ def test_get_dag_configs(): }, }, "example_dag4": { - "vars": {'arg1': 'hello', 'arg2': 'hello world'}, + "vars": {"arg1": "hello", "arg2": "hello world"}, "tasks": { "task_1": { "operator": "airflow.operators.bash_operator.BashOperator", @@ -434,7 +434,7 @@ def test_set_callback_after_loading_config(): def test_load_yaml_dags_fail(): with pytest.raises(Exception): load_yaml_dags( - globals_dict= globals(), + globals_dict=globals(), dags_folder="tests/fixtures", suffix=["invalid_yaml.yml"], ) @@ -442,7 +442,7 @@ def test_load_yaml_dags_fail(): def test_load_yaml_dags_succeed(): load_yaml_dags( - globals_dict= globals(), + globals_dict=globals(), dags_folder="tests/fixtures", suffix=["dag_factory_variables_as_arguments.yml"], ) diff --git a/tests/test_utils.py b/tests/test_utils.py index dc3e079a..2f2792b3 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -211,13 +211,15 @@ def test_get_expand_partial_kwargs_with_expand_and_partial(): task_params = { "task_id": "my_task", "expand": {"key_1": "value_1"}, - "partial": {"key_2": {"nested_key_1": "nested_value_1"}} + "partial": {"key_2": {"nested_key_1": "nested_value_1"}}, } expected_expand_kwargs = {"key_1": "value_1"} expected_partial_kwargs = {"key_2": {"nested_key_1": "nested_value_1"}} expected_task_params = {"task_id": "my_task"} - result_task_params, result_expand_kwargs, result_partial_kwargs = utils.get_expand_partial_kwargs(task_params) + result_task_params, result_expand_kwargs, result_partial_kwargs = ( + utils.get_expand_partial_kwargs(task_params) + ) assert result_expand_kwargs == expected_expand_kwargs assert result_partial_kwargs == expected_partial_kwargs assert result_task_params == expected_task_params @@ -226,12 +228,12 @@ def test_get_expand_partial_kwargs_with_expand_and_partial(): def test_get_expand_partial_kwargs_without_partial(): task_params = { "task_id": "task2", - "expand": {"param1": "value1", "param2": "value2"} + "expand": {"param1": "value1", "param2": "value2"}, } expected_result = ( {"task_id": "task2"}, {"param1": "value1", "param2": "value2"}, - {} + {}, ) assert utils.get_expand_partial_kwargs(task_params) == expected_result @@ -249,19 +251,23 @@ def test_is_partial_duplicated(): except Exception as e: assert str(e) == "Duplicated partial kwarg! It's already in task_params." + def test_open_and_filter_yaml_config_datasets(): - datasets_names = ['dataset_custom_1', 'dataset_custom_2'] - file_path = 'examples/datasets/example_config_datasets.yml' + datasets_names = ["dataset_custom_1", "dataset_custom_2"] + file_path = "examples/datasets/example_config_datasets.yml" actual = utils.get_datasets_uri_yaml_file(file_path, datasets_names) - expected = ['s3://bucket-cjmm/raw/dataset_custom_1', 's3://bucket-cjmm/raw/dataset_custom_2'] - + expected = [ + "s3://bucket-cjmm/raw/dataset_custom_1", + "s3://bucket-cjmm/raw/dataset_custom_2", + ] + assert actual == expected + def test_open_and_filter_yaml_config_datasets_file_notfound(): - datasets_names = ['dataset_custom_1', 'dataset_custom_2'] - file_path = 'examples/datasets/not_found_example_config_datasets.yml' + datasets_names = ["dataset_custom_1", "dataset_custom_2"] + file_path = "examples/datasets/not_found_example_config_datasets.yml" with pytest.raises(Exception): utils.get_datasets_uri_yaml_file(file_path, datasets_names) - diff --git a/tox.ini b/tox.ini index e5f1870f..375ac351 100644 --- a/tox.ini +++ b/tox.ini @@ -1,46 +1,15 @@ [tox] envlist = - py37-airflow{1108,2} py38-airflow{1108,2} py39-airflow{1108,2} py310-airflow{1108,2} [gh-actions] python = - 3.7: py37-airflow{1108,2} 3.8: py38-airflow{1108,2} 3.9: py39-airflow{1108,2} 3.10: py310-airflow{1108,2} -[testenv:py37-airflow1108] -deps = - pytest - pytest-cov - apache-airflow[kubernetes] >=1.10.8, <2.0.0 - SQLAlchemy==1.3.23 - Flask-SQLAlchemy==2.4.4 - wtforms<=2.3.3 - markupsafe>=1.1.1,<2.1.0 -setenv = - AIRFLOW__CORE__SQL_ALCHEMY_CONN = sqlite:///airflow1108.db -commands = - airflow initdb - pytest --cov=dagfactory tests -p no:warnings --verbose --color=yes --cov-report=xml - -[testenv:py37-airflow2] -deps = - pytest - pytest-cov - apache-airflow[http,cncf.kubernetes] >=2.0.0 - SQLAlchemy==1.3.23 - Flask-SQLAlchemy==2.4.4 - markupsafe>=1.1.1,<2.1.0 -setenv = - AIRFLOW__CORE__SQL_ALCHEMY_CONN = sqlite:////tmp/airflow.db -commands = - airflow db init - pytest --cov=dagfactory tests -p no:warnings --verbose --color=yes --cov-report=xml - [testenv:py38-airflow1108] deps = pytest