Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make list backfills endpoint use asyncio #44208

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 81 additions & 3 deletions airflow/api_fastapi/common/db/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

from typing import TYPE_CHECKING, Literal, Sequence, overload

from airflow.utils.db import get_query_count
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from sqlalchemy.ext.asyncio import AsyncSession

from airflow.utils.db import get_query_count, get_query_count_async
from airflow.utils.session import NEW_SESSION, create_session, create_session_async, provide_session

if TYPE_CHECKING:
from sqlalchemy.orm import Session
Expand Down Expand Up @@ -53,7 +55,9 @@ def your_route(session: Annotated[Session, Depends(get_session)]):


def apply_filters_to_select(
*, base_select: Select, filters: Sequence[BaseParam | None] | None = None
*,
base_select: Select,
filters: Sequence[BaseParam | None] | None = None,
) -> Select:
if filters is None:
return base_select
Expand All @@ -65,6 +69,80 @@ def apply_filters_to_select(
return base_select


async def get_async_session() -> AsyncSession:
"""
Dependency for providing a session.

Example usage:

.. code:: python

@router.get("/your_path")
def your_route(session: Annotated[AsyncSession, Depends(get_async_session)]):
pass
"""
async with create_session_async() as session:
yield session


@overload
async def paginated_select_async(
*,
query: Select,
filters: Sequence[BaseParam] | None = None,
order_by: BaseParam | None = None,
offset: BaseParam | None = None,
limit: BaseParam | None = None,
session: AsyncSession,
return_total_entries: Literal[True] = True,
) -> tuple[Select, int]: ...


@overload
async def paginated_select_async(
*,
query: Select,
filters: Sequence[BaseParam] | None = None,
order_by: BaseParam | None = None,
offset: BaseParam | None = None,
limit: BaseParam | None = None,
session: AsyncSession,
return_total_entries: Literal[False],
) -> tuple[Select, None]: ...


async def paginated_select_async(
*,
query: Select,
filters: Sequence[BaseParam | None] | None = None,
order_by: BaseParam | None = None,
offset: BaseParam | None = None,
limit: BaseParam | None = None,
session: AsyncSession,
return_total_entries: bool = True,
) -> tuple[Select, int | None]:
query = apply_filters_to_select(
base_select=query,
filters=filters,
)

total_entries = None
if return_total_entries:
total_entries = await get_query_count_async(query, session=session)

# TODO: Re-enable when permissions are handled. Readable / writable entities,
# for instance:
# readable_dags = get_auth_manager().get_permitted_dag_ids(user=g.user)
# dags_select = dags_select.where(DagModel.dag_id.in_(readable_dags))

query = apply_filters_to_select(
base_select=query,
filters=[order_by, offset, limit],
)

return query, total_entries


@overload
def paginated_select(
*,
Expand Down
29 changes: 29 additions & 0 deletions airflow/api_fastapi/core_api/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from pydantic import BaseModel as PydanticBaseModel, ConfigDict


class BaseModel(PydanticBaseModel):
"""
Base pydantic model for REST API.

:meta private:
"""

model_config = ConfigDict(from_attributes=True)
3 changes: 2 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

from datetime import datetime

from pydantic import BaseModel, Field, field_validator
from pydantic import Field, field_validator

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.utils.log.secrets_masker import redact


Expand Down
3 changes: 1 addition & 2 deletions airflow/api_fastapi/core_api/datamodels/backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@

from datetime import datetime

from pydantic import BaseModel

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.models.backfill import ReprocessBehavior


Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/datamodels/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations

from pydantic import BaseModel
from airflow.api_fastapi.core_api.base import BaseModel


class ConfigOption(BaseModel):
Expand Down
3 changes: 2 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

import json

from pydantic import BaseModel, Field, field_validator
from pydantic import Field, field_validator
from pydantic_core.core_schema import ValidationInfo

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.utils.log.secrets_masker import redact


Expand Down
3 changes: 2 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
from datetime import datetime
from enum import Enum

from pydantic import BaseModel, Field
from pydantic import Field

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/datamodels/dag_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations

from pydantic import BaseModel
from airflow.api_fastapi.core_api.base import BaseModel


class DAGSourceResponse(BaseModel):
Expand Down
3 changes: 1 addition & 2 deletions airflow/api_fastapi/core_api/datamodels/dag_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

from __future__ import annotations

from pydantic import BaseModel

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.utils.state import DagRunState


Expand Down
3 changes: 1 addition & 2 deletions airflow/api_fastapi/core_api/datamodels/dag_warning.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@

from datetime import datetime

from pydantic import BaseModel

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.models.dagwarning import DagWarningType


Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/datamodels/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
from pendulum.tz.timezone import FixedTimezone, Timezone
from pydantic import (
AliasGenerator,
BaseModel,
ConfigDict,
computed_field,
field_validator,
)

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.configuration import conf
from airflow.serialization.pydantic.dag import DagTagPydantic

Expand Down Expand Up @@ -144,7 +144,7 @@ def get_params(cls, params: abc.MutableMapping | None) -> dict | None:
"""Convert params attribute to dict representation."""
if params is None:
return None
return {param_name: param_val.dump() for param_name, param_val in params.items()}
return {k: v.dump() for k, v in params.items()}

# Mypy issue https://github.com/python/mypy/issues/1362
@computed_field # type: ignore[misc]
Expand Down
4 changes: 3 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/event_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

from datetime import datetime

from pydantic import BaseModel, ConfigDict, Field
from pydantic import ConfigDict, Field

from airflow.api_fastapi.core_api.base import BaseModel


class EventLogResponse(BaseModel):
Expand Down
4 changes: 3 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/import_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

from datetime import datetime

from pydantic import BaseModel, ConfigDict, Field
from pydantic import ConfigDict, Field

from airflow.api_fastapi.core_api.base import BaseModel


class ImportErrorResponse(BaseModel):
Expand Down
4 changes: 3 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

from datetime import datetime

from pydantic import BaseModel, ConfigDict
from pydantic import ConfigDict

from airflow.api_fastapi.core_api.base import BaseModel


class JobResponse(BaseModel):
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/datamodels/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations

from pydantic import BaseModel
from airflow.api_fastapi.core_api.base import BaseModel


class BaseInfoSchema(BaseModel):
Expand Down
3 changes: 2 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

from typing import Annotated, Any

from pydantic import BaseModel, BeforeValidator, ConfigDict, field_validator
from pydantic import BeforeValidator, ConfigDict, field_validator

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.plugins_manager import AirflowPluginSource


Expand Down
4 changes: 3 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

from typing import Annotated, Callable

from pydantic import BaseModel, BeforeValidator, ConfigDict, Field
from pydantic import BeforeValidator, ConfigDict, Field

from airflow.api_fastapi.core_api.base import BaseModel


def _call_function(function: Callable[[], int]) -> int:
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/datamodels/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from __future__ import annotations

from pydantic import BaseModel
from airflow.api_fastapi.core_api.base import BaseModel


class ProviderResponse(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
from pydantic import (
AliasPath,
AwareDatetime,
BaseModel,
BeforeValidator,
ConfigDict,
Field,
NonNegativeInt,
)

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.api_fastapi.core_api.datamodels.job import JobResponse
from airflow.api_fastapi.core_api.datamodels.trigger import TriggerResponse
from airflow.utils.state import TaskInstanceState
Expand Down
3 changes: 2 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
from datetime import datetime
from typing import Any

from pydantic import BaseModel, computed_field, field_validator, model_validator
from pydantic import computed_field, field_validator, model_validator

from airflow.api_fastapi.common.types import TimeDeltaWithValidation
from airflow.api_fastapi.core_api.base import BaseModel
from airflow.models.mappedoperator import MappedOperator
from airflow.serialization.serialized_objects import SerializedBaseOperator, encode_priority_weight_strategy
from airflow.task.priority_strategy import PriorityWeightStrategy
Expand Down
4 changes: 3 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
from datetime import datetime
from typing import Annotated

from pydantic import BaseModel, BeforeValidator, ConfigDict
from pydantic import BeforeValidator, ConfigDict

from airflow.api_fastapi.core_api.base import BaseModel


class TriggerResponse(BaseModel):
Expand Down
3 changes: 2 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

import json

from pydantic import BaseModel, ConfigDict, Field, model_validator
from pydantic import ConfigDict, Field, model_validator

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.typing_compat import Self
from airflow.utils.log.secrets_masker import redact

Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/datamodels/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations

from pydantic import BaseModel
from airflow.api_fastapi.core_api.base import BaseModel


class VersionInfo(BaseModel):
Expand Down
4 changes: 3 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
from datetime import datetime
from typing import Any

from pydantic import BaseModel, field_validator
from pydantic import field_validator

from airflow.api_fastapi.core_api.base import BaseModel


class XComResponse(BaseModel):
Expand Down
Loading