Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Enable Depandabot to scan outdated Github Actions dependencies by @tatiana in #347
- Improve docs deploy job by @pankajastro in #352
- Unify how we build dagfactory by @tatiana in #353
- Fix running make docker run when previous versions were run locally by @tatiana in #362
- Fix running make docker run when previous versions were run locally by @tatiana in #362
- Install `jq` in `dev` container by @pankajastro in #363
- Dependabot GitHub actions version upgrades in #349, #350, #351

Expand Down
3 changes: 3 additions & 0 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ def get_dag_params(self) -> Dict[str, Any]:
raise DagFactoryConfigException("Failed to merge config with default config") from err
dag_params["dag_id"]: str = self.dag_name

# If there are no default_args, add an empty dictionary
dag_params["default_args"] = {} if "default_args" not in dag_params else dag_params["default_args"]

if utils.check_dict_key(dag_params, "schedule_interval") and dag_params["schedule_interval"] == "None":
dag_params["schedule_interval"] = None

Expand Down
18 changes: 12 additions & 6 deletions dagfactory/dagfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(
self.config: Dict[str, Any] = config

def _global_default_args(self):
"""If a defaults.yml exists, use this as the global default arguments (to be applied to each DAG)."""
default_args_yml = Path(self.default_args_config_path) / "defaults.yml"

if default_args_yml.exists():
Expand Down Expand Up @@ -133,11 +134,16 @@ def build_dags(self) -> Dict[str, DAG]:
global_default_args = self._global_default_args()
default_config: Dict[str, Any] = self.get_default_config()

if global_default_args is not None:
if "default_args" in default_config and "default_args" in global_default_args:
default_config = {
"default_args": {**global_default_args["default_args"], **default_config["default_args"]}
}
# If global_default_args is None, then default_config will remain as is. Otherwise, we'll (try) go ahead and
# update the default args using global_default_args
if isinstance(global_default_args, dict):
# Previously, default_config was being overwritten completely to only container the default_args
# key-value pair. This was updated as part of issue-295 to not overwrite the entire default_config
# dictionary, and instead update the default_args key-value pair of the default_config dictionary
default_config["default_args"] = {
**global_default_args.get("default_args", {}),
**default_config.get("default_args", {}),
}

dags: Dict[str, Any] = {}

Expand All @@ -162,7 +168,7 @@ def build_dags(self) -> Dict[str, DAG]:
def register_dags(dags: Dict[str, DAG], globals: Dict[str, Any]) -> None:
"""Adds `dags` to `globals` so Airflow can discover them.

:param: dags: Dict of DAGs to be registered.
:param dags: Dict of DAGs to be registered.
:param globals: The globals() from the file used to generate DAGs. The dag_id
must be passed into globals() for Airflow to import
"""
Expand Down
16 changes: 0 additions & 16 deletions dev/dags/customized/callables/python.py

This file was deleted.

10 changes: 10 additions & 0 deletions dev/dags/customized/helpers/etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
def extract():
print("extract() function has been called")


def transform(ds_nodash):
print("transform() function has been called")


def load(database_name, table_name):
print("load() function has been called")
10 changes: 5 additions & 5 deletions dev/dags/datasets/example_dag_datasets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ example_custom_config_condition_dataset_consumer_dag:
example_without_custom_config_condition_dataset_consumer_dag:
description: "Example DAG consumer custom config condition datasets"
schedule:
datasets:
!or
- !and
- "s3://bucket-cjmm/raw/dataset_custom_1"
datasets:
!or
- !and
- "s3://bucket-cjmm/raw/dataset_custom_1"
- "s3://bucket-cjmm/raw/dataset_custom_2"
- "s3://bucket-cjmm/raw/dataset_custom_3"
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'consumer datasets'"
bash_command: "echo 'consumer datasets'"
2 changes: 1 addition & 1 deletion dev/dags/datasets/example_dag_datasets_outlet_inlet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ consumer_dag:
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'consumer datasets'"
bash_command: "echo 'consumer datasets'"
2 changes: 1 addition & 1 deletion dev/dags/datasets/example_dataset_condition_string.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ consumer_dag:
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'consumer datasets'"
bash_command: "echo 'consumer datasets'"
10 changes: 5 additions & 5 deletions dev/dags/datasets/example_dataset_yaml_syntax.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ consumer_dag:
start_date: '2024-01-01'
description: "Example DAG consumer simple datasets"
schedule:
datasets:
!or
- !and
- "s3://bucket-cjmm/raw/dataset_custom_1"
datasets:
!or
- !and
- "s3://bucket-cjmm/raw/dataset_custom_1"
- "s3://bucket-cjmm/raw/dataset_custom_2"
- "s3://bucket-cjmm/raw/dataset_custom_3"
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'consumer datasets'"
bash_command: "echo 'consumer datasets'"
23 changes: 23 additions & 0 deletions dev/dags/example_dag_factory_default_args.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
default:
default_args:
start_date: '2024-01-01'
schedule_interval: 0 0 * * *
catchup: false
tags:
- "data engineering"

etl:
tasks:
extract:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo extract"
transform:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo transform"
dependencies:
- extract
load:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo load"
dependencies:
- transform
17 changes: 17 additions & 0 deletions dev/dags/example_dag_factory_default_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import os
from pathlib import Path

# The following import is here so Airflow parses this file
# from airflow import DAG
import dagfactory

DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))

