From e9f64f656941a16b1a76cfc0a72a16a4f98dc149 Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Thu, 6 Mar 2025 12:03:05 -0500 Subject: [PATCH 1/2] Added documentation for defaults, updated dagfactory to respect other default values --- CHANGELOG.md | 2 +- dagfactory/dagbuilder.py | 3 + dagfactory/dagfactory.py | 19 +++-- dev/dags/customized/callables/python.py | 16 ---- .../{callables => helpers}/__init__.py | 0 dev/dags/customized/helpers/etl.py | 8 ++ dev/dags/datasets/example_dag_datasets.yml | 10 +-- .../datasets/example_dag_datasets_outlet.yml | 2 +- .../example_dataset_condition_string.yml | 2 +- .../datasets/example_dataset_yaml_syntax.yml | 10 +-- dev/dags/example_dag_factory_default_args.yml | 23 ++++++ .../example_dag_factory_default_config.py | 17 +++++ .../example_dag_factory_default_config.yml | 36 +++++++++ dev/dags/example_task_group.yml | 1 + docs/configuration/defaults.md | 75 +++++++++++-------- docs/features/datasets.md | 2 +- tests/test_dagbuilder.py | 38 ++++++++++ tests/test_parsers.py | 39 ++++++---- tests/test_utils.py | 7 ++ 19 files changed, 228 insertions(+), 82 deletions(-) delete mode 100644 dev/dags/customized/callables/python.py rename dev/dags/customized/{callables => helpers}/__init__.py (100%) create mode 100644 dev/dags/customized/helpers/etl.py create mode 100644 dev/dags/example_dag_factory_default_args.yml create mode 100644 dev/dags/example_dag_factory_default_config.py create mode 100644 dev/dags/example_dag_factory_default_config.yml diff --git a/CHANGELOG.md b/CHANGELOG.md index 1279dc40..41c0ab44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,7 +49,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Enable Depandabot to scan outdated Github Actions dependencies by @tatiana in #347 - Improve docs deploy job by @pankajastro in #352 - Unify how we build dagfactory by @tatiana in #353 -- Fix running make docker run when previous versions were run locally by @tatiana in #362 +- Fix running make docker run when previous versions were run locally by @tatiana in #362 - Install `jq` in `dev` container by @pankajastro in #363 - Dependabot GitHub actions version upgrades in #349, #350, #351 diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index a38b9868..47556385 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -142,6 +142,9 @@ def get_dag_params(self) -> Dict[str, Any]: raise DagFactoryConfigException("Failed to merge config with default config") from err dag_params["dag_id"]: str = self.dag_name + # If there are no default_args, add an empty dictionary + dag_params["default_args"] = {} if "default_args" not in dag_params else dag_params["default_args"] + if utils.check_dict_key(dag_params, "schedule_interval") and dag_params["schedule_interval"] == "None": dag_params["schedule_interval"] = None diff --git a/dagfactory/dagfactory.py b/dagfactory/dagfactory.py index a555f13f..2b63507b 100644 --- a/dagfactory/dagfactory.py +++ b/dagfactory/dagfactory.py @@ -43,6 +43,8 @@ def __init__( self.config: Dict[str, Any] = config def _global_default_args(self): + # Possible change here/clarification needed; what if defaults.yml has more than default_args? In this case, we + # are "discarding" these values, and only holding onto the default_args. This is shown in build_dags default_args_yml = Path(self.default_args_config_path) / "defaults.yml" if default_args_yml.exists(): @@ -133,11 +135,16 @@ def build_dags(self) -> Dict[str, DAG]: global_default_args = self._global_default_args() default_config: Dict[str, Any] = self.get_default_config() - if global_default_args is not None: - if "default_args" in default_config and "default_args" in global_default_args: - default_config = { - "default_args": {**global_default_args["default_args"], **default_config["default_args"]} - } + # If global_default_args is None, then default_config will remain as is. Otherwise, we'll (try) go ahead and + # update the default args using global_default_args + if isinstance(global_default_args, dict): + # Previously, default_config was being overwritten completely to only container the default_args + # key-value pair. This was updated as part of issue-295 to not overwrite the entire default_config + # dictionary, and instead update the default_args key-value pair of the default_config dictionary + default_config["default_args"] = { + **global_default_args.get("default_args", {}), + **default_config.get("default_args", {}), + } dags: Dict[str, Any] = {} @@ -162,7 +169,7 @@ def build_dags(self) -> Dict[str, DAG]: def register_dags(dags: Dict[str, DAG], globals: Dict[str, Any]) -> None: """Adds `dags` to `globals` so Airflow can discover them. - :param: dags: Dict of DAGs to be registered. + :param dags: Dict of DAGs to be registered. :param globals: The globals() from the file used to generate DAGs. The dag_id must be passed into globals() for Airflow to import """ diff --git a/dev/dags/customized/callables/python.py b/dev/dags/customized/callables/python.py deleted file mode 100644 index 8c4a73ea..00000000 --- a/dev/dags/customized/callables/python.py +++ /dev/null @@ -1,16 +0,0 @@ -""" -failure.py - -Create a callable that intentionally "fails". - -Author: Jake Roach -Date: 2024-10-22 -""" - - -def succeeding_task(): - print("Task has executed successfully!") - - -def failing_task(): - raise Exception("Intentionally failing this Task to trigger on_failure_callback.") diff --git a/dev/dags/customized/callables/__init__.py b/dev/dags/customized/helpers/__init__.py similarity index 100% rename from dev/dags/customized/callables/__init__.py rename to dev/dags/customized/helpers/__init__.py diff --git a/dev/dags/customized/helpers/etl.py b/dev/dags/customized/helpers/etl.py new file mode 100644 index 00000000..aa7d3c08 --- /dev/null +++ b/dev/dags/customized/helpers/etl.py @@ -0,0 +1,8 @@ +def extract(): + print("extract() function has been called") + +def transform(ds_nodash): + print("transform() function has been called") + +def load(database_name, table_name): + print("load() function has been called") diff --git a/dev/dags/datasets/example_dag_datasets.yml b/dev/dags/datasets/example_dag_datasets.yml index ec14def9..64980c6f 100644 --- a/dev/dags/datasets/example_dag_datasets.yml +++ b/dev/dags/datasets/example_dag_datasets.yml @@ -66,13 +66,13 @@ example_custom_config_condition_dataset_consumer_dag: example_without_custom_config_condition_dataset_consumer_dag: description: "Example DAG consumer custom config condition datasets" schedule: - datasets: - !or - - !and - - "s3://bucket-cjmm/raw/dataset_custom_1" + datasets: + !or + - !and + - "s3://bucket-cjmm/raw/dataset_custom_1" - "s3://bucket-cjmm/raw/dataset_custom_2" - "s3://bucket-cjmm/raw/dataset_custom_3" tasks: task_1: operator: airflow.operators.bash_operator.BashOperator - bash_command: "echo 'consumer datasets'" \ No newline at end of file + bash_command: "echo 'consumer datasets'" diff --git a/dev/dags/datasets/example_dag_datasets_outlet.yml b/dev/dags/datasets/example_dag_datasets_outlet.yml index d76a08e8..758611d8 100644 --- a/dev/dags/datasets/example_dag_datasets_outlet.yml +++ b/dev/dags/datasets/example_dag_datasets_outlet.yml @@ -24,4 +24,4 @@ consumer_dag: tasks: task_1: operator: airflow.operators.bash_operator.BashOperator - bash_command: "echo 'consumer datasets'" \ No newline at end of file + bash_command: "echo 'consumer datasets'" diff --git a/dev/dags/datasets/example_dataset_condition_string.yml b/dev/dags/datasets/example_dataset_condition_string.yml index f9cd1605..8e41f1bf 100644 --- a/dev/dags/datasets/example_dataset_condition_string.yml +++ b/dev/dags/datasets/example_dataset_condition_string.yml @@ -9,4 +9,4 @@ consumer_dag: tasks: task_1: operator: airflow.operators.bash_operator.BashOperator - bash_command: "echo 'consumer datasets'" \ No newline at end of file + bash_command: "echo 'consumer datasets'" diff --git a/dev/dags/datasets/example_dataset_yaml_syntax.yml b/dev/dags/datasets/example_dataset_yaml_syntax.yml index 51c76af0..e03c61d0 100644 --- a/dev/dags/datasets/example_dataset_yaml_syntax.yml +++ b/dev/dags/datasets/example_dataset_yaml_syntax.yml @@ -5,13 +5,13 @@ consumer_dag: start_date: '2024-01-01' description: "Example DAG consumer simple datasets" schedule: - datasets: - !or - - !and - - "s3://bucket-cjmm/raw/dataset_custom_1" + datasets: + !or + - !and + - "s3://bucket-cjmm/raw/dataset_custom_1" - "s3://bucket-cjmm/raw/dataset_custom_2" - "s3://bucket-cjmm/raw/dataset_custom_3" tasks: task_1: operator: airflow.operators.bash_operator.BashOperator - bash_command: "echo 'consumer datasets'" \ No newline at end of file + bash_command: "echo 'consumer datasets'" diff --git a/dev/dags/example_dag_factory_default_args.yml b/dev/dags/example_dag_factory_default_args.yml new file mode 100644 index 00000000..97d6a2b6 --- /dev/null +++ b/dev/dags/example_dag_factory_default_args.yml @@ -0,0 +1,23 @@ +default: + default_args: + start_date: '2024-01-01' + schedule_interval: 0 0 * * * + catchup: false + tags: + - "data engineering" + +etl: + tasks: + extract: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo extract" + transform: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo transform" + dependencies: + - extract + load: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo load" + dependencies: + - transform diff --git a/dev/dags/example_dag_factory_default_config.py b/dev/dags/example_dag_factory_default_config.py new file mode 100644 index 00000000..0a13040b --- /dev/null +++ b/dev/dags/example_dag_factory_default_config.py @@ -0,0 +1,17 @@ +import os +from pathlib import Path + +# The following import is here so Airflow parses this file +# from airflow import DAG +import dagfactory + +DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" +CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) + +config_file = str(CONFIG_ROOT_DIR / "example_dag_factory_default_config.yml") + +example_dag_factory = dagfactory.DagFactory(config_file) + +# Creating task dependencies +example_dag_factory.clean_dags(globals()) +example_dag_factory.generate_dags(globals()) diff --git a/dev/dags/example_dag_factory_default_config.yml b/dev/dags/example_dag_factory_default_config.yml new file mode 100644 index 00000000..6fcca5ee --- /dev/null +++ b/dev/dags/example_dag_factory_default_config.yml @@ -0,0 +1,36 @@ +default: + default_args: + start_date: '2024-01-01' + schedule_interval: 0 0 * * * + catchup: false + tags: + - dynamic + tasks: + extract: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo extract" + transform: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo transform" + dependencies: + - extract + load: + operator: airflow.operators.bash_operator.BashOperator + dependencies: + - transform + + +machine_learning: + tasks: + load: + bash_command: "echo machine_larning" + +data_science: + tasks: + load: + bash_command: "echo data_science" + +artificial_intelligence: + tasks: + load: + bash_command: "echo artificial_intelligence" diff --git a/dev/dags/example_task_group.yml b/dev/dags/example_task_group.yml index b5b61a24..4ac06210 100644 --- a/dev/dags/example_task_group.yml +++ b/dev/dags/example_task_group.yml @@ -7,6 +7,7 @@ default: default_view: tree max_active_runs: 1 schedule_interval: 0 1 * * * + example_task_group: description: "this dag uses task groups" task_groups: diff --git a/docs/configuration/defaults.md b/docs/configuration/defaults.md index 276eb562..f61b260e 100644 --- a/docs/configuration/defaults.md +++ b/docs/configuration/defaults.md @@ -2,52 +2,49 @@ DAG Factory allows you to define Airflow [default_args](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#default-arguments) and -additional DAG-level arguments in a `default` block. This block enables you to share common settings across all DAGs in -your YAML configuration, with the arguments automatically applied to each DAG defined in the file. +additional DAG-level arguments in a `default` block. This block enables you to share common settings and configurations +across all DAGs in your YAML configuration, with the arguments automatically applied to each DAG defined in the file. +This is one of DAG Factory's most powerful features; using defaults allows for the dynamic generation of more than a +single DAG. ## Benefits of using the default block - Consistency: Ensures uniform configurations across all tasks and DAGs. - Maintainability: Reduces duplication by centralizing common properties. - Simplicity: Makes configurations easier to read and manage. +- Dynamic Generation: Use a single default block to easily generate more than a single DAG. -### Example usage of default block +### Example usage of a default block for `default_args` -```title="Usage of default block in YAML" ---8<-- "dev/dags/example_task_group.yml" -``` - -The arguments specified in the `default` block, such as `default_args`, `default_view`, `max_active_runs`, -`schedule_interval`, and any others defined, will be applied to all the DAGs in the YAML configuration. - -## Multiple ways for specifying Airflow default_args - -DAG Factory offers flexibility in defining Airflow’s `default_args`. These can be specified in several ways, depending on your requirements. - -1. Specifying `default_args` in the `default` block - - As seen in the previous example, you can define shared `default_args` for all DAGs in the configuration YAML under -the `default` block. These arguments are automatically inherited by every DAG defined in the file. +#### Specifying `default_args` in the `default` block -2. Specifying `default_args` directly in a DAG configuration + Using a `default` block in a YAML file allows for those key-value pairs to be applied to each DAG that is defined in + that same file. One of the most common examples is using a `default` block to specify `default_args` for each DAG + defined in that file. These arguments are automatically inherited by every DAG defined in the file. Below is an example of this. - You can override or define specific default_args at the individual DAG level. This allows you to customize arguments -for each DAG without affecting others. + ```yaml title="Usage of default block for default_args in YAML" + --8<-- "dev/dags/example_dag_factory_default_args.yml" + ``` - Example: +#### Specifying `default_args` directly in a DAG configuration - ```title="DAG level default_args" - --8<-- "dev/dags/example_dag_factory.yml" - ``` + You can override or define specific `default_args` at the individual DAG level. This allows you to customize + arguments for each DAG without affecting others. Not only can existing `default_args` be overridden directly in a DAG + configuration, but new arguments can be added. -3. Specifying `default_args` in a shared `defaults.yml` + ```yaml + etl: + default_args: + start_date: '2024-12-31' + retries: 1 # A new default_arg was added + ... + ``` - Starting DAG Factory 0.22.0, you can also keep the `default_args` in the `defaults.yml` file. The configuration -from `defaults.yml` will be applied to all DAG Factory generated DAGs. +#### Specifying `default_args` in a shared `defaults.yml` - ```title="defaults.yml" - --8<-- "dev/dags/defaults.yml" - ``` + Starting DAG Factory 0.22.0, you can also keep the `default_args` in the `defaults.yml` file. The configuration + from `defaults.yml` will be applied to all DAG Factory generated DAGs. **Be careful, these will be applied to all + generated DAGs.** Given the various ways to specify `default_args`, the following precedence order is applied when arguments are duplicated: @@ -55,3 +52,19 @@ duplicated: 1. In the DAG configuration 2. In the `default` block within the workflow's YAML file 3. In the `defaults.yml` + +### Example using of default block for dynamic DAG generation + +Not only can the `default` block in a YAML file be used to define `default_args` for one or more DAGs; it can also be +used to create the skeleton of "templated" DAGs. In the example below, the `default` block is used to define not only +the `default_args` of a DAG, but also default Tasks. These Tasks provide a "template" for the DAGs defined in this file. +Each DAG (`machine_learning`, `data_science`, `artificial_intelligence`) will be defined using the values from the +`default` block, and like with `default_args`, can override these values. **This is a powerful way to use DAG Factory +to dynamically create DAGs using a single configuration.** + + +```yaml title="Usage of default block in YAML" +--8<-- "dev/dags/example_dag_factory_default_config.yml" +``` + +Currently, only `default_args` can be specified using the `defaults.yml` file. diff --git a/docs/features/datasets.md b/docs/features/datasets.md index 7092a5d1..e20edd87 100644 --- a/docs/features/datasets.md +++ b/docs/features/datasets.md @@ -56,4 +56,4 @@ The following diagrams illustrate the dataset conditions described in the exampl 2. Alternatively, **`s3://bucket-cjmm/raw/dataset_custom_3`** alone can satisfy the condition. ![Graph Conditional Dataset 1](../static/images/datasets/conditions/graph_conditional_dataset.png) -![Graph Conditional Dataset 2](../static/images/datasets/conditions/graph_conditional_dataset_2.png) \ No newline at end of file +![Graph Conditional Dataset 2](../static/images/datasets/conditions/graph_conditional_dataset_2.png) diff --git a/tests/test_dagbuilder.py b/tests/test_dagbuilder.py index 67ba0b3c..a3cbca47 100644 --- a/tests/test_dagbuilder.py +++ b/tests/test_dagbuilder.py @@ -156,6 +156,28 @@ }, } +DAG_CONFIG_ML = { + "tasks": { + "task_2": { + "bash_command": "echo 2", + } + } +} + +DAG_CONFIG_DEFAULT_ML = { + "schedule_interval": "0 0 * * *", + "default_args": {"start_date": "2025-01-01", "owner": "custom_owner"}, + "tasks": { + "task_1": { + "operator": "airflow.operators.bash_operator.BashOperator", + "bash_command": "echo 1", + }, + "task_2": { + "operator": "airflow.operators.bash_operator.BashOperator", + }, + }, +} + DAG_CONFIG_CALLBACKS = { "doc_md": "##here is a doc md string", "default_args": { @@ -637,6 +659,22 @@ def test_make_task_groups_empty(): assert task_groups == {} +def test_dag_config_default(): + td = dagbuilder.DagBuilder("test_dynamic_machine_learning_dag", DAG_CONFIG_ML, DAG_CONFIG_DEFAULT_ML) + dag = td.build()["dag"] + + # Validate that the default values were applied to the machine_learning DAG + assert dag.dag_id == "test_dynamic_machine_learning_dag" + assert len(dag.tasks) == 2 + + task_1 = dag.task_dict["task_1"] + assert task_1.bash_command == "echo 1" + + task_2 = dag.task_dict["task_2"] + assert task_2.bash_command == "echo 2" + + +# These functions are used to mock callbacks for the tests below def print_context_callback(context, **kwargs): print(context) diff --git a/tests/test_parsers.py b/tests/test_parsers.py index 759ecfee..5b5c2dbc 100644 --- a/tests/test_parsers.py +++ b/tests/test_parsers.py @@ -1,74 +1,83 @@ import ast + import pytest + from dagfactory.parsers import SafeEvalVisitor + @pytest.fixture def dataset_map(): - return { - 'dataset_custom_1': 1, - 'dataset_custom_2': 2, - 'dataset_custom_3': 3 - } + return {"dataset_custom_1": 1, "dataset_custom_2": 2, "dataset_custom_3": 3} + @pytest.fixture def visitor(dataset_map): return SafeEvalVisitor(dataset_map) + def test_evaluate(visitor): condition_string = "dataset_custom_1 & dataset_custom_2 | dataset_custom_3" - tree = ast.parse(condition_string, mode='eval') + tree = ast.parse(condition_string, mode="eval") result = visitor.evaluate(tree) expected = (1 & 2) | 3 assert result == expected + def test_visit_BinOp_and(visitor): condition_string = "dataset_custom_1 & dataset_custom_2" - tree = ast.parse(condition_string, mode='eval') + tree = ast.parse(condition_string, mode="eval") result = visitor.evaluate(tree) expected = 1 & 2 assert result == expected + def test_visit_BinOp_or(visitor): condition_string = "dataset_custom_1 | dataset_custom_3" - tree = ast.parse(condition_string, mode='eval') + tree = ast.parse(condition_string, mode="eval") result = visitor.evaluate(tree) expected = 1 | 3 assert result == expected + def test_visit_Name(visitor): condition_string = "dataset_custom_2" - tree = ast.parse(condition_string, mode='eval') + tree = ast.parse(condition_string, mode="eval") result = visitor.evaluate(tree) expected = 2 assert result == expected + def test_visit_Constant(visitor): condition_string = "42" - tree = ast.parse(condition_string, mode='eval') + tree = ast.parse(condition_string, mode="eval") result = visitor.evaluate(tree) expected = 42 assert result == expected + def test_unsupported_binary_operation(visitor): condition_string = "dataset_custom_1 + dataset_custom_2" - tree = ast.parse(condition_string, mode='eval') + tree = ast.parse(condition_string, mode="eval") with pytest.raises(ValueError): visitor.evaluate(tree) + def test_unsupported_unary_operation(visitor): condition_string = "+dataset_custom_1" - tree = ast.parse(condition_string, mode='eval') + tree = ast.parse(condition_string, mode="eval") with pytest.raises(ValueError): visitor.evaluate(tree) + def test_undefined_variable(visitor): condition_string = "undefined_dataset" - tree = ast.parse(condition_string, mode='eval') + tree = ast.parse(condition_string, mode="eval") with pytest.raises(NameError): visitor.evaluate(tree) + def test_unsupported_syntax(visitor): condition_string = "[1, 2, 3]" - tree = ast.parse(condition_string, mode='eval') + tree = ast.parse(condition_string, mode="eval") with pytest.raises(ValueError): - visitor.evaluate(tree) \ No newline at end of file + visitor.evaluate(tree) diff --git a/tests/test_utils.py b/tests/test_utils.py index de5a1c5d..0b13f4b8 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -259,6 +259,7 @@ def test_open_and_filter_yaml_config_datasets(): assert actual == expected + def get_datasets_map_uri_yaml_file(): datasets_names = ["dataset_custom_1", "dataset_custom_2"] file_path = "dev/dags/datasets/example_config_datasets.yml" @@ -271,21 +272,25 @@ def get_datasets_map_uri_yaml_file(): assert actual == expected + def test_valid_uri(): actual = utils.make_valid_variable_name("s3://bucket/dataset") expected = "s3___bucket_dataset" assert actual == expected + def test_uri_with_special_characters(): actual = utils.make_valid_variable_name("s3://bucket/dataset-1!@#$%^&*()") expected = "s3___bucket_dataset_1__________" assert actual == expected + def test_uri_starting_with_number(): actual = utils.make_valid_variable_name("123/bucket/dataset") expected = "_123_bucket_dataset" assert actual == expected + def test_open_and_filter_yaml_config_datasets_file_notfound(): datasets_names = ["dataset_custom_1", "dataset_custom_2"] file_path = "examples/datasets/not_found_example_config_datasets.yml" @@ -293,6 +298,7 @@ def test_open_and_filter_yaml_config_datasets_file_notfound(): with pytest.raises(Exception): utils.get_datasets_uri_yaml_file(file_path, datasets_names) + def test_extract_dataset_names(): expression = "((dataset_custom_1 & dataset_custom_2) | (dataset_custom_3))" expected = ["dataset_custom_1", "dataset_custom_2", "dataset_custom_3"] @@ -309,6 +315,7 @@ def test_extract_dataset_names(): result = utils.extract_dataset_names(expression) assert result == expected + def test_extract_storage_names(): expression = "s3://bucket-cjmm/raw/dataset_custom_1 & s3://bucket-cjmm/raw/dataset_custom_2" expected = ["s3://bucket-cjmm/raw/dataset_custom_1", "s3://bucket-cjmm/raw/dataset_custom_2"] From 1344a41cb61449b98b22931bdcf3cf16e3f0a1c3 Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Fri, 28 Mar 2025 10:36:47 -0400 Subject: [PATCH 2/2] Resolving conversations from Pankaj --- dagfactory/dagfactory.py | 3 +-- dev/dags/customized/helpers/etl.py | 2 ++ docs/configuration/defaults.md | 14 +++++++++----- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/dagfactory/dagfactory.py b/dagfactory/dagfactory.py index 2b63507b..7683c94b 100644 --- a/dagfactory/dagfactory.py +++ b/dagfactory/dagfactory.py @@ -43,8 +43,7 @@ def __init__( self.config: Dict[str, Any] = config def _global_default_args(self): - # Possible change here/clarification needed; what if defaults.yml has more than default_args? In this case, we - # are "discarding" these values, and only holding onto the default_args. This is shown in build_dags + """If a defaults.yml exists, use this as the global default arguments (to be applied to each DAG).""" default_args_yml = Path(self.default_args_config_path) / "defaults.yml" if default_args_yml.exists(): diff --git a/dev/dags/customized/helpers/etl.py b/dev/dags/customized/helpers/etl.py index aa7d3c08..b1d76897 100644 --- a/dev/dags/customized/helpers/etl.py +++ b/dev/dags/customized/helpers/etl.py @@ -1,8 +1,10 @@ def extract(): print("extract() function has been called") + def transform(ds_nodash): print("transform() function has been called") + def load(database_name, table_name): print("load() function has been called") diff --git a/docs/configuration/defaults.md b/docs/configuration/defaults.md index f61b260e..bf755141 100644 --- a/docs/configuration/defaults.md +++ b/docs/configuration/defaults.md @@ -46,12 +46,16 @@ single DAG. from `defaults.yml` will be applied to all DAG Factory generated DAGs. **Be careful, these will be applied to all generated DAGs.** -Given the various ways to specify `default_args`, the following precedence order is applied when arguments are -duplicated: + ```yaml title="defaults.yml" + --8<-- "dev/dags/defaults.yml" + ``` + + Given the various ways to specify `default_args`, the following precedence order is applied when arguments are + duplicated: -1. In the DAG configuration -2. In the `default` block within the workflow's YAML file -3. In the `defaults.yml` + 1. In the DAG configuration + 2. In the `default` block within the workflow's YAML file + 3. In the `defaults.yml` ### Example using of default block for dynamic DAG generation