diff --git a/dagfactory/dagfactory.py b/dagfactory/dagfactory.py index 5ecfb19a..909288c0 100644 --- a/dagfactory/dagfactory.py +++ b/dagfactory/dagfactory.py @@ -31,8 +31,12 @@ class DagFactory: :param config_filepath: the filepath of the DAG factory YAML config file. Must be absolute path to file. Cannot be used with `config`. :type config_filepath: str - :param config: DAG factory config dictionary. Cannot be user with `config_filepath`. + :param config: DAG factory config dictionary. Cannot be used with `config_filepath`. :type config: dict + :param default_args_config_path: The path to a file that contains the default arguments for that DAG. + :type default_args_config_path: str + :param default_args_config_dict: A dictionary of default arguments for that DAG, as an alternative to default_args_config_path. + :type default_args_config_dict: dict """ def __init__( @@ -40,15 +44,33 @@ def __init__( config_filepath: Optional[str] = None, config: Optional[dict] = None, default_args_config_path: str = airflow_conf.get("core", "dags_folder"), + default_args_config_dict: Optional[dict] = None, ) -> None: + # Handle the config(_filepath) assert bool(config_filepath) ^ bool(config), "Either `config_filepath` or `config` should be provided" - self.default_args_config_path = default_args_config_path + if config_filepath: DagFactory._validate_config_filepath(config_filepath=config_filepath) self.config: Dict[str, Any] = self._load_dag_config(config_filepath=config_filepath) if config: self.config: Dict[str, Any] = config + # These default args are a bit different; these are not the "default" structure that is applied to certain DAGs. + # These are in-fact the "default" default_args + if default_args_config_dict: + # Log a warning if the default_args parameter is specified. If both the default_args and + # default_args_file_path are passed, we'll throw an exception. + logging.warning( + "Manually specifying `default_args_config_dict` will override the values in the `defaults.yml` file." + ) + + if default_args_config_path != airflow_conf.get("core", "dags_folder"): + raise DagFactoryException("Cannot pass both `default_args_config_dict` and `default_args_config_path`.") + + # We'll still go ahead and set both values. They'll be referenced in _global_default_args. + self.default_args_config_path: str = default_args_config_path + self.default_args_config_dict: Optional[dict] = default_args_config_dict + def _load_yaml_config(self, config_filepath: str) -> Dict[str, Any]: """For loading yaml config file, including DAG config and default args config.""" @@ -74,8 +96,14 @@ def __and(loader: yaml.FullLoader, node: yaml.Node) -> str: config = cast_with_type(config) return config - def _global_default_args(self) -> Optional[Dict[str, Any]]: - """If a defaults.yml exists, use this as the global default arguments (to be applied to each DAG).""" + def _global_default_args(self): + """ + If self.default_args exists, use this as the global default_args (to be applied to each DAG). Otherwise, fall + back to the defaults.yml file. + """ + if self.default_args_config_dict: + return self.default_args_config_dict + default_args_yml = Path(self.default_args_config_path) / "defaults.yml" if default_args_yml.exists(): diff --git a/dev/dags/example_dag_factory_default_config_dict.py b/dev/dags/example_dag_factory_default_config_dict.py new file mode 100644 index 00000000..bfe7ed8f --- /dev/null +++ b/dev/dags/example_dag_factory_default_config_dict.py @@ -0,0 +1,19 @@ +import os +from pathlib import Path + +# The following import is here so Airflow parses this file +# from airflow import DAG +import dagfactory + +DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" +CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) + +config_file = str(CONFIG_ROOT_DIR / "example_dag_factory_default_config_dict.yml") + +example_dag_factory = dagfactory.DagFactory( + config_file, default_args_config_dict={"default_args": {"start_date": "2025-01-01", "owner": "global_owner"}} +) + +# Creating task dependencies +example_dag_factory.clean_dags(globals()) +example_dag_factory.generate_dags(globals()) diff --git a/dev/dags/example_dag_factory_default_config_dict.yml b/dev/dags/example_dag_factory_default_config_dict.yml new file mode 100644 index 00000000..854384e5 --- /dev/null +++ b/dev/dags/example_dag_factory_default_config_dict.yml @@ -0,0 +1,16 @@ +daily_etl: + schedule: "@daily" + tasks: + extract: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo extract" + transform: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo transform" + dependencies: + - extract + load: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo load" + dependencies: + - transform diff --git a/docs/configuration/defaults.md b/docs/configuration/defaults.md index bf755141..ff539023 100644 --- a/docs/configuration/defaults.md +++ b/docs/configuration/defaults.md @@ -59,16 +59,23 @@ single DAG. ### Example using of default block for dynamic DAG generation -Not only can the `default` block in a YAML file be used to define `default_args` for one or more DAGs; it can also be -used to create the skeleton of "templated" DAGs. In the example below, the `default` block is used to define not only -the `default_args` of a DAG, but also default Tasks. These Tasks provide a "template" for the DAGs defined in this file. -Each DAG (`machine_learning`, `data_science`, `artificial_intelligence`) will be defined using the values from the -`default` block, and like with `default_args`, can override these values. **This is a powerful way to use DAG Factory -to dynamically create DAGs using a single configuration.** + Not only can the `default` block in a YAML file be used to define `default_args` for one or more DAGs; it can also be + used to create the skeleton of "templated" DAGs. In the example below, the `default` block is used to define not only + the `default_args` of a DAG, but also default Tasks. These Tasks provide a "template" for the DAGs defined in this + file. Each DAG (`machine_learning`, `data_science`, `artificial_intelligence`) will be defined using the values from + the `default` block, and like with `default_args`, can override these values. **This is a powerful way to use DAG + Factory to dynamically create DAGs using a single configuration.** + + ```yaml title="Usage of default block in YAML" + --8<-- "dev/dags/example_dag_factory_default_config.yml" + ``` +### Specifying `default_args` in a `.py` file -```yaml title="Usage of default block in YAML" ---8<-- "dev/dags/example_dag_factory_default_config.yml" -``` + In the `.py` used to instantiate a DAG defined using YAML, default arguments in the form of a Python dictionary can + be set using the `default_args_config_dict` parameter in the `DAGFactory` class. This mirrors the functionality of + manually specifying a `default_args_config_ypath` in the `DAGFactory` class. -Currently, only `default_args` can be specified using the `defaults.yml` file. + ```python title="Usage of default_args_config_dict in .py file" + --8<-- "dev/dags/example_dag_factory_default_config_dict.py:13:19" + ``` diff --git a/tests/test_dagfactory.py b/tests/test_dagfactory.py index 797f9e21..8d20ca13 100644 --- a/tests/test_dagfactory.py +++ b/tests/test_dagfactory.py @@ -490,6 +490,17 @@ def test_build_dag_with_global_default(): assert dags.get("example_dag").tasks[0].depends_on_past == True +def test_build_dag_with_global_default_dict(): + dags = dagfactory.DagFactory( + config=DAG_FACTORY_CONFIG, + default_args_config_dict={ + "default_args": {"start_date": "2025-01-01", "owner": "global_owner", "depends_on_past": True} + }, + ).build_dags() + + assert dags.get("example_dag").tasks[0].depends_on_past == True + + def test_load_invalid_yaml_logs_error(caplog): caplog.set_level(logging.ERROR) load_yaml_dags( diff --git a/uv.lock b/uv.lock index 2268a564..1d681e6f 100644 --- a/uv.lock +++ b/uv.lock @@ -959,7 +959,7 @@ dev = [ [package.metadata] requires-dist = [ - { name = "apache-airflow", specifier = ">=2.3" }, + { name = "apache-airflow", specifier = ">=2.4" }, { name = "apache-airflow-providers-cncf-kubernetes" }, { name = "apache-airflow-providers-http", specifier = ">=2.0.0" }, { name = "apache-airflow-providers-slack", marker = "extra == 'tests'" }, @@ -1058,7 +1058,7 @@ name = "exceptiongroup" version = "1.3.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "typing-extensions", marker = "python_full_version < '3.12'" }, + { name = "typing-extensions", marker = "python_full_version < '3.11'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/0b/9f/a65090624ecf468cdca03533906e7c69ed7588582240cfe7cc9e770b50eb/exceptiongroup-1.3.0.tar.gz", hash = "sha256:b241f5885f560bc56a59ee63ca4c6a8bfa46ae4ad651af316d4e81817bb9fd88", size = 29749, upload-time = "2025-05-10T17:42:51.123Z" } wheels = [