Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate public endpoint Clear Task Instances to FastAPI #44220

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ def get_task_instances_batch(session: Session = NEW_SESSION) -> APIResponse:
)


@mark_fastapi_migration_done
@security.requires_access_dag("PUT", DagAccessEntity.TASK_INSTANCE)
@action_logging
@provide_session
Expand Down
11 changes: 9 additions & 2 deletions airflow/api_fastapi/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@
from enum import Enum
from typing import Annotated

from pydantic import AfterValidator, AliasGenerator, AwareDatetime, BaseModel, BeforeValidator, ConfigDict
from pydantic import (
AfterValidator,
AliasGenerator,
AwareDatetime,
BaseModel,
BeforeValidator,
ConfigDict,
)

from airflow.utils import timezone

Expand All @@ -29,7 +36,7 @@


def _validate_timedelta_field(td: timedelta | None) -> TimeDelta | None:
"""Validate the execution_timeout property."""
"""Validate the timedelta field and return it."""
if td is None:
return None
return TimeDelta(
Expand Down
55 changes: 54 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from __future__ import annotations

from datetime import datetime
from typing import Annotated
from typing import Annotated, Any

from pydantic import (
AliasPath,
Expand All @@ -27,6 +27,8 @@
ConfigDict,
Field,
NonNegativeInt,
ValidationError,
model_validator,
)

from airflow.api_fastapi.core_api.datamodels.job import JobResponse
Expand Down Expand Up @@ -150,3 +152,54 @@ class TaskInstanceHistoryCollectionResponse(BaseModel):

task_instances: list[TaskInstanceHistoryResponse]
total_entries: int


class ClearTaskInstancesBody(BaseModel):
"""Request body for Clear Task Instances endpoint."""

dry_run: bool = True
start_date: AwareDatetime | None = None
end_date: AwareDatetime | None = None
only_failed: bool = True
only_running: bool = False
reset_dag_runs: bool = False
task_ids: list[str] | None = None
dag_run_id: str | None = None
include_upstream: bool = False
include_downstream: bool = False
include_future: bool = False
include_past: bool = False

@model_validator(mode="before")
@classmethod
def validate_model(cls, data: Any) -> Any:
"""Validate clear task instance form."""
if data.get("only_failed") and data.get("only_running"):
raise ValidationError("only_failed and only_running both are set to True")
if data.get("start_date") and data.get("end_date"):
if data.get("start_date") > data.get("end_date"):
raise ValidationError("end_date is sooner than start_date")
if data.get("start_date") and data.get("end_date") and data.get("dag_run_id"):
raise ValidationError("Exactly one of dag_run_id or (start_date and end_date) must be provided")
if data.get("start_date") and data.get("dag_run_id"):
raise ValidationError("Exactly one of dag_run_id or start_date must be provided")
if data.get("end_date") and data.get("dag_run_id"):
raise ValidationError("Exactly one of dag_run_id or end_date must be provided")
if isinstance(data.get("task_ids"), list) and len(data.get("task_ids")) < 1:
raise ValidationError("task_ids list should have at least 1 element.")
return data


class TaskInstanceReferenceResponse(BaseModel):
"""Task Instance Reference serializer for responses."""

task_id: str
dag_run_id: str = Field(validation_alias="run_id")
dag_id: str


class TaskInstanceReferenceCollectionResponse(BaseModel):
"""Task Instance Reference collection serializer for responses."""

task_instances: list[TaskInstanceReferenceResponse]
omkar-foss marked this conversation as resolved.
Show resolved Hide resolved
total_entries: int
146 changes: 146 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4181,6 +4181,57 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/clearTaskInstances:
post:
tags:
- Task Instance
summary: Post Clear Task Instances
description: Clear task instances.
operationId: post_clear_task_instances
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/ClearTaskInstancesBody'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/TaskInstanceReferenceCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/tasks:
get:
tags:
Expand Down Expand Up @@ -4974,6 +5025,67 @@ components:
- status
title: BaseInfoSchema
description: Base status field for metadatabase and scheduler.
ClearTaskInstancesBody:
properties:
dry_run:
type: boolean
title: Dry Run
default: true
start_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Start Date
end_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: End Date
only_failed:
type: boolean
title: Only Failed
default: true
only_running:
type: boolean
title: Only Running
default: false
reset_dag_runs:
type: boolean
title: Reset Dag Runs
default: false
task_ids:
anyOf:
- items:
type: string
type: array
- type: 'null'
title: Task Ids
dag_run_id:
anyOf:
- type: string
- type: 'null'
title: Dag Run Id
include_upstream:
type: boolean
title: Include Upstream
default: false
include_downstream:
type: boolean
title: Include Downstream
default: false
include_future:
type: boolean
title: Include Future
default: false
include_past:
type: boolean
title: Include Past
default: false
type: object
title: ClearTaskInstancesBody
description: Request body for Clear Task Instances endpoint.
Config:
properties:
sections:
Expand Down Expand Up @@ -6887,6 +6999,40 @@ components:
- executor_config
title: TaskInstanceHistoryResponse
description: TaskInstanceHistory serializer for responses.
TaskInstanceReferenceCollectionResponse:
properties:
task_instances:
items:
$ref: '#/components/schemas/TaskInstanceReferenceResponse'
type: array
title: Task Instances
total_entries:
type: integer
title: Total Entries
type: object
required:
- task_instances
- total_entries
title: TaskInstanceReferenceCollectionResponse
description: Task Instance Reference collection serializer for responses.
TaskInstanceReferenceResponse:
properties:
task_id:
type: string
title: Task Id
dag_run_id:
type: string
title: Dag Run Id
dag_id:
type: string
title: Dag Id
type: object
required:
- task_id
- dag_run_id
- dag_id
title: TaskInstanceReferenceResponse
description: Task Instance Reference serializer for responses.
TaskInstanceResponse:
properties:
id:
Expand Down
Loading