Skip to content

Commit

Permalink
Remove 'allow_trigger_in_future' config (apache#46663)
Browse files Browse the repository at this point in the history
A DAG run with logical date in the future can never be started now. This
only affects schedule=None, which can only be triggered manually.

Instead of using a future date, you can trigger with a None logical date
whenever you want. A custom run_id can be supplied if you want it. If a
date is needed, it can be passed as a DAG param instead.
  • Loading branch information
uranusjr authored Feb 12, 2025
1 parent dea2cc9 commit 34261d0
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 61 deletions.
8 changes: 0 additions & 8 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2398,14 +2398,6 @@ scheduler:
type: boolean
example: ~
default: "True"
allow_trigger_in_future:
description: |
Allow externally triggered DagRuns for Execution Dates in the future
Only has effect if schedule is set to None in DAG
version_added: 1.10.8
type: boolean
example: ~
default: "False"
trigger_timeout_check_interval:
description: |
How often to check for expired trigger requests that have not run yet.
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1656,7 +1656,7 @@ def _schedule_dag_run(
)
return callback_to_execute

if dag_run.logical_date > timezone.utcnow() and not dag.allow_future_exec_dates:
if dag_run.logical_date and dag_run.logical_date > timezone.utcnow():
self.log.error("Logical date is in future: %s", dag_run.logical_date)
return callback

Expand Down
5 changes: 1 addition & 4 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1054,10 +1054,7 @@ def _get_task_instances(
tis = tis.where(DagRun.logical_date >= start_date)
if task_ids is not None:
tis = tis.where(TaskInstance.ti_selector_condition(task_ids))

# This allows allow_trigger_in_future config to take affect, rather than mandating exec_date <= UTC
if end_date or not self.allow_future_exec_dates:
end_date = end_date or timezone.utcnow()
if end_date:
tis = tis.where(DagRun.logical_date <= end_date)

if state:
Expand Down
7 changes: 2 additions & 5 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
from sqlalchemy.sql.functions import coalesce
from sqlalchemy_utils import UUIDType

from airflow import settings
from airflow.callbacks.callback_requests import DagCallbackRequest
from airflow.configuration import conf as airflow_conf
from airflow.exceptions import AirflowException, TaskNotFound
Expand Down Expand Up @@ -456,8 +455,7 @@ def get_running_dag_runs_to_examine(cls, session: Session) -> Query:
.limit(cls.DEFAULT_DAGRUNS_TO_EXAMINE)
)

if not settings.ALLOW_FUTURE_LOGICAL_DATES:
query = query.where(DagRun.logical_date <= func.now())
query = query.where(DagRun.run_after <= func.now())

return session.scalars(with_row_locks(query, of=cls, session=session, skip_locked=True))

Expand Down Expand Up @@ -542,8 +540,7 @@ def get_queued_dag_runs_to_set_running(cls, session: Session) -> Query:
.limit(cls.DEFAULT_DAGRUNS_TO_EXAMINE)
)

if not settings.ALLOW_FUTURE_LOGICAL_DATES:
query = query.where(DagRun.logical_date <= func.now())
query = query.where(DagRun.run_after <= func.now())

return session.scalars(with_row_locks(query, of=cls, session=session, skip_locked=True))

Expand Down
2 changes: 0 additions & 2 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,8 +677,6 @@ def initialize():
fallback=False,
)

ALLOW_FUTURE_LOGICAL_DATES = conf.getboolean("scheduler", "allow_trigger_in_future", fallback=False)

USE_JOB_SCHEDULE = conf.getboolean("scheduler", "use_job_schedule", fallback=True)

# By default Airflow plugins are lazily-loaded (only loaded when required). Set it to False,
Expand Down
11 changes: 6 additions & 5 deletions airflow/ti_deps/deps/runnable_exec_date_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@
class RunnableExecDateDep(BaseTIDep):
"""Determines whether a task's logical date is valid."""

NAME = "Execution Date"
NAME = "Logical Date"
IGNORABLE = True

@provide_session
def _get_dep_statuses(self, ti, session, dep_context):
logical_date = ti.get_dagrun(session).logical_date
if logical_date is None:
return

cur_date = timezone.utcnow()

