Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dagfactory/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from .dagfactory import DagFactory, load_yaml_dags

__version__ = "0.23.0a4"
__version__ = "0.23.0a5"
__all__ = [
"DagFactory",
"load_yaml_dags",
Expand Down
96 changes: 90 additions & 6 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
import warnings
from copy import deepcopy
from datetime import datetime, timedelta
from functools import partial
from functools import partial, reduce
from typing import Any, Callable, Dict, List, Tuple, Union

from airflow import DAG, configuration
from airflow.models import BaseOperator, Variable
from airflow.utils.module_loading import import_string
from dateutil.relativedelta import relativedelta
from packaging import version

from dagfactory.constants import AIRFLOW3_MAJOR_VERSION
Expand Down Expand Up @@ -729,6 +730,93 @@ def process_file_with_datasets(file: str, datasets_conditions: str) -> Any:
datasets_uri = utils.get_datasets_uri_yaml_file(file, list(dataset_map.keys()))
return [Dataset(uri) for uri in datasets_uri]

@staticmethod
def _init_watchers(watchers_data):
"""Initialize watcher objects from configuration."""
from dagfactory.utils import _import_from_string

watchers = []
for watcher in watchers_data:
watcher_class = _import_from_string(watcher["callable"])
trigger_data = watcher.get("trigger", {})
trigger_class = _import_from_string(trigger_data.get("callable"))
trigger_params = trigger_data.get("params", {})
watchers.append(watcher_class(name=watcher.get("name"), trigger=trigger_class(**trigger_params)))
return watchers

@staticmethod
def _init_asset(asset_dict: dict):
from airflow.sdk import Asset

"""Initialize an Asset from its configuration dictionary."""
if "watchers" in asset_dict:
asset_dict["watchers"] = DagBuilder._init_watchers(asset_dict["watchers"])
return Asset(**asset_dict)

@staticmethod
def _combine_assets(assets, op: str):
"""Combine a list of Asset objects using logical operators."""
if op == "or":
return reduce(lambda a, b: a | b, assets)
elif op == "and":
return reduce(lambda a, b: a & b, assets)
else:
raise ValueError(f"Unknown operator: {op}")

@staticmethod
def _asset_schedule(value):
"""Recursively parse and construct assets or combinations of assets."""
if isinstance(value, dict):
if "or" in value:
assets = [DagBuilder._asset_schedule(item) for item in value["or"]]
return DagBuilder._combine_assets(assets, "or")
elif "and" in value:
assets = [DagBuilder._asset_schedule(item) for item in value["and"]]
return DagBuilder._combine_assets(assets, "and")
elif "uri" in value:
return DagBuilder._init_asset(value)
else:
raise ValueError(f"Invalid asset entry: {value}")
elif isinstance(value, list):
return [DagBuilder._init_asset(asset) for asset in value]
else:
raise TypeError(f"Unexpected data type: {type(value)}")

@staticmethod
def _resolve_schedule(dag_params):
schedule = dag_params.get("schedule")
Comment thread
pankajastro marked this conversation as resolved.
Outdated
if schedule is None:
return None

if isinstance(schedule, str) and schedule.strip().lower() == "none":
return None

# Case 1: If schedule is a string, return it directly
if isinstance(schedule, str):
return schedule

# Case 2: If schedule is a dictionary
if isinstance(schedule, dict):
schedule_type = schedule.get("type")
value = schedule.get("value")

dispatch = {
"cron": lambda v: v,
"timetable": lambda v: DagBuilder.make_timetable(v.get("callable"), v.get("params", {})),
"timedelta": lambda v: timedelta(**v),
"relativedelta": lambda v: relativedelta(**v),
"assets": lambda v: DagBuilder._asset_schedule(v),
}

try:
handler = dispatch[schedule_type]
except KeyError:
raise DagFactoryException(f"Schedule type {schedule_type} is not supported.")

return handler(value)

raise DagFactoryException(f"Unexpected value for schedule: {schedule}")

@staticmethod
def configure_schedule(dag_params: Dict[str, Any], dag_kwargs: Dict[str, Any]) -> None:
"""
Expand Down Expand Up @@ -774,11 +862,7 @@ def configure_schedule(dag_params: Dict[str, Any], dag_kwargs: Dict[str, Any]) -
if has_datasets_attr:
schedule.pop("datasets")
else:
schedule = dag_params.get("schedule")
if schedule.strip().lower() == "none":
dag_kwargs["schedule"] = None
else:
dag_kwargs["schedule"] = schedule
dag_kwargs["schedule"] = DagBuilder._resolve_schedule(dag_params)

# pylint: disable=too-many-locals
def build(self) -> Dict[str, Union[str, DAG]]:
Expand Down
8 changes: 8 additions & 0 deletions dagfactory/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import sys
import types
from datetime import date, datetime, timedelta
from importlib import import_module
from pathlib import Path
from typing import Any, AnyStr, Dict, List, Match, Optional, Pattern, Tuple, Union

Expand All @@ -18,6 +19,13 @@
from dagfactory.exceptions import DagFactoryException


def _import_from_string(dotted_path: str):
"""Import a class or function from a dotted path string."""
module_path, _, attr = dotted_path.rpartition(".")
module = import_module(module_path)
return getattr(module, attr)


def get_datetime(date_value: Union[str, datetime, date], timezone: str = "UTC") -> datetime:
"""
Takes value from DAG config and generates valid datetime. Defaults to
Expand Down
Loading