Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions benches/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
"--benchmark-columns=mean,stddev,min,max,rounds",
"--benchmark-sort=mean",
"--benchmark-min-rounds=1",
# Ignore benchmark machine info warnings that occur due to different CI runner CPU frequencies
# see https://github.com/ionelmc/pytest-benchmark/issues/255
"-W",
"ignore::pytest_benchmark.logger.PytestBenchmarkWarning",
]
+ (targets or [default_target])
+ options,
Expand Down
2 changes: 2 additions & 0 deletions src/prefect/server/database/alembic.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ prepend_sys_path = .
revision_environment = true

file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d%%(second).2d_%%(rev)s_%%(slug)s

path_separator = os
4 changes: 2 additions & 2 deletions src/prefect/server/database/configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,8 @@ async def engine(self) -> AsyncEngine:
)

engine = create_async_engine(self.connection_url, echo=self.echo, **kwargs)
sa.event.listen(engine.sync_engine, "connect", self.setup_sqlite)
sa.event.listen(engine.sync_engine, "begin", self.begin_sqlite_stmt)
event.listen(engine.sync_engine, "connect", self.setup_sqlite)
event.listen(engine.sync_engine, "begin", self.begin_sqlite_stmt)

if logfire:
logfire.instrument_sqlalchemy(engine) # pyright: ignore
Expand Down
9 changes: 4 additions & 5 deletions src/prefect/server/database/orm_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
relationship,
synonym,
)
from sqlalchemy.orm.decl_api import registry as RegistryType
from sqlalchemy.sql import roles
from sqlalchemy.sql.functions import coalesce