# don't consider runs that are executed in the future unless
# specified by config and schedule is None
logical_date = ti.get_dagrun(session).logical_date
if logical_date > cur_date and not ti.task.dag.allow_future_exec_dates:
if logical_date > cur_date:
yield self._failing_status(
reason=(
f"Logical date {logical_date.isoformat()} is in the future "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,6 @@ In the UI, it appears as if Airflow is running your tasks a day **late**
waiting than the queue slots. Thus there can be cases where low priority tasks will be scheduled before high priority tasks if they share the same batch.
For more read about that you can reference `this GitHub discussion <https://github.com/apache/airflow/discussions/28809>`__.


Triggering DAG with Future Date
-------------------------------

If you want to use 'external trigger' to run future-dated data intervals, set ``allow_trigger_in_future = True`` in ``scheduler`` section in ``airflow.cfg``.
This only has effect if your DAG is defined with ``schedule=None``.
When set to ``False`` (the default value), if you manually trigger a run with future-dated data intervals,
the scheduler will not execute it until its ``data_interval_start`` is in the past.

.. _scheduler:ha:

Running More Than One Scheduler
Expand Down
31 changes: 31 additions & 0 deletions newsfragments/46663.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
Removed configuration ``scheduler.allow_trigger_in_future``.

A DAG run with logical date in the future can never be started now. This only affects ``schedule=None``.

Instead of using a future date, you can trigger with ``logical_date=None``. A custom ``run_id`` can be supplied if desired. If a date is needed, it can be passed as a DAG param instead.

Property ``allow_future_exec_dates`` on the DAG class has also been removed.


* Types of change

* [ ] Dag changes
* [x] Config changes
* [ ] API changes
* [ ] CLI changes
* [ ] Behaviour changes
* [ ] Plugin changes
* [ ] Dependency changes
* [x] Code interface changes

* Migration rules needed

* ruff

* AIR302

* [ ] property ``airflow.models.dag.DAG.allow_future_exec_dates``

* ``airflow config lint``

* [ ] ``scheduler.allow_trigger_in_future``
4 changes: 0 additions & 4 deletions task_sdk/src/airflow/sdk/definitions/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,10 +651,6 @@ def owner(self) -> str:
"""
return ", ".join({t.owner for t in self.tasks})

@property
def allow_future_exec_dates(self) -> bool:
return settings.ALLOW_FUTURE_LOGICAL_DATES and not self.timetable.can_be_scheduled

def resolve_template_files(self):
for t in self.tasks:
# TODO: TaskSDK: move this on to BaseOperator and remove the check?
Expand Down
38 changes: 15 additions & 23 deletions tests/ti_deps/deps/test_runnable_exec_date_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
# under the License.
from __future__ import annotations

from unittest.mock import Mock, patch
from unittest.mock import Mock

import pytest
import time_machine

from airflow import settings
from airflow.models import DagRun, TaskInstance
from airflow.ti_deps.deps.runnable_exec_date_dep import RunnableExecDateDep
from airflow.utils.timezone import datetime
Expand All @@ -40,39 +39,32 @@ def clean_db(session):

@time_machine.travel("2016-11-01")
@pytest.mark.parametrize(
"allow_trigger_in_future,schedule,logical_date,is_met",
"logical_date, is_met",
[
(True, "@daily", datetime(2016, 11, 3), False),
(False, None, datetime(2016, 11, 3), False),
(False, "@daily", datetime(2016, 11, 3), False),
(False, "@daily", datetime(2016, 11, 1), True),
(False, None, datetime(2016, 11, 1), True),
(datetime(2016, 11, 3), False),
(datetime(2016, 11, 1), True),
],
)
def test_logical_date_dep(
dag_maker,
session,
create_dummy_dag,
allow_trigger_in_future,
schedule,
logical_date,
is_met,
):
"""
If the dag's logical date is in the future but (allow_trigger_in_future=False or not schedule)
this dep should fail
If the dag's logical date is in the future, this dep should fail
"""
with patch.object(settings, "ALLOW_FUTURE_LOGICAL_DATES", allow_trigger_in_future):
create_dummy_dag(
"test_localtaskjob_heartbeat",
start_date=datetime(2015, 1, 1),
end_date=datetime(2016, 11, 5),
schedule=schedule,
with_dagrun_type=DagRunType.MANUAL,
session=session,
)
(ti,) = dag_maker.create_dagrun(run_id="scheduled", logical_date=logical_date).task_instances
assert RunnableExecDateDep().is_met(ti=ti) == is_met
create_dummy_dag(
"test_localtaskjob_heartbeat",
start_date=datetime(2015, 1, 1),
end_date=datetime(2016, 11, 5),
schedule=None,
with_dagrun_type=DagRunType.MANUAL,
session=session,
)
(ti,) = dag_maker.create_dagrun(run_id="scheduled", logical_date=logical_date).task_instances
assert RunnableExecDateDep().is_met(ti=ti) == is_met


@time_machine.travel("2016-01-01")
Expand Down

0 comments on commit 34261d0

Please sign in to comment.