diff --git a/docs/configuration/schedule.md b/docs/configuration/schedule.md new file mode 100644 index 00000000..a2e6ccee --- /dev/null +++ b/docs/configuration/schedule.md @@ -0,0 +1,92 @@ +# Scheduling + +- *Released in version: 0.23.0* + +DAG-Factory offers flexible scheduling options so your DAGs can run on time or based on data availability. Whether you're triggering DAGs on a cron schedule, a time delta, or when an upstream asset is ready, you can configure it easily using the schedule field in your YAML. + +Below are the supported scheduling types, each with consistent structure and examples to help you get started. + +## How to Use + +- Every schedule block must specify a type, such as `cron`, `timedelta`, `relativedelta`, `timetable`, or `assets`. +- The actual configuration goes under the value key. +- Only one schedule type should be defined per DAG. + +## Example Overview + +| Type | Description | Use Case Example | +|-----------------|---------------------------------------|-------------------------------------| +| `cron` | Run based on a cron string | Every day at midnight | +| `timedelta` | Fixed intervals between runs | Every 6 hours | +| `relativedelta` | Calendar-aware schedule (e.g. months) | Every 1st of the month | +| `timetable` | Advanced Airflow timetables | Custom trigger logic | +| `assets` | Trigger based on asset readiness | When data `X` and `Y` are available | +| `datasets` | Trigger based on datasets readiness | When data `X` and `Y` are available | + +## Schema Options + +### 1. Cron-Based Schedule + +```yaml title="Corn Schedule" +--8<-- "tests/schedule/cron.yml" +``` + +Or, + +```yaml title="Corn Schedule" +--8<-- "tests/schedule/cron_dict.yml" +``` + +### 2. Timedelta Schedule + +```yaml title="Timedelta Schedule" +--8<-- "tests/schedule/timedelta.yml" +``` + +### 3. RelativeDelta Schedule + +```yaml title="Relativedelta Schedule" +--8<-- "tests/schedule/relativedelta.yml" +``` + +## 4. Timetable (Advanced Scheduling) + +```yaml title="Timetable Schedule" +--8<-- "tests/schedule/timetable.yml" +``` + +### 5. Asset-Based Triggering + +#### OR (default when list is provided) + +```yaml title="OR Condition" +--8<-- "tests/schedule/list_asset.yml" +``` + +```yaml title="OR Condition" +--8<-- "tests/schedule/or_asset.yml" +``` + +#### AND (explicit composition) + +```yaml title="AND Condition" +--8<-- "tests/schedule/and_asset.yml" +``` + +#### Nested And Or Condition + +```yaml title="Nested AND OR Condition" +--8<-- "tests/schedule/nested_asset.yml" +``` + +#### With Watchers + +```yaml title="Assert with watcher" +--8<-- "tests/schedule/asset_with_watcher.yml" +``` + +### 6. Datasets-Based Triggering + +```yaml +schedule: [ 's3://bucket_example/raw/dataset1.json', 's3://bucket_example/raw/dataset2.json' ] +``` diff --git a/mkdocs.yml b/mkdocs.yml index 27c30700..aa03a9af 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -52,6 +52,7 @@ nav: - configuration/configuring_workflows.md - configuration/environment_variables.md - configuration/defaults.md + - configuration/schedule.md - Features: - features/dynamic_tasks.md - features/datasets.md diff --git a/tests/schedule/and_asset.yml b/tests/schedule/and_asset.yml new file mode 100644 index 00000000..bdd02b28 --- /dev/null +++ b/tests/schedule/and_asset.yml @@ -0,0 +1,10 @@ +schedule: + type: assets + value: + and: + - uri: s3://dag1/output_1.txt + extra: + hi: bye + - uri: s3://dag2/output_1.txt + extra: + hi: bye diff --git a/tests/schedule/asset_with_watcher.yml b/tests/schedule/asset_with_watcher.yml new file mode 100644 index 00000000..5c91991b --- /dev/null +++ b/tests/schedule/asset_with_watcher.yml @@ -0,0 +1,13 @@ +schedule: + type: asset + value: + - uri: s3://dag1/output_1.txt + extra: + hi: bye + watchers: + - callable: airflow.sdk.AssetWatcher + name: test_asset_watcher + trigger: + callable: airflow.providers.standard.triggers.file.FileDeleteTrigger + params: + filepath: "/temp/file.txt" diff --git a/tests/schedule/cron.yml b/tests/schedule/cron.yml new file mode 100644 index 00000000..99bd853d --- /dev/null +++ b/tests/schedule/cron.yml @@ -0,0 +1 @@ +schedule: '@daily' diff --git a/tests/schedule/cron_dict.yml b/tests/schedule/cron_dict.yml new file mode 100644 index 00000000..396ce7fa --- /dev/null +++ b/tests/schedule/cron_dict.yml @@ -0,0 +1,3 @@ +schedule: + type: cron + value: "0 0 * * *" diff --git a/tests/schedule/list_asset.yml b/tests/schedule/list_asset.yml new file mode 100644 index 00000000..b63b8d46 --- /dev/null +++ b/tests/schedule/list_asset.yml @@ -0,0 +1,9 @@ +schedule: + type: assets + value: + - uri: s3://dag1/output_1.txt + extra: + hi: bye + - uri: s3://dag2/output_1.txt + extra: + hi: bye diff --git a/tests/schedule/nested_asset.yml b/tests/schedule/nested_asset.yml new file mode 100644 index 00000000..d645e89e --- /dev/null +++ b/tests/schedule/nested_asset.yml @@ -0,0 +1,14 @@ +schedule: + type: assets + value: + or: + - and: + - uri: s3://dag1/output_1.txt + extra: + hi: bye + - uri: s3://dag2/output_1.txt + extra: + hi: bye + - uri: s3://dag3/output_3.txt + extra: + hi: bye diff --git a/tests/schedule/or_asset.yml b/tests/schedule/or_asset.yml new file mode 100644 index 00000000..064160a8 --- /dev/null +++ b/tests/schedule/or_asset.yml @@ -0,0 +1,10 @@ +schedule: + type: assets + value: + or: + - uri: s3://dag1/output_1.txt + extra: + hi: bye + - uri: s3://dag2/output_1.txt + extra: + hi: bye diff --git a/tests/schedule/relativedelta.yml b/tests/schedule/relativedelta.yml new file mode 100644 index 00000000..6aa19588 --- /dev/null +++ b/tests/schedule/relativedelta.yml @@ -0,0 +1,4 @@ +schedule: + type: relativedelta + value: + month: 1 diff --git a/tests/schedule/timedelta.yml b/tests/schedule/timedelta.yml new file mode 100644 index 00000000..bb3182d1 --- /dev/null +++ b/tests/schedule/timedelta.yml @@ -0,0 +1,4 @@ +schedule: + type: timedelta + value: + seconds: 30 diff --git a/tests/schedule/timetable.yml b/tests/schedule/timetable.yml new file mode 100644 index 00000000..ddd4c568 --- /dev/null +++ b/tests/schedule/timetable.yml @@ -0,0 +1,7 @@ +schedule: + type: timetable + value: + callable: airflow.timetables.trigger.CronTriggerTimetable + params: + cron: "* * * * *" + timezone: UTC diff --git a/tests/test_dagbuilder.py b/tests/test_dagbuilder.py index a20cf588..9ee4c39e 100644 --- a/tests/test_dagbuilder.py +++ b/tests/test_dagbuilder.py @@ -23,6 +23,7 @@ get_schedule_key, get_sql_sensor_path, one_hour_ago, + read_yml, ) try: @@ -62,6 +63,7 @@ from airflow.models import MappedOperator here = Path(__file__).parent +schedule_path = here / "schedule" PROJECT_ROOT_PATH = str(here.parent) UTC = pendulum.timezone("UTC") @@ -1161,15 +1163,8 @@ class TestSchedule: def test_asset_schedule_list_of_assets(self): from airflow.sdk import Asset - yaml_str = """ - - uri: s3://dag1/output_1.txt - extra: - hi: bye - - uri: s3://dag2/output_1.txt - extra: - hi: bye - """ - data = yaml.safe_load(yaml_str) + schedule_data = read_yml(schedule_path / "list_asset.yml") + data = schedule_data["schedule"]["value"] parsed_schedule = DagBuilder._asset_schedule(data) expected = [ @@ -1193,16 +1188,8 @@ def test_asset_schedule_list_of_assets(self): def test_asset_schedule_with_and_operator(self): from airflow.sdk import Asset, AssetAll - yaml_str = """ - and: - - uri: s3://dag1/output_1.txt - extra: - hi: bye - - uri: s3://dag2/output_1.txt - extra: - hi: bye - """ - data = yaml.safe_load(yaml_str) + schedule_data = read_yml(schedule_path / "and_asset.yml") + data = schedule_data["schedule"]["value"] parsed_schedule = DagBuilder._asset_schedule(data) expected = AssetAll( @@ -1226,16 +1213,9 @@ def test_asset_schedule_with_and_operator(self): def test_asset_schedule_with_or_operator(self): from airflow.sdk import Asset, AssetAny - yaml_str = """ - or: - - uri: s3://dag1/output_1.txt - extra: - hi: bye - - uri: s3://dag2/output_1.txt - extra: - hi: bye - """ - data = yaml.safe_load(yaml_str) + schedule_data = read_yml(schedule_path / "or_asset.yml") + data = schedule_data["schedule"]["value"] + parsed_schedule = DagBuilder._asset_schedule(data) expected = AssetAny( @@ -1259,20 +1239,9 @@ def test_asset_schedule_with_or_operator(self): def test_asset_schedule_with_nested_operators(self): from airflow.sdk import Asset, AssetAll, AssetAny - yaml_str = """ - or: - - and: - - uri: s3://dag1/output_1.txt - extra: - hi: bye - - uri: s3://dag2/output_1.txt - extra: - hi: bye - - uri: s3://dag3/output_3.txt - extra: - hi: bye - """ - data = yaml.safe_load(yaml_str) + schedule_data = read_yml(schedule_path / "nested_asset.yml") + data = schedule_data["schedule"]["value"] + parsed_schedule = DagBuilder._asset_schedule(data) expected = AssetAny( @@ -1306,19 +1275,9 @@ def test_asset_schedule_with_watcher(self): from airflow.providers.standard.triggers.file import FileDeleteTrigger from airflow.sdk import Asset, AssetWatcher - yaml_str = """ - - uri: s3://dag1/output_1.txt - extra: - hi: bye - watchers: - - callable: airflow.sdk.AssetWatcher - name: test_asset_watcher - trigger: - callable: airflow.providers.standard.triggers.file.FileDeleteTrigger - params: - filepath: "/temp/file.txt" - """ - data = yaml.safe_load(yaml_str) + schedule_data = read_yml(schedule_path / "asset_with_watcher.yml") + data = schedule_data["schedule"]["value"] + parsed_schedule = DagBuilder._asset_schedule(data) expected = [ @@ -1335,7 +1294,7 @@ def test_asset_schedule_with_watcher(self): ], ) ] - assert parsed_schedule == expected + assert parsed_schedule.__eq__(expected) def test_resolve_schedule_cron_string(self): yaml_str = "schedule: '* * * * *'" @@ -1344,58 +1303,32 @@ def test_resolve_schedule_cron_string(self): assert schedule == "* * * * *" def test_resolve_schedule_cron_string_alias(self): - yaml_str = "schedule: '@daily'" - data = yaml.safe_load(yaml_str) + data = read_yml(schedule_path / "cron.yml") schedule = DagBuilder._resolve_schedule(data) assert schedule == "@daily" def test_resolve_schedule_cron_type_value(self): - yaml_str = """ - schedule: - type: cron - value: "@daily" - """ - data = yaml.safe_load(yaml_str) + data = read_yml(schedule_path / "cron_dict.yml") schedule = DagBuilder._resolve_schedule(data) - assert schedule == "@daily" + assert schedule == "0 0 * * *" def test_resolve_schedule_timetable_type(self): from airflow.timetables.trigger import CronTriggerTimetable - yaml_str = """ - schedule: - type: timetable - value: - callable: airflow.timetables.trigger.CronTriggerTimetable - params: - cron: "* * * * *" - timezone: UTC - """ - data = yaml.safe_load(yaml_str) + data = read_yml(schedule_path / "timetable.yml") schedule = DagBuilder._resolve_schedule(data) assert schedule == CronTriggerTimetable(cron="* * * * *", timezone="UTC") def test_resolve_schedule_timedelta_type(self): - yaml_str = """ - schedule: - type: timedelta - value: - seconds: 30 - """ - data = yaml.safe_load(yaml_str) + + data = read_yml(schedule_path / "timedelta.yml") schedule = DagBuilder._resolve_schedule(data) assert schedule == datetime.timedelta(seconds=30) def test_resolve_schedule_relativedelta_type(self): from dateutil.relativedelta import relativedelta - yaml_str = """ - schedule: - type: relativedelta - value: - month: 1 - """ - data = yaml.safe_load(yaml_str) + data = read_yml(schedule_path / "relativedelta.yml") schedule = DagBuilder._resolve_schedule(data) assert schedule == relativedelta(month=1) diff --git a/tests/utils.py b/tests/utils.py index 43c48449..4b2bc600 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -5,6 +5,7 @@ from datetime import datetime from typing import Any +import yaml from airflow.configuration import secrets_backend_list from airflow.exceptions import AirflowSkipException from airflow.models.dag import DAG @@ -231,3 +232,8 @@ def get_sql_sensor_path(): return "airflow.sensors.sql_sensor.SqlSensor" else: return "airflow.providers.common.sql.sensors.sql.SqlSensor" + + +def read_yml(path): + with open(path, "r") as file: + return yaml.safe_load(file)