Skip to content

Commit

Permalink
AIP-83 amendment: Add logic for generating run_id when logical date i…
Browse files Browse the repository at this point in the history
…s None. (apache#46616)

Co-authored-by: Tzu-ping Chung <[email protected]>
  • Loading branch information
sunank200 and uranusjr authored Feb 11, 2025
1 parent 0047a68 commit 035060d
Show file tree
Hide file tree
Showing 43 changed files with 441 additions and 168 deletions.
51 changes: 30 additions & 21 deletions airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def _trigger_dag(
dag_bag: DagBag,
*,
triggered_by: DagRunTriggeredByType,
run_after: datetime,
run_id: str | None = None,
conf: dict | str | None = None,
logical_date: datetime | None = None,
Expand All @@ -54,6 +55,7 @@ def _trigger_dag(
:param dag_id: DAG ID
:param dag_bag: DAG Bag model
:param triggered_by: the entity which triggers the dag_run
:param run_after: the datetime before which dag cannot run.
:param run_id: ID of the run
:param conf: configuration
:param logical_date: logical date of the run
Expand All @@ -65,26 +67,30 @@ def _trigger_dag(
if dag is None or dag_id not in dag_bag.dags:
raise DagNotFound(f"Dag id {dag_id} not found")

logical_date = logical_date or timezone.utcnow()

if not timezone.is_localized(logical_date):
raise ValueError("The logical date should be localized")

if replace_microseconds:
logical_date = logical_date.replace(microsecond=0)

if dag.default_args and "start_date" in dag.default_args:
min_dag_start_date = dag.default_args["start_date"]
if min_dag_start_date and logical_date < min_dag_start_date:
raise ValueError(
f"Logical date [{logical_date.isoformat()}] should be >= start_date "
f"[{min_dag_start_date.isoformat()}] from DAG's default_args"
)
coerced_logical_date = timezone.coerce_datetime(logical_date)

data_interval = dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date)
run_id = run_id or dag.timetable.generate_run_id(
run_type=DagRunType.MANUAL, logical_date=coerced_logical_date, data_interval=data_interval
if logical_date:
if not timezone.is_localized(logical_date):
raise ValueError("The logical date should be localized")

if replace_microseconds:
logical_date = logical_date.replace(microsecond=0)

if dag.default_args and "start_date" in dag.default_args:
min_dag_start_date = dag.default_args["start_date"]
if min_dag_start_date and logical_date < min_dag_start_date:
raise ValueError(
f"Logical date [{logical_date.isoformat()}] should be >= start_date "
f"[{min_dag_start_date.isoformat()}] from DAG's default_args"
)
coerced_logical_date = timezone.coerce_datetime(logical_date)
data_interval = dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date)
else:
coerced_logical_date = None
data_interval = None

run_id = run_id or DagRun.generate_run_id(
run_type=DagRunType.MANUAL,
logical_date=coerced_logical_date,
run_after=timezone.coerce_datetime(run_after),
)

# This intentionally does not use 'session' in the current scope because it
Expand All @@ -102,7 +108,7 @@ def _trigger_dag(
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
run_after=data_interval.end,
run_after=run_after,
conf=run_conf,
run_type=DagRunType.MANUAL,
triggered_by=triggered_by,
Expand All @@ -120,6 +126,7 @@ def trigger_dag(
dag_id: str,
*,
triggered_by: DagRunTriggeredByType,
run_after: datetime | None = None,
run_id: str | None = None,
conf: dict | str | None = None,
logical_date: datetime | None = None,
Expand All @@ -131,6 +138,7 @@ def trigger_dag(
:param dag_id: DAG ID
:param triggered_by: the entity which triggers the dag_run
:param run_after: the datetime before which dag won't run.
:param run_id: ID of the dag_run
:param conf: configuration
:param logical_date: date of execution
Expand All @@ -147,6 +155,7 @@ def trigger_dag(
dag_id=dag_id,
dag_bag=dagbag,
run_id=run_id,
run_after=run_after or timezone.utcnow(),
conf=conf,
logical_date=logical_date,
replace_microseconds=replace_microseconds,
Expand Down
9 changes: 6 additions & 3 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,8 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
except ValidationError as err:
raise BadRequest(detail=str(err))

logical_date = pendulum.instance(post_body["logical_date"])
logical_date = pendulum.instance(post_body["logical_date"]) if post_body.get("logical_date") else None
run_after = pendulum.instance(post_body["run_after"])
run_id = post_body["run_id"]
dagrun_instance = session.scalar(
select(DagRun)
Expand All @@ -352,12 +353,14 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
end=pendulum.instance(data_interval_end),
)
else:
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)
data_interval = (
dag.timetable.infer_manual_data_interval(run_after=logical_date) if logical_date else None
)
dag_run = dag.create_dagrun(
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
run_after=data_interval.end,
run_after=run_after,
conf=post_body.get("conf"),
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
Expand Down
21 changes: 14 additions & 7 deletions airflow/api_connexion/schemas/dag_run_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class Meta:

run_id = auto_field(data_key="dag_run_id")
dag_id = auto_field(dump_only=True)
logical_date = auto_field(data_key="logical_date", validate=validate_istimezone)
logical_date = auto_field(data_key="logical_date", allow_none=True, validate=validate_istimezone)
run_after = auto_field(data_key="run_after", validate=validate_istimezone)
start_date = auto_field(dump_only=True)
end_date = auto_field(dump_only=True)
state = DagStateField(dump_only=True)
Expand All @@ -78,17 +79,23 @@ class Meta:

@pre_load
def autogenerate(self, data, **kwargs):
"""Auto generate run_id and logical_date if they are not provided."""
logical_date = data.get("logical_date", _MISSING)
"""Auto generate run_id and run_after if they are not provided."""
run_after = data.get("run_after", _MISSING)

# Auto-generate logical_date if missing
if logical_date is _MISSING:
data["logical_date"] = str(timezone.utcnow())
# Auto-generate run_after if missing
if run_after is _MISSING:
data["run_after"] = str(timezone.utcnow())

if "dag_run_id" not in data:
try:
if logical_date_str := data.get("logical_date"):
logical_date = timezone.parse(logical_date_str)
else:
logical_date = None
data["dag_run_id"] = DagRun.generate_run_id(
DagRunType.MANUAL, timezone.parse(data["logical_date"])
run_type=DagRunType.MANUAL,
logical_date=logical_date,
run_after=timezone.parse(data["run_after"]),
)
except (ParserError, TypeError) as err:
raise BadRequest("Incorrect datetime argument", detail=str(err))
Expand Down
8 changes: 5 additions & 3 deletions airflow/api_fastapi/core_api/datamodels/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
from datetime import datetime
from enum import Enum

import pendulum
from pydantic import AwareDatetime, Field, NonNegativeInt, model_validator

from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
from airflow.models import DagRun
from airflow.utils import timezone
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

Expand Down Expand Up @@ -82,9 +82,11 @@ class TriggerDAGRunPostBody(StrictBaseModel):
"""Trigger DAG Run Serializer for POST body."""

dag_run_id: str | None = None
logical_date: AwareDatetime | None
data_interval_start: AwareDatetime | None = None
data_interval_end: AwareDatetime | None = None
logical_date: AwareDatetime | None
run_after: datetime = Field(default_factory=timezone.utcnow)

conf: dict = Field(default_factory=dict)
note: str | None = None

Expand All @@ -102,7 +104,7 @@ def check_data_intervals(cls, values):
def validate_dag_run_id(self):
if not self.dag_run_id:
self.dag_run_id = DagRun.generate_run_id(
DagRunType.MANUAL, self.logical_date or pendulum.now("UTC")
run_type=DagRunType.MANUAL, logical_date=self.logical_date, run_after=self.run_after
)
return self

Expand Down
16 changes: 10 additions & 6 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10501,12 +10501,6 @@ components:
- type: string
- type: 'null'
title: Dag Run Id
logical_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Logical Date
data_interval_start:
anyOf:
- type: string
Expand All @@ -10519,6 +10513,16 @@ components:
format: date-time
- type: 'null'
title: Data Interval End
logical_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Logical Date
run_after:
type: string
format: date-time
title: Run After
conf:
type: object
title: Conf
Expand Down
21 changes: 17 additions & 4 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,6 @@ def trigger_dag_run(
) -> DAGRunResponse:
"""Trigger a DAG."""
dm = session.scalar(select(DagModel).where(DagModel.is_active, DagModel.dag_id == dag_id).limit(1))
now = pendulum.now("UTC")
if not dm:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with dag_id: '{dag_id}' not found")

Expand All @@ -359,6 +358,7 @@ def trigger_dag_run(

logical_date = timezone.coerce_datetime(body.logical_date)
coerced_logical_date = timezone.coerce_datetime(logical_date)
run_after = timezone.coerce_datetime(body.run_after)

try:
dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
Expand All @@ -369,13 +369,26 @@ def trigger_dag_run(
end=pendulum.instance(body.data_interval_end),
)
else:
data_interval = dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date or now)
if body.logical_date:
data_interval = dag.timetable.infer_manual_data_interval(
run_after=coerced_logical_date or run_after
)
run_after = data_interval.end
else:
data_interval = None

if body.dag_run_id:
run_id = body.dag_run_id
else:
run_id = DagRun.generate_run_id(
run_type=DagRunType.SCHEDULED, logical_date=coerced_logical_date, run_after=run_after
)

dag_run = dag.create_dagrun(
run_id=cast(str, body.dag_run_id),
run_id=run_id,
logical_date=coerced_logical_date,
data_interval=data_interval,
run_after=data_interval.end,
run_after=run_after,
conf=body.conf,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
Expand Down
8 changes: 3 additions & 5 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1286,7 +1286,7 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.SCHEDULED,
logical_date=dag_model.next_dagrun,
run_after=dag_model.next_dagrun,
data_interval=data_interval,
),
logical_date=dag_model.next_dagrun,
Expand Down Expand Up @@ -1394,12 +1394,10 @@ def _create_dag_runs_asset_triggered(

data_interval = dag.timetable.data_interval_for_events(logical_date, asset_events)
dag_run = dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_id=DagRun.generate_run_id(
run_type=DagRunType.ASSET_TRIGGERED,
logical_date=logical_date,
data_interval=data_interval,
session=session,
events=asset_events,
run_after=max(logical_dates.values()),
),
logical_date=logical_date,
data_interval=data_interval,
Expand Down
8 changes: 4 additions & 4 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ def _create_backfill_dag_run(
backfill_sort_ordinal,
session,
):
from airflow.models.dagrun import DagRun

with session.begin_nested():
should_skip_create_backfill = should_create_backfill_dag_run(
info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session
Expand All @@ -296,10 +298,8 @@ def _create_backfill_dag_run(
dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
try:
dr = dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.BACKFILL_JOB,
logical_date=info.logical_date,
data_interval=info.data_interval,
run_id=DagRun.generate_run_id(
run_type=DagRunType.BACKFILL_JOB, logical_date=info.logical_date, run_after=info.run_after
),
logical_date=info.logical_date,
data_interval=info.data_interval,
Expand Down
6 changes: 5 additions & 1 deletion airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,11 @@ def run(
# This is _mostly_ only used in tests
dr = DagRun(
dag_id=self.dag_id,
run_id=DagRun.generate_run_id(DagRunType.MANUAL, info.logical_date),
run_id=DagRun.generate_run_id(
run_type=DagRunType.MANUAL,
logical_date=info.logical_date,
run_after=info.run_after,
),
run_type=DagRunType.MANUAL,
logical_date=info.logical_date,
data_interval=info.data_interval,
Expand Down
Loading

0 comments on commit 035060d

Please sign in to comment.