Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions dagfactory/dagfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,46 @@ class DagFactory:
:param config_filepath: the filepath of the DAG factory YAML config file.
Must be absolute path to file. Cannot be used with `config`.
:type config_filepath: str
:param config: DAG factory config dictionary. Cannot be user with `config_filepath`.
:param config: DAG factory config dictionary. Cannot be used with `config_filepath`.
:type config: dict
:param default_args_config_path: The path to a file that contains the default arguments for that DAG.
:type default_args_config_path: str
:param default_args_config_dict: A dictionary of default arguments for that DAG, as an alternative to default_args_config_path.
:type default_args_config_dict: dict
"""

def __init__(
self,
config_filepath: Optional[str] = None,
config: Optional[dict] = None,
default_args_config_path: str = airflow_conf.get("core", "dags_folder"),
default_args_config_dict: Optional[dict] = None,
) -> None:
# Handle the config(_filepath)
assert bool(config_filepath) ^ bool(config), "Either `config_filepath` or `config` should be provided"
self.default_args_config_path = default_args_config_path

if config_filepath:
DagFactory._validate_config_filepath(config_filepath=config_filepath)
self.config: Dict[str, Any] = self._load_dag_config(config_filepath=config_filepath)
if config:
self.config: Dict[str, Any] = config

# These default args are a bit different; these are not the "default" structure that is applied to certain DAGs.
# These are in-fact the "default" default_args
if default_args_config_dict:
# Log a warning if the default_args parameter is specified. If both the default_args and
# default_args_file_path are passed, we'll throw an exception.
logging.warning(
"Manually specifying `default_args_config_dict` will override the values in the `defaults.yml` file."
)

if default_args_config_path != airflow_conf.get("core", "dags_folder"):
raise DagFactoryException("Cannot pass both `default_args_config_dict` and `default_args_config_path`.")

# We'll still go ahead and set both values. They'll be referenced in _global_default_args.
self.default_args_config_path: str = default_args_config_path
self.default_args_config_dict: Optional[dict] = default_args_config_dict

def _load_yaml_config(self, config_filepath: str) -> Dict[str, Any]:
"""For loading yaml config file, including DAG config and default args config."""

Expand All @@ -74,8 +96,14 @@ def __and(loader: yaml.FullLoader, node: yaml.Node) -> str:
config = cast_with_type(config)
return config

def _global_default_args(self) -> Optional[Dict[str, Any]]:
"""If a defaults.yml exists, use this as the global default arguments (to be applied to each DAG)."""
def _global_default_args(self):
"""
If self.default_args exists, use this as the global default_args (to be applied to each DAG). Otherwise, fall
back to the defaults.yml file.
"""
if self.default_args_config_dict:
return self.default_args_config_dict

default_args_yml = Path(self.default_args_config_path) / "defaults.yml"

if default_args_yml.exists():
Expand Down
19 changes: 19 additions & 0 deletions dev/dags/example_dag_factory_default_config_dict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import os
from pathlib import Path

# The following import is here so Airflow parses this file
# from airflow import DAG
import dagfactory

DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))

config_file = str(CONFIG_ROOT_DIR / "example_dag_factory_default_config_dict.yml")

example_dag_factory = dagfactory.DagFactory(
config_file, default_args_config_dict={"default_args": {"start_date": "2025-01-01", "owner": "global_owner"}}
)

# Creating task dependencies
example_dag_factory.clean_dags(globals())
example_dag_factory.generate_dags(globals())
16 changes: 16 additions & 0 deletions dev/dags/example_dag_factory_default_config_dict.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
daily_etl:
schedule: "@daily"
tasks:
extract:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo extract"
transform:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo transform"
dependencies:
- extract
load:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo load"
dependencies:
- transform
27 changes: 17 additions & 10 deletions docs/configuration/defaults.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,23 @@ single DAG.

### Example using of default block for dynamic DAG generation

Not only can the `default` block in a YAML file be used to define `default_args` for one or more DAGs; it can also be
used to create the skeleton of "templated" DAGs. In the example below, the `default` block is used to define not only
the `default_args` of a DAG, but also default Tasks. These Tasks provide a "template" for the DAGs defined in this file.
Each DAG (`machine_learning`, `data_science`, `artificial_intelligence`) will be defined using the values from the
`default` block, and like with `default_args`, can override these values. **This is a powerful way to use DAG Factory
to dynamically create DAGs using a single configuration.**
Not only can the `default` block in a YAML file be used to define `default_args` for one or more DAGs; it can also be
used to create the skeleton of "templated" DAGs. In the example below, the `default` block is used to define not only
the `default_args` of a DAG, but also default Tasks. These Tasks provide a "template" for the DAGs defined in this
file. Each DAG (`machine_learning`, `data_science`, `artificial_intelligence`) will be defined using the values from
the `default` block, and like with `default_args`, can override these values. **This is a powerful way to use DAG
Factory to dynamically create DAGs using a single configuration.**

```yaml title="Usage of default block in YAML"
--8<-- "dev/dags/example_dag_factory_default_config.yml"
```

### Specifying `default_args` in a `.py` file

```yaml title="Usage of default block in YAML"
--8<-- "dev/dags/example_dag_factory_default_config.yml"
```
In the `.py` used to instantiate a DAG defined using YAML, default arguments in the form of a Python dictionary can
be set using the `default_args_config_dict` parameter in the `DAGFactory` class. This mirrors the functionality of
manually specifying a `default_args_config_ypath` in the `DAGFactory` class.

Currently, only `default_args` can be specified using the `defaults.yml` file.
```python title="Usage of default_args_config_dict in .py file"
--8<-- "dev/dags/example_dag_factory_default_config_dict.py:13:19"
```
11 changes: 11 additions & 0 deletions tests/test_dagfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,17 @@ def test_build_dag_with_global_default():
assert dags.get("example_dag").tasks[0].depends_on_past == True


def test_build_dag_with_global_default_dict():
dags = dagfactory.DagFactory(
config=DAG_FACTORY_CONFIG,
default_args_config_dict={
"default_args": {"start_date": "2025-01-01", "owner": "global_owner", "depends_on_past": True}
},
).build_dags()

assert dags.get("example_dag").tasks[0].depends_on_past == True


def test_load_invalid_yaml_logs_error(caplog):
caplog.set_level(logging.ERROR)
load_yaml_dags(
Expand Down
4 changes: 2 additions & 2 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading