Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ def __str__(self) -> str:
return f"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}"


class AirflowDagInconsistent(AirflowException):
"""Raise when a DAG has inconsistent attributes."""


class AirflowClusterPolicyViolation(AirflowException):
"""Raise when there is a violation of a Cluster Policy in DAG definition."""

Expand Down
43 changes: 42 additions & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
from airflow import settings, utils
from airflow.compat.functools import cached_property
from airflow.configuration import conf
from airflow.exceptions import AirflowException, DuplicateTaskIdFound, TaskNotFound
from airflow.exceptions import AirflowDagInconsistent, AirflowException, DuplicateTaskIdFound, TaskNotFound
from airflow.models.abstractoperator import AbstractOperator
from airflow.models.base import ID_LEN, Base
from airflow.models.dagbag import DagBag
Expand Down Expand Up @@ -483,6 +483,47 @@ def __init__(
self._task_group = TaskGroup.create_root(self)
self.validate_schedule_and_params()

def _check_schedule_interval_matches_timetable(self) -> bool:
"""Check ``schedule_interval`` and ``timetable`` match.

This is done as a part of the DAG validation done before it's bagged, to
guard against the DAG's ``timetable`` (or ``schedule_interval``) from
being changed after it's created, e.g.

.. code-block:: python

dag1 = DAG("d1", timetable=MyTimetable())
dag1.schedule_interval = "@once"

dag2 = DAG("d2", schedule_interval="@once")
dag2.timetable = MyTimetable()

Validation is done by creating a timetable and check its summary matches
``schedule_interval``. The logic is not bullet-proof, especially if a
custom timetable does not provide a useful ``summary``. But this is the
best we can do.
"""
if self.schedule_interval == self.timetable.summary:
return True
try:
timetable = create_timetable(self.schedule_interval, self.timezone)
except ValueError:
return False
return timetable.summary == self.timetable.summary

def validate(self):
"""Validate the DAG has a coherent setup.

This is called by the DAG bag before bagging the DAG.
"""
if not self._check_schedule_interval_matches_timetable():
raise AirflowDagInconsistent(
f"inconsistent schedule: timetable {self.timetable.summary!r} "
f"does not match schedule_interval {self.schedule_interval!r}",
)
self.params.validate()
self.timetable.validate()

def __repr__(self):
return f"<DAG: {self.dag_id}>"

Expand Down
13 changes: 7 additions & 6 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
AirflowClusterPolicyViolation,
AirflowDagCycleException,
AirflowDagDuplicatedIdException,
AirflowDagInconsistent,
AirflowTimetableInvalid,
ParamValidationError,
)
Expand Down Expand Up @@ -402,25 +403,25 @@ def _process_modules(self, filepath, mods, file_last_changed_on_disk):
for (dag, mod) in top_level_dags:
dag.fileloc = mod.__file__
try:
dag.timetable.validate()
# validate dag params
dag.params.validate()
dag.validate()
self.bag_dag(dag=dag, root_dag=dag)
found_dags.append(dag)
found_dags += dag.subdags
except AirflowTimetableInvalid as exception:
self.log.exception("Failed to bag_dag: %s", dag.fileloc)
self.import_errors[dag.fileloc] = f"Invalid timetable expression: {exception}"
self.file_last_changed[dag.fileloc] = file_last_changed_on_disk
except (
AirflowClusterPolicyViolation,
AirflowDagCycleException,
AirflowDagDuplicatedIdException,
AirflowClusterPolicyViolation,
AirflowDagInconsistent,
ParamValidationError,
) as exception:
self.log.exception("Failed to bag_dag: %s", dag.fileloc)
self.import_errors[dag.fileloc] = str(exception)
self.file_last_changed[dag.fileloc] = file_last_changed_on_disk
else:
found_dags.append(dag)
found_dags += dag.subdags
return found_dags

def bag_dag(self, dag, root_dag):
Expand Down
26 changes: 26 additions & 0 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2249,6 +2249,32 @@ def return_num(num):
assert dag.params['value'] == value


@pytest.mark.parametrize("timetable", [NullTimetable(), OnceTimetable()])
def test_dag_timetable_match_schedule_interval(timetable):
dag = DAG("my-dag", timetable=timetable)
assert dag._check_schedule_interval_matches_timetable()


@pytest.mark.parametrize("schedule_interval", [None, "@once", "@daily", timedelta(days=1)])
def test_dag_schedule_interval_match_timetable(schedule_interval):
dag = DAG("my-dag", schedule_interval=schedule_interval)
assert dag._check_schedule_interval_matches_timetable()


@pytest.mark.parametrize("schedule_interval", [None, "@daily", timedelta(days=1)])
def test_dag_schedule_interval_change_after_init(schedule_interval):
dag = DAG("my-dag", timetable=OnceTimetable())
dag.schedule_interval = schedule_interval
assert not dag._check_schedule_interval_matches_timetable()


@pytest.mark.parametrize("timetable", [NullTimetable(), OnceTimetable()])
def test_dag_timetable_change_after_init(timetable):
dag = DAG("my-dag") # Default is timedelta(days=1).
dag.timetable = timetable
assert not dag._check_schedule_interval_matches_timetable()


@pytest.mark.parametrize("run_id, execution_date", [(None, datetime_tz(2020, 1, 1)), ('test-run-id', None)])
def test_set_task_instance_state(run_id, execution_date, session, dag_maker):
"""Test that set_task_instance_state updates the TaskInstance state and clear downstream failed"""
Expand Down