Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions docs/configuration/schedule.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Scheduling

- *Released in version: 0.23.0*

DAG-Factory offers flexible scheduling options so your DAGs can run on time or based on data availability. Whether you're triggering DAGs on a cron schedule, a time delta, or when an upstream asset is ready, you can configure it easily using the schedule field in your YAML.
Comment thread
pankajastro marked this conversation as resolved.

Below are the supported scheduling types, each with consistent structure and examples to help you get started.

## How to Use

- Every schedule block must specify a type, such as `cron`, `timedelta`, `relativedelta`, `timetable`, or `assets`.
- The actual configuration goes under the value key.
- Only one schedule type should be defined per DAG.

## Example Overview

| Type | Description | Use Case Example |
|-----------------|---------------------------------------|-------------------------------------|
| `cron` | Run based on a cron string | Every day at midnight |
| `timedelta` | Fixed intervals between runs | Every 6 hours |
| `relativedelta` | Calendar-aware schedule (e.g. months) | Every 1st of the month |
| `timetable` | Advanced Airflow timetables | Custom trigger logic |
| `assets` | Trigger based on asset readiness | When data `X` and `Y` are available |
Comment thread
pankajastro marked this conversation as resolved.
| `datasets` | Trigger based on datasets readiness | When data `X` and `Y` are available |

## Schema Options

### 1. Cron-Based Schedule

```yaml title="Corn Schedule"
--8<-- "tests/schedule/cron.yml"
```

Or,

```yaml title="Corn Schedule"
--8<-- "tests/schedule/cron_dict.yml"
```

### 2. Timedelta Schedule

```yaml title="Timedelta Schedule"
--8<-- "tests/schedule/timedelta.yml"
```

### 3. RelativeDelta Schedule

```yaml title="Relativedelta Schedule"
--8<-- "tests/schedule/relativedelta.yml"
```

## 4. Timetable (Advanced Scheduling)

```yaml title="Timetable Schedule"
--8<-- "tests/schedule/timetable.yml"
```

### 5. Asset-Based Triggering

#### OR (default when list is provided)

```yaml title="OR Condition"
--8<-- "tests/schedule/list_asset.yml"
```

```yaml title="OR Condition"
--8<-- "tests/schedule/or_asset.yml"
```

#### AND (explicit composition)

```yaml title="AND Condition"
--8<-- "tests/schedule/and_asset.yml"
```

#### Nested And Or Condition

```yaml title="Nested AND OR Condition"
--8<-- "tests/schedule/nested_asset.yml"
```

#### With Watchers

```yaml title="Assert with watcher"
--8<-- "tests/schedule/asset_with_watcher.yml"
```
Comment thread
pankajastro marked this conversation as resolved.

### 6. Datasets-Based Triggering