Expand Down Expand Up @@ -59,7 +60,7 @@ class Base(DeclarativeBase):
and provides ID, created, and updated columns
"""

registry: ClassVar[sa.orm.registry] = registry(
registry: ClassVar[RegistryType] = registry(
metadata=sa.schema.MetaData(
# define naming conventions for our Base class to use
# sqlalchemy will use the following templated strings
Expand Down Expand Up @@ -846,10 +847,8 @@ def job_variables(self) -> Mapped[dict[str, Any]]:
concurrency_limit_id: Mapped[Optional[uuid.UUID]] = mapped_column(
sa.ForeignKey("concurrency_limit_v2.id", ondelete="SET NULL"),
)
global_concurrency_limit: Mapped[Optional["ConcurrencyLimitV2"]] = (
sa.orm.relationship(
lazy="selectin",
)
global_concurrency_limit: Mapped[Optional["ConcurrencyLimitV2"]] = relationship(
lazy="selectin",
)
concurrency_options: Mapped[Optional[schemas.core.ConcurrencyOptions]] = (
mapped_column(
Expand Down
116 changes: 58 additions & 58 deletions src/prefect/server/orchestration/core_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ class SecureTaskConcurrencySlots(TaskRunOrchestrationRule):

async def before_transition(
self,
initial_state: states.State | None,
proposed_state: states.State | None,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.TaskRun, core.TaskRunPolicy],
) -> None:
self._applied_limits: list[str] = []
Expand Down Expand Up @@ -352,8 +352,8 @@ async def before_transition(

async def cleanup(
self,
initial_state: states.State | None,
validated_state: states.State | None,
initial_state: states.State[Any] | None,
validated_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.TaskRun, core.TaskRunPolicy],
) -> None:
for tag in self._applied_limits:
Expand Down Expand Up @@ -417,8 +417,8 @@ class SecureFlowConcurrencySlots(FlowRunOrchestrationRule):

async def before_transition( # type: ignore
self,
initial_state: states.State | None,
proposed_state: states.State | None,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: FlowOrchestrationContext,
) -> None:
if not context.session or not context.run.deployment_id:
Expand Down Expand Up @@ -485,8 +485,8 @@ async def before_transition( # type: ignore

async def cleanup( # type: ignore
self,
initial_state: states.State | None,
validated_state: states.State | None,
initial_state: states.State[Any] | None,
validated_state: states.State[Any] | None,
context: FlowOrchestrationContext,
) -> None:
logger = get_logger()
Expand Down Expand Up @@ -579,8 +579,8 @@ class CacheInsertion(TaskRunOrchestrationRule):

async def before_transition(
self,
initial_state: states.State | None,
proposed_state: states.State | None,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.TaskRun, core.TaskRunPolicy],
) -> None:
if proposed_state is None:
Expand All @@ -598,8 +598,8 @@ async def before_transition(
async def after_transition(
self,
db: PrefectDBInterface,
initial_state: states.State | None,
validated_state: states.State | None,
initial_state: states.State[Any] | None,
validated_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.TaskRun, core.TaskRunPolicy],
) -> None:
if not validated_state or not context.session:
Expand Down Expand Up @@ -631,8 +631,8 @@ class CacheRetrieval(TaskRunOrchestrationRule):
async def before_transition(
self,
db: PrefectDBInterface,
initial_state: states.State | None,
proposed_state: states.State | None,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.TaskRun, core.TaskRunPolicy],
) -> None:
if not proposed_state:
Expand Down Expand Up @@ -679,8 +679,8 @@ class RetryFailedFlows(FlowRunOrchestrationRule):

async def before_transition(
self,
initial_state: states.State | None,
proposed_state: states.State | None,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy],
) -> None:
if initial_state is None or proposed_state is None:
Expand Down Expand Up @@ -756,8 +756,8 @@ class RetryFailedTasks(TaskRunOrchestrationRule):

async def before_transition(
self,
initial_state: states.State | None,
proposed_state: states.State | None,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.TaskRun, core.TaskRunPolicy],
) -> None:
if initial_state is None or proposed_state is None:
Expand Down Expand Up @@ -803,8 +803,8 @@ class EnqueueScheduledTasks(TaskRunOrchestrationRule):

async def after_transition(
self,
initial_state: states.State | None,
validated_state: states.State | None,
initial_state: states.State[Any] | None,
validated_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.TaskRun, core.TaskRunPolicy],
) -> None:
if not validated_state:
Expand Down Expand Up @@ -837,8 +837,8 @@ class RenameReruns(GenericOrchestrationRule):

async def before_transition(
self,
initial_state: states.State | None,
proposed_state: states.State | None,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: OrchestrationContext[
orm_models.Run, core.TaskRunPolicy | core.FlowRunPolicy
],
Expand Down Expand Up @@ -869,8 +869,8 @@ class CopyScheduledTime(

async def before_transition(
self,
initial_state: states.State | None,
proposed_state: states.State | None,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: OrchestrationContext[
orm_models.Run, core.TaskRunPolicy | core.FlowRunPolicy
],
Expand Down Expand Up @@ -902,8 +902,8 @@ class WaitForScheduledTime(

async def before_transition(
self,
initial_state: states.State | None,
proposed_state: states.State | None,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: OrchestrationContext[
orm_models.Run, core.TaskRunPolicy | core.FlowRunPolicy
],
Expand Down Expand Up @@ -940,8 +940,8 @@ class CopyTaskParametersID(TaskRunOrchestrationRule):

async def before_transition(
self,
initial_state: states.State | None,
proposed_state: states.State | None,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.TaskRun, core.TaskRunPolicy],
) -> None:
if initial_state is None or proposed_state is None:
Expand All @@ -963,8 +963,8 @@ class HandlePausingFlows(FlowRunOrchestrationRule):

async def before_transition(
self,
initial_state: states.State | None,
proposed_state: states.State | None,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy],
) -> None:
if proposed_state is None:
Expand Down Expand Up @@ -1010,8 +1010,8 @@ async def before_transition(

async def after_transition(
self,
initial_state: states.State | None,
validated_state: states.State | None,
initial_state: states.State[Any] | None,
validated_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy],
) -> None:
updated_policy = context.run.empirical_policy.model_dump()
Expand All @@ -1029,8 +1029,8 @@ class HandleResumingPausedFlows(FlowRunOrchestrationRule):

async def before_transition(
self,
initial_state: states.State | None,
proposed_state: states.State | None,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy],
) -> None:
if initial_state is None or proposed_state is None:
Expand Down Expand Up @@ -1084,8 +1084,8 @@ async def before_transition(

async def after_transition(
self,
initial_state: states.State | None,
validated_state: states.State | None,
initial_state: states.State[Any] | None,
validated_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy],
) -> None:
updated_policy = context.run.empirical_policy.model_dump()
Expand All @@ -1103,8 +1103,8 @@ class UpdateFlowRunTrackerOnTasks(TaskRunOrchestrationRule):

async def after_transition(
self,
initial_state: states.State | None,
validated_state: states.State | None,
initial_state: states.State[Any] | None,
validated_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.TaskRun, core.TaskRunPolicy],
) -> None:
if context.run.flow_run_id is not None:
Expand Down Expand Up @@ -1135,8 +1135,8 @@ class HandleTaskTerminalStateTransitions(TaskRunOrchestrationRule):

async def before_transition(
self,
initial_state: states.State | None,
proposed_state: states.State | None,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.TaskRun, core.TaskRunPolicy],
) -> None:
if initial_state is None or proposed_state is None:
Expand Down Expand Up @@ -1176,8 +1176,8 @@ async def before_transition(

async def cleanup(
self,
initial_state: states.State | None,
validated_state: states.State | None,
initial_state: states.State[Any] | None,
validated_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.TaskRun, core.TaskRunPolicy],
) -> None:
# reset run count
Expand All @@ -1200,8 +1200,8 @@ class HandleFlowTerminalStateTransitions(FlowRunOrchestrationRule):

async def before_transition(
self,
initial_state: states.State | None,
proposed_state: states.State | None,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy],
) -> None:
if initial_state is None or proposed_state is None:
Expand Down Expand Up @@ -1255,8 +1255,8 @@ async def before_transition(

async def cleanup(
self,
initial_state: states.State | None,
validated_state: states.State | None,
initial_state: states.State[Any] | None,
validated_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy],
) -> None:
context.run.empirical_policy = core.FlowRunPolicy(**self.original_flow_policy)
Expand Down Expand Up @@ -1294,8 +1294,8 @@ class PreventPendingTransitions(GenericOrchestrationRule):

async def before_transition(
self,
initial_state: states.State | None,
proposed_state: states.State | None,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: OrchestrationContext[
orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
],
Expand All @@ -1317,8 +1317,8 @@ class EnsureOnlyScheduledFlowsMarkedLate(FlowRunOrchestrationRule):

async def before_transition(
self,
initial_state: states.State | None,
proposed_state: states.State | None,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy],
) -> None:
if initial_state is None or proposed_state is None:
Expand Down Expand Up @@ -1346,8 +1346,8 @@ class PreventRunningTasksFromStoppedFlows(TaskRunOrchestrationRule):

async def before_transition(
self,
initial_state: states.State | None,
proposed_state: states.State | None,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.TaskRun, core.TaskRunPolicy],
) -> None:
flow_run = await context.flow_run()
Expand Down Expand Up @@ -1390,8 +1390,8 @@ class EnforceCancellingToCancelledTransition(TaskRunOrchestrationRule):

async def before_transition(
self,
initial_state: states.State | None,
proposed_state: states.State | None,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.TaskRun, core.TaskRunPolicy],
) -> None:
await self.reject_transition(
Expand Down Expand Up @@ -1422,8 +1422,8 @@ class BypassCancellingFlowRunsWithNoInfra(FlowRunOrchestrationRule):

async def before_transition(
self,
initial_state: states.State | None,
proposed_state: states.State | None,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy],
) -> None:
if initial_state is None or proposed_state is None:
Expand Down Expand Up @@ -1468,8 +1468,8 @@ class PreventDuplicateTransitions(FlowRunOrchestrationRule):

async def before_transition(
self,
initial_state: states.State | None,
proposed_state: states.State | None,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy],
) -> None:
if initial_state is None or proposed_state is None:
Expand Down
Loading
Loading