diff --git a/airflow/exceptions.py b/airflow/exceptions.py index 95fa9e3276545..a1d714e7bc432 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -149,6 +149,10 @@ def __str__(self) -> str: return f"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}" +class AirflowDagInconsistent(AirflowException): + """Raise when a DAG has inconsistent attributes.""" + + class AirflowClusterPolicyViolation(AirflowException): """Raise when there is a violation of a Cluster Policy in DAG definition.""" diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 83860ba59146f..fd5c48688420a 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -62,7 +62,7 @@ from airflow import settings, utils from airflow.compat.functools import cached_property from airflow.configuration import conf -from airflow.exceptions import AirflowException, DuplicateTaskIdFound, TaskNotFound +from airflow.exceptions import AirflowDagInconsistent, AirflowException, DuplicateTaskIdFound, TaskNotFound from airflow.models.abstractoperator import AbstractOperator from airflow.models.base import ID_LEN, Base from airflow.models.dagbag import DagBag @@ -483,6 +483,47 @@ def __init__( self._task_group = TaskGroup.create_root(self) self.validate_schedule_and_params() + def _check_schedule_interval_matches_timetable(self) -> bool: + """Check ``schedule_interval`` and ``timetable`` match. + + This is done as a part of the DAG validation done before it's bagged, to + guard against the DAG's ``timetable`` (or ``schedule_interval``) from + being changed after it's created, e.g. + + .. code-block:: python + + dag1 = DAG("d1", timetable=MyTimetable()) + dag1.schedule_interval = "@once" + + dag2 = DAG("d2", schedule_interval="@once") + dag2.timetable = MyTimetable() + + Validation is done by creating a timetable and check its summary matches + ``schedule_interval``. The logic is not bullet-proof, especially if a + custom timetable does not provide a useful ``summary``. But this is the + best we can do. + """ + if self.schedule_interval == self.timetable.summary: + return True + try: + timetable = create_timetable(self.schedule_interval, self.timezone) + except ValueError: + return False + return timetable.summary == self.timetable.summary + + def validate(self): + """Validate the DAG has a coherent setup. + + This is called by the DAG bag before bagging the DAG. + """ + if not self._check_schedule_interval_matches_timetable(): + raise AirflowDagInconsistent( + f"inconsistent schedule: timetable {self.timetable.summary!r} " + f"does not match schedule_interval {self.schedule_interval!r}", + ) + self.params.validate() + self.timetable.validate() + def __repr__(self): return f"" diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 3673ce095ea16..c0ef0941b6dbf 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -39,6 +39,7 @@ AirflowClusterPolicyViolation, AirflowDagCycleException, AirflowDagDuplicatedIdException, + AirflowDagInconsistent, AirflowTimetableInvalid, ParamValidationError, ) @@ -402,25 +403,25 @@ def _process_modules(self, filepath, mods, file_last_changed_on_disk): for (dag, mod) in top_level_dags: dag.fileloc = mod.__file__ try: - dag.timetable.validate() - # validate dag params - dag.params.validate() + dag.validate() self.bag_dag(dag=dag, root_dag=dag) - found_dags.append(dag) - found_dags += dag.subdags except AirflowTimetableInvalid as exception: self.log.exception("Failed to bag_dag: %s", dag.fileloc) self.import_errors[dag.fileloc] = f"Invalid timetable expression: {exception}" self.file_last_changed[dag.fileloc] = file_last_changed_on_disk except ( + AirflowClusterPolicyViolation, AirflowDagCycleException, AirflowDagDuplicatedIdException, - AirflowClusterPolicyViolation, + AirflowDagInconsistent, ParamValidationError, ) as exception: self.log.exception("Failed to bag_dag: %s", dag.fileloc) self.import_errors[dag.fileloc] = str(exception) self.file_last_changed[dag.fileloc] = file_last_changed_on_disk + else: + found_dags.append(dag) + found_dags += dag.subdags return found_dags def bag_dag(self, dag, root_dag): diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 6cd8ea660f7fc..306f519e29dc6 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -2249,6 +2249,32 @@ def return_num(num): assert dag.params['value'] == value +@pytest.mark.parametrize("timetable", [NullTimetable(), OnceTimetable()]) +def test_dag_timetable_match_schedule_interval(timetable): + dag = DAG("my-dag", timetable=timetable) + assert dag._check_schedule_interval_matches_timetable() + + +@pytest.mark.parametrize("schedule_interval", [None, "@once", "@daily", timedelta(days=1)]) +def test_dag_schedule_interval_match_timetable(schedule_interval): + dag = DAG("my-dag", schedule_interval=schedule_interval) + assert dag._check_schedule_interval_matches_timetable() + + +@pytest.mark.parametrize("schedule_interval", [None, "@daily", timedelta(days=1)]) +def test_dag_schedule_interval_change_after_init(schedule_interval): + dag = DAG("my-dag", timetable=OnceTimetable()) + dag.schedule_interval = schedule_interval + assert not dag._check_schedule_interval_matches_timetable() + + +@pytest.mark.parametrize("timetable", [NullTimetable(), OnceTimetable()]) +def test_dag_timetable_change_after_init(timetable): + dag = DAG("my-dag") # Default is timedelta(days=1). + dag.timetable = timetable + assert not dag._check_schedule_interval_matches_timetable() + + @pytest.mark.parametrize("run_id, execution_date", [(None, datetime_tz(2020, 1, 1)), ('test-run-id', None)]) def test_set_task_instance_state(run_id, execution_date, session, dag_maker): """Test that set_task_instance_state updates the TaskInstance state and clear downstream failed"""