```yaml
schedule: [ 's3://bucket_example/raw/dataset1.json', 's3://bucket_example/raw/dataset2.json' ]
```
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ nav:
- configuration/configuring_workflows.md
- configuration/environment_variables.md
- configuration/defaults.md
- configuration/schedule.md
- Features:
- features/dynamic_tasks.md
- features/datasets.md
Expand Down
10 changes: 10 additions & 0 deletions tests/schedule/and_asset.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
schedule:
type: assets
value:
and:
- uri: s3://dag1/output_1.txt
extra:
hi: bye
- uri: s3://dag2/output_1.txt
extra:
hi: bye
13 changes: 13 additions & 0 deletions tests/schedule/asset_with_watcher.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
schedule:
type: asset
value:
- uri: s3://dag1/output_1.txt
extra:
hi: bye
watchers:
- callable: airflow.sdk.AssetWatcher
name: test_asset_watcher
trigger:
callable: airflow.providers.standard.triggers.file.FileDeleteTrigger
params:
filepath: "/temp/file.txt"
1 change: 1 addition & 0 deletions tests/schedule/cron.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
schedule: '@daily'
3 changes: 3 additions & 0 deletions tests/schedule/cron_dict.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
schedule:
type: cron
value: "0 0 * * *"
9 changes: 9 additions & 0 deletions tests/schedule/list_asset.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
schedule:
type: assets
value:
- uri: s3://dag1/output_1.txt
extra:
hi: bye
- uri: s3://dag2/output_1.txt
extra:
hi: bye
14 changes: 14 additions & 0 deletions tests/schedule/nested_asset.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
schedule:
type: assets
value:
or:
- and:
- uri: s3://dag1/output_1.txt
extra:
hi: bye
- uri: s3://dag2/output_1.txt
extra:
hi: bye
- uri: s3://dag3/output_3.txt
extra:
hi: bye
10 changes: 10 additions & 0 deletions tests/schedule/or_asset.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
schedule:
type: assets
value:
or:
- uri: s3://dag1/output_1.txt
extra:
hi: bye
- uri: s3://dag2/output_1.txt
extra:
hi: bye
4 changes: 4 additions & 0 deletions tests/schedule/relativedelta.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
schedule:
type: relativedelta
value:
month: 1
4 changes: 4 additions & 0 deletions tests/schedule/timedelta.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
schedule:
type: timedelta
value:
seconds: 30
7 changes: 7 additions & 0 deletions tests/schedule/timetable.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
schedule:
type: timetable
value:
callable: airflow.timetables.trigger.CronTriggerTimetable
params:
cron: "* * * * *"
timezone: UTC
113 changes: 23 additions & 90 deletions tests/test_dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
get_schedule_key,
get_sql_sensor_path,
one_hour_ago,
read_yml,
)

try:
Expand Down Expand Up @@ -62,6 +63,7 @@
from airflow.models import MappedOperator

here = Path(__file__).parent
schedule_path = here / "schedule"

PROJECT_ROOT_PATH = str(here.parent)
UTC = pendulum.timezone("UTC")
Expand Down Expand Up @@ -1161,15 +1163,8 @@ class TestSchedule:
def test_asset_schedule_list_of_assets(self):
from airflow.sdk import Asset

yaml_str = """
- uri: s3://dag1/output_1.txt
extra:
hi: bye
- uri: s3://dag2/output_1.txt
extra:
hi: bye
"""
data = yaml.safe_load(yaml_str)
schedule_data = read_yml(schedule_path / "list_asset.yml")
data = schedule_data["schedule"]["value"]
parsed_schedule = DagBuilder._asset_schedule(data)

