diff --git a/.dockerignore b/.dockerignore index 191381ee..6b8710a7 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1 +1 @@ -.git \ No newline at end of file +.git diff --git a/CHANGELOG.md b/CHANGELOG.md index 79d0db8b..2749b1e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,7 +47,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [0.16.0] - 2022-11-13 ### Added -- Function to scan recursively for YAML DAGs +- Function to scan recursively for YAML DAGs ### Changed - Changed deprecated imports to support Airflow 2.4+ @@ -62,7 +62,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [0.13.0] - 2022-05-27 ### Added - Add support for custom `timetable` -- Add support for `python_callable_file` for `PythonSensor` +- Add support for `python_callable_file` for `PythonSensor` ## [0.12.0] - 2022-02-07 ### Added @@ -73,7 +73,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [0.11.1] - 2021-12-07 ### Added -- Add support for `access_control` in DAG params +- Add support for `access_control` in DAG params ### Fixed - Fixed tests for Airflow 1.10 by pinning `wtforms` @@ -105,7 +105,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [0.8.0] - 2021-06-09 ### Added - Support for `TaskGroups` if using Airflow 2.0 -- Separate DAG building and registering logic +- Separate DAG building and registering logic ## [0.7.2] - 2021-01-21 ### Fixed @@ -138,11 +138,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [0.4.5] - 2020-06-17 ### Fixed - Do not include DAG `tags` parameter in Airflow versions that do not support it. - + ## [0.4.4] - 2020-06-12 ### Fixed - Use correct default for `tags` parameter - + ## [0.4.3] - 2020-05-24 ### Added - `execution_timeout` parse at task level @@ -157,7 +157,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [0.4.1] - 2020-02-18 ### Fixed - Default `default_view` parameter to value from `airflow.cfg` - + ## [0.4.0] - 2020-02-12 ### Added - Support for additional DAG parameters @@ -175,7 +175,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `airflow` version ### Removed - `piplock` and `pipfile` files - + ## [0.2.1] - 2019-02-26 ### Added - Python 3+ type-annotations @@ -188,7 +188,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [0.1.1] - 2018-11-20 ### Removed -- Removed `logme` dependency +- Removed `logme` dependency ## [0.1.0] - 2018-11-20 - Initial release diff --git a/LICENSE b/LICENSE index f49a4e16..261eeb9e 100644 --- a/LICENSE +++ b/LICENSE @@ -198,4 +198,4 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and - limitations under the License. \ No newline at end of file + limitations under the License. diff --git a/MANIFEST.in b/MANIFEST.in index 785b4a10..5b3805c0 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,4 @@ include LICENSE include README.md include pyproject.toml -prune tests \ No newline at end of file +prune tests diff --git a/PRIVACY_NOTICE.md b/PRIVACY_NOTICE.md index 38d41509..f40eb5cf 100644 --- a/PRIVACY_NOTICE.md +++ b/PRIVACY_NOTICE.md @@ -1,3 +1,3 @@ # Privacy Notice -This project follows the [Privacy Policy of Astronomer](https://www.astronomer.io/privacy/) \ No newline at end of file +This project follows the [Privacy Policy of Astronomer](https://www.astronomer.io/privacy/) diff --git a/README.md b/README.md index 03a29234..6a210981 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ [![Downloads](https://pepy.tech/badge/dag-factory)](https://pepy.tech/project/dag-factory) -Welcome to *dag-factory*! *dag-factory* is a library for [Apache Airflow®](https://airflow.apache.org) to construct DAGs declaratively via configuration files. +Welcome to *dag-factory*! *dag-factory* is a library for [Apache Airflow®](https://airflow.apache.org) to construct DAGs declaratively via configuration files. The minimum requirements for **dag-factory** are: - Python 3.8.0+ @@ -25,10 +25,10 @@ For a gentle introduction, please take a look at our [Quickstart Guide](#quickst - [Notes](#notes) - [HttpSensor (since 0.10.0)](#httpsensor-since-0100) - [Contributing](#contributing) - + ## Quickstart -The following example demonstrates how to create a simple DAG using *dag-factory*. We will be generating a DAG with three tasks, where `task_2` and `task_3` depend on `task_1`. +The following example demonstrates how to create a simple DAG using *dag-factory*. We will be generating a DAG with three tasks, where `task_2` and `task_3` depend on `task_1`. These tasks will be leveraging the `BashOperator` to execute simple bash commands. ![screenshot](/img/quickstart_dag.png) @@ -63,7 +63,7 @@ example_dag1: ``` We are setting the execution order of the tasks by specifying the `dependencies` key. -3. In the same folder, create a python file called `generate_dags.py`. This file is responsible for generating the DAGs from the configuration file and is a one-time setup. +3. In the same folder, create a python file called `generate_dags.py`. This file is responsible for generating the DAGs from the configuration file and is a one-time setup. You won't need to modify this file unless you want to add more configuration files or change the configuration file name. ```python @@ -96,7 +96,7 @@ load_yaml_dags(globals_dict=globals(), suffix=['dag.yaml']) ``` ### Dynamically Mapped Tasks -If you want to create a dynamic number of tasks, you can use the `mapped_tasks` key in the configuration file. The `mapped_tasks` key is a list of dictionaries, where each dictionary represents a task. +If you want to create a dynamic number of tasks, you can use the `mapped_tasks` key in the configuration file. The `mapped_tasks` key is a list of dictionaries, where each dictionary represents a task. ```yaml ... @@ -123,7 +123,7 @@ If you want to create a dynamic number of tasks, you can use the `mapped_tasks` **dag-factory** supports scheduling DAGs via [Apache Airflow Datasets](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html). To leverage, you need to specify the `Dataset` in the `outlets` key in the configuration file. The `outlets` key is a list of strings that represent the dataset locations. -In the `schedule` key of the consumer dag, you can set the `Dataset` you would like to schedule against. The key is a list of strings that represent the dataset locations. +In the `schedule` key of the consumer dag, you can set the `Dataset` you would like to schedule against. The key is a list of strings that represent the dataset locations. The consumer dag will run when all the datasets are available. ```yaml @@ -157,7 +157,7 @@ consumer_dag: bash_command: "echo 'consumer datasets'" ``` ![datasets_example.png](img/datasets_example.png) - + ### Custom Operators **dag-factory** supports using custom operators. To leverage, set the path to the custom operator within the `operator` key in the configuration file. You can add any additional parameters that the custom operator requires. diff --git a/examples/datasets/example_config_datasets.yml b/examples/datasets/example_config_datasets.yml index b440723d..6f5914f5 100644 --- a/examples/datasets/example_config_datasets.yml +++ b/examples/datasets/example_config_datasets.yml @@ -4,4 +4,4 @@ datasets: - name: dataset_custom_2 uri: s3://bucket-cjmm/raw/dataset_custom_2 - name: dataset_custom_3 - uri: s3://bucket-cjmm/raw/dataset_custom_3 \ No newline at end of file + uri: s3://bucket-cjmm/raw/dataset_custom_3 diff --git a/examples/datasets/example_dag_datasets.py b/examples/datasets/example_dag_datasets.py index 4eb94b68..a228fdd9 100644 --- a/examples/datasets/example_dag_datasets.py +++ b/examples/datasets/example_dag_datasets.py @@ -3,10 +3,8 @@ # The following import is here so Airflow parses this file # from airflow import DAG - import dagfactory - DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) diff --git a/examples/datasets/example_dag_datasets.yml b/examples/datasets/example_dag_datasets.yml index d04ddeac..e9613ff5 100644 --- a/examples/datasets/example_dag_datasets.yml +++ b/examples/datasets/example_dag_datasets.yml @@ -23,7 +23,7 @@ example_simple_dataset_producer_dag: bash_command: "echo 2" dependencies: [task_1] outlets: ['s3://bucket_example/raw/dataset2.json'] - + example_simple_dataset_consumer_dag: description: "Example DAG consumer simple datasets" schedule: ['s3://bucket_example/raw/dataset1.json', 's3://bucket_example/raw/dataset2.json'] @@ -51,4 +51,4 @@ example_custom_config_dataset_consumer_dag: tasks: task_1: operator: airflow.operators.bash_operator.BashOperator - bash_command: "echo 'consumer datasets'" \ No newline at end of file + bash_command: "echo 'consumer datasets'" diff --git a/examples/example_customize_operator.py b/examples/example_customize_operator.py index 6e456638..1a7a07ff 100644 --- a/examples/example_customize_operator.py +++ b/examples/example_customize_operator.py @@ -3,10 +3,8 @@ # The following import is here so Airflow parses this file # from airflow import DAG - import dagfactory - DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) diff --git a/examples/example_customize_operator.yml b/examples/example_customize_operator.yml index 85e0b393..91fb8506 100644 --- a/examples/example_customize_operator.yml +++ b/examples/example_customize_operator.yml @@ -44,4 +44,3 @@ example_breadfast: - make_bread_1 - make_bread_2 - make_coffee_1 - diff --git a/examples/example_dag_factory.py b/examples/example_dag_factory.py index 84bca73c..4b2ff2d7 100644 --- a/examples/example_dag_factory.py +++ b/examples/example_dag_factory.py @@ -3,10 +3,8 @@ # The following import is here so Airflow parses this file # from airflow import DAG - import dagfactory - DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) diff --git a/examples/example_dynamic_task_mapping.py b/examples/example_dynamic_task_mapping.py index 4b17f9b1..abfb0881 100644 --- a/examples/example_dynamic_task_mapping.py +++ b/examples/example_dynamic_task_mapping.py @@ -3,10 +3,8 @@ # The following import is here so Airflow parses this file # from airflow import DAG - import dagfactory - DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) diff --git a/tests/test_dagfactory.py b/tests/test_dagfactory.py index e1574e55..4c513149 100644 --- a/tests/test_dagfactory.py +++ b/tests/test_dagfactory.py @@ -1,11 +1,11 @@ -import os import datetime import logging +import os import pytest +from airflow import __version__ as AIRFLOW_VERSION from airflow.models.variable import Variable from packaging import version -from airflow import __version__ as AIRFLOW_VERSION here = os.path.dirname(__file__) @@ -14,12 +14,8 @@ TEST_DAG_FACTORY = os.path.join(here, "fixtures/dag_factory.yml") INVALID_YAML = os.path.join(here, "fixtures/invalid_yaml.yml") INVALID_DAG_FACTORY = os.path.join(here, "fixtures/invalid_dag_factory.yml") -DAG_FACTORY_KUBERNETES_POD_OPERATOR = os.path.join( - here, "fixtures/dag_factory_kubernetes_pod_operator.yml" -) -DAG_FACTORY_VARIABLES_AS_ARGUMENTS = os.path.join( - here, "fixtures/dag_factory_variables_as_arguments.yml" -) +DAG_FACTORY_KUBERNETES_POD_OPERATOR = os.path.join(here, "fixtures/dag_factory_kubernetes_pod_operator.yml") +DAG_FACTORY_VARIABLES_AS_ARGUMENTS = os.path.join(here, "fixtures/dag_factory_variables_as_arguments.yml") DOC_MD_FIXTURE_FILE = os.path.join(here, "fixtures/mydocfile.md") DOC_MD_PYTHON_CALLABLE_FILE = os.path.join(here, "fixtures/doc_md_builder.py") @@ -361,10 +357,7 @@ def test_doc_md_callable(): td = dagfactory.DagFactory(TEST_DAG_FACTORY) td.generate_dags(globals()) expected_doc_md = globals()["example_dag3"].doc_md - assert ( - str(td.get_dag_configs()["example_dag3"]["doc_md_python_arguments"]) - == expected_doc_md - ) + assert str(td.get_dag_configs()["example_dag3"]["doc_md_python_arguments"]) == expected_doc_md def test_schedule_interval(): @@ -404,9 +397,7 @@ def test_dagfactory_dict(): def test_dagfactory_dict_and_yaml(): error_message = "Either `config_filepath` or `config` should be provided" with pytest.raises(AssertionError, match=error_message): - dagfactory.DagFactory( - config_filepath=TEST_DAG_FACTORY, config=DAG_FACTORY_CONFIG - ) + dagfactory.DagFactory(config_filepath=TEST_DAG_FACTORY, config=DAG_FACTORY_CONFIG) def test_get_dag_configs_dict(): @@ -439,7 +430,7 @@ def test_load_invalid_yaml_logs_error(caplog): dags_folder="tests/fixtures", suffix=["invalid_yaml.yml"], ) - assert caplog.messages == ['Failed to load dag from tests/fixtures/invalid_yaml.yml'] + assert caplog.messages == ["Failed to load dag from tests/fixtures/invalid_yaml.yml"] def test_load_yaml_dags_succeed(): diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 65262c71..09010771 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -1,11 +1,11 @@ from __future__ import annotations + from pathlib import Path import airflow from airflow.models.dagbag import DagBag from packaging.version import Version - EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "examples" AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore" AIRFLOW_VERSION = Version(airflow.__version__) diff --git a/tests/test_utils.py b/tests/test_utils.py index 2f2792b3..4c703333 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -6,7 +6,6 @@ from dagfactory import utils - NOW = datetime.datetime.today().replace(hour=0, minute=0, second=0, microsecond=0) CET = pendulum.timezone("Europe/Amsterdam") UTC = pendulum.timezone("UTC") @@ -128,9 +127,7 @@ def test_get_python_callable_valid(): python_callable_file = os.path.realpath(__file__) python_callable_name = "print_test" - python_callable = utils.get_python_callable( - python_callable_name, python_callable_file - ) + python_callable = utils.get_python_callable(python_callable_name, python_callable_file) assert callable(python_callable) @@ -217,9 +214,7 @@ def test_get_expand_partial_kwargs_with_expand_and_partial(): expected_partial_kwargs = {"key_2": {"nested_key_1": "nested_value_1"}} expected_task_params = {"task_id": "my_task"} - result_task_params, result_expand_kwargs, result_partial_kwargs = ( - utils.get_expand_partial_kwargs(task_params) - ) + result_task_params, result_expand_kwargs, result_partial_kwargs = utils.get_expand_partial_kwargs(task_params) assert result_expand_kwargs == expected_expand_kwargs assert result_partial_kwargs == expected_partial_kwargs assert result_task_params == expected_task_params diff --git a/tox.ini b/tox.ini index 73f6f24f..dac54836 100644 --- a/tox.ini +++ b/tox.ini @@ -11,7 +11,7 @@ python = 3.10: py310-airflow{1108,2} [testenv:py38-airflow2] -deps = +deps = pytest pytest-cov apache-airflow[http,cncf.kubernetes] >=2.0.0 @@ -27,7 +27,7 @@ commands = pytest --cov=dagfactory tests -p no:warnings --verbose --color=yes --cov-report=xml [testenv:py39-airflow2] -deps = +deps = pytest pytest-cov apache-airflow[http,cncf.kubernetes] >=2.0.0 @@ -43,7 +43,7 @@ commands = pytest --cov=dagfactory tests -p no:warnings --verbose --color=yes --cov-report=xml [testenv:py310-airflow2] -deps = +deps = pytest pytest-cov apache-airflow[http,cncf.kubernetes] >=2.0.0