From 4bd5478dd944019e68903a522f2bed1259b7c69c Mon Sep 17 00:00:00 2001 From: Sparrow1029 <19390913+Sparrow1029@users.noreply.github.com> Date: Wed, 1 Oct 2025 06:24:59 -0600 Subject: [PATCH] make_async w run_in_threadpool wrapper for resolvers --- orchestrator/graphql/resolvers/helpers.py | 15 +++++++++++++++ orchestrator/graphql/resolvers/process.py | 8 +++++--- orchestrator/graphql/resolvers/product.py | 5 +++-- orchestrator/graphql/resolvers/product_block.py | 5 +++-- orchestrator/graphql/resolvers/resource_type.py | 5 +++-- orchestrator/graphql/resolvers/scheduled_tasks.py | 4 +++- orchestrator/graphql/resolvers/settings.py | 2 ++ orchestrator/graphql/resolvers/subscription.py | 8 +++++--- orchestrator/graphql/resolvers/version.py | 2 ++ orchestrator/graphql/resolvers/workflow.py | 5 +++-- 10 files changed, 44 insertions(+), 15 deletions(-) diff --git a/orchestrator/graphql/resolvers/helpers.py b/orchestrator/graphql/resolvers/helpers.py index 737b8ed4d..34550696d 100644 --- a/orchestrator/graphql/resolvers/helpers.py +++ b/orchestrator/graphql/resolvers/helpers.py @@ -1,11 +1,17 @@ from collections.abc import Sequence +from functools import wraps +from typing import Callable, Coroutine +import structlog from sqlalchemy import CompoundSelect, Select, select from sqlalchemy.orm.strategy_options import _AbstractLoad +from starlette.concurrency import run_in_threadpool from orchestrator.db import db from orchestrator.db.database import BaseModel +logger = structlog.get_logger(__name__) + def rows_from_statement( stmt: Select | CompoundSelect, @@ -19,3 +25,12 @@ def rows_from_statement( result = db.session.scalars(from_stmt) uresult = result.unique() if unique else result return uresult.all() + + +def make_async(f: Callable): # type: ignore + @wraps(f) + async def wrapper(*args, **kwargs) -> Coroutine: # type: ignore + logger.debug(f"**async, calling fn {f.__name__}") + return await run_in_threadpool(f, *args, **kwargs) + + return wrapper diff --git a/orchestrator/graphql/resolvers/process.py b/orchestrator/graphql/resolvers/process.py index 2ba43ced9..7f6556425 100644 --- a/orchestrator/graphql/resolvers/process.py +++ b/orchestrator/graphql/resolvers/process.py @@ -25,7 +25,7 @@ from orchestrator.db.sorting import Sort from orchestrator.db.sorting.process import process_sort_fields, sort_processes from orchestrator.graphql.pagination import Connection -from orchestrator.graphql.resolvers.helpers import rows_from_statement +from orchestrator.graphql.resolvers.helpers import make_async, rows_from_statement from orchestrator.graphql.schemas.process import ProcessType from orchestrator.graphql.types import GraphqlFilter, GraphqlSort, OrchestratorInfo from orchestrator.graphql.utils import ( @@ -55,7 +55,8 @@ def _enrich_process(process: ProcessTable, with_details: bool = False) -> Proces return ProcessSchema(**process_data) -async def resolve_process(info: OrchestratorInfo, process_id: UUID) -> ProcessType | None: +@make_async +def resolve_process(info: OrchestratorInfo, process_id: UUID) -> ProcessType | None: query_loaders = get_query_loaders_for_gql_fields(ProcessTable, info) stmt = select(ProcessTable).options(*query_loaders).where(ProcessTable.process_id == process_id) if process := db.session.scalar(stmt): @@ -64,7 +65,8 @@ async def resolve_process(info: OrchestratorInfo, process_id: UUID) -> ProcessTy return None -async def resolve_processes( +@make_async +def resolve_processes( info: OrchestratorInfo, filter_by: list[GraphqlFilter] | None = None, sort_by: list[GraphqlSort] | None = None, diff --git a/orchestrator/graphql/resolvers/product.py b/orchestrator/graphql/resolvers/product.py index 6db4fb19a..5f84fdc57 100644 --- a/orchestrator/graphql/resolvers/product.py +++ b/orchestrator/graphql/resolvers/product.py @@ -9,7 +9,7 @@ from orchestrator.db.sorting import Sort from orchestrator.db.sorting.product import product_sort_fields, sort_products from orchestrator.graphql.pagination import Connection -from orchestrator.graphql.resolvers.helpers import rows_from_statement +from orchestrator.graphql.resolvers.helpers import make_async, rows_from_statement from orchestrator.graphql.schemas.product import ProductType from orchestrator.graphql.types import GraphqlFilter, GraphqlSort, OrchestratorInfo from orchestrator.graphql.utils import create_resolver_error_handler, is_querying_page_data, to_graphql_result_page @@ -19,7 +19,8 @@ logger = structlog.get_logger(__name__) -async def resolve_products( +@make_async +def resolve_products( info: OrchestratorInfo, filter_by: list[GraphqlFilter] | None = None, sort_by: list[GraphqlSort] | None = None, diff --git a/orchestrator/graphql/resolvers/product_block.py b/orchestrator/graphql/resolvers/product_block.py index b4d539394..a6c90f25e 100644 --- a/orchestrator/graphql/resolvers/product_block.py +++ b/orchestrator/graphql/resolvers/product_block.py @@ -13,7 +13,7 @@ from orchestrator.db.sorting import Sort from orchestrator.db.sorting.product_block import product_block_sort_fields, sort_product_blocks from orchestrator.graphql.pagination import Connection -from orchestrator.graphql.resolvers.helpers import rows_from_statement +from orchestrator.graphql.resolvers.helpers import make_async, rows_from_statement from orchestrator.graphql.schemas.product_block import ProductBlock from orchestrator.graphql.types import GraphqlFilter, GraphqlSort, OrchestratorInfo from orchestrator.graphql.utils import create_resolver_error_handler, is_querying_page_data, to_graphql_result_page @@ -23,7 +23,8 @@ logger = structlog.get_logger(__name__) -async def resolve_product_blocks( +@make_async +def resolve_product_blocks( info: OrchestratorInfo, filter_by: list[GraphqlFilter] | None = None, sort_by: list[GraphqlSort] | None = None, diff --git a/orchestrator/graphql/resolvers/resource_type.py b/orchestrator/graphql/resolvers/resource_type.py index 4babf2547..b26579fc7 100644 --- a/orchestrator/graphql/resolvers/resource_type.py +++ b/orchestrator/graphql/resolvers/resource_type.py @@ -13,7 +13,7 @@ from orchestrator.db.sorting import Sort from orchestrator.db.sorting.resource_type import resource_type_sort_fields, sort_resource_types from orchestrator.graphql.pagination import Connection -from orchestrator.graphql.resolvers.helpers import rows_from_statement +from orchestrator.graphql.resolvers.helpers import make_async, rows_from_statement from orchestrator.graphql.schemas.resource_type import ResourceType from orchestrator.graphql.types import GraphqlFilter, GraphqlSort, OrchestratorInfo from orchestrator.graphql.utils import create_resolver_error_handler, is_querying_page_data, to_graphql_result_page @@ -23,7 +23,8 @@ logger = structlog.get_logger(__name__) -async def resolve_resource_types( +@make_async +def resolve_resource_types( info: OrchestratorInfo, filter_by: list[GraphqlFilter] | None = None, sort_by: list[GraphqlSort] | None = None, diff --git a/orchestrator/graphql/resolvers/scheduled_tasks.py b/orchestrator/graphql/resolvers/scheduled_tasks.py index 126465452..1058076ef 100644 --- a/orchestrator/graphql/resolvers/scheduled_tasks.py +++ b/orchestrator/graphql/resolvers/scheduled_tasks.py @@ -3,6 +3,7 @@ from orchestrator.db.filters import Filter from orchestrator.db.sorting import Sort from orchestrator.graphql.pagination import Connection +from orchestrator.graphql.resolvers.helpers import make_async from orchestrator.graphql.schemas.scheduled_task import ScheduledTaskGraphql from orchestrator.graphql.types import GraphqlFilter, GraphqlSort, OrchestratorInfo from orchestrator.graphql.utils import create_resolver_error_handler, to_graphql_result_page @@ -12,7 +13,8 @@ logger = structlog.get_logger(__name__) -async def resolve_scheduled_tasks( +@make_async +def resolve_scheduled_tasks( info: OrchestratorInfo, filter_by: list[GraphqlFilter] | None = None, sort_by: list[GraphqlSort] | None = None, diff --git a/orchestrator/graphql/resolvers/settings.py b/orchestrator/graphql/resolvers/settings.py index 629755f23..ba20f1678 100644 --- a/orchestrator/graphql/resolvers/settings.py +++ b/orchestrator/graphql/resolvers/settings.py @@ -4,6 +4,7 @@ from oauth2_lib.strawberry import authenticated_mutation_field from orchestrator.api.api_v1.endpoints.settings import generate_engine_status_response +from orchestrator.graphql.resolvers.helpers import make_async from orchestrator.graphql.schemas.errors import Error from orchestrator.graphql.schemas.settings import ( CACHE_FLUSH_OPTIONS, @@ -27,6 +28,7 @@ # Queries +@make_async def resolve_settings(info: OrchestratorInfo) -> StatusType: selected_fields = get_selected_fields(info) diff --git a/orchestrator/graphql/resolvers/subscription.py b/orchestrator/graphql/resolvers/subscription.py index 14bad60ee..1c79c4d71 100644 --- a/orchestrator/graphql/resolvers/subscription.py +++ b/orchestrator/graphql/resolvers/subscription.py @@ -18,6 +18,7 @@ from pydantic.alias_generators import to_camel as to_lower_camel from sqlalchemy import Select, func, select from sqlalchemy.orm import contains_eager +from starlette.concurrency import run_in_threadpool from strawberry.experimental.pydantic.conversion_types import StrawberryTypeFromPydantic from nwastdlib.asyncio import gather_nice @@ -101,7 +102,7 @@ async def format_subscription(info: OrchestratorInfo, subscription: Subscription async def resolve_subscription(info: OrchestratorInfo, id: UUID) -> SubscriptionInterface | None: stmt = select(SubscriptionTable).where(SubscriptionTable.subscription_id == id) - if subscription := db.session.scalar(stmt): + if subscription := await run_in_threadpool(db.session.scalar, stmt): return await format_subscription(info, subscription) return None @@ -141,12 +142,13 @@ async def resolve_subscriptions( stmt = filter_by_query_string(stmt, query) stmt = cast(Select, sort_subscriptions(stmt, pydantic_sort_by, _error_handler)) - total = db.session.scalar(select(func.count()).select_from(stmt.subquery())) + total = await run_in_threadpool(db.session.scalar, select(func.count()).select_from(stmt.subquery())) stmt = apply_range_to_statement(stmt, after, after + first + 1) graphql_subscriptions: list[SubscriptionInterface] = [] if is_querying_page_data(info): - subscriptions = db.session.scalars(stmt).all() + scalars = await run_in_threadpool(db.session.scalars, stmt) + subscriptions = scalars.all() graphql_subscriptions = list(await gather_nice((format_subscription(info, p) for p in subscriptions))) # type: ignore logger.info("Resolve subscriptions", filter_by=filter_by, total=total) diff --git a/orchestrator/graphql/resolvers/version.py b/orchestrator/graphql/resolvers/version.py index ce34a2fc5..34681ee69 100644 --- a/orchestrator/graphql/resolvers/version.py +++ b/orchestrator/graphql/resolvers/version.py @@ -1,6 +1,7 @@ from structlog import get_logger from orchestrator import __version__ +from orchestrator.graphql.resolvers.helpers import make_async from orchestrator.graphql.schemas.version import VersionType from orchestrator.graphql.types import OrchestratorInfo from orchestrator.graphql.utils import create_resolver_error_handler @@ -11,6 +12,7 @@ VERSIONS = [f"orchestrator-core: {__version__}"] +@make_async def resolve_version(info: OrchestratorInfo) -> VersionType | None: logger.debug("resolve_version() called") _error_handler = create_resolver_error_handler(info) diff --git a/orchestrator/graphql/resolvers/workflow.py b/orchestrator/graphql/resolvers/workflow.py index 58a63e5e3..6c7bae0c9 100644 --- a/orchestrator/graphql/resolvers/workflow.py +++ b/orchestrator/graphql/resolvers/workflow.py @@ -9,7 +9,7 @@ from orchestrator.db.sorting import Sort from orchestrator.db.sorting.workflow import sort_workflows, workflow_sort_fields from orchestrator.graphql.pagination import Connection -from orchestrator.graphql.resolvers.helpers import rows_from_statement +from orchestrator.graphql.resolvers.helpers import make_async, rows_from_statement from orchestrator.graphql.schemas.workflow import Workflow from orchestrator.graphql.types import GraphqlFilter, GraphqlSort, OrchestratorInfo from orchestrator.graphql.utils import create_resolver_error_handler, is_querying_page_data, to_graphql_result_page @@ -19,7 +19,8 @@ logger = structlog.get_logger(__name__) -async def resolve_workflows( +@make_async +def resolve_workflows( info: OrchestratorInfo, filter_by: list[GraphqlFilter] | None = None, sort_by: list[GraphqlSort] | None = None,