Skip to content

Commit

Permalink
Handle future dates cases in Backfill (apache#46017)
Browse files Browse the repository at this point in the history
This PR fixes the below behaviors

Ensure dates which are in future with run_backwards=true are not inserted in backfilldagrun table
Raise 422 when all dates are in the future.

---------

Co-authored-by: Daniel Standish <[email protected]>
  • Loading branch information
vatsrahul1001 and dstandish authored Feb 11, 2025
1 parent 08a08d0 commit 67c4f77
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 8 deletions.
9 changes: 8 additions & 1 deletion airflow/api_fastapi/core_api/routes/public/backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
Backfill,
BackfillDagRun,
DagNoScheduleException,
InvalidBackfillDate,
InvalidBackfillDirection,
InvalidReprocessBehavior,
_create_backfill,
Expand Down Expand Up @@ -220,6 +221,7 @@ def create_backfill(
InvalidReprocessBehavior,
InvalidBackfillDirection,
DagNoScheduleException,
InvalidBackfillDate,
) as e:
raise RequestValidationError(str(e))

Expand Down Expand Up @@ -254,5 +256,10 @@ def create_backfill_dry_run(
detail=f"Could not find dag {body.dag_id}",
)

except (InvalidReprocessBehavior, InvalidBackfillDirection, DagNoScheduleException) as e:
except (
InvalidReprocessBehavior,
InvalidBackfillDirection,
DagNoScheduleException,
InvalidBackfillDate,
) as e:
raise RequestValidationError(str(e))
21 changes: 16 additions & 5 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ class InvalidReprocessBehavior(AirflowException):
"""


class InvalidBackfillDate(AirflowException):
"""
Raised when a backfill is requested for future date.
:meta private:
"""


class ReprocessBehavior(str, Enum):
"""
Internal enum for setting reprocess behavior in a backfill.
Expand Down Expand Up @@ -204,7 +212,7 @@ def _get_dag_run_no_create_reason(dr, reprocess_behavior: ReprocessBehavior) ->
return non_create_reason


def _validate_backfill_params(dag, reverse, reprocess_behavior: ReprocessBehavior | None):
def _validate_backfill_params(dag, reverse, from_date, to_date, reprocess_behavior: ReprocessBehavior | None):
depends_on_past = any(x.depends_on_past for x in dag.tasks)
if depends_on_past:
if reverse is True:
Expand All @@ -216,6 +224,9 @@ def _validate_backfill_params(dag, reverse, reprocess_behavior: ReprocessBehavio
"DAG has tasks for which depends_on_past=True. "
"You must set reprocess behavior to reprocess completed or reprocess failed."
)
current_time = timezone.utcnow()
if from_date >= current_time and to_date >= current_time:
raise InvalidBackfillDate("Backfill cannot be executed for future dates.")


def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior, session) -> list[datetime]:
Expand All @@ -226,7 +237,7 @@ def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior, sess
if not serdag:
raise DagNotFound(f"Could not find dag {dag_id}")
dag = serdag.dag
_validate_backfill_params(dag, reverse, reprocess_behavior)
_validate_backfill_params(dag, reverse, from_date, to_date, reprocess_behavior)

no_schedule = session.scalar(
select(func.count()).where(DagModel.timetable_summary == "None", DagModel.dag_id == dag_id)
Expand Down Expand Up @@ -345,9 +356,9 @@ def _get_info_list(
):
infos = dag.iter_dagrun_infos_between(from_date, to_date)
now = timezone.utcnow()
dagrun_info_list = (x for x in infos if x.data_interval.end < now)
dagrun_info_list = [x for x in infos if x.data_interval.end < now]
if reverse:
dagrun_info_list = reversed([x for x in dag.iter_dagrun_infos_between(from_date, to_date)])
dagrun_info_list = reversed(dagrun_info_list)
return dagrun_info_list


Expand Down Expand Up @@ -387,7 +398,7 @@ def _create_backfill(
)

dag = serdag.dag
_validate_backfill_params(dag, reverse, reprocess_behavior)
_validate_backfill_params(dag, reverse, from_date, to_date, reprocess_behavior)

br = Backfill(
dag_id=dag_id,
Expand Down
70 changes: 68 additions & 2 deletions tests/api_fastapi/core_api/routes/public/test_backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
from __future__ import annotations

import os
from datetime import datetime
from datetime import datetime, timedelta
from unittest import mock

import pendulum
import pytest
from sqlalchemy import select
from sqlalchemy import and_, func, select

from airflow.models import DagBag, DagModel, DagRun
from airflow.models.backfill import Backfill, BackfillDagRun, ReprocessBehavior, _create_backfill
Expand Down Expand Up @@ -311,6 +311,72 @@ def test_create_backfill_with_depends_on_past(
== "DAG has tasks for which depends_on_past=True. You must set reprocess behavior to reprocess completed or reprocess failed."
)

@pytest.mark.parametrize(
"run_backwards",
[
(False),
(True),
],
)
def test_create_backfill_future_dates(self, session, dag_maker, test_client, run_backwards):
with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="0 * * * *") as dag:
EmptyOperator(task_id="mytask")
session.query(DagModel).all()
session.commit()
from_date = timezone.utcnow() + timedelta(days=1)
to_date = timezone.utcnow() + timedelta(days=1)
max_active_runs = 5
data = {
"dag_id": dag.dag_id,
"from_date": f"{to_iso(from_date)}",
"to_date": f"{to_iso(to_date)}",
"max_active_runs": max_active_runs,
"run_backwards": run_backwards,
"dag_run_conf": {"param1": "val1", "param2": True},
}

response = test_client.post(
url="/public/backfills",
json=data,
)
assert response.status_code == 422
assert response.json().get("detail") == "Backfill cannot be executed for future dates."

@pytest.mark.parametrize(
"run_backwards",
[
(False),
(True),
],
)
def test_create_backfill_past_future_dates(self, session, dag_maker, test_client, run_backwards):
with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="@daily") as dag:
EmptyOperator(task_id="mytask")
session.query(DagModel).all()
session.commit()
from_date = timezone.utcnow() - timedelta(days=2)
to_date = timezone.utcnow() + timedelta(days=1)
max_active_runs = 1
data = {
"dag_id": dag.dag_id,
"from_date": f"{to_iso(from_date)}",
"to_date": f"{to_iso(to_date)}",
"max_active_runs": max_active_runs,
"run_backwards": run_backwards,
"dag_run_conf": {"param1": "val1", "param2": True},
}

response = test_client.post(
url="/public/backfills",
json=data,
)
assert response.status_code == 200
backfill_dag_run_count = select(func.count()).where(
and_(BackfillDagRun.backfill_id == response.json()["id"], BackfillDagRun.logical_date == to_date)
)
count = session.execute(backfill_dag_run_count).scalar()
assert count == 0


class TestCreateBackfillDryRun(TestBackfillEndpoint):
@pytest.mark.parametrize(
Expand Down

0 comments on commit 67c4f77

Please sign in to comment.