config_file = str(CONFIG_ROOT_DIR / "example_dag_factory_default_config.yml")

example_dag_factory = dagfactory.DagFactory(config_file)

# Creating task dependencies
example_dag_factory.clean_dags(globals())
example_dag_factory.generate_dags(globals())
36 changes: 36 additions & 0 deletions dev/dags/example_dag_factory_default_config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
default:
default_args:
start_date: '2024-01-01'
schedule_interval: 0 0 * * *
catchup: false
tags:
- dynamic
tasks:
extract:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo extract"
transform:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo transform"
dependencies:
- extract
load:
operator: airflow.operators.bash_operator.BashOperator
dependencies:
- transform


machine_learning:
tasks:
load:
bash_command: "echo machine_larning"

data_science:
tasks:
load:
bash_command: "echo data_science"

artificial_intelligence:
tasks:
load:
bash_command: "echo artificial_intelligence"
1 change: 1 addition & 0 deletions dev/dags/example_task_group.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ default:
default_view: tree
max_active_runs: 1
schedule_interval: 0 1 * * *

example_task_group:
description: "this dag uses task groups"
task_groups:
Expand Down
79 changes: 48 additions & 31 deletions docs/configuration/defaults.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,73 @@

DAG Factory allows you to define Airflow
[default_args](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#default-arguments) and
additional DAG-level arguments in a `default` block. This block enables you to share common settings across all DAGs in
your YAML configuration, with the arguments automatically applied to each DAG defined in the file.
additional DAG-level arguments in a `default` block. This block enables you to share common settings and configurations
across all DAGs in your YAML configuration, with the arguments automatically applied to each DAG defined in the file.
This is one of DAG Factory's most powerful features; using defaults allows for the dynamic generation of more than a
single DAG.

## Benefits of using the default block

- Consistency: Ensures uniform configurations across all tasks and DAGs.
- Maintainability: Reduces duplication by centralizing common properties.
- Simplicity: Makes configurations easier to read and manage.
- Dynamic Generation: Use a single default block to easily generate more than a single DAG.

### Example usage of default block
### Example usage of a default block for `default_args`

```title="Usage of default block in YAML"
--8<-- "dev/dags/example_task_group.yml"
```
#### Specifying `default_args` in the `default` block

Using a `default` block in a YAML file allows for those key-value pairs to be applied to each DAG that is defined in
that same file. One of the most common examples is using a `default` block to specify `default_args` for each DAG
defined in that file. These arguments are automatically inherited by every DAG defined in the file. Below is an example of this.

The arguments specified in the `default` block, such as `default_args`, `default_view`, `max_active_runs`,
`schedule_interval`, and any others defined, will be applied to all the DAGs in the YAML configuration.
```yaml title="Usage of default block for default_args in YAML"
--8<-- "dev/dags/example_dag_factory_default_args.yml"
```

## Multiple ways for specifying Airflow default_args
#### Specifying `default_args` directly in a DAG configuration

DAG Factory offers flexibility in defining Airflow’s `default_args`. These can be specified in several ways, depending on your requirements.
You can override or define specific `default_args` at the individual DAG level. This allows you to customize
arguments for each DAG without affecting others. Not only can existing `default_args` be overridden directly in a DAG
configuration, but new arguments can be added.

1. Specifying `default_args` in the `default` block
```yaml
etl:
default_args:
start_date: '2024-12-31'
retries: 1 # A new default_arg was added
...
```

As seen in the previous example, you can define shared `default_args` for all DAGs in the configuration YAML under
the `default` block. These arguments are automatically inherited by every DAG defined in the file.
#### Specifying `default_args` in a shared `defaults.yml`

2. Specifying `default_args` directly in a DAG configuration
Starting DAG Factory 0.22.0, you can also keep the `default_args` in the `defaults.yml` file. The configuration
from `defaults.yml` will be applied to all DAG Factory generated DAGs. **Be careful, these will be applied to all
generated DAGs.**

You can override or define specific default_args at the individual DAG level. This allows you to customize arguments
for each DAG without affecting others.
```yaml title="defaults.yml"
--8<-- "dev/dags/defaults.yml"
```

Example:
Given the various ways to specify `default_args`, the following precedence order is applied when arguments are
duplicated:

```title="DAG level default_args"
--8<-- "dev/dags/example_dag_factory.yml"
```
1. In the DAG configuration
2. In the `default` block within the workflow's YAML file
3. In the `defaults.yml`

3. Specifying `default_args` in a shared `defaults.yml`
### Example using of default block for dynamic DAG generation

Starting DAG Factory 0.22.0, you can also keep the `default_args` in the `defaults.yml` file. The configuration
from `defaults.yml` will be applied to all DAG Factory generated DAGs.
Not only can the `default` block in a YAML file be used to define `default_args` for one or more DAGs; it can also be
used to create the skeleton of "templated" DAGs. In the example below, the `default` block is used to define not only
the `default_args` of a DAG, but also default Tasks. These Tasks provide a "template" for the DAGs defined in this file.
Each DAG (`machine_learning`, `data_science`, `artificial_intelligence`) will be defined using the values from the
`default` block, and like with `default_args`, can override these values. **This is a powerful way to use DAG Factory
to dynamically create DAGs using a single configuration.**

```title="defaults.yml"
--8<-- "dev/dags/defaults.yml"
```
Comment thread
This conversation was marked as resolved.

Given the various ways to specify `default_args`, the following precedence order is applied when arguments are
duplicated:
```yaml title="Usage of default block in YAML"
--8<-- "dev/dags/example_dag_factory_default_config.yml"
```

1. In the DAG configuration
2. In the `default` block within the workflow's YAML file
3. In the `defaults.yml`
Currently, only `default_args` can be specified using the `defaults.yml` file.
2 changes: 1 addition & 1 deletion docs/features/datasets.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,4 @@ The following diagrams illustrate the dataset conditions described in the exampl
2. Alternatively, **`s3://bucket-cjmm/raw/dataset_custom_3`** alone can satisfy the condition.

![Graph Conditional Dataset 1](../static/images/datasets/conditions/graph_conditional_dataset.png)
![Graph Conditional Dataset 2](../static/images/datasets/conditions/graph_conditional_dataset_2.png)
![Graph Conditional Dataset 2](../static/images/datasets/conditions/graph_conditional_dataset_2.png)
38 changes: 38 additions & 0 deletions tests/test_dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,28 @@
},
}