expected = [
Expand All @@ -1193,16 +1188,8 @@ def test_asset_schedule_list_of_assets(self):
def test_asset_schedule_with_and_operator(self):
from airflow.sdk import Asset, AssetAll

yaml_str = """
and:
- uri: s3://dag1/output_1.txt
extra:
hi: bye
- uri: s3://dag2/output_1.txt
extra:
hi: bye
"""
data = yaml.safe_load(yaml_str)
schedule_data = read_yml(schedule_path / "and_asset.yml")
data = schedule_data["schedule"]["value"]
parsed_schedule = DagBuilder._asset_schedule(data)

expected = AssetAll(
Expand All @@ -1226,16 +1213,9 @@ def test_asset_schedule_with_and_operator(self):
def test_asset_schedule_with_or_operator(self):
from airflow.sdk import Asset, AssetAny

yaml_str = """
or:
- uri: s3://dag1/output_1.txt
extra:
hi: bye
- uri: s3://dag2/output_1.txt
extra:
hi: bye
"""
data = yaml.safe_load(yaml_str)
schedule_data = read_yml(schedule_path / "or_asset.yml")
data = schedule_data["schedule"]["value"]

parsed_schedule = DagBuilder._asset_schedule(data)

expected = AssetAny(
Expand All @@ -1259,20 +1239,9 @@ def test_asset_schedule_with_or_operator(self):
def test_asset_schedule_with_nested_operators(self):
from airflow.sdk import Asset, AssetAll, AssetAny

yaml_str = """
or:
- and:
- uri: s3://dag1/output_1.txt
extra:
hi: bye
- uri: s3://dag2/output_1.txt
extra:
hi: bye
- uri: s3://dag3/output_3.txt
extra:
hi: bye
"""
data = yaml.safe_load(yaml_str)
schedule_data = read_yml(schedule_path / "nested_asset.yml")
data = schedule_data["schedule"]["value"]

parsed_schedule = DagBuilder._asset_schedule(data)

expected = AssetAny(
Expand Down Expand Up @@ -1306,19 +1275,9 @@ def test_asset_schedule_with_watcher(self):
from airflow.providers.standard.triggers.file import FileDeleteTrigger
from airflow.sdk import Asset, AssetWatcher

yaml_str = """
- uri: s3://dag1/output_1.txt
extra:
hi: bye
watchers:
- callable: airflow.sdk.AssetWatcher
name: test_asset_watcher
trigger:
callable: airflow.providers.standard.triggers.file.FileDeleteTrigger
params:
filepath: "/temp/file.txt"
"""
data = yaml.safe_load(yaml_str)
schedule_data = read_yml(schedule_path / "asset_with_watcher.yml")
data = schedule_data["schedule"]["value"]

parsed_schedule = DagBuilder._asset_schedule(data)

expected = [
Expand All @@ -1335,7 +1294,7 @@ def test_asset_schedule_with_watcher(self):
],
)
]
assert parsed_schedule == expected
assert parsed_schedule.__eq__(expected)

def test_resolve_schedule_cron_string(self):
yaml_str = "schedule: '* * * * *'"
Expand All @@ -1344,58 +1303,32 @@ def test_resolve_schedule_cron_string(self):
assert schedule == "* * * * *"

def test_resolve_schedule_cron_string_alias(self):
yaml_str = "schedule: '@daily'"
data = yaml.safe_load(yaml_str)
data = read_yml(schedule_path / "cron.yml")
schedule = DagBuilder._resolve_schedule(data)
assert schedule == "@daily"

def test_resolve_schedule_cron_type_value(self):
yaml_str = """
schedule:
type: cron
value: "@daily"
"""
data = yaml.safe_load(yaml_str)
data = read_yml(schedule_path / "cron_dict.yml")
schedule = DagBuilder._resolve_schedule(data)
assert schedule == "@daily"
assert schedule == "0 0 * * *"

def test_resolve_schedule_timetable_type(self):
from airflow.timetables.trigger import CronTriggerTimetable

yaml_str = """
schedule:
type: timetable
value:
callable: airflow.timetables.trigger.CronTriggerTimetable
params:
cron: "* * * * *"
timezone: UTC
"""
data = yaml.safe_load(yaml_str)
data = read_yml(schedule_path / "timetable.yml")
schedule = DagBuilder._resolve_schedule(data)
assert schedule == CronTriggerTimetable(cron="* * * * *", timezone="UTC")

def test_resolve_schedule_timedelta_type(self):
yaml_str = """
schedule:
type: timedelta
value:
seconds: 30
"""
data = yaml.safe_load(yaml_str)

data = read_yml(schedule_path / "timedelta.yml")
schedule = DagBuilder._resolve_schedule(data)
assert schedule == datetime.timedelta(seconds=30)

def test_resolve_schedule_relativedelta_type(self):
from dateutil.relativedelta import relativedelta

yaml_str = """
schedule:
type: relativedelta
value:
month: 1
"""
data = yaml.safe_load(yaml_str)
data = read_yml(schedule_path / "relativedelta.yml")
schedule = DagBuilder._resolve_schedule(data)
assert schedule == relativedelta(month=1)

Expand Down
6 changes: 6 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import datetime
from typing import Any

import yaml
from airflow.configuration import secrets_backend_list
from airflow.exceptions import AirflowSkipException
from airflow.models.dag import DAG
Expand Down Expand Up @@ -231,3 +232,8 @@ def get_sql_sensor_path():
return "airflow.sensors.sql_sensor.SqlSensor"
else:
return "airflow.providers.common.sql.sensors.sql.SqlSensor"


def read_yml(path):
with open(path, "r") as file:
return yaml.safe_load(file)