Skip to content

Commit

Permalink
Migrate public endpoint Clear Task Instances to FastAPI
Browse files Browse the repository at this point in the history
  • Loading branch information
omkar-foss committed Nov 20, 2024
1 parent d43052e commit f670a0a
Show file tree
Hide file tree
Showing 12 changed files with 1,289 additions and 30 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ def get_task_instances_batch(session: Session = NEW_SESSION) -> APIResponse:
)


@mark_fastapi_migration_done
@security.requires_access_dag("PUT", DagAccessEntity.TASK_INSTANCE)
@action_logging
@provide_session
Expand Down
27 changes: 24 additions & 3 deletions airflow/api_fastapi/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@
# under the License.
from __future__ import annotations

from datetime import timedelta
from datetime import datetime, timedelta
from enum import Enum
from typing import Annotated

from pydantic import AfterValidator, AliasGenerator, AwareDatetime, BaseModel, BeforeValidator, ConfigDict
from pydantic import (
AfterValidator,
AliasGenerator,
AwareDatetime,
BaseModel,
BeforeValidator,
ConfigDict,
)

from airflow.utils import timezone

Expand All @@ -29,7 +36,7 @@


def _validate_timedelta_field(td: timedelta | None) -> TimeDelta | None:
"""Validate the execution_timeout property."""
"""Validate the timedelta field and return it."""
if td is None:
return None
return TimeDelta(
Expand Down Expand Up @@ -59,6 +66,20 @@ class TimeDelta(BaseModel):
TimeDeltaWithValidation = Annotated[TimeDelta, BeforeValidator(_validate_timedelta_field)]


def _validate_nonnaive_datetime_field(dt: datetime | None) -> datetime | None:
"""Validate and return the datetime field."""
if dt is None:
return None
if isinstance(dt, str):
dt = datetime.fromisoformat(dt)
if not dt.tzinfo:
raise ValueError("Invalid datetime format, Naive datetime is disallowed")
return dt


DatetimeWithNonNaiveValidation = Annotated[datetime, BeforeValidator(_validate_nonnaive_datetime_field)]


class Mimetype(str, Enum):
"""Mimetype for the `Content-Type` header."""

Expand Down
57 changes: 56 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@
from __future__ import annotations

from datetime import datetime
from typing import Annotated
from typing import Annotated, Any

from pydantic import (
AliasChoices,
AliasPath,
AwareDatetime,
BaseModel,
BeforeValidator,
ConfigDict,
Field,
NonNegativeInt,
ValidationError,
model_validator,
)

from airflow.api_fastapi.common.types import DatetimeWithNonNaiveValidation
from airflow.api_fastapi.core_api.datamodels.job import JobResponse
from airflow.api_fastapi.core_api.datamodels.trigger import TriggerResponse
from airflow.utils.state import TaskInstanceState
Expand Down Expand Up @@ -150,3 +154,54 @@ class TaskInstanceHistoryCollectionResponse(BaseModel):

task_instances: list[TaskInstanceHistoryResponse]
total_entries: int


class ClearTaskInstancesBody(BaseModel):
"""Request body for Clear Task Instances endpoint."""

dry_run: bool = True
start_date: DatetimeWithNonNaiveValidation | None = None
end_date: DatetimeWithNonNaiveValidation | None = None
only_failed: bool = True
only_running: bool = False
reset_dag_runs: bool = False
task_ids: list[str] | None = None
dag_run_id: str | None = None
include_upstream: bool = False
include_downstream: bool = False
include_future: bool = False
include_past: bool = False

@model_validator(mode="before")
@classmethod
def validate_model(cls, data: Any) -> Any:
"""Validate clear task instance form."""
if data.get("only_failed") and data.get("only_running"):
raise ValidationError("only_failed and only_running both are set to True")
if data.get("start_date") and data.get("end_date"):
if data.get("start_date") > data.get("end_date"):
raise ValidationError("end_date is sooner than start_date")
if data.get("start_date") and data.get("end_date") and data.get("dag_run_id"):
raise ValidationError("Exactly one of dag_run_id or (start_date and end_date) must be provided")
if data.get("start_date") and data.get("dag_run_id"):
raise ValidationError("Exactly one of dag_run_id or start_date must be provided")
if data.get("end_date") and data.get("dag_run_id"):
raise ValidationError("Exactly one of dag_run_id or end_date must be provided")
if isinstance(data.get("task_ids"), list) and len(data.get("task_ids")) < 1:
raise ValidationError("task_ids list should have at least 1 element.")
return data


class TaskInstanceReferenceResponse(BaseModel):
"""Task Instance Reference serializer for responses."""

task_id: str
dag_run_id: str = Field(validation_alias=AliasChoices("run_id", "dagrun_id"))
dag_id: str
logical_date: datetime


class TaskInstanceReferenceCollectionResponse(BaseModel):
"""Task Instance Reference collection serializer for responses."""

task_instances: list[TaskInstanceReferenceResponse]
149 changes: 148 additions & 1 deletion airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3780,7 +3780,7 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances:
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/:
get:
tags:
- Task Instance
Expand Down Expand Up @@ -4181,6 +4181,57 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/clearTaskInstances:
post:
tags:
- Task Instance
summary: Post Clear Task Instances
description: Clear task instances.
operationId: post_clear_task_instances
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/ClearTaskInstancesBody'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/TaskInstanceReferenceCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/tasks:
get:
tags:
Expand Down Expand Up @@ -4974,6 +5025,67 @@ components:
- status
title: BaseInfoSchema
description: Base status field for metadatabase and scheduler.
ClearTaskInstancesBody:
properties:
dry_run:
type: boolean
title: Dry Run
default: true
start_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Start Date
end_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: End Date
only_failed:
type: boolean
title: Only Failed
default: true
only_running:
type: boolean
title: Only Running
default: false
reset_dag_runs:
type: boolean
title: Reset Dag Runs
default: false
task_ids:
anyOf:
- items:
type: string
type: array
- type: 'null'
title: Task Ids
dag_run_id:
anyOf:
- type: string
- type: 'null'
title: Dag Run Id
include_upstream:
type: boolean
title: Include Upstream
default: false
include_downstream:
type: boolean
title: Include Downstream
default: false
include_future:
type: boolean
title: Include Future
default: false
include_past:
type: boolean
title: Include Past
default: false
type: object
title: ClearTaskInstancesBody
description: Request body for Clear Task Instances endpoint.
Config:
properties:
sections:
Expand Down Expand Up @@ -6887,6 +6999,41 @@ components:
- executor_config
title: TaskInstanceHistoryResponse
description: TaskInstanceHistory serializer for responses.
TaskInstanceReferenceCollectionResponse:
properties:
task_instances:
items:
$ref: '#/components/schemas/TaskInstanceReferenceResponse'
type: array
title: Task Instances
type: object
required:
- task_instances
title: TaskInstanceReferenceCollectionResponse
description: Task Instance Reference collection serializer for responses.
TaskInstanceReferenceResponse:
properties:
task_id:
type: string
title: Task Id
dag_run_id:
type: string
title: Dag Run Id
dag_id:
type: string
title: Dag Id
logical_date:
type: string
format: date-time
title: Logical Date
type: object
required:
- task_id
- dag_run_id
- dag_id
- logical_date
title: TaskInstanceReferenceResponse
description: Task Instance Reference serializer for responses.
TaskInstanceResponse:
properties:
id:
Expand Down
Loading

0 comments on commit f670a0a

Please sign in to comment.