DAG_CONFIG_ML = {
"tasks": {
"task_2": {
"bash_command": "echo 2",
}
}
}

DAG_CONFIG_DEFAULT_ML = {
"schedule_interval": "0 0 * * *",
"default_args": {"start_date": "2025-01-01", "owner": "custom_owner"},
"tasks": {
"task_1": {
"operator": "airflow.operators.bash_operator.BashOperator",
"bash_command": "echo 1",
},
"task_2": {
"operator": "airflow.operators.bash_operator.BashOperator",
},
},
}

DAG_CONFIG_CALLBACKS = {
"doc_md": "##here is a doc md string",
"default_args": {
Expand Down Expand Up @@ -637,6 +659,22 @@ def test_make_task_groups_empty():
assert task_groups == {}


def test_dag_config_default():
td = dagbuilder.DagBuilder("test_dynamic_machine_learning_dag", DAG_CONFIG_ML, DAG_CONFIG_DEFAULT_ML)
dag = td.build()["dag"]

# Validate that the default values were applied to the machine_learning DAG
assert dag.dag_id == "test_dynamic_machine_learning_dag"
assert len(dag.tasks) == 2

task_1 = dag.task_dict["task_1"]
assert task_1.bash_command == "echo 1"

task_2 = dag.task_dict["task_2"]
assert task_2.bash_command == "echo 2"


# These functions are used to mock callbacks for the tests below
def print_context_callback(context, **kwargs):
print(context)

Expand Down
Loading