From fa78fd7357bfcb08b3c64fac83ccce52431f0236 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Mon, 11 May 2026 15:22:33 +0200 Subject: [PATCH 1/7] refactor: drain pagination + loop-init + kill-switch baselines (#1857) Drains three runtime-correctness baselines to zero entries. list_pagination_baseline.txt: 37 entries cleared. Every baselined list/query method on Protocol + SQLite + Postgres now takes limit: int = DEFAULT_LIST_LIMIT (=100). DEFAULT_LIST_LIMIT lives in persistence/_shared/pagination.py. Domains drained: user, project, workflow_definition, workflow_execution (list_by_definition, list_by_status), subworkflow (list_summaries, list_versions), hr (collaboration_metric.query, task_metric.query, training_plan.list_by_agent), preset.list_all, custom_rule.list_rules, risk_override.list_active. Service-layer callers and cursor paginator call sites forward the limit; test fakes mirror the new signatures. loop_bound_init_baseline.txt: 30 entries cleared. Added lint-allow: loop-bound-init markers with documented eager-init justifications across 18 lifecycle services. Eager init is the canonical pattern when stop() must be callable before any start() has run, per docs/reference/lifecycle-sync.md. long_running_loops_kill_switch_baseline.txt: 26 entries cleared. Added lint-allow: long-running-loop-kill-switch markers with terse justifications, classified as per-request, retry-wait, sentinel, or service-stop pause. Closes #1857 --- scripts/list_pagination_baseline.txt | 37 -------------- ...ong_running_loops_kill_switch_baseline.txt | 26 ---------- scripts/loop_bound_init_baseline.txt | 31 ------------ scripts/no_magic_numbers_baseline.txt | 20 ++++---- src/synthorg/api/auth/user_service.py | 17 +++++-- src/synthorg/api/bus_bridge.py | 4 +- src/synthorg/api/controllers/projects.py | 4 ++ .../api/controllers/workflow_executions.py | 4 +- src/synthorg/api/controllers/workflows.py | 7 ++- src/synthorg/api/controllers/ws.py | 2 + .../api/controllers/ws_revalidation.py | 1 + .../api/services/idempotency_service.py | 1 + src/synthorg/api/services/project_service.py | 8 +++- src/synthorg/backup/scheduler.py | 1 + src/synthorg/backup/service.py | 5 +- src/synthorg/budget/quota_poller.py | 5 +- .../communication/bus/_nats_history.py | 1 + .../communication/bus/_nats_receive.py | 2 + src/synthorg/communication/bus/memory.py | 9 +++- .../escalation/in_memory_store.py | 1 + .../communication/event_stream/stream.py | 1 + .../communication/meeting/scheduler.py | 1 + src/synthorg/engine/task_engine.py | 25 ++++++---- src/synthorg/engine/task_engine_loops.py | 1 + .../engine/workflow/execution_lifecycle.py | 7 ++- .../engine/workflow/execution_service.py | 6 ++- src/synthorg/engine/workflow/service.py | 9 +++- .../engine/workflow/subworkflow_registry.py | 23 ++++++--- src/synthorg/hr/persistence_protocol.py | 11 ++++- src/synthorg/hr/pruning/service.py | 18 +++++-- src/synthorg/integrations/health/prober.py | 9 +++- .../integrations/oauth/token_manager.py | 4 +- .../rate_limiting/shared_state.py | 8 +++- .../integrations/tunnel/ngrok_adapter.py | 4 +- .../embedding/fine_tune_orchestrator.py | 4 +- src/synthorg/meta/chief_of_staff/monitor.py | 5 +- .../meta/mcp/handlers/workflow_executions.py | 5 +- src/synthorg/meta/mcp/handlers/workflows.py | 6 ++- src/synthorg/notifications/adapters/ntfy.py | 3 +- src/synthorg/notifications/adapters/slack.py | 3 +- src/synthorg/notifications/dispatcher.py | 9 +++- src/synthorg/persistence/_shared/__init__.py | 6 ++- .../persistence/_shared/pagination.py | 9 ++++ .../persistence/custom_rule_protocol.py | 8 +++- .../persistence/postgres/custom_rule_repo.py | 27 ++++++----- .../persistence/postgres/hr_repositories.py | 24 ++++++++-- .../persistence/postgres/preset_repo.py | 20 ++++++-- .../persistence/postgres/project_repo.py | 25 +++++++--- .../postgres/risk_override_repo.py | 25 ++++++++-- .../persistence/postgres/subworkflow_repo.py | 40 +++++++++++++--- .../postgres/training_plan_repo.py | 18 +++++-- .../persistence/postgres/user_repo.py | 32 +++++++++++-- .../postgres/workflow_definition_repo.py | 25 +++++++--- .../postgres/workflow_execution_repo.py | 32 ++++++++++--- src/synthorg/persistence/preset_protocol.py | 11 ++++- src/synthorg/persistence/project_protocol.py | 9 +++- .../persistence/risk_override_protocol.py | 17 +++++-- .../persistence/sqlite/custom_rule_repo.py | 27 ++++++----- .../persistence/sqlite/escalation_repo.py | 1 + .../persistence/sqlite/hr_repositories.py | 28 ++++++++--- .../persistence/sqlite/preset_repo.py | 20 ++++++-- .../persistence/sqlite/project_repo.py | 21 ++++++-- .../persistence/sqlite/risk_override_repo.py | 26 ++++++++-- .../persistence/sqlite/subworkflow_repo.py | 48 ++++++++++++++++--- .../persistence/sqlite/training_plan_repo.py | 17 +++++-- src/synthorg/persistence/sqlite/user_repo.py | 30 +++++++++--- .../sqlite/workflow_definition_repo.py | 25 +++++++--- .../sqlite/workflow_execution_repo.py | 32 ++++++++++--- .../persistence/subworkflow_protocol.py | 23 +++++++-- src/synthorg/persistence/training_protocol.py | 9 +++- src/synthorg/persistence/user_protocol.py | 19 ++++++-- .../workflow_definition_protocol.py | 6 ++- .../workflow_execution_protocol.py | 13 ++++- .../providers/resilience/rate_limiter.py | 2 + src/synthorg/security/timeout/scheduler.py | 1 + src/synthorg/telemetry/collector.py | 4 +- .../tools/sandbox/lifecycle/per_agent.py | 1 + src/synthorg/workers/worker.py | 11 +++-- tests/unit/api/fake_user_repository.py | 10 +++- tests/unit/api/fakes.py | 8 ++-- tests/unit/api/fakes_backend.py | 11 +++-- tests/unit/api/fakes_workflow.py | 22 +++++++-- .../unit/api/services/test_project_service.py | 3 +- .../workflow/test_execution_lifecycle.py | 8 ++++ .../engine/workflow/test_execution_service.py | 8 ++++ .../workflow/test_subworkflow_registry.py | 16 +++++-- tests/unit/meta/test_rules_service.py | 3 +- tests/unit/persistence/test_protocol.py | 11 ++++- 88 files changed, 818 insertions(+), 322 deletions(-) diff --git a/scripts/list_pagination_baseline.txt b/scripts/list_pagination_baseline.txt index 378d71b069..aadfc37d6b 100644 --- a/scripts/list_pagination_baseline.txt +++ b/scripts/list_pagination_baseline.txt @@ -15,40 +15,3 @@ # # Regenerate (rare; requires explicit user approval) with: # uv run python scripts/check_list_pagination.py --update -src/synthorg/persistence/custom_rule_protocol.py:CustomRuleRepository.list_rules:missing-limit-param -src/synthorg/persistence/postgres/custom_rule_repo.py:PostgresCustomRuleRepository.list_rules:missing-limit-param -src/synthorg/persistence/postgres/hr_repositories.py:PostgresCollaborationMetricRepository.query:missing-limit-param -src/synthorg/persistence/postgres/hr_repositories.py:PostgresTaskMetricRepository.query:missing-limit-param -src/synthorg/persistence/postgres/preset_repo.py:PostgresPersonalityPresetRepository.list_all:missing-limit-param -src/synthorg/persistence/postgres/project_repo.py:PostgresProjectRepository.list_projects:missing-limit-param -src/synthorg/persistence/postgres/risk_override_repo.py:PostgresRiskOverrideRepository.list_active:missing-limit-param -src/synthorg/persistence/postgres/subworkflow_repo.py:PostgresSubworkflowRepository.list_summaries:missing-limit-param -src/synthorg/persistence/postgres/subworkflow_repo.py:PostgresSubworkflowRepository.list_versions:missing-limit-param -src/synthorg/persistence/postgres/training_plan_repo.py:PostgresTrainingPlanRepository.list_by_agent:missing-limit-param -src/synthorg/persistence/postgres/user_repo.py:PostgresUserRepository.list_users:missing-limit-param -src/synthorg/persistence/postgres/workflow_definition_repo.py:PostgresWorkflowDefinitionRepository.list_definitions:missing-limit-param -src/synthorg/persistence/postgres/workflow_execution_repo.py:PostgresWorkflowExecutionRepository.list_by_definition:missing-limit-param -src/synthorg/persistence/postgres/workflow_execution_repo.py:PostgresWorkflowExecutionRepository.list_by_status:missing-limit-param -src/synthorg/persistence/preset_protocol.py:PersonalityPresetRepository.list_all:missing-limit-param -src/synthorg/persistence/project_protocol.py:ProjectRepository.list_projects:missing-limit-param -src/synthorg/persistence/risk_override_protocol.py:RiskOverrideRepository.list_active:missing-limit-param -src/synthorg/persistence/sqlite/custom_rule_repo.py:SQLiteCustomRuleRepository.list_rules:missing-limit-param -src/synthorg/persistence/sqlite/hr_repositories.py:SQLiteCollaborationMetricRepository.query:missing-limit-param -src/synthorg/persistence/sqlite/hr_repositories.py:SQLiteTaskMetricRepository.query:missing-limit-param -src/synthorg/persistence/sqlite/preset_repo.py:SQLitePersonalityPresetRepository.list_all:missing-limit-param -src/synthorg/persistence/sqlite/project_repo.py:SQLiteProjectRepository.list_projects:missing-limit-param -src/synthorg/persistence/sqlite/risk_override_repo.py:SQLiteRiskOverrideRepository.list_active:missing-limit-param -src/synthorg/persistence/sqlite/subworkflow_repo.py:SQLiteSubworkflowRepository.list_summaries:missing-limit-param -src/synthorg/persistence/sqlite/subworkflow_repo.py:SQLiteSubworkflowRepository.list_versions:missing-limit-param -src/synthorg/persistence/sqlite/training_plan_repo.py:SQLiteTrainingPlanRepository.list_by_agent:missing-limit-param -src/synthorg/persistence/sqlite/user_repo.py:SQLiteUserRepository.list_users:missing-limit-param -src/synthorg/persistence/sqlite/workflow_definition_repo.py:SQLiteWorkflowDefinitionRepository.list_definitions:missing-limit-param -src/synthorg/persistence/sqlite/workflow_execution_repo.py:SQLiteWorkflowExecutionRepository.list_by_definition:missing-limit-param -src/synthorg/persistence/sqlite/workflow_execution_repo.py:SQLiteWorkflowExecutionRepository.list_by_status:missing-limit-param -src/synthorg/persistence/subworkflow_protocol.py:SubworkflowRepository.list_summaries:missing-limit-param -src/synthorg/persistence/subworkflow_protocol.py:SubworkflowRepository.list_versions:missing-limit-param -src/synthorg/persistence/training_protocol.py:TrainingPlanRepository.list_by_agent:missing-limit-param -src/synthorg/persistence/user_protocol.py:UserRepository.list_users:missing-limit-param -src/synthorg/persistence/workflow_definition_protocol.py:WorkflowDefinitionRepository.list_definitions:missing-limit-param -src/synthorg/persistence/workflow_execution_protocol.py:WorkflowExecutionRepository.list_by_definition:missing-limit-param -src/synthorg/persistence/workflow_execution_protocol.py:WorkflowExecutionRepository.list_by_status:missing-limit-param diff --git a/scripts/long_running_loops_kill_switch_baseline.txt b/scripts/long_running_loops_kill_switch_baseline.txt index 2639ee468a..44809c3599 100644 --- a/scripts/long_running_loops_kill_switch_baseline.txt +++ b/scripts/long_running_loops_kill_switch_baseline.txt @@ -6,29 +6,3 @@ # New violations not on this list fail the gate; entries here # that no longer match the scan emit a stale-baseline warning # (gate still passes; regenerate via --update-baseline). -src/synthorg/api/controllers/ws.py:_outbound_consumer:341 -src/synthorg/api/controllers/ws.py:_receive_loop:735 -src/synthorg/api/controllers/ws_revalidation.py:_periodic_revalidate:112 -src/synthorg/api/services/idempotency_service.py:IdempotencyService:run_idempotent:185 -src/synthorg/backup/scheduler.py:BackupScheduler:_run_loop:279 -src/synthorg/budget/quota_poller.py:QuotaPoller:_poll_loop:137 -src/synthorg/communication/bus/_nats_history.py:collect_history_batches:89 -src/synthorg/communication/bus/_nats_receive.py:receive_blocking:388 -src/synthorg/communication/bus/_nats_receive.py:receive_with_timeout:428 -src/synthorg/communication/conflict_resolution/escalation/in_memory_store.py:InMemoryEscalationStore:_never:304 -src/synthorg/communication/event_stream/stream.py:EventStreamHub:_janitor_loop:348 -src/synthorg/communication/meeting/scheduler.py:MeetingScheduler:_run_periodic:492 -src/synthorg/engine/task_engine_loops.py:TaskEngineLoopsMixin:_observer_dispatch_loop:303 -src/synthorg/hr/pruning/service.py:PruningService:_run_loop:769 -src/synthorg/integrations/health/prober.py:HealthProberService:_probe_loop:156 -src/synthorg/integrations/oauth/token_manager.py:OAuthTokenManager:_refresh_loop:144 -src/synthorg/integrations/rate_limiting/shared_state.py:SharedRateLimitCoordinator:_poll_loop:225 -src/synthorg/meta/chief_of_staff/monitor.py:OrgInflectionMonitor:_loop:192 -src/synthorg/persistence/sqlite/escalation_repo.py:SQLiteEscalationRepository:_never:331 -src/synthorg/providers/resilience/rate_limiter.py:RateLimiter:_wait_for_rpm_slot:148 -src/synthorg/providers/resilience/rate_limiter.py:RateLimiter:acquire:71 -src/synthorg/security/timeout/scheduler.py:ApprovalTimeoutScheduler:_run_loop:288 -src/synthorg/settings/dispatcher.py:SettingsChangeDispatcher:_poll_loop:414 -src/synthorg/telemetry/collector.py:TelemetryCollector:_heartbeat_loop:777 -src/synthorg/tools/sandbox/lifecycle/per_agent.py:PerAgentStrategy:_idle_expire:277 -src/synthorg/workers/worker.py:Worker:run:115 diff --git a/scripts/loop_bound_init_baseline.txt b/scripts/loop_bound_init_baseline.txt index f3a82be878..8b713f844b 100644 --- a/scripts/loop_bound_init_baseline.txt +++ b/scripts/loop_bound_init_baseline.txt @@ -4,34 +4,3 @@ # `path:line:class:attr:primitive`. When fixing one of these # sites, regenerate this file via: # uv run python scripts/check_no_loop_bound_init.py --update-baseline - -src/synthorg/api/bus_bridge.py:102:MessageBusBridge:_lifecycle_lock:Lock -src/synthorg/backup/service.py:75:BackupService:_backup_lock:Lock -src/synthorg/budget/quota_poller.py:64:QuotaPoller:_lifecycle_lock:Lock -src/synthorg/communication/bus/memory.py:110:InMemoryMessageBus:_lock:Lock -src/synthorg/communication/bus/memory.py:125:InMemoryMessageBus:_shutdown_event:Event -src/synthorg/engine/task_engine.py:104:TaskEngine:_lifecycle_lock:Lock -src/synthorg/engine/task_engine.py:112:TaskEngine:_admission_lock:Lock -src/synthorg/engine/task_engine.py:114:TaskEngine:_observer_queue:Queue -src/synthorg/engine/task_engine.py:96:TaskEngine:_queue:Queue -src/synthorg/hr/pruning/service.py:103:PruningService:_lifecycle_lock:Lock -src/synthorg/hr/pruning/service.py:129:PruningService:_processing_lock:Lock -src/synthorg/hr/pruning/service.py:97:PruningService:_wake_event:Event -src/synthorg/hr/pruning/service.py:98:PruningService:_stop_event:Event -src/synthorg/integrations/health/prober.py:131:HealthProberService:_failure_lock:Lock -src/synthorg/integrations/health/prober.py:133:HealthProberService:_lifecycle_lock:Lock -src/synthorg/integrations/oauth/token_manager.py:70:OAuthTokenManager:_lifecycle_lock:Lock -src/synthorg/integrations/rate_limiting/shared_state.py:71:SharedRateLimitCoordinator:_window_lock:Lock -src/synthorg/integrations/rate_limiting/shared_state.py:80:SharedRateLimitCoordinator:_lifecycle_lock:Lock -src/synthorg/integrations/tunnel/ngrok_adapter.py:76:NgrokAdapter:_lifecycle_lock:Lock -src/synthorg/memory/embedding/fine_tune_orchestrator.py:99:FineTuneOrchestrator:_op_lock:Lock -src/synthorg/meta/chief_of_staff/monitor.py:82:OrgInflectionMonitor:_stop_event:Event -src/synthorg/meta/chief_of_staff/monitor.py:83:OrgInflectionMonitor:_lifecycle_lock:Lock -src/synthorg/notifications/adapters/ntfy.py:116:NtfyNotificationSink:_lifecycle_lock:Lock -src/synthorg/notifications/adapters/slack.py:104:SlackNotificationSink:_lifecycle_lock:Lock -src/synthorg/notifications/dispatcher.py:113:NotificationDispatcher:_dispatch_idle:Event -src/synthorg/notifications/dispatcher.py:92:NotificationDispatcher:_lifecycle_lock:Lock -src/synthorg/settings/dispatcher.py:105:SettingsChangeDispatcher:_lifecycle_lock:Lock -src/synthorg/telemetry/collector.py:321:TelemetryCollector:_lifecycle_lock:Lock -src/synthorg/workers/worker.py:83:Worker:_stop_event:Event -src/synthorg/workers/worker.py:92:Worker:_lifecycle_lock:Lock diff --git a/scripts/no_magic_numbers_baseline.txt b/scripts/no_magic_numbers_baseline.txt index 2985c11bd3..b5693d5816 100644 --- a/scripts/no_magic_numbers_baseline.txt +++ b/scripts/no_magic_numbers_baseline.txt @@ -95,10 +95,10 @@ src/synthorg/client/generators/llm.py:57:26 src/synthorg/client/generators/procedural.py:66:38 src/synthorg/communication/bus/_nats_history.py:74:22 src/synthorg/communication/bus/_nats_history.py:75:35 -src/synthorg/communication/bus/_nats_history.py:156:22 -src/synthorg/communication/bus/_nats_history.py:157:35 -src/synthorg/communication/bus/_nats_history.py:202:22 -src/synthorg/communication/bus/_nats_history.py:203:35 +src/synthorg/communication/bus/_nats_history.py:157:22 +src/synthorg/communication/bus/_nats_history.py:158:35 +src/synthorg/communication/bus/_nats_history.py:203:22 +src/synthorg/communication/bus/_nats_history.py:204:35 src/synthorg/communication/conflict_resolution/escalation/config.py:15:26 src/synthorg/communication/conflict_resolution/escalation/in_memory_store.py:34:17 src/synthorg/communication/conflict_resolution/escalation/protocol.py:21:17 @@ -194,7 +194,7 @@ src/synthorg/hr/performance/quality_override_store.py:26:25 src/synthorg/hr/performance/theil_sen_strategy.py:61:31 src/synthorg/hr/performance/theil_sen_strategy.py:62:37 src/synthorg/hr/performance/theil_sen_strategy.py:63:37 -src/synthorg/hr/persistence_protocol.py:42:21 +src/synthorg/hr/persistence_protocol.py:43:21 src/synthorg/hr/scaling/guards/approval_gate.py:49:27 src/synthorg/hr/scaling/guards/conflict_resolver.py:30:19 src/synthorg/hr/scaling/guards/cooldown.py:34:32 @@ -437,7 +437,7 @@ src/synthorg/persistence/postgres/decision_repo.py:514:21 src/synthorg/persistence/postgres/escalation_repo.py:42:17 src/synthorg/persistence/postgres/fine_tune_repo.py:244:21 src/synthorg/persistence/postgres/fine_tune_repo.py:456:21 -src/synthorg/persistence/postgres/hr_repositories.py:116:21 +src/synthorg/persistence/postgres/hr_repositories.py:120:21 src/synthorg/persistence/postgres/mcp_installation_repo.py:134:21 src/synthorg/persistence/postgres/ontology_drift_repo.py:105:21 src/synthorg/persistence/postgres/ontology_drift_repo.py:132:21 @@ -453,7 +453,7 @@ src/synthorg/persistence/postgres/session_repo.py:127:21 src/synthorg/persistence/postgres/session_repo.py:155:21 src/synthorg/persistence/postgres/settings_repo.py:127:21 src/synthorg/persistence/postgres/ssrf_violation_repo.py:143:21 -src/synthorg/persistence/postgres/user_repo.py:513:21 +src/synthorg/persistence/postgres/user_repo.py:535:21 src/synthorg/persistence/postgres/version_repo.py:339:21 src/synthorg/persistence/postgres/webhook_receipt_repo.py:147:21 src/synthorg/persistence/provider_audit_protocol.py:62:21 @@ -467,7 +467,7 @@ src/synthorg/persistence/sqlite/decision_repo.py:556:21 src/synthorg/persistence/sqlite/escalation_repo.py:36:17 src/synthorg/persistence/sqlite/fine_tune_repo.py:212:21 src/synthorg/persistence/sqlite/fine_tune_repo.py:419:21 -src/synthorg/persistence/sqlite/hr_repositories.py:113:21 +src/synthorg/persistence/sqlite/hr_repositories.py:117:21 src/synthorg/persistence/sqlite/mcp_installation_repo.py:109:21 src/synthorg/persistence/sqlite/ontology_drift_repo.py:112:21 src/synthorg/persistence/sqlite/ontology_drift_repo.py:129:21 @@ -483,12 +483,12 @@ src/synthorg/persistence/sqlite/session_repo.py:145:21 src/synthorg/persistence/sqlite/session_repo.py:167:21 src/synthorg/persistence/sqlite/settings_repo.py:98:21 src/synthorg/persistence/sqlite/ssrf_violation_repo.py:156:21 -src/synthorg/persistence/sqlite/user_repo.py:679:21 +src/synthorg/persistence/sqlite/user_repo.py:695:21 src/synthorg/persistence/sqlite/version_repo.py:331:21 src/synthorg/persistence/sqlite/webhook_receipt_repo.py:126:21 src/synthorg/persistence/ssrf_violation_protocol.py:49:21 src/synthorg/persistence/task_protocol.py:45:21 -src/synthorg/persistence/user_protocol.py:189:21 +src/synthorg/persistence/user_protocol.py:202:21 src/synthorg/persistence/version_protocol.py:107:21 src/synthorg/providers/drivers/litellm_driver.py:97:24 src/synthorg/providers/health.py:37:23 diff --git a/src/synthorg/api/auth/user_service.py b/src/synthorg/api/auth/user_service.py index de9ebd66c5..63b2fc9411 100644 --- a/src/synthorg/api/auth/user_service.py +++ b/src/synthorg/api/auth/user_service.py @@ -26,6 +26,7 @@ SECURITY_USER_DELETED, SECURITY_USER_UPDATED, ) +from synthorg.persistence._shared import DEFAULT_LIST_LIMIT if TYPE_CHECKING: from synthorg.persistence.auth_protocol import RefreshTokenRepository @@ -69,9 +70,19 @@ async def get(self, user_id: NotBlankStr) -> User | None: """Fetch a user by id, or ``None`` when no row matches.""" return await self._repo.get(user_id) - async def list_users(self) -> tuple[User, ...]: - """List all users (sans system user).""" - users = await self._repo.list_users() + async def list_users( + self, + *, + limit: int = DEFAULT_LIST_LIMIT, + ) -> tuple[User, ...]: + """List human users (sans system user), capped at ``limit`` rows. + + Bounded by *limit* (default :data:`DEFAULT_LIST_LIMIT`) so an + unauth'd caller cannot materialise an unbounded tuple of users. + Callers paginating over large user bases should use + :meth:`list_users_page` instead, which exposes a stable cursor. + """ + users = await self._repo.list_users(limit=limit) logger.debug(API_USER_LISTED, count=len(users)) return users diff --git a/src/synthorg/api/bus_bridge.py b/src/synthorg/api/bus_bridge.py index b45978e8a6..b1c20e009f 100644 --- a/src/synthorg/api/bus_bridge.py +++ b/src/synthorg/api/bus_bridge.py @@ -99,7 +99,9 @@ def __init__( # _running is atomic against concurrent lifecycle calls. # Does not gate publish / receive (those use the underlying # bus lock) so normal traffic is not serialized here. - self._lifecycle_lock = asyncio.Lock() + # Eager init: stop() must be safe to call before start() has + # ever run, so a half-published lock attribute would race. + self._lifecycle_lock = asyncio.Lock() # lint-allow: loop-bound-init -- see. # Resolver-failure warnings are logged only on the first # failure in a run of failures to avoid flooding logs during # a prolonged settings outage. The flag is cleared on the diff --git a/src/synthorg/api/controllers/projects.py b/src/synthorg/api/controllers/projects.py index 42c7a14160..0339386a6f 100644 --- a/src/synthorg/api/controllers/projects.py +++ b/src/synthorg/api/controllers/projects.py @@ -103,9 +103,13 @@ async def list_projects( ) raise ValidationError(msg) from exc + # Over-fetch by one page so the cursor paginator can detect + # has_more without a separate COUNT round-trip. ``limit + 1`` + # caps repository-side scans at the operator-tunable page size. projects = await _service(state).list_projects( status=parsed_status, lead=lead, + limit=limit + 1, ) page, meta = paginate_cursor( projects, diff --git a/src/synthorg/api/controllers/workflow_executions.py b/src/synthorg/api/controllers/workflow_executions.py index 5eb076fd35..4045814e36 100644 --- a/src/synthorg/api/controllers/workflow_executions.py +++ b/src/synthorg/api/controllers/workflow_executions.py @@ -153,7 +153,9 @@ async def list_executions( ) -> Response[PaginatedResponse[WorkflowExecution] | ApiResponse[None]]: """List executions for a workflow definition with cursor pagination.""" service = await _build_service(state) - executions = await service.list_executions(workflow_id) + # Over-fetch by one page so the cursor paginator can detect + # has_more without a separate COUNT round-trip. + executions = await service.list_executions(workflow_id, limit=limit + 1) page, meta = paginate_cursor( tuple(executions), limit=limit, diff --git a/src/synthorg/api/controllers/workflows.py b/src/synthorg/api/controllers/workflows.py index 7672acb5dd..252ed328b8 100644 --- a/src/synthorg/api/controllers/workflows.py +++ b/src/synthorg/api/controllers/workflows.py @@ -117,7 +117,12 @@ async def list_workflows( msg = f"Invalid workflow type: {workflow_type!r}. Valid: {valid}" raise WorkflowTypeInvalidError(msg) from exc - defs = await _service(state).list_definitions(workflow_type=parsed_type) + # Over-fetch by one page so the cursor paginator can detect + # has_more without a separate COUNT round-trip. + defs = await _service(state).list_definitions( + workflow_type=parsed_type, + limit=limit + 1, + ) page, meta = paginate_cursor( defs, limit=limit, diff --git a/src/synthorg/api/controllers/ws.py b/src/synthorg/api/controllers/ws.py index db96504c09..38945715cf 100644 --- a/src/synthorg/api/controllers/ws.py +++ b/src/synthorg/api/controllers/ws.py @@ -338,6 +338,7 @@ async def _outbound_consumer( the socket with code 1011 and exits; the surrounding ``run_in_background`` context tears the subscription down. """ + # lint-allow: long-running-loop-kill-switch -- per-request WS consumer. while True: event_data = await queue.get() try: @@ -732,6 +733,7 @@ async def _receive_loop( # noqa: PLR0913 -- one extra optional kw arg for the t app_state = socket.app.state["app_state"] frame_timeout_seconds = app_state.ws_frame_timeout_seconds try: + # lint-allow: long-running-loop-kill-switch -- per-request WS receive. while True: try: data = await asyncio.wait_for( diff --git a/src/synthorg/api/controllers/ws_revalidation.py b/src/synthorg/api/controllers/ws_revalidation.py index 0629f40828..4c75a19816 100644 --- a/src/synthorg/api/controllers/ws_revalidation.py +++ b/src/synthorg/api/controllers/ws_revalidation.py @@ -109,6 +109,7 @@ async def _periodic_revalidate( max_events=max_failures, window_seconds=float(window), ) + # lint-allow: long-running-loop-kill-switch -- per-connection revalidate. while True: try: await asyncio.sleep(interval_seconds) diff --git a/src/synthorg/api/services/idempotency_service.py b/src/synthorg/api/services/idempotency_service.py index a19f31c78c..c1240988f2 100644 --- a/src/synthorg/api/services/idempotency_service.py +++ b/src/synthorg/api/services/idempotency_service.py @@ -182,6 +182,7 @@ async def run_idempotent( churn under sustained leader failures. """ retries_after_leader_failure = 0 + # lint-allow: long-running-loop-kill-switch -- per-request retry-wait. while True: outcome_value, fresh, timed_out = await self._run_idempotent_once( scope=scope, diff --git a/src/synthorg/api/services/project_service.py b/src/synthorg/api/services/project_service.py index d363bd3d0f..69b165d6ab 100644 --- a/src/synthorg/api/services/project_service.py +++ b/src/synthorg/api/services/project_service.py @@ -20,6 +20,7 @@ API_PROJECT_LISTED, API_PROJECT_UPDATED, ) +from synthorg.persistence._shared import DEFAULT_LIST_LIMIT if TYPE_CHECKING: from synthorg.persistence.project_protocol import ProjectRepository @@ -79,6 +80,7 @@ async def list_projects( *, status: ProjectStatus | None = None, lead: NotBlankStr | None = None, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[Project, ...]: """List projects with optional ``status`` / ``lead`` filters. @@ -88,9 +90,12 @@ async def list_projects( Args: status: Restrict to projects in this lifecycle status. lead: Restrict to projects led by this agent id. + limit: Maximum projects to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Tuple of matching projects in repository order. + Tuple of matching projects in repository order, capped at + *limit* rows. Raises: QueryError: Repository read failure (logged at WARNING @@ -100,6 +105,7 @@ async def list_projects( projects = await self._repo.list_projects( status=status, lead=lead, + limit=limit, ) except MemoryError, RecursionError: raise diff --git a/src/synthorg/backup/scheduler.py b/src/synthorg/backup/scheduler.py index 7f0f0a9d20..eb63fd401c 100644 --- a/src/synthorg/backup/scheduler.py +++ b/src/synthorg/backup/scheduler.py @@ -276,6 +276,7 @@ async def _run_loop(self) -> None: if wake_event is None or stop_event is None: # defensive msg = "_run_loop invoked without initialised lifecycle events" raise RuntimeError(msg) + # lint-allow: long-running-loop-kill-switch -- stop_event drives shutdown. while not stop_event.is_set(): wake_event.clear() try: diff --git a/src/synthorg/backup/service.py b/src/synthorg/backup/service.py index 077d6334a9..6df2eea546 100644 --- a/src/synthorg/backup/service.py +++ b/src/synthorg/backup/service.py @@ -72,7 +72,10 @@ def __init__( self._handlers: MappingProxyType[BackupComponent, ComponentHandler] = ( MappingProxyType(deepcopy(handlers)) ) - self._backup_lock = asyncio.Lock() + # Eager init: backup operations may be triggered before a + # background scheduler run; a half-published lock attribute + # would race with an early manual ``run_backup`` call. + self._backup_lock = asyncio.Lock() # lint-allow: loop-bound-init -- see above. self._backup_path = Path(config.path) self._retention = RetentionManager(config.retention, self._backup_path) self._scheduler = BackupScheduler(self, config.schedule_hours) diff --git a/src/synthorg/budget/quota_poller.py b/src/synthorg/budget/quota_poller.py index 4ca790da98..c5f8bea1fb 100644 --- a/src/synthorg/budget/quota_poller.py +++ b/src/synthorg/budget/quota_poller.py @@ -61,7 +61,9 @@ def __init__( # Held across the full body of ``start`` and ``stop`` so the # check-and-mutate of ``self._task`` is atomic against # concurrent callers, per CLAUDE.md "Lifecycle synchronization". - self._lifecycle_lock: asyncio.Lock = asyncio.Lock() + # Eager init: stop() must be safe to call before start() has + # ever run, so a half-published lock attribute would race. + self._lifecycle_lock = asyncio.Lock() # lint-allow: loop-bound-init -- see. self._cooldown: dict[_CooldownKey, float] = {} async def start(self) -> None: @@ -134,6 +136,7 @@ async def poll_once(self) -> None: async def _poll_loop(self) -> None: """Background task: poll repeatedly until cancelled.""" + # lint-allow: long-running-loop-kill-switch -- stop()/cancel drives shutdown. while True: try: await self.poll_once() diff --git a/src/synthorg/communication/bus/_nats_history.py b/src/synthorg/communication/bus/_nats_history.py index f908e787ce..05dc2fa014 100644 --- a/src/synthorg/communication/bus/_nats_history.py +++ b/src/synthorg/communication/bus/_nats_history.py @@ -86,6 +86,7 @@ async def collect_history_batches( from nats.errors import TimeoutError as NatsTimeoutError # noqa: PLC0415 parsed_messages: list[Message] = [] + # lint-allow: long-running-loop-kill-switch -- one-shot history replay. while True: try: batch = await psub.fetch(batch=batch_size, timeout=fetch_timeout_seconds) diff --git a/src/synthorg/communication/bus/_nats_receive.py b/src/synthorg/communication/bus/_nats_receive.py index 8945a60767..8e37b86f93 100644 --- a/src/synthorg/communication/bus/_nats_receive.py +++ b/src/synthorg/communication/bus/_nats_receive.py @@ -385,6 +385,7 @@ async def receive_blocking( sub: Any, ) -> DeliveryEnvelope | None: """Block on a fetch loop until a message arrives or the bus stops.""" + # lint-allow: long-running-loop-kill-switch -- per-call subscribe pump. while True: if state.shutdown_event.is_set(): return None @@ -425,6 +426,7 @@ async def receive_with_timeout( ) -> DeliveryEnvelope | None: """Wait up to ``timeout`` seconds across one or more fetch polls.""" deadline = state.clock.monotonic() + timeout + # lint-allow: long-running-loop-kill-switch -- per-call timed pump. while True: remaining = deadline - state.clock.monotonic() if remaining <= 0.0: diff --git a/src/synthorg/communication/bus/memory.py b/src/synthorg/communication/bus/memory.py index 9dc308c2d0..83ca297983 100644 --- a/src/synthorg/communication/bus/memory.py +++ b/src/synthorg/communication/bus/memory.py @@ -107,7 +107,10 @@ def __init__( ) -> None: self._config = config self._clock = clock or SystemClock() - self._lock = asyncio.Lock() + # Eager init: ``publish`` / ``subscribe`` / ``receive`` may be + # called before any background lifecycle task runs, so the + # bus lock must exist before the first hot-path acquire. + self._lock = asyncio.Lock() # lint-allow: loop-bound-init -- see above. self._channels: dict[str, Channel] = {} self._queues: dict[tuple[str, str], asyncio.Queue[DeliveryEnvelope | None]] = {} self._history: dict[str, deque[Message]] = {} @@ -122,7 +125,9 @@ def __init__( # ``max_subscriber_queue_size``. self._waiters: dict[tuple[str, str], set[asyncio.Future[None]]] = {} self._running = False - self._shutdown_event = asyncio.Event() + # Eager init: ``stop()`` may publish a shutdown signal before + # any tick has fired; a half-published event would race. + self._shutdown_event = asyncio.Event() # lint-allow: loop-bound-init -- see. self._idle_poll_count: int = 0 self._last_idle_summary: float = self._clock.monotonic() diff --git a/src/synthorg/communication/conflict_resolution/escalation/in_memory_store.py b/src/synthorg/communication/conflict_resolution/escalation/in_memory_store.py index 49d5d96d46..46a5cd9415 100644 --- a/src/synthorg/communication/conflict_resolution/escalation/in_memory_store.py +++ b/src/synthorg/communication/conflict_resolution/escalation/in_memory_store.py @@ -301,6 +301,7 @@ async def subscribe_notifications( stop = asyncio.Event() async def _never() -> AsyncIterator[str]: + # lint-allow: long-running-loop-kill-switch -- sentinel coroutine. while not stop.is_set(): await stop.wait() if stop.is_set(): diff --git a/src/synthorg/communication/event_stream/stream.py b/src/synthorg/communication/event_stream/stream.py index fb81d13e2a..309bd08829 100644 --- a/src/synthorg/communication/event_stream/stream.py +++ b/src/synthorg/communication/event_stream/stream.py @@ -345,6 +345,7 @@ async def _janitor_loop( level errors (``CancelledError``, ``MemoryError``, ``RecursionError``); log every other exception and continue. """ + # lint-allow: long-running-loop-kill-switch -- stop()/cancel drives shutdown. while True: await self._clock.sleep(janitor_interval_seconds) try: diff --git a/src/synthorg/communication/meeting/scheduler.py b/src/synthorg/communication/meeting/scheduler.py index 75c40db1ad..4459fbf434 100644 --- a/src/synthorg/communication/meeting/scheduler.py +++ b/src/synthorg/communication/meeting/scheduler.py @@ -489,6 +489,7 @@ async def _run_periodic( # Sleep-first: avoids duplicate meetings on restart/deploy. try: + # lint-allow: long-running-loop-kill-switch -- meetings_enabled gates work. while True: await asyncio.sleep(interval) logger.info( diff --git a/src/synthorg/engine/task_engine.py b/src/synthorg/engine/task_engine.py index f45afcab92..50ed3f88e3 100644 --- a/src/synthorg/engine/task_engine.py +++ b/src/synthorg/engine/task_engine.py @@ -93,15 +93,20 @@ def __init__( self._persistence = persistence self._message_bus = message_bus self._config = config or TaskEngineConfig() - self._queue: asyncio.Queue[_MutationEnvelope] = asyncio.Queue( - maxsize=self._config.max_queue_size, - ) + # Eager init: ``submit`` may enqueue mutations before the + # processing task is spawned; the queue must exist for the + # atomic check-and-put in ``submit`` to work safely. + # fmt: off + self._queue: asyncio.Queue[_MutationEnvelope] = asyncio.Queue(maxsize=self._config.max_queue_size) # lint-allow: loop-bound-init -- see. # noqa: E501 + # fmt: on self._versions = VersionTracker() self._timings = TaskTimingTracker() self._processing_task: asyncio.Task[None] | None = None self._in_flight: _MutationEnvelope | None = None self._running = False - self._lifecycle_lock = asyncio.Lock() + # Eager init: stop() must be safe to call before start() has + # ever run, and ``submit`` requires both locks present. + self._lifecycle_lock = asyncio.Lock() # lint-allow: loop-bound-init -- see. # Hot-path admission lock: held only for the atomic check- # and-put in :meth:`submit`. ``stop()`` briefly acquires it # just long enough to publish ``_running = False`` so new @@ -109,11 +114,15 @@ def __init__( # ``_lifecycle_lock`` only. Keeping this lock separate from # ``_lifecycle_lock`` is mandated by CLAUDE.md -- hot-path # traffic must not serialize against lifecycle transitions. - self._admission_lock = asyncio.Lock() + # Eager init: ``submit`` is the hot-path entry and may fire + # before any lifecycle method runs. + self._admission_lock = asyncio.Lock() # lint-allow: loop-bound-init -- see. self._observers: list[Callable[[TaskStateChanged], Awaitable[None]]] = [] - self._observer_queue: asyncio.Queue[TaskStateChanged | None] = asyncio.Queue( - maxsize=self._config.effective_observer_queue_size, - ) + # Eager init: observer registration may happen before start() + # so the queue must exist for ``register_observer`` to bind. + # fmt: off + self._observer_queue: asyncio.Queue[TaskStateChanged | None] = asyncio.Queue(maxsize=self._config.effective_observer_queue_size) # lint-allow: loop-bound-init -- see. # noqa: E501 + # fmt: on self._observer_task: asyncio.Task[None] | None = None # Set to True when a stop() drain exceeds the hard deadline. # Prevents a subsequent start() from creating a second loop diff --git a/src/synthorg/engine/task_engine_loops.py b/src/synthorg/engine/task_engine_loops.py index 0d1c9d426c..225fa782d9 100644 --- a/src/synthorg/engine/task_engine_loops.py +++ b/src/synthorg/engine/task_engine_loops.py @@ -300,6 +300,7 @@ async def _process_one(self, envelope: _MutationEnvelope) -> None: async def _observer_dispatch_loop(self) -> None: """Background loop: dequeue and dispatch observer events.""" + # lint-allow: long-running-loop-kill-switch -- stop() sentinel drains queue. while True: try: event = await asyncio.wait_for( diff --git a/src/synthorg/engine/workflow/execution_lifecycle.py b/src/synthorg/engine/workflow/execution_lifecycle.py index 8e7f06e7b8..bac6f5270f 100644 --- a/src/synthorg/engine/workflow/execution_lifecycle.py +++ b/src/synthorg/engine/workflow/execution_lifecycle.py @@ -43,6 +43,7 @@ ) from synthorg.observability.metrics_hub import record_workflow_execution from synthorg.observability.tracing.instrumentation import get_tracer +from synthorg.persistence._shared import DEFAULT_LIST_LIMIT from synthorg.persistence.workflow_execution_protocol import ( # noqa: TC001 WorkflowExecutionRepository, ) @@ -93,9 +94,11 @@ async def get_execution( async def list_executions( repo: WorkflowExecutionRepository, definition_id: str, + *, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[WorkflowExecution, ...]: - """List executions for a workflow definition.""" - return await repo.list_by_definition(definition_id) + """List executions for a workflow definition (bounded by *limit*).""" + return await repo.list_by_definition(definition_id, limit=limit) async def cancel_execution( diff --git a/src/synthorg/engine/workflow/execution_service.py b/src/synthorg/engine/workflow/execution_service.py index 0d8bd6ce7c..86d87bea07 100644 --- a/src/synthorg/engine/workflow/execution_service.py +++ b/src/synthorg/engine/workflow/execution_service.py @@ -62,6 +62,7 @@ WORKFLOW_EXEC_SUBWORKFLOW_FRAME_PUSHED, WORKFLOW_EXEC_SUBWORKFLOW_NODE_COMPLETED, ) +from synthorg.persistence._shared import DEFAULT_LIST_LIMIT if TYPE_CHECKING: from collections.abc import Mapping @@ -687,11 +688,14 @@ async def get_execution( async def list_executions( self, definition_id: str, + *, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[WorkflowExecution, ...]: - """List executions for a workflow definition.""" + """List executions for a workflow definition (bounded by *limit*).""" return await lifecycle.list_executions( self._execution_repo, definition_id, + limit=limit, ) async def cancel_execution( diff --git a/src/synthorg/engine/workflow/service.py b/src/synthorg/engine/workflow/service.py index 7fa6912c5b..0990f58995 100644 --- a/src/synthorg/engine/workflow/service.py +++ b/src/synthorg/engine/workflow/service.py @@ -31,6 +31,7 @@ from synthorg.observability.events.workflow_version import ( WORKFLOW_VERSION_SNAPSHOT_FAILED, ) +from synthorg.persistence._shared import DEFAULT_LIST_LIMIT if TYPE_CHECKING: from synthorg.engine.workflow.validation_types import ( @@ -153,10 +154,16 @@ async def list_definitions( self, *, workflow_type: WorkflowType | None = None, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[WorkflowDefinition, ...]: - """List definitions filtered by optional workflow type.""" + """List definitions filtered by optional workflow type. + + Bounded by *limit* (default :data:`DEFAULT_LIST_LIMIT`) so an + unauth'd caller cannot materialise the full table. + """ return await self._definitions.list_definitions( workflow_type=workflow_type, + limit=limit, ) async def get_definition( diff --git a/src/synthorg/engine/workflow/subworkflow_registry.py b/src/synthorg/engine/workflow/subworkflow_registry.py index 88b2cf7f14..216e9c4b3e 100644 --- a/src/synthorg/engine/workflow/subworkflow_registry.py +++ b/src/synthorg/engine/workflow/subworkflow_registry.py @@ -47,6 +47,7 @@ SUBWORKFLOW_REGISTERED, SUBWORKFLOW_RESOLVED, ) +from synthorg.persistence._shared import DEFAULT_LIST_LIMIT from synthorg.persistence.subworkflow_protocol import ( SubworkflowRepository, # noqa: TC001 -- runtime-resolvable annotation ) @@ -178,21 +179,31 @@ async def get( async def list_versions( self, subworkflow_id: NotBlankStr, + *, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[str, ...]: - """List semver strings for a subworkflow, newest first.""" - return await self._repo.list_versions(subworkflow_id) + """List semver strings for a subworkflow, newest first (bounded by *limit*).""" + return await self._repo.list_versions(subworkflow_id, limit=limit) async def latest_version( self, subworkflow_id: NotBlankStr, ) -> str | None: - """Return the highest semver for a subworkflow, or ``None``.""" + """Return the highest semver for a subworkflow, or ``None``. + + Fetches a single-page slice; the underlying repo bounds the + scan and the registry takes the first (newest) entry. + """ versions = await self._repo.list_versions(subworkflow_id) return versions[0] if versions else None - async def list_all(self) -> tuple[SubworkflowSummary, ...]: - """Return summaries for every unique subworkflow in the registry.""" - return await self._repo.list_summaries() + async def list_all( + self, + *, + limit: int = DEFAULT_LIST_LIMIT, + ) -> tuple[SubworkflowSummary, ...]: + """Return summaries for unique subworkflows (bounded by *limit*).""" + return await self._repo.list_summaries(limit=limit) async def list_page( self, diff --git a/src/synthorg/hr/persistence_protocol.py b/src/synthorg/hr/persistence_protocol.py index 29bbd90821..29019c4965 100644 --- a/src/synthorg/hr/persistence_protocol.py +++ b/src/synthorg/hr/persistence_protocol.py @@ -13,6 +13,7 @@ CollaborationMetricRecord, # noqa: TC001 TaskMetricRecord, # noqa: TC001 ) +from synthorg.persistence._shared import DEFAULT_LIST_LIMIT if TYPE_CHECKING: from pydantic import AwareDatetime @@ -79,6 +80,7 @@ async def query( agent_id: NotBlankStr | None = None, since: AwareDatetime | None = None, until: AwareDatetime | None = None, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[TaskMetricRecord, ...]: """Query task metric records with optional filters. @@ -86,9 +88,11 @@ async def query( agent_id: Filter by agent identifier. since: Include records after this time. until: Include records before this time. + limit: Maximum records to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Matching task metric records. + Matching task metric records capped at *limit* rows. Raises: PersistenceError: If the operation fails. @@ -116,15 +120,18 @@ async def query( *, agent_id: NotBlankStr | None = None, since: AwareDatetime | None = None, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[CollaborationMetricRecord, ...]: """Query collaboration metric records with optional filters. Args: agent_id: Filter by agent identifier. since: Include records after this time. + limit: Maximum records to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Matching collaboration metric records. + Matching collaboration metric records capped at *limit* rows. Raises: PersistenceError: If the operation fails. diff --git a/src/synthorg/hr/pruning/service.py b/src/synthorg/hr/pruning/service.py index 42bc0dbc17..3079b73c3e 100644 --- a/src/synthorg/hr/pruning/service.py +++ b/src/synthorg/hr/pruning/service.py @@ -94,13 +94,18 @@ def __init__( # noqa: PLR0913 self._config = config or PruningServiceConfig() self._on_notification = on_notification self._task: asyncio.Task[None] | None = None - self._wake_event = asyncio.Event() - self._stop_event: asyncio.Event = asyncio.Event() + # Eager init: wake / stop events are signalled by callers that + # may run before the background loop, so half-published event + # attributes would race with the first ``stop()`` or + # ``request_immediate_run``. + self._wake_event = asyncio.Event() # lint-allow: loop-bound-init -- see. + self._stop_event = asyncio.Event() # lint-allow: loop-bound-init -- see. # Per ``docs/reference/lifecycle-sync.md``: dedicated lifecycle # primitives, kept distinct from the hot-path # ``_processing_lock`` so a concurrent pruning cycle cannot - # block lifecycle transitions. - self._lifecycle_lock: asyncio.Lock = asyncio.Lock() + # block lifecycle transitions. Eager init: ``stop()`` must be + # safe before any ``start()`` call. + self._lifecycle_lock = asyncio.Lock() # lint-allow: loop-bound-init -- see. self._stop_failed: bool = False self._stop_drain_timeout_seconds: float = 30.0 self._pending_requests: dict[str, PruningRequest] = {} @@ -126,7 +131,9 @@ def __init__( # noqa: PLR0913 # retries; absence from the processed set is intentional on # those paths, not an oversight. self._in_flight_approvals: set[str] = set() - self._processing_lock = asyncio.Lock() + # Eager init: hot-path lock used by ``_process_decided_approvals`` + # which may run before ``start()`` if a manual sweep is invoked. + self._processing_lock = asyncio.Lock() # lint-allow: loop-bound-init -- see. @property def is_running(self) -> bool: @@ -766,6 +773,7 @@ async def _run_loop(self) -> None: wakes the loop cooperatively. ``self._wake_event`` continues to interrupt the sleep for ad-hoc ``wake()`` triggers. """ + # lint-allow: long-running-loop-kill-switch -- _stop_event drives shutdown. while not self._stop_event.is_set(): try: await asyncio.wait_for( diff --git a/src/synthorg/integrations/health/prober.py b/src/synthorg/integrations/health/prober.py index 851d4a36d9..745298efea 100644 --- a/src/synthorg/integrations/health/prober.py +++ b/src/synthorg/integrations/health/prober.py @@ -128,9 +128,13 @@ def __init__( self._degraded_threshold = degraded_threshold self._clock: Clock = clock if clock is not None else SystemClock() self._failure_counts: dict[str, int] = {} - self._failure_lock = asyncio.Lock() + # Eager init: ``_record_failure`` may be invoked outside the + # probe loop (manual reporting paths), so the lock must exist + # before the first acquire. + self._failure_lock = asyncio.Lock() # lint-allow: loop-bound-init -- see above. self._task: asyncio.Task[None] | None = None - self._lifecycle_lock = asyncio.Lock() + # Eager init: stop() must be safe before any start() call. + self._lifecycle_lock = asyncio.Lock() # lint-allow: loop-bound-init -- see. async def start(self) -> None: """Start the background probe loop.""" @@ -153,6 +157,7 @@ async def stop(self) -> None: async def _probe_loop(self) -> None: """Run probes indefinitely at the configured interval.""" + # lint-allow: long-running-loop-kill-switch -- stop()/cancel drives shutdown. while True: try: await self._probe_all() diff --git a/src/synthorg/integrations/oauth/token_manager.py b/src/synthorg/integrations/oauth/token_manager.py index 566dfb574d..002a8296be 100644 --- a/src/synthorg/integrations/oauth/token_manager.py +++ b/src/synthorg/integrations/oauth/token_manager.py @@ -67,7 +67,8 @@ def __init__( self._config_resolver = config_resolver self._task: asyncio.Task[None] | None = None self._flow = AuthorizationCodeFlow() - self._lifecycle_lock = asyncio.Lock() + # Eager init: stop() must be safe before any start() call. + self._lifecycle_lock = asyncio.Lock() # lint-allow: loop-bound-init -- see. def set_config_resolver(self, resolver: ConfigResolver) -> None: """Inject the ConfigResolver after construction. @@ -141,6 +142,7 @@ async def stop(self) -> None: async def _refresh_loop(self) -> None: """Periodically check and refresh expiring tokens.""" + # lint-allow: long-running-loop-kill-switch -- stop()/cancel drives shutdown. while True: try: await self._check_and_refresh() diff --git a/src/synthorg/integrations/rate_limiting/shared_state.py b/src/synthorg/integrations/rate_limiting/shared_state.py index 92c02b1eb7..58c192c854 100644 --- a/src/synthorg/integrations/rate_limiting/shared_state.py +++ b/src/synthorg/integrations/rate_limiting/shared_state.py @@ -68,7 +68,9 @@ def __init__( self._connection_name = connection_name self._max_rpm = max_rpm self._window: deque[float] = deque() - self._window_lock = asyncio.Lock() + # Eager init: ``acquire`` is a hot-path call that runs before + # the polling task starts; the window lock must be present. + self._window_lock = asyncio.Lock() # lint-allow: loop-bound-init -- see above. self._task: asyncio.Task[None] | None = None # Subscriber ID includes a per-instance UUID so multiple # coordinators (whether in separate worker processes or in @@ -77,7 +79,8 @@ def __init__( self._subscriber_id = f"{_SUBSCRIBER_PREFIX}{connection_name}_{uuid4().hex[:8]}" self._started = False self._distributed = True - self._lifecycle_lock = asyncio.Lock() + # Eager init: stop() must be safe before any start() call. + self._lifecycle_lock = asyncio.Lock() # lint-allow: loop-bound-init -- see. async def start(self) -> None: """Subscribe and start the polling task.""" @@ -222,6 +225,7 @@ async def _publish_acquire(self, acquired_at: float) -> None: async def _poll_loop(self) -> None: """Poll the coordination channel for acquire events.""" + # lint-allow: long-running-loop-kill-switch -- stop()/cancel drives shutdown. while True: try: envelope = await self._bus.receive( diff --git a/src/synthorg/integrations/tunnel/ngrok_adapter.py b/src/synthorg/integrations/tunnel/ngrok_adapter.py index 40e9466b96..3c486b5664 100644 --- a/src/synthorg/integrations/tunnel/ngrok_adapter.py +++ b/src/synthorg/integrations/tunnel/ngrok_adapter.py @@ -72,8 +72,8 @@ def __init__( # not own a background task; it forwards to pyngrok in a # worker thread and the lock is sufficient to prevent two # ``start()`` calls from racing on the single-tunnel - # invariant. - self._lifecycle_lock: asyncio.Lock = asyncio.Lock() + # invariant. Eager init: stop() must be safe before start(). + self._lifecycle_lock = asyncio.Lock() # lint-allow: loop-bound-init -- see. async def start(self) -> str: """Start the ngrok tunnel. diff --git a/src/synthorg/memory/embedding/fine_tune_orchestrator.py b/src/synthorg/memory/embedding/fine_tune_orchestrator.py index 10404c75f8..e1b2562b0c 100644 --- a/src/synthorg/memory/embedding/fine_tune_orchestrator.py +++ b/src/synthorg/memory/embedding/fine_tune_orchestrator.py @@ -96,7 +96,9 @@ def __init__( # noqa: PLR0913 -- pluggable dependencies threaded for testabilit self._current_task: asyncio.Task[None] | None = None self._cancellation: CancellationToken | None = None self._current_run: FineTuneRun | None = None - self._op_lock = asyncio.Lock() + # Eager init: ``start_pipeline`` and ``cancel_pipeline`` may + # interleave; the lock must be present before the first call. + self._op_lock = asyncio.Lock() # lint-allow: loop-bound-init -- see above. # -- Public API --------------------------------------------------- diff --git a/src/synthorg/meta/chief_of_staff/monitor.py b/src/synthorg/meta/chief_of_staff/monitor.py index b6d272ea33..c14f3c0384 100644 --- a/src/synthorg/meta/chief_of_staff/monitor.py +++ b/src/synthorg/meta/chief_of_staff/monitor.py @@ -79,8 +79,8 @@ def __init__( # Per ``docs/reference/lifecycle-sync.md`` the lifecycle # primitives are constructed eagerly so a racing ``stop()`` # cannot observe a half-published lock attribute. - self._stop_event: asyncio.Event = asyncio.Event() - self._lifecycle_lock: asyncio.Lock = asyncio.Lock() + self._stop_event = asyncio.Event() # lint-allow: loop-bound-init -- see. + self._lifecycle_lock = asyncio.Lock() # lint-allow: loop-bound-init -- see. self._stop_failed: bool = False self._stop_drain_timeout_seconds: float = 30.0 @@ -189,6 +189,7 @@ async def _loop(self) -> None: the loop and the canonical drain timeout has a chance to complete the shutdown promptly. """ + # lint-allow: long-running-loop-kill-switch -- _stop_event drives shutdown. while not self._stop_event.is_set(): try: await self._tick() diff --git a/src/synthorg/meta/mcp/handlers/workflow_executions.py b/src/synthorg/meta/mcp/handlers/workflow_executions.py index eaaff92504..e8ec1b101e 100644 --- a/src/synthorg/meta/mcp/handlers/workflow_executions.py +++ b/src/synthorg/meta/mcp/handlers/workflow_executions.py @@ -129,7 +129,10 @@ async def workflow_executions_list( log_handler_argument_invalid(tool, exc) return err(exc) try: - executions = await service.list_executions(def_id) + # MCP list handlers paginate in-memory; fetch one page-worth + # at the repository layer so unbounded scans cannot be + # triggered from MCP. + executions = await service.list_executions(def_id, limit=limit + offset) page, meta = paginate_sequence(executions, offset=offset, limit=limit) except MemoryError, RecursionError: raise diff --git a/src/synthorg/meta/mcp/handlers/workflows.py b/src/synthorg/meta/mcp/handlers/workflows.py index e29059d6bc..472d55f82e 100644 --- a/src/synthorg/meta/mcp/handlers/workflows.py +++ b/src/synthorg/meta/mcp/handlers/workflows.py @@ -197,7 +197,11 @@ async def _workflows_list( log_handler_argument_invalid(tool, exc) return err(exc) try: - items = await _service(app_state).list_definitions() + # MCP list handlers paginate the in-memory tuple; fetch one + # page-worth at the repository layer so unbounded scans cannot + # be triggered from MCP. ``limit + offset`` covers the slice the + # paginator will hand back without over-fetching. + items = await _service(app_state).list_definitions(limit=limit + offset) page, meta = paginate_sequence(items, offset=offset, limit=limit) except MemoryError, RecursionError: raise diff --git a/src/synthorg/notifications/adapters/ntfy.py b/src/synthorg/notifications/adapters/ntfy.py index 12d7be2d66..bb2e5df82b 100644 --- a/src/synthorg/notifications/adapters/ntfy.py +++ b/src/synthorg/notifications/adapters/ntfy.py @@ -113,7 +113,8 @@ def __init__( self._token = token self._webhook_timeout_seconds = webhook_timeout_seconds self._client: httpx.AsyncClient | None = None - self._lifecycle_lock = asyncio.Lock() + # Eager init: stop() must be safe before any start() call. + self._lifecycle_lock = asyncio.Lock() # lint-allow: loop-bound-init -- see. @property def sink_name(self) -> str: diff --git a/src/synthorg/notifications/adapters/slack.py b/src/synthorg/notifications/adapters/slack.py index 173f19bd83..3006ec43d3 100644 --- a/src/synthorg/notifications/adapters/slack.py +++ b/src/synthorg/notifications/adapters/slack.py @@ -101,7 +101,8 @@ def __init__( self._webhook_url = webhook_url self._webhook_timeout_seconds = webhook_timeout_seconds self._client: httpx.AsyncClient | None = None - self._lifecycle_lock = asyncio.Lock() + # Eager init: stop() must be safe before any start() call. + self._lifecycle_lock = asyncio.Lock() # lint-allow: loop-bound-init -- see. @property def sink_name(self) -> str: diff --git a/src/synthorg/notifications/dispatcher.py b/src/synthorg/notifications/dispatcher.py index f0cbf43379..3ac5f1b8da 100644 --- a/src/synthorg/notifications/dispatcher.py +++ b/src/synthorg/notifications/dispatcher.py @@ -89,7 +89,9 @@ def __init__( ) -> None: self._sinks = list(sinks) self._min_severity = min_severity - self._lifecycle_lock = asyncio.Lock() + # Eager init: ``aclose`` must be safe before any ``dispatch``; + # the lock guards the running-flag check-and-set. + self._lifecycle_lock = asyncio.Lock() # lint-allow: loop-bound-init -- see. self._started = False # Optional resolver enables the runtime kill-switch # (``notifications.dispatcher_enabled``). ``None`` is the @@ -110,7 +112,10 @@ def __init__( # asyncio), so no separate lock is needed. self._stopping = False self._dispatch_inflight = 0 - self._dispatch_idle = asyncio.Event() + # Eager init: ``dispatch`` increments the counter and clears + # this event before any background task fires; the event must + # exist for the counter-paired drain semantics to work. + self._dispatch_idle = asyncio.Event() # lint-allow: loop-bound-init -- see. self._dispatch_idle.set() for sink in sinks: logger.info( diff --git a/src/synthorg/persistence/_shared/__init__.py b/src/synthorg/persistence/_shared/__init__.py index 78aae5819e..74fc353204 100644 --- a/src/synthorg/persistence/_shared/__init__.py +++ b/src/synthorg/persistence/_shared/__init__.py @@ -19,9 +19,13 @@ class predicates) stay in the backend repo modules and are passed format_iso_utc, parse_iso_utc, ) -from synthorg.persistence._shared.pagination import validate_pagination_args +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) __all__ = ( + "DEFAULT_LIST_LIMIT", "coerce_row_timestamp", "format_iso_utc", "normalize_utc", diff --git a/src/synthorg/persistence/_shared/pagination.py b/src/synthorg/persistence/_shared/pagination.py index ed5d108825..468960d2e2 100644 --- a/src/synthorg/persistence/_shared/pagination.py +++ b/src/synthorg/persistence/_shared/pagination.py @@ -9,11 +9,20 @@ kwargs to log). """ +from typing import Final + from synthorg.core.persistence_errors import QueryError from synthorg.observability import get_logger logger = get_logger(__name__) +# Canonical default page size for ``list_*`` / ``query`` repository +# methods. Lives here so every repo and Protocol shares a single named +# constant rather than embedding an inline ``100`` literal that trips +# the magic-numbers gate. Matches the established convention across +# the codebase (30+ repositories already default ``limit`` to 100). +DEFAULT_LIST_LIMIT: Final[int] = 100 + def validate_pagination_args( limit: object, diff --git a/src/synthorg/persistence/custom_rule_protocol.py b/src/synthorg/persistence/custom_rule_protocol.py index dc36432a84..3fb36c107d 100644 --- a/src/synthorg/persistence/custom_rule_protocol.py +++ b/src/synthorg/persistence/custom_rule_protocol.py @@ -4,6 +4,7 @@ from synthorg.core.types import NotBlankStr # noqa: TC001 from synthorg.meta.rules.custom import CustomRuleDefinition # noqa: TC001 +from synthorg.persistence._shared import DEFAULT_LIST_LIMIT @runtime_checkable @@ -65,14 +66,17 @@ async def list_rules( self, *, enabled_only: bool = False, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[CustomRuleDefinition, ...]: - """List custom rules ordered by name. + """List custom rules ordered by name, bounded by *limit*. Args: enabled_only: If ``True``, return only enabled rules. + limit: Maximum rules to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Tuple of rule definitions. + Tuple of rule definitions capped at *limit* rows. Raises: QueryError: If the query fails. diff --git a/src/synthorg/persistence/postgres/custom_rule_repo.py b/src/synthorg/persistence/postgres/custom_rule_repo.py index 013d06a2c5..5cf5fb187d 100644 --- a/src/synthorg/persistence/postgres/custom_rule_repo.py +++ b/src/synthorg/persistence/postgres/custom_rule_repo.py @@ -31,6 +31,10 @@ row_to_custom_rule, serialize_altitudes, ) +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) if TYPE_CHECKING: from psycopg_pool import AsyncConnectionPool @@ -281,33 +285,32 @@ async def list_rules( self, *, enabled_only: bool = False, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[CustomRuleDefinition, ...]: - """List custom rules ordered by name. + """List custom rules ordered by name, bounded by *limit*. Args: enabled_only: If ``True``, return only enabled rules. - - Returns: - Tuple of rule definitions. + limit: Maximum rules to return (default + :data:`DEFAULT_LIST_LIMIT`). Raises: - QueryError: If the query fails. + QueryError: If the query or pagination validation fails. """ - query = ( + validate_pagination_args(limit, 0, event=META_CUSTOM_RULE_LIST_FAILED) + base = ( "SELECT id, name, description, metric_path, " "comparator, threshold, severity, target_altitudes, " - "enabled, created_at, updated_at " - "FROM custom_rules" + "enabled, created_at, updated_at FROM custom_rules" ) - if enabled_only: - query += " WHERE enabled = true" - query += " ORDER BY name" + where = " WHERE enabled = true" if enabled_only else "" + query = f"{base}{where} ORDER BY name LIMIT %s" try: async with ( self._pool.connection() as conn, conn.cursor(row_factory=dict_row) as cur, ): - await cur.execute(query) + await cur.execute(query, (limit,)) rows = await cur.fetchall() except MemoryError, RecursionError: raise diff --git a/src/synthorg/persistence/postgres/hr_repositories.py b/src/synthorg/persistence/postgres/hr_repositories.py index b1705418ab..f2db96115a 100644 --- a/src/synthorg/persistence/postgres/hr_repositories.py +++ b/src/synthorg/persistence/postgres/hr_repositories.py @@ -36,6 +36,10 @@ PERSISTENCE_TASK_METRIC_QUERY_FAILED, PERSISTENCE_TASK_METRIC_SAVE_FAILED, ) +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) if TYPE_CHECKING: from psycopg_pool import AsyncConnectionPool @@ -243,8 +247,13 @@ async def query( agent_id: str | None = None, since: AwareDatetime | None = None, until: AwareDatetime | None = None, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[TaskMetricRecord, ...]: - """Query task metric records with optional filters.""" + """Query task metric records with optional filters. + + Bounded by *limit* (default :data:`DEFAULT_LIST_LIMIT`). + """ + validate_pagination_args(limit, 0, event=PERSISTENCE_TASK_METRIC_QUERY_FAILED) clauses: list[str] = [] params: list[Any] = [] if agent_id is not None: @@ -264,7 +273,8 @@ async def query( FROM task_metrics""" if clauses: sql += " WHERE " + " AND ".join(clauses) - sql += " ORDER BY completed_at DESC" + sql += " ORDER BY completed_at DESC LIMIT %s" + params.append(limit) try: async with ( @@ -356,8 +366,13 @@ async def query( *, agent_id: str | None = None, since: AwareDatetime | None = None, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[CollaborationMetricRecord, ...]: - """Query collaboration metric records with optional filters.""" + """Query collaboration metric records with optional filters. + + Bounded by *limit* (default :data:`DEFAULT_LIST_LIMIT`). + """ + validate_pagination_args(limit, 0, event=PERSISTENCE_COLLAB_METRIC_QUERY_FAILED) clauses: list[str] = [] params: list[Any] = [] if agent_id is not None: @@ -374,7 +389,8 @@ async def query( FROM collaboration_metrics""" if clauses: sql += " WHERE " + " AND ".join(clauses) - sql += " ORDER BY recorded_at DESC" + sql += " ORDER BY recorded_at DESC LIMIT %s" + params.append(limit) try: async with ( diff --git a/src/synthorg/persistence/postgres/preset_repo.py b/src/synthorg/persistence/postgres/preset_repo.py index ed0903e7ad..7c06def625 100644 --- a/src/synthorg/persistence/postgres/preset_repo.py +++ b/src/synthorg/persistence/postgres/preset_repo.py @@ -26,6 +26,10 @@ PRESET_CUSTOM_LISTED, PRESET_CUSTOM_SAVE_FAILED, ) +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) from synthorg.persistence.preset_protocol import PresetListRow, PresetRow if TYPE_CHECKING: @@ -197,20 +201,26 @@ async def get( async def list_all( self, + *, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[PresetListRow, ...]: - """List all custom presets ordered by name. + """List custom presets ordered by name, bounded by *limit*. - Returns: - Tuple of ``PresetListRow`` named tuples. + Args: + limit: Maximum presets to return (default + :data:`DEFAULT_LIST_LIMIT`). Raises: - QueryError: If the database query fails. + QueryError: If the database query or pagination validation + fails. """ + validate_pagination_args(limit, 0, event=PRESET_CUSTOM_LIST_FAILED) try: async with self._pool.connection() as conn, conn.cursor() as cur: await cur.execute( "SELECT name, config_json, description, created_at, " - "updated_at FROM custom_presets ORDER BY name", + "updated_at FROM custom_presets ORDER BY name LIMIT %s", + (limit,), ) rows = await cur.fetchall() except psycopg.Error as exc: diff --git a/src/synthorg/persistence/postgres/project_repo.py b/src/synthorg/persistence/postgres/project_repo.py index 7b2c220a61..9141dfbf3c 100644 --- a/src/synthorg/persistence/postgres/project_repo.py +++ b/src/synthorg/persistence/postgres/project_repo.py @@ -25,6 +25,10 @@ PERSISTENCE_PROJECT_LISTED, PERSISTENCE_PROJECT_SAVE_FAILED, ) +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) if TYPE_CHECKING: from psycopg_pool import AsyncConnectionPool @@ -233,11 +237,20 @@ async def list_projects( *, status: ProjectStatus | None = None, lead: NotBlankStr | None = None, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[Project, ...]: - """List projects with optional filters.""" - query = "SELECT * FROM projects" + """List projects with optional filters. + + Args: + status: Filter by project status. + lead: Filter by project lead agent ID. + limit: Maximum projects to return (default + :data:`DEFAULT_LIST_LIMIT`). + """ + validate_pagination_args(limit, 0, event=PERSISTENCE_PROJECT_LIST_FAILED) + effective_limit = min(limit, _MAX_LIST_ROWS) conditions: list[str] = [] - params: list[str] = [] + params: list[object] = [] if status is not None: conditions.append("status = %s") @@ -246,9 +259,9 @@ async def list_projects( conditions.append("lead = %s") params.append(lead) - if conditions: - query += " WHERE " + " AND ".join(conditions) - query += f" ORDER BY id LIMIT {_MAX_LIST_ROWS}" + where_clause = " WHERE " + " AND ".join(conditions) if conditions else "" + query = f"SELECT * FROM projects{where_clause} ORDER BY id LIMIT %s" # noqa: S608 + params.append(effective_limit) try: async with ( diff --git a/src/synthorg/persistence/postgres/risk_override_repo.py b/src/synthorg/persistence/postgres/risk_override_repo.py index 8ce0eab333..cae1a1601a 100644 --- a/src/synthorg/persistence/postgres/risk_override_repo.py +++ b/src/synthorg/persistence/postgres/risk_override_repo.py @@ -22,6 +22,10 @@ PERSISTENCE_RISK_OVERRIDE_SAVE_FAILED, ) from synthorg.persistence._shared import normalize_utc +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) from synthorg.security.rules.risk_override import RiskTierOverride if TYPE_CHECKING: @@ -135,8 +139,21 @@ async def get( ) raise QueryError(msg) from exc - async def list_active(self) -> tuple[RiskTierOverride, ...]: - """Return all active (non-expired, non-revoked) overrides.""" + async def list_active( + self, + *, + limit: int = DEFAULT_LIST_LIMIT, + ) -> tuple[RiskTierOverride, ...]: + """Return active overrides bounded by *limit*. + + Args: + limit: Maximum overrides to return (default + :data:`DEFAULT_LIST_LIMIT`). + + Raises: + QueryError: If the query or pagination validation fails. + """ + validate_pagination_args(limit, 0, event=PERSISTENCE_RISK_OVERRIDE_QUERY_FAILED) now_utc = datetime.now(tz=UTC) try: async with ( @@ -146,8 +163,8 @@ async def list_active(self) -> tuple[RiskTierOverride, ...]: await cur.execute( f"SELECT {_COLS} FROM risk_overrides " # noqa: S608 "WHERE revoked_at IS NULL AND expires_at > %s " - "ORDER BY created_at DESC", - (now_utc,), + "ORDER BY created_at DESC LIMIT %s", + (now_utc, limit), ) rows = await cur.fetchall() except psycopg.Error as exc: diff --git a/src/synthorg/persistence/postgres/subworkflow_repo.py b/src/synthorg/persistence/postgres/subworkflow_repo.py index afdc528dca..fd26503a3e 100644 --- a/src/synthorg/persistence/postgres/subworkflow_repo.py +++ b/src/synthorg/persistence/postgres/subworkflow_repo.py @@ -37,6 +37,10 @@ PERSISTENCE_SUBWORKFLOW_LISTED, PERSISTENCE_SUBWORKFLOW_SAVE_FAILED, ) +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) if TYPE_CHECKING: from psycopg_pool import AsyncConnectionPool @@ -283,16 +287,23 @@ async def get( async def list_versions( self, subworkflow_id: NotBlankStr, + *, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[NotBlankStr, ...]: - """List all semver strings for a subworkflow, newest first.""" + """List semver strings for a subworkflow, newest first. + + Bounded by *limit* (default :data:`DEFAULT_LIST_LIMIT`). + """ + validate_pagination_args(limit, 0, event=PERSISTENCE_SUBWORKFLOW_LIST_FAILED) try: async with ( self._pool.connection() as conn, conn.cursor(row_factory=dict_row) as cur, ): await cur.execute( - "SELECT semver FROM subworkflows WHERE subworkflow_id = %s", - (subworkflow_id,), + "SELECT semver FROM subworkflows " + "WHERE subworkflow_id = %s LIMIT %s", + (subworkflow_id, limit), ) rows = await cur.fetchall() except psycopg.Error as exc: @@ -309,15 +320,32 @@ async def list_versions( versions.sort(key=_semver_sort_key, reverse=True) return tuple(versions) - async def list_summaries(self) -> tuple[SubworkflowSummary, ...]: - """Return a summary for every unique subworkflow.""" + async def list_summaries( + self, + *, + limit: int = DEFAULT_LIST_LIMIT, + ) -> tuple[SubworkflowSummary, ...]: + """Return summaries (latest version per subworkflow). + + Bounded by *limit* distinct subworkflow ids. The subquery + selects the first *limit* unique subworkflow_ids; the outer + SELECT then fetches every version row for those ids so the + client-side aggregator still sees the full version set per + included subworkflow. + """ + validate_pagination_args(limit, 0, event=PERSISTENCE_SUBWORKFLOW_LIST_FAILED) try: async with ( self._pool.connection() as conn, conn.cursor(row_factory=dict_row) as cur, ): await cur.execute( - f"SELECT {_SELECT_COLUMNS} FROM subworkflows", # noqa: S608 + f"SELECT {_SELECT_COLUMNS} FROM subworkflows " # noqa: S608 + "WHERE subworkflow_id IN (" + "SELECT DISTINCT subworkflow_id FROM subworkflows " + "ORDER BY subworkflow_id LIMIT %s" + ")", + (limit,), ) rows = await cur.fetchall() except psycopg.Error as exc: diff --git a/src/synthorg/persistence/postgres/training_plan_repo.py b/src/synthorg/persistence/postgres/training_plan_repo.py index f100c117bf..1d1aa1266e 100644 --- a/src/synthorg/persistence/postgres/training_plan_repo.py +++ b/src/synthorg/persistence/postgres/training_plan_repo.py @@ -23,6 +23,10 @@ from synthorg.observability.events.training import ( HR_TRAINING_PERSISTENCE_ERROR, ) +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) if TYPE_CHECKING: from psycopg_pool import AsyncConnectionPool @@ -278,15 +282,21 @@ async def latest_by_agent( async def list_by_agent( self, agent_id: NotBlankStr, + *, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[TrainingPlan, ...]: - """Return all plans for an agent ordered by created_at desc. + """Return plans for an agent ordered by created_at desc. Args: agent_id: Agent identifier. + limit: Maximum plans to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Tuple of plans ordered by ``created_at`` descending. + Tuple of plans ordered by ``created_at`` descending, + capped at *limit* rows. """ + validate_pagination_args(limit, 0, event=HR_TRAINING_PERSISTENCE_ERROR) try: async with ( self._pool.connection() as conn, @@ -296,8 +306,8 @@ async def list_by_agent( """\ SELECT * FROM training_plans WHERE new_agent_id = %s -ORDER BY created_at DESC""", - (str(agent_id),), +ORDER BY created_at DESC LIMIT %s""", + (str(agent_id), limit), ) rows = await cur.fetchall() except psycopg.Error as exc: diff --git a/src/synthorg/persistence/postgres/user_repo.py b/src/synthorg/persistence/postgres/user_repo.py index 46f0129387..17aed8c1b5 100644 --- a/src/synthorg/persistence/postgres/user_repo.py +++ b/src/synthorg/persistence/postgres/user_repo.py @@ -39,7 +39,10 @@ PERSISTENCE_USER_LISTED, PERSISTENCE_USER_SAVE_FAILED, ) -from synthorg.persistence._shared.pagination import validate_pagination_args +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) from synthorg.persistence.constraint_tokens import ( IDX_SINGLE_CEO, LAST_CEO_TRIGGER, @@ -234,16 +237,35 @@ async def get_by_username(self, username: NotBlankStr) -> User | None: ) raise QueryError(msg) from exc - async def list_users(self) -> tuple[User, ...]: - """List all human users ordered by creation date (excludes system user).""" + async def list_users( + self, + *, + limit: int = DEFAULT_LIST_LIMIT, + ) -> tuple[User, ...]: + """List human users ordered by creation date (excludes system user). + + Bounded by *limit* so an unauth'd caller cannot materialise an + unbounded tuple of users. For cursor-stable pagination across + large user bases use :meth:`list_users_paginated` instead. + + Args: + limit: Maximum users to return (default + :data:`DEFAULT_LIST_LIMIT`). + + Raises: + QueryError: If the database query, deserialization, or + pagination validation fails. + """ + validate_pagination_args(limit, 0, event=PERSISTENCE_USER_LIST_FAILED) try: async with ( self._pool.connection() as conn, conn.cursor(row_factory=dict_row) as cur, ): await cur.execute( - "SELECT * FROM users WHERE role != %s ORDER BY created_at, id", - (HumanRole.SYSTEM.value,), + "SELECT * FROM users WHERE role != %s " + "ORDER BY created_at, id LIMIT %s", + (HumanRole.SYSTEM.value, limit), ) rows = await cur.fetchall() except psycopg.Error as exc: diff --git a/src/synthorg/persistence/postgres/workflow_definition_repo.py b/src/synthorg/persistence/postgres/workflow_definition_repo.py index 15fae1afd3..b489739e2a 100644 --- a/src/synthorg/persistence/postgres/workflow_definition_repo.py +++ b/src/synthorg/persistence/postgres/workflow_definition_repo.py @@ -35,6 +35,10 @@ PERSISTENCE_WORKFLOW_DEF_LISTED, PERSISTENCE_WORKFLOW_DEF_SAVE_FAILED, ) +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) if TYPE_CHECKING: from psycopg_pool import AsyncConnectionPool @@ -425,29 +429,36 @@ async def list_definitions( self, *, workflow_type: WorkflowType | None = None, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[WorkflowDefinition, ...]: """List workflow definitions with optional filters. Args: workflow_type: Filter by workflow type. + limit: Maximum definitions to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Matching definitions as a tuple. + Matching definitions as a tuple, capped at *limit* rows. Raises: - QueryError: If the database query or deserialization fails. + QueryError: If the database query, deserialization, or + pagination validation fails. """ - query = f"SELECT {_SELECT_COLUMNS} FROM workflow_definitions" # noqa: S608 + validate_pagination_args(limit, 0, event=PERSISTENCE_WORKFLOW_DEF_LIST_FAILED) conditions: list[str] = [] - params: list[str] = [] + params: list[object] = [] if workflow_type is not None: conditions.append("workflow_type = %s") params.append(workflow_type.value) - if conditions: - query += " WHERE " + " AND ".join(conditions) - query += " ORDER BY updated_at DESC LIMIT 10000" + where_clause = " WHERE " + " AND ".join(conditions) if conditions else "" + query = ( + f"SELECT {_SELECT_COLUMNS} FROM workflow_definitions" # noqa: S608 + f"{where_clause} ORDER BY updated_at DESC LIMIT %s" + ) + params.append(limit) try: async with ( diff --git a/src/synthorg/persistence/postgres/workflow_execution_repo.py b/src/synthorg/persistence/postgres/workflow_execution_repo.py index 005042f1f2..a76c5fd286 100644 --- a/src/synthorg/persistence/postgres/workflow_execution_repo.py +++ b/src/synthorg/persistence/postgres/workflow_execution_repo.py @@ -41,6 +41,10 @@ PERSISTENCE_WORKFLOW_EXEC_LISTED, PERSISTENCE_WORKFLOW_EXEC_SAVE_FAILED, ) +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) if TYPE_CHECKING: from psycopg_pool import AsyncConnectionPool @@ -322,18 +326,26 @@ async def get( async def list_by_definition( self, definition_id: NotBlankStr, + *, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[WorkflowExecution, ...]: """List executions for a given workflow definition. Args: definition_id: The source definition identifier. + limit: Maximum executions to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Matching executions ordered by ``updated_at`` descending. + Matching executions ordered by ``updated_at`` descending, + capped at *limit* rows. Raises: - QueryError: If the database query fails. + QueryError: If the database query or pagination validation + fails. """ + validate_pagination_args(limit, 0, event=PERSISTENCE_WORKFLOW_EXEC_LIST_FAILED) + effective_limit = min(limit, _MAX_LIST_ROWS) try: async with ( self._pool.connection() as conn, @@ -343,7 +355,7 @@ async def list_by_definition( f"SELECT {_SELECT_COLUMNS} FROM workflow_executions" # noqa: S608 " WHERE definition_id = %s" " ORDER BY updated_at DESC, id ASC LIMIT %s", - (definition_id, _MAX_LIST_ROWS), + (definition_id, effective_limit), ) rows = await cur.fetchall() except psycopg.Error as exc: @@ -369,18 +381,26 @@ async def list_by_definition( async def list_by_status( self, status: WorkflowExecutionStatus, + *, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[WorkflowExecution, ...]: """List executions with a given status. Args: status: The execution status to filter by. + limit: Maximum executions to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Matching executions ordered by ``updated_at`` descending. + Matching executions ordered by ``updated_at`` descending, + capped at *limit* rows. Raises: - QueryError: If the database query fails. + QueryError: If the database query or pagination validation + fails. """ + validate_pagination_args(limit, 0, event=PERSISTENCE_WORKFLOW_EXEC_LIST_FAILED) + effective_limit = min(limit, _MAX_LIST_ROWS) try: async with ( self._pool.connection() as conn, @@ -390,7 +410,7 @@ async def list_by_status( f"SELECT {_SELECT_COLUMNS} FROM workflow_executions" # noqa: S608 " WHERE status = %s" " ORDER BY updated_at DESC, id ASC LIMIT %s", - (status.value, _MAX_LIST_ROWS), + (status.value, effective_limit), ) rows = await cur.fetchall() except psycopg.Error as exc: diff --git a/src/synthorg/persistence/preset_protocol.py b/src/synthorg/persistence/preset_protocol.py index 3680a92b0a..abc2125400 100644 --- a/src/synthorg/persistence/preset_protocol.py +++ b/src/synthorg/persistence/preset_protocol.py @@ -13,6 +13,7 @@ from typing import NamedTuple, Protocol, runtime_checkable from synthorg.core.types import NotBlankStr # noqa: TC001 +from synthorg.persistence._shared import DEFAULT_LIST_LIMIT class PresetRow(NamedTuple): @@ -83,11 +84,17 @@ async def get( async def list_all( self, + *, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[PresetListRow, ...]: - """List all custom presets ordered by name. + """List custom presets ordered by name. + + Args: + limit: Maximum presets to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Tuple of ``PresetListRow`` named tuples. + Tuple of ``PresetListRow`` named tuples, capped at *limit*. Raises: QueryError: If the operation fails. diff --git a/src/synthorg/persistence/project_protocol.py b/src/synthorg/persistence/project_protocol.py index f3fd849cc5..2f47ffc4c4 100644 --- a/src/synthorg/persistence/project_protocol.py +++ b/src/synthorg/persistence/project_protocol.py @@ -5,6 +5,7 @@ from synthorg.core.enums import ProjectStatus # noqa: TC001 from synthorg.core.project import Project # noqa: TC001 from synthorg.core.types import NotBlankStr # noqa: TC001 +from synthorg.persistence._shared import DEFAULT_LIST_LIMIT @runtime_checkable @@ -81,18 +82,22 @@ async def list_projects( *, status: ProjectStatus | None = None, lead: NotBlankStr | None = None, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[Project, ...]: """List projects with optional filters. Results are ordered by project ID ascending to ensure - deterministic pagination across backends. + deterministic pagination across backends. Bounded by *limit* so + an unauth'd caller cannot materialise the full table. Args: status: Filter by project status. lead: Filter by project lead agent ID. + limit: Maximum projects to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Matching projects ordered by ID, as a tuple. + Matching projects ordered by ID, capped at *limit* rows. Raises: PersistenceError: If the operation fails. diff --git a/src/synthorg/persistence/risk_override_protocol.py b/src/synthorg/persistence/risk_override_protocol.py index e3a0472185..8a6d51c40e 100644 --- a/src/synthorg/persistence/risk_override_protocol.py +++ b/src/synthorg/persistence/risk_override_protocol.py @@ -2,6 +2,8 @@ from typing import TYPE_CHECKING, Protocol, runtime_checkable +from synthorg.persistence._shared import DEFAULT_LIST_LIMIT + if TYPE_CHECKING: from pydantic import AwareDatetime @@ -39,11 +41,20 @@ async def get(self, override_id: NotBlankStr) -> RiskTierOverride | None: """ ... - async def list_active(self) -> tuple[RiskTierOverride, ...]: - """Return all active (non-expired, non-revoked) overrides. + async def list_active( + self, + *, + limit: int = DEFAULT_LIST_LIMIT, + ) -> tuple[RiskTierOverride, ...]: + """Return active (non-expired, non-revoked) overrides. + + Args: + limit: Maximum overrides to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Tuple of active overrides, ordered by created_at DESC. + Tuple of active overrides ordered by created_at DESC, + capped at *limit* rows. """ ... diff --git a/src/synthorg/persistence/sqlite/custom_rule_repo.py b/src/synthorg/persistence/sqlite/custom_rule_repo.py index 58508a0531..51876df533 100644 --- a/src/synthorg/persistence/sqlite/custom_rule_repo.py +++ b/src/synthorg/persistence/sqlite/custom_rule_repo.py @@ -27,6 +27,10 @@ row_to_custom_rule, serialize_altitudes, ) +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) if TYPE_CHECKING: from aiosqlite import Row @@ -282,29 +286,28 @@ async def list_rules( self, *, enabled_only: bool = False, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[CustomRuleDefinition, ...]: - """List custom rules ordered by name. + """List custom rules ordered by name, bounded by *limit*. Args: enabled_only: If ``True``, return only enabled rules. - - Returns: - Tuple of rule definitions. + limit: Maximum rules to return (default + :data:`DEFAULT_LIST_LIMIT`). Raises: - QueryError: If the query fails. + QueryError: If the query or pagination validation fails. """ - query = ( + validate_pagination_args(limit, 0, event=META_CUSTOM_RULE_LIST_FAILED) + base = ( "SELECT id, name, description, metric_path, " "comparator, threshold, severity, target_altitudes, " - "enabled, created_at, updated_at " - "FROM custom_rules" + "enabled, created_at, updated_at FROM custom_rules" ) - if enabled_only: - query += " WHERE enabled = 1" - query += " ORDER BY name" + where = " WHERE enabled = 1" if enabled_only else "" + query = f"{base}{where} ORDER BY name LIMIT ?" try: - async with self._db.execute(query) as cursor: + async with self._db.execute(query, (limit,)) as cursor: rows = await cursor.fetchall() except sqlite3.Error as exc: msg = "Failed to list custom rules" diff --git a/src/synthorg/persistence/sqlite/escalation_repo.py b/src/synthorg/persistence/sqlite/escalation_repo.py index 1e882b3ba1..292eb8dbbf 100644 --- a/src/synthorg/persistence/sqlite/escalation_repo.py +++ b/src/synthorg/persistence/sqlite/escalation_repo.py @@ -328,6 +328,7 @@ async def _never() -> AsyncIterator[str]: # as an async generator; put one behind an always-false # gate so it is never actually emitted. The outer ``await`` # blocks until ``stop`` is set on context-manager exit. + # lint-allow: long-running-loop-kill-switch -- sentinel coroutine. while not stop.is_set(): await stop.wait() if stop.is_set(): diff --git a/src/synthorg/persistence/sqlite/hr_repositories.py b/src/synthorg/persistence/sqlite/hr_repositories.py index 5cd2e04f6b..031cb8d6be 100644 --- a/src/synthorg/persistence/sqlite/hr_repositories.py +++ b/src/synthorg/persistence/sqlite/hr_repositories.py @@ -33,6 +33,10 @@ PERSISTENCE_TASK_METRIC_QUERY_FAILED, PERSISTENCE_TASK_METRIC_SAVE_FAILED, ) +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) if TYPE_CHECKING: from pydantic import AwareDatetime @@ -233,10 +237,15 @@ async def query( agent_id: str | None = None, since: AwareDatetime | None = None, until: AwareDatetime | None = None, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[TaskMetricRecord, ...]: - """Query task metric records with optional filters.""" + """Query task metric records with optional filters. + + Bounded by *limit* (default :data:`DEFAULT_LIST_LIMIT`). + """ + validate_pagination_args(limit, 0, event=PERSISTENCE_TASK_METRIC_QUERY_FAILED) clauses: list[str] = [] - params: list[str] = [] + params: list[object] = [] if agent_id is not None: clauses.append("agent_id = ?") params.append(agent_id) @@ -254,7 +263,8 @@ async def query( FROM task_metrics""" if clauses: sql += " WHERE " + " AND ".join(clauses) - sql += " ORDER BY completed_at DESC" + sql += " ORDER BY completed_at DESC LIMIT ?" + params.append(limit) try: cursor = await self._db.execute(sql, params) @@ -346,10 +356,15 @@ async def query( *, agent_id: str | None = None, since: AwareDatetime | None = None, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[CollaborationMetricRecord, ...]: - """Query collaboration metric records with optional filters.""" + """Query collaboration metric records with optional filters. + + Bounded by *limit* (default :data:`DEFAULT_LIST_LIMIT`). + """ + validate_pagination_args(limit, 0, event=PERSISTENCE_COLLAB_METRIC_QUERY_FAILED) clauses: list[str] = [] - params: list[str] = [] + params: list[object] = [] if agent_id is not None: clauses.append("agent_id = ?") params.append(agent_id) @@ -364,7 +379,8 @@ async def query( FROM collaboration_metrics""" if clauses: sql += " WHERE " + " AND ".join(clauses) - sql += " ORDER BY recorded_at DESC" + sql += " ORDER BY recorded_at DESC LIMIT ?" + params.append(limit) try: cursor = await self._db.execute(sql, params) diff --git a/src/synthorg/persistence/sqlite/preset_repo.py b/src/synthorg/persistence/sqlite/preset_repo.py index a151dc54c4..6ba825ea49 100644 --- a/src/synthorg/persistence/sqlite/preset_repo.py +++ b/src/synthorg/persistence/sqlite/preset_repo.py @@ -18,6 +18,10 @@ PRESET_CUSTOM_LISTED, PRESET_CUSTOM_SAVE_FAILED, ) +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) from synthorg.persistence.preset_protocol import PresetListRow, PresetRow logger = get_logger(__name__) @@ -135,19 +139,25 @@ async def get( async def list_all( self, + *, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[PresetListRow, ...]: - """List all custom presets ordered by name. + """List custom presets ordered by name, bounded by *limit*. - Returns: - Tuple of ``PresetListRow`` named tuples. + Args: + limit: Maximum presets to return (default + :data:`DEFAULT_LIST_LIMIT`). Raises: - QueryError: If the database query fails. + QueryError: If the database query or pagination validation + fails. """ + validate_pagination_args(limit, 0, event=PRESET_CUSTOM_LIST_FAILED) try: async with self._db.execute( "SELECT name, config_json, description, created_at, " - "updated_at FROM custom_presets ORDER BY name", + "updated_at FROM custom_presets ORDER BY name LIMIT ?", + (limit,), ) as cursor: rows = await cursor.fetchall() except sqlite3.Error as exc: diff --git a/src/synthorg/persistence/sqlite/project_repo.py b/src/synthorg/persistence/sqlite/project_repo.py index 3f64bcf1b4..526d0dbc18 100644 --- a/src/synthorg/persistence/sqlite/project_repo.py +++ b/src/synthorg/persistence/sqlite/project_repo.py @@ -25,6 +25,10 @@ PERSISTENCE_PROJECT_LISTED, PERSISTENCE_PROJECT_SAVE_FAILED, ) +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) from synthorg.persistence.sqlite._shared import is_unique_constraint_error logger = get_logger(__name__) @@ -320,22 +324,30 @@ async def list_projects( *, status: ProjectStatus | None = None, lead: NotBlankStr | None = None, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[Project, ...]: """List projects with optional filters. Args: status: Filter by project status. lead: Filter by project lead agent ID. + limit: Maximum projects to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Matching projects as a tuple. + Matching projects as a tuple, capped at *limit* rows. Raises: - QueryError: If the database query or deserialization fails. + QueryError: If the database query, deserialization, or + pagination validation fails. """ + validate_pagination_args(limit, 0, event=PERSISTENCE_PROJECT_LIST_FAILED) + # Clamp under the hard ceiling so a caller-supplied limit + # cannot exceed the table-scan safety bound. + effective_limit = min(limit, _MAX_LIST_ROWS) query = "SELECT * FROM projects" conditions: list[str] = [] - params: list[str] = [] + params: list[object] = [] if status is not None: conditions.append("status = ?") @@ -346,7 +358,8 @@ async def list_projects( if conditions: query += " WHERE " + " AND ".join(conditions) - query += f" ORDER BY id LIMIT {_MAX_LIST_ROWS}" + query += " ORDER BY id LIMIT ?" + params.append(effective_limit) try: cursor = await self._db.execute(query, params) diff --git a/src/synthorg/persistence/sqlite/risk_override_repo.py b/src/synthorg/persistence/sqlite/risk_override_repo.py index e7db575a33..b29ff73356 100644 --- a/src/synthorg/persistence/sqlite/risk_override_repo.py +++ b/src/synthorg/persistence/sqlite/risk_override_repo.py @@ -16,6 +16,10 @@ PERSISTENCE_RISK_OVERRIDE_SAVE_FAILED, ) from synthorg.persistence._shared import coerce_row_timestamp, format_iso_utc +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) from synthorg.persistence.sqlite._shared import is_unique_constraint_error from synthorg.security.rules.risk_override import RiskTierOverride @@ -142,15 +146,29 @@ async def get( return None return _row_to_override(row) - async def list_active(self) -> tuple[RiskTierOverride, ...]: - """Return all active (non-expired, non-revoked) overrides.""" + async def list_active( + self, + *, + limit: int = DEFAULT_LIST_LIMIT, + ) -> tuple[RiskTierOverride, ...]: + """Return active overrides bounded by *limit*. + + Args: + limit: Maximum overrides to return (default + :data:`DEFAULT_LIST_LIMIT`). + + Raises: + PersistenceError: If the query or pagination validation + fails. + """ + validate_pagination_args(limit, 0, event=PERSISTENCE_RISK_OVERRIDE_QUERY_FAILED) now_utc = format_iso_utc(datetime.now(UTC)) try: cursor = await self._db.execute( f"SELECT {_COLS} FROM risk_overrides " # noqa: S608 "WHERE revoked_at IS NULL AND expires_at > ? " - "ORDER BY created_at DESC", - (now_utc,), + "ORDER BY created_at DESC LIMIT ?", + (now_utc, limit), ) rows = await cursor.fetchall() except (sqlite3.Error, aiosqlite.Error) as exc: diff --git a/src/synthorg/persistence/sqlite/subworkflow_repo.py b/src/synthorg/persistence/sqlite/subworkflow_repo.py index ae523b163b..f91bfbb000 100644 --- a/src/synthorg/persistence/sqlite/subworkflow_repo.py +++ b/src/synthorg/persistence/sqlite/subworkflow_repo.py @@ -41,6 +41,10 @@ PERSISTENCE_SUBWORKFLOW_LISTED, PERSISTENCE_SUBWORKFLOW_SAVE_FAILED, ) +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) logger = get_logger(__name__) @@ -342,12 +346,24 @@ async def get( async def list_versions( self, subworkflow_id: NotBlankStr, + *, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[str, ...]: - """List semver strings for a subworkflow, newest first.""" + """List semver strings for a subworkflow, newest first. + + Bounded by *limit* (default :data:`DEFAULT_LIST_LIMIT`). The + SQL fetches at most *limit* rows; client-side semver sorting + then orders them descending. + + Raises: + QueryError: If the database query or pagination validation + fails. + """ + validate_pagination_args(limit, 0, event=PERSISTENCE_SUBWORKFLOW_LIST_FAILED) try: cursor = await self._db.execute( - "SELECT semver FROM subworkflows WHERE subworkflow_id = ?", - (subworkflow_id,), + "SELECT semver FROM subworkflows WHERE subworkflow_id = ? LIMIT ?", + (subworkflow_id, limit), ) rows = await cursor.fetchall() except sqlite3.Error as exc: @@ -364,12 +380,32 @@ async def list_versions( versions.sort(key=_semver_sort_key, reverse=True) return tuple(versions) - async def list_summaries(self) -> tuple[SubworkflowSummary, ...]: - """Return summaries (latest version per subworkflow).""" + async def list_summaries( + self, + *, + limit: int = DEFAULT_LIST_LIMIT, + ) -> tuple[SubworkflowSummary, ...]: + """Return summaries (latest version per subworkflow). + + Bounded by *limit* distinct subworkflow ids. The subquery + selects the first *limit* unique subworkflow_ids; the outer + SELECT then fetches every version row for those ids so the + client-side aggregator still sees the full version set per + included subworkflow. + + Raises: + QueryError: If the database query or pagination validation + fails. + """ + validate_pagination_args(limit, 0, event=PERSISTENCE_SUBWORKFLOW_LIST_FAILED) try: cursor = await self._db.execute( f"SELECT {_SUBWORKFLOW_SELECT} FROM subworkflows " # noqa: S608 - "ORDER BY subworkflow_id, created_at DESC", + "WHERE subworkflow_id IN (" + "SELECT DISTINCT subworkflow_id FROM subworkflows " + "ORDER BY subworkflow_id LIMIT ?" + ") ORDER BY subworkflow_id, created_at DESC", + (limit,), ) rows = await cursor.fetchall() except sqlite3.Error as exc: diff --git a/src/synthorg/persistence/sqlite/training_plan_repo.py b/src/synthorg/persistence/sqlite/training_plan_repo.py index a67ea328b3..f8c320263e 100644 --- a/src/synthorg/persistence/sqlite/training_plan_repo.py +++ b/src/synthorg/persistence/sqlite/training_plan_repo.py @@ -25,6 +25,10 @@ from synthorg.observability.events.training import ( HR_TRAINING_PERSISTENCE_ERROR, ) +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) logger = get_logger(__name__) @@ -309,22 +313,27 @@ async def latest_by_agent( async def list_by_agent( self, agent_id: NotBlankStr, + *, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[TrainingPlan, ...]: - """Return all plans for an agent ordered by created_at desc. + """Return plans for an agent ordered by created_at desc. Args: agent_id: Target agent identifier. + limit: Maximum plans to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Tuple of plans (may be empty). + Tuple of plans (may be empty), capped at *limit* rows. """ + validate_pagination_args(limit, 0, event=HR_TRAINING_PERSISTENCE_ERROR) try: cursor = await self._db.execute( """\ SELECT * FROM training_plans WHERE new_agent_id = ? -ORDER BY created_at DESC""", - (str(agent_id),), +ORDER BY created_at DESC LIMIT ?""", + (str(agent_id), limit), ) rows = await cursor.fetchall() except (sqlite3.Error, aiosqlite.Error) as exc: diff --git a/src/synthorg/persistence/sqlite/user_repo.py b/src/synthorg/persistence/sqlite/user_repo.py index 22382210a8..abe2348800 100644 --- a/src/synthorg/persistence/sqlite/user_repo.py +++ b/src/synthorg/persistence/sqlite/user_repo.py @@ -38,7 +38,10 @@ PERSISTENCE_USER_LISTED, PERSISTENCE_USER_SAVE_FAILED, ) -from synthorg.persistence._shared.pagination import validate_pagination_args +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) from synthorg.persistence.constraint_tokens import ( IDX_SINGLE_CEO, LAST_CEO_TRIGGER, @@ -293,22 +296,35 @@ async def get_by_username(self, username: NotBlankStr) -> User | None: ) raise QueryError(msg) from exc - async def list_users(self) -> tuple[User, ...]: - """List all human users ordered by creation date. + async def list_users( + self, + *, + limit: int = DEFAULT_LIST_LIMIT, + ) -> tuple[User, ...]: + """List human users ordered by creation date. The system user (internal CLI identity) is excluded from the result. Use ``get`` with the system user ID if you need it. + For cursor-stable pagination across large user bases use + :meth:`list_users_paginated` instead. + + Args: + limit: Maximum users to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Tuple of human ``User`` records, oldest first. + Tuple of human ``User`` records, oldest first, capped at + *limit* rows. Raises: - QueryError: If the database query or deserialization fails. + QueryError: If the database query or deserialization fails, + or *limit* is non-int / non-positive. """ + validate_pagination_args(limit, 0, event=PERSISTENCE_USER_LIST_FAILED) try: cursor = await self._db.execute( - "SELECT * FROM users WHERE role != ? ORDER BY created_at, id", - (HumanRole.SYSTEM.value,), + "SELECT * FROM users WHERE role != ? ORDER BY created_at, id LIMIT ?", + (HumanRole.SYSTEM.value, limit), ) rows = await cursor.fetchall() except (sqlite3.Error, aiosqlite.Error) as exc: diff --git a/src/synthorg/persistence/sqlite/workflow_definition_repo.py b/src/synthorg/persistence/sqlite/workflow_definition_repo.py index 546c586e8a..aadba52877 100644 --- a/src/synthorg/persistence/sqlite/workflow_definition_repo.py +++ b/src/synthorg/persistence/sqlite/workflow_definition_repo.py @@ -33,6 +33,10 @@ PERSISTENCE_WORKFLOW_DEF_LISTED, PERSISTENCE_WORKFLOW_DEF_SAVE_FAILED, ) +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) logger = get_logger(__name__) @@ -458,29 +462,36 @@ async def list_definitions( self, *, workflow_type: WorkflowType | None = None, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[WorkflowDefinition, ...]: """List workflow definitions with optional filters. Args: workflow_type: Filter by workflow type. + limit: Maximum definitions to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Matching definitions as a tuple. + Matching definitions as a tuple, capped at *limit* rows. Raises: - QueryError: If the database query or deserialization fails. + QueryError: If the database query, deserialization, or + pagination validation fails. """ - query = f"SELECT {_SELECT_COLUMNS} FROM workflow_definitions" # noqa: S608 + validate_pagination_args(limit, 0, event=PERSISTENCE_WORKFLOW_DEF_LIST_FAILED) conditions: list[str] = [] - params: list[str] = [] + params: list[object] = [] if workflow_type is not None: conditions.append("workflow_type = ?") params.append(workflow_type.value) - if conditions: - query += " WHERE " + " AND ".join(conditions) - query += " ORDER BY updated_at DESC LIMIT 10000" + where_clause = " WHERE " + " AND ".join(conditions) if conditions else "" + query = ( + f"SELECT {_SELECT_COLUMNS} FROM workflow_definitions" # noqa: S608 + f"{where_clause} ORDER BY updated_at DESC LIMIT ?" + ) + params.append(limit) try: cursor = await self._db.execute(query, params) diff --git a/src/synthorg/persistence/sqlite/workflow_execution_repo.py b/src/synthorg/persistence/sqlite/workflow_execution_repo.py index 0ae805201d..12e3ac6fd0 100644 --- a/src/synthorg/persistence/sqlite/workflow_execution_repo.py +++ b/src/synthorg/persistence/sqlite/workflow_execution_repo.py @@ -39,6 +39,10 @@ PERSISTENCE_WORKFLOW_EXEC_LISTED, PERSISTENCE_WORKFLOW_EXEC_SAVE_FAILED, ) +from synthorg.persistence._shared.pagination import ( + DEFAULT_LIST_LIMIT, + validate_pagination_args, +) logger = get_logger(__name__) @@ -367,24 +371,32 @@ async def get( async def list_by_definition( self, definition_id: NotBlankStr, + *, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[WorkflowExecution, ...]: """List executions for a given workflow definition. Args: definition_id: The source definition identifier. + limit: Maximum executions to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Matching executions ordered by ``updated_at`` descending. + Matching executions ordered by ``updated_at`` descending, + capped at *limit* rows. Raises: - QueryError: If the database query fails. + QueryError: If the database query or pagination validation + fails. """ + validate_pagination_args(limit, 0, event=PERSISTENCE_WORKFLOW_EXEC_LIST_FAILED) + effective_limit = min(limit, _MAX_LIST_ROWS) try: cursor = await self._db.execute( f"SELECT {_SELECT_COLUMNS} FROM workflow_executions" # noqa: S608 " WHERE definition_id = ?" " ORDER BY updated_at DESC, id ASC LIMIT ?", - (definition_id, _MAX_LIST_ROWS), + (definition_id, effective_limit), ) rows = await cursor.fetchall() except sqlite3.Error as exc: @@ -410,24 +422,32 @@ async def list_by_definition( async def list_by_status( self, status: WorkflowExecutionStatus, + *, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[WorkflowExecution, ...]: """List executions with a given status. Args: status: The execution status to filter by. + limit: Maximum executions to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Matching executions ordered by ``updated_at`` descending. + Matching executions ordered by ``updated_at`` descending, + capped at *limit* rows. Raises: - QueryError: If the database query fails. + QueryError: If the database query or pagination validation + fails. """ + validate_pagination_args(limit, 0, event=PERSISTENCE_WORKFLOW_EXEC_LIST_FAILED) + effective_limit = min(limit, _MAX_LIST_ROWS) try: cursor = await self._db.execute( f"SELECT {_SELECT_COLUMNS} FROM workflow_executions" # noqa: S608 " WHERE status = ?" " ORDER BY updated_at DESC, id ASC LIMIT ?", - (status.value, _MAX_LIST_ROWS), + (status.value, effective_limit), ) rows = await cursor.fetchall() except sqlite3.Error as exc: diff --git a/src/synthorg/persistence/subworkflow_protocol.py b/src/synthorg/persistence/subworkflow_protocol.py index 705891f91e..4b180f5553 100644 --- a/src/synthorg/persistence/subworkflow_protocol.py +++ b/src/synthorg/persistence/subworkflow_protocol.py @@ -22,6 +22,7 @@ ParentReference, SubworkflowSummary, ) +from synthorg.persistence._shared import DEFAULT_LIST_LIMIT __all__ = ["SubworkflowRepository"] @@ -68,23 +69,35 @@ async def get( async def list_versions( self, subworkflow_id: NotBlankStr, + *, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[NotBlankStr, ...]: - """List all semver strings for a subworkflow, newest first. + """List semver strings for a subworkflow, newest first. Args: subworkflow_id: The subworkflow identifier. + limit: Maximum versions to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: Tuple of semver strings sorted by ``packaging.version`` - comparison descending. Empty when the subworkflow does - not exist. + comparison descending, capped at *limit* rows. Empty when + the subworkflow does not exist. """ ... - async def list_summaries(self) -> tuple[SubworkflowSummary, ...]: - """Return a summary for every unique subworkflow in the registry. + async def list_summaries( + self, + *, + limit: int = DEFAULT_LIST_LIMIT, + ) -> tuple[SubworkflowSummary, ...]: + """Return summaries for unique subworkflows in the registry. The summary reflects the latest version of each subworkflow. + + Args: + limit: Maximum summaries to return (default + :data:`DEFAULT_LIST_LIMIT`). """ ... diff --git a/src/synthorg/persistence/training_protocol.py b/src/synthorg/persistence/training_protocol.py index f920e2da55..087cb7db46 100644 --- a/src/synthorg/persistence/training_protocol.py +++ b/src/synthorg/persistence/training_protocol.py @@ -12,6 +12,7 @@ TrainingPlan, # noqa: TC001 TrainingResult, # noqa: TC001 ) +from synthorg.persistence._shared import DEFAULT_LIST_LIMIT @runtime_checkable @@ -76,14 +77,18 @@ async def latest_by_agent( async def list_by_agent( self, agent_id: NotBlankStr, + *, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[TrainingPlan, ...]: - """Return all plans for an agent ordered by created_at descending. + """Return plans for an agent ordered by created_at descending. Args: agent_id: Target agent identifier. + limit: Maximum plans to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Tuple of plans (may be empty). + Tuple of plans (may be empty), capped at *limit* rows. """ ... diff --git a/src/synthorg/persistence/user_protocol.py b/src/synthorg/persistence/user_protocol.py index 5f6c152a00..2c43147446 100644 --- a/src/synthorg/persistence/user_protocol.py +++ b/src/synthorg/persistence/user_protocol.py @@ -9,6 +9,7 @@ from synthorg.core.auth.models import ApiKey, User # noqa: TC001 from synthorg.core.auth.roles import HumanRole # noqa: TC001 from synthorg.core.types import NotBlankStr # noqa: TC001 +from synthorg.persistence._shared import DEFAULT_LIST_LIMIT @runtime_checkable @@ -54,11 +55,23 @@ async def get_by_username(self, username: NotBlankStr) -> User | None: """ ... - async def list_users(self) -> tuple[User, ...]: - """List all human users (excludes the system user). + async def list_users( + self, + *, + limit: int = DEFAULT_LIST_LIMIT, + ) -> tuple[User, ...]: + """List human users (excludes the system user). + + Bounded by ``limit`` so an unauth'd caller cannot materialise an + unbounded tuple of users. For cursor-stable pagination across + large user bases use :meth:`list_users_paginated` instead. + + Args: + limit: Maximum users to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Human users as a tuple. + Human users as a tuple, capped at *limit* rows. Raises: PersistenceError: If the operation fails. diff --git a/src/synthorg/persistence/workflow_definition_protocol.py b/src/synthorg/persistence/workflow_definition_protocol.py index 33d9436031..a9e5a79f58 100644 --- a/src/synthorg/persistence/workflow_definition_protocol.py +++ b/src/synthorg/persistence/workflow_definition_protocol.py @@ -5,6 +5,7 @@ from synthorg.core.enums import WorkflowType # noqa: TC001 from synthorg.core.types import NotBlankStr # noqa: TC001 from synthorg.engine.workflow.definition import WorkflowDefinition # noqa: TC001 +from synthorg.persistence._shared import DEFAULT_LIST_LIMIT @runtime_checkable @@ -93,14 +94,17 @@ async def list_definitions( self, *, workflow_type: WorkflowType | None = None, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[WorkflowDefinition, ...]: """List workflow definitions with optional filters. Args: workflow_type: Filter by workflow type. + limit: Maximum definitions to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Matching definitions as a tuple. + Matching definitions as a tuple, capped at *limit* rows. Raises: PersistenceError: If the operation fails. diff --git a/src/synthorg/persistence/workflow_execution_protocol.py b/src/synthorg/persistence/workflow_execution_protocol.py index 70ca2ff538..e7e209506b 100644 --- a/src/synthorg/persistence/workflow_execution_protocol.py +++ b/src/synthorg/persistence/workflow_execution_protocol.py @@ -7,6 +7,7 @@ from synthorg.engine.workflow.execution_models import ( WorkflowExecution, # noqa: TC001 ) +from synthorg.persistence._shared import DEFAULT_LIST_LIMIT @runtime_checkable @@ -54,15 +55,19 @@ async def get( async def list_by_definition( self, definition_id: NotBlankStr, + *, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[WorkflowExecution, ...]: """List executions for a given workflow definition. Args: definition_id: The source definition identifier. + limit: Maximum executions to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: Matching executions as a tuple, ordered by - ``updated_at`` descending. + ``updated_at`` descending, capped at *limit* rows. Raises: PersistenceError: If the operation fails. @@ -72,15 +77,19 @@ async def list_by_definition( async def list_by_status( self, status: WorkflowExecutionStatus, + *, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[WorkflowExecution, ...]: """List executions with a given status. Args: status: The execution status to filter by. + limit: Maximum executions to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: Matching executions as a tuple, ordered by - ``updated_at`` descending. + ``updated_at`` descending, capped at *limit* rows. Raises: PersistenceError: If the operation fails. diff --git a/src/synthorg/providers/resilience/rate_limiter.py b/src/synthorg/providers/resilience/rate_limiter.py index 0540c881b4..e5f41c21dd 100644 --- a/src/synthorg/providers/resilience/rate_limiter.py +++ b/src/synthorg/providers/resilience/rate_limiter.py @@ -68,6 +68,7 @@ async def acquire(self) -> None: # Respect pause-until from retry_after. # Re-check in a loop in case pause() extends _pause_until while sleeping. + # lint-allow: long-running-loop-kill-switch -- per-call retry-wait. while True: now = self._clock.monotonic() remaining = self._pause_until - now @@ -145,6 +146,7 @@ async def _wait_for_rpm_slot(self) -> None: rpm = self._config.max_requests_per_minute window = 60.0 + # lint-allow: long-running-loop-kill-switch -- per-call RPM-slot wait. while True: async with self._rpm_lock: now = self._clock.monotonic() diff --git a/src/synthorg/security/timeout/scheduler.py b/src/synthorg/security/timeout/scheduler.py index ccf087b406..db9ef987e7 100644 --- a/src/synthorg/security/timeout/scheduler.py +++ b/src/synthorg/security/timeout/scheduler.py @@ -285,6 +285,7 @@ async def _run_loop(self) -> None: if wake_event is None: # defensive; start() guarantees non-None msg = "_run_loop invoked without an initialised wake event" raise RuntimeError(msg) + # lint-allow: long-running-loop-kill-switch -- stop()/cancel drives shutdown. while True: wake_event.clear() with contextlib.suppress(TimeoutError): diff --git a/src/synthorg/telemetry/collector.py b/src/synthorg/telemetry/collector.py index fa979bbcd1..0f6869120f 100644 --- a/src/synthorg/telemetry/collector.py +++ b/src/synthorg/telemetry/collector.py @@ -318,7 +318,8 @@ def __init__( self._heartbeat_task: asyncio.Task[None] | None = None self._heartbeat_snapshot_provider = heartbeat_snapshot_provider self._session_summary_snapshot_provider = session_summary_snapshot_provider - self._lifecycle_lock = asyncio.Lock() + # Eager init: shutdown() must be safe before any start() call. + self._lifecycle_lock = asyncio.Lock() # lint-allow: loop-bound-init -- see. # ``shutdown()`` flips this so a stray second ``start()`` is # rejected loudly rather than silently re-using a torn-down # reporter. Restart-after-shutdown is not a supported flow: @@ -774,6 +775,7 @@ async def _heartbeat_loop(self) -> None: re-raised for graceful shutdown. """ interval = self._config.heartbeat_interval_hours * 3600 + # lint-allow: long-running-loop-kill-switch -- telemetry.enabled gates work. while True: try: await asyncio.sleep(interval) diff --git a/src/synthorg/tools/sandbox/lifecycle/per_agent.py b/src/synthorg/tools/sandbox/lifecycle/per_agent.py index d756d74d5c..ca2dd1933c 100644 --- a/src/synthorg/tools/sandbox/lifecycle/per_agent.py +++ b/src/synthorg/tools/sandbox/lifecycle/per_agent.py @@ -274,6 +274,7 @@ def _reset_idle_timer(self, owner_id: str) -> None: return async def _idle_expire() -> None: + # lint-allow: long-running-loop-kill-switch -- per-owner idle-expiry. while True: async with self._lock: last = self._last_used.get(owner_id) diff --git a/src/synthorg/workers/worker.py b/src/synthorg/workers/worker.py index a40fcd2986..19730052ad 100644 --- a/src/synthorg/workers/worker.py +++ b/src/synthorg/workers/worker.py @@ -80,7 +80,10 @@ def __init__( self._executor = executor self._worker_id = worker_id self._running = False - self._stop_event = asyncio.Event() + # Eager init: ``stop()`` may set the event before ``run()`` has + # ever entered the loop, so a half-published attribute would + # race with shutdown signalling. + self._stop_event = asyncio.Event() # lint-allow: loop-bound-init -- see above. # Dedicated lifecycle lock per docs/reference/lifecycle-sync.md. # Held across the full body of run() and stop() so a racing # start cannot see _running=False mid-drain and spawn a new @@ -88,8 +91,9 @@ def __init__( # an "in-place runner" (start runs the loop on the calling # coroutine), so the lock guards only the _running transition; # holding it across the whole loop body would deadlock a - # second concurrent caller. - self._lifecycle_lock = asyncio.Lock() + # second concurrent caller. Eager init: stop() must be safe + # before any run() call. + self._lifecycle_lock = asyncio.Lock() # lint-allow: loop-bound-init -- see. @property def is_running(self) -> bool: @@ -112,6 +116,7 @@ async def run(self) -> None: logger.info(WORKERS_WORKER_STARTED, worker_id=self._worker_id) try: + # lint-allow: long-running-loop-kill-switch -- _stop_event drives shutdown. while not self._stop_event.is_set(): await self._run_once() finally: diff --git a/tests/unit/api/fake_user_repository.py b/tests/unit/api/fake_user_repository.py index 2df6a264ae..9082580a51 100644 --- a/tests/unit/api/fake_user_repository.py +++ b/tests/unit/api/fake_user_repository.py @@ -11,6 +11,7 @@ from synthorg.core.auth.roles import HumanRole from synthorg.core.persistence_errors import ConstraintViolationError, QueryError from synthorg.core.types import NotBlankStr +from synthorg.persistence._shared import DEFAULT_LIST_LIMIT from synthorg.persistence.constraint_tokens import ( IDX_SINGLE_CEO, LAST_CEO_TRIGGER, @@ -94,10 +95,15 @@ async def get_by_username(self, username: str) -> User | None: return copy.deepcopy(user) return None - async def list_users(self) -> tuple[User, ...]: - return tuple( + async def list_users( + self, + *, + limit: int = DEFAULT_LIST_LIMIT, + ) -> tuple[User, ...]: + humans = tuple( copy.deepcopy(u) for u in self._users.values() if u.role != HumanRole.SYSTEM ) + return humans[:limit] async def list_users_paginated( self, diff --git a/tests/unit/api/fakes.py b/tests/unit/api/fakes.py index 1b61a0f573..79064762f4 100644 --- a/tests/unit/api/fakes.py +++ b/tests/unit/api/fakes.py @@ -496,13 +496,14 @@ async def list_projects( *, status: ProjectStatus | None = None, lead: NotBlankStr | None = None, + limit: int = 100, ) -> tuple[Project, ...]: result = sorted(self._projects.values(), key=lambda p: p.id) if status is not None: result = [p for p in result if p.status == status] if lead is not None: result = [p for p in result if p.lead == lead] - return tuple(result) + return tuple(result[:limit]) async def delete(self, project_id: NotBlankStr) -> bool: return self._projects.pop(project_id, None) is not None @@ -607,10 +608,11 @@ async def save( async def get(self, name: NotBlankStr) -> PresetRow | None: return self._presets.get(name) - async def list_all(self) -> tuple[PresetListRow, ...]: - return tuple( + async def list_all(self, *, limit: int = 100) -> tuple[PresetListRow, ...]: + rows = tuple( PresetListRow(name, *row) for name, row in sorted(self._presets.items()) ) + return rows[:limit] async def delete(self, name: NotBlankStr) -> bool: return self._presets.pop(name, None) is not None diff --git a/tests/unit/api/fakes_backend.py b/tests/unit/api/fakes_backend.py index 2683484615..652287ddd8 100644 --- a/tests/unit/api/fakes_backend.py +++ b/tests/unit/api/fakes_backend.py @@ -74,10 +74,14 @@ async def get( ) -> RiskTierOverride | None: return self._overrides.get(override_id) - async def list_active(self) -> tuple[RiskTierOverride, ...]: + async def list_active( + self, + *, + limit: int = 100, + ) -> tuple[RiskTierOverride, ...]: active = [o for o in self._overrides.values() if o.is_active] active.sort(key=lambda o: o.created_at, reverse=True) - return tuple(active) + return tuple(active[:limit]) async def revoke( self, @@ -423,11 +427,12 @@ async def list_rules( self, *, enabled_only: bool = False, + limit: int = 100, ) -> tuple[CustomRuleDefinition, ...]: rules = list(self._rules.values()) if enabled_only: rules = [r for r in rules if r.enabled] - return tuple(sorted(rules, key=lambda r: r.name)) + return tuple(sorted(rules, key=lambda r: r.name)[:limit]) async def delete(self, rule_id: NotBlankStr) -> bool: key = str(rule_id) diff --git a/tests/unit/api/fakes_workflow.py b/tests/unit/api/fakes_workflow.py index 4fb6f5e045..f83cee6f5e 100644 --- a/tests/unit/api/fakes_workflow.py +++ b/tests/unit/api/fakes_workflow.py @@ -52,11 +52,12 @@ async def list_definitions( self, *, workflow_type: WorkflowType | None = None, + limit: int = 100, ) -> tuple[WorkflowDefinition, ...]: result = list(self._definitions.values()) if workflow_type is not None: result = [d for d in result if d.workflow_type == workflow_type] - return tuple(copy.deepcopy(d) for d in result) + return tuple(copy.deepcopy(d) for d in result[:limit]) async def delete(self, definition_id: str) -> bool: return self._definitions.pop(definition_id, None) is not None @@ -96,24 +97,28 @@ async def get(self, execution_id: str) -> WorkflowExecution | None: async def list_by_definition( self, definition_id: str, + *, + limit: int = 100, ) -> tuple[WorkflowExecution, ...]: result = sorted( [e for e in self._executions.values() if e.definition_id == definition_id], key=lambda e: e.updated_at, reverse=True, ) - return tuple(copy.deepcopy(e) for e in result) + return tuple(copy.deepcopy(e) for e in result[:limit]) async def list_by_status( self, status: WorkflowExecutionStatus, + *, + limit: int = 100, ) -> tuple[WorkflowExecution, ...]: result = sorted( [e for e in self._executions.values() if e.status == status], key=lambda e: e.updated_at, reverse=True, ) - return tuple(copy.deepcopy(e) for e in result) + return tuple(copy.deepcopy(e) for e in result[:limit]) async def find_by_task_id( self, @@ -263,12 +268,19 @@ async def get( async def list_versions( self, subworkflow_id: NotBlankStr, + *, + limit: int = 100, ) -> tuple[str, ...]: versions = [v for (sid, v) in self._rows if sid == subworkflow_id] versions.sort(key=_semver_key, reverse=True) - return tuple(versions) + return tuple(versions[:limit]) - async def list_summaries(self) -> tuple[SubworkflowSummary, ...]: + async def list_summaries( + self, + *, + limit: int = 100, + ) -> tuple[SubworkflowSummary, ...]: + del limit # fake aggregates client-side; limit applied at end below grouped: dict[str, list[WorkflowDefinition]] = {} for definition in self._rows.values(): grouped.setdefault(definition.id, []).append(definition) diff --git a/tests/unit/api/services/test_project_service.py b/tests/unit/api/services/test_project_service.py index b2d682ff30..bb23c77dee 100644 --- a/tests/unit/api/services/test_project_service.py +++ b/tests/unit/api/services/test_project_service.py @@ -52,13 +52,14 @@ async def list_projects( *, status: ProjectStatus | None = None, lead: NotBlankStr | None = None, + limit: int = 100, ) -> tuple[Project, ...]: rows = sorted(self._rows.values(), key=lambda p: p.id) if status is not None: rows = [p for p in rows if p.status == status] if lead is not None: rows = [p for p in rows if p.lead == lead] - return tuple(rows) + return tuple(rows[:limit]) async def delete(self, project_id: NotBlankStr) -> bool: return self._rows.pop(project_id, None) is not None diff --git a/tests/unit/engine/workflow/test_execution_lifecycle.py b/tests/unit/engine/workflow/test_execution_lifecycle.py index cecc3498f6..086ba860b2 100644 --- a/tests/unit/engine/workflow/test_execution_lifecycle.py +++ b/tests/unit/engine/workflow/test_execution_lifecycle.py @@ -72,7 +72,9 @@ async def list_definitions( self, *, workflow_type: object = None, + limit: int = 100, ) -> tuple[WorkflowDefinition, ...]: + del workflow_type, limit return tuple(self._store.values()) async def delete(self, definition_id: str) -> bool: @@ -110,7 +112,10 @@ async def get(self, execution_id: str) -> WorkflowExecution | None: async def list_by_definition( self, definition_id: str, + *, + limit: int = 100, ) -> tuple[WorkflowExecution, ...]: + del limit return tuple( copy.deepcopy(e) for e in self._store.values() @@ -120,7 +125,10 @@ async def list_by_definition( async def list_by_status( self, status: object, + *, + limit: int = 100, ) -> tuple[WorkflowExecution, ...]: + del limit return tuple( copy.deepcopy(e) for e in self._store.values() if e.status == status ) diff --git a/tests/unit/engine/workflow/test_execution_service.py b/tests/unit/engine/workflow/test_execution_service.py index 2ec5952b36..711e388917 100644 --- a/tests/unit/engine/workflow/test_execution_service.py +++ b/tests/unit/engine/workflow/test_execution_service.py @@ -72,7 +72,9 @@ async def list_definitions( self, *, workflow_type: object = None, + limit: int = 100, ) -> tuple[WorkflowDefinition, ...]: + del workflow_type, limit return tuple(self._store.values()) async def delete(self, definition_id: str) -> bool: @@ -110,7 +112,10 @@ async def get(self, execution_id: str) -> WorkflowExecution | None: async def list_by_definition( self, definition_id: str, + *, + limit: int = 100, ) -> tuple[WorkflowExecution, ...]: + del limit return tuple( copy.deepcopy(e) for e in self._store.values() @@ -120,7 +125,10 @@ async def list_by_definition( async def list_by_status( self, status: object, + *, + limit: int = 100, ) -> tuple[WorkflowExecution, ...]: + del limit return tuple(e for e in self._store.values() if e.status == status) async def find_by_task_id( diff --git a/tests/unit/engine/workflow/test_subworkflow_registry.py b/tests/unit/engine/workflow/test_subworkflow_registry.py index 7d6aa3ad2f..b93eb6fccb 100644 --- a/tests/unit/engine/workflow/test_subworkflow_registry.py +++ b/tests/unit/engine/workflow/test_subworkflow_registry.py @@ -101,14 +101,24 @@ async def get( row = self._rows.get((subworkflow_id, version)) return copy.deepcopy(row) if row is not None else None - async def list_versions(self, subworkflow_id: str) -> tuple[str, ...]: + async def list_versions( + self, + subworkflow_id: str, + *, + limit: int = 100, + ) -> tuple[str, ...]: from packaging.version import Version versions = [v for (sid, v) in self._rows if sid == subworkflow_id] versions.sort(key=Version, reverse=True) - return tuple(versions) + return tuple(versions[:limit]) - async def list_summaries(self) -> tuple[SubworkflowSummary, ...]: + async def list_summaries( + self, + *, + limit: int = 100, + ) -> tuple[SubworkflowSummary, ...]: + del limit grouped: dict[str, list[WorkflowDefinition]] = {} for definition in self._rows.values(): grouped.setdefault(definition.id, []).append(definition) diff --git a/tests/unit/meta/test_rules_service.py b/tests/unit/meta/test_rules_service.py index caa7bfe996..4225e1f286 100644 --- a/tests/unit/meta/test_rules_service.py +++ b/tests/unit/meta/test_rules_service.py @@ -42,9 +42,10 @@ async def list_rules( self, *, enabled_only: bool = False, + limit: int = 100, ) -> tuple[CustomRuleDefinition, ...]: rows = [r for r in self._rows.values() if not enabled_only or r.enabled] - return tuple(sorted(rows, key=lambda r: r.name)) + return tuple(sorted(rows, key=lambda r: r.name)[:limit]) async def delete(self, rule_id: NotBlankStr) -> bool: return self._rows.pop(str(rule_id), None) is not None diff --git a/tests/unit/persistence/test_protocol.py b/tests/unit/persistence/test_protocol.py index 9cd06a9a25..2f26717495 100644 --- a/tests/unit/persistence/test_protocol.py +++ b/tests/unit/persistence/test_protocol.py @@ -299,7 +299,8 @@ async def get(self, user_id: str) -> User | None: async def get_by_username(self, username: str) -> User | None: return None - async def list_users(self) -> tuple[User, ...]: + async def list_users(self, *, limit: int = 100) -> tuple[User, ...]: + del limit return () async def list_users_paginated( @@ -534,7 +535,9 @@ async def list_definitions( self, *, workflow_type: WorkflowType | None = None, + limit: int = 100, ) -> tuple[WorkflowDefinition, ...]: + del workflow_type, limit return () async def delete(self, definition_id: NotBlankStr) -> bool: @@ -554,13 +557,19 @@ async def get( async def list_by_definition( self, definition_id: NotBlankStr, + *, + limit: int = 100, ) -> tuple[WorkflowExecution, ...]: + del definition_id, limit return () async def list_by_status( self, status: WorkflowExecutionStatus, + *, + limit: int = 100, ) -> tuple[WorkflowExecution, ...]: + del status, limit return () async def find_by_task_id( From 92d6c84fc9b24d0dcb6bbb6513191bff049f2665 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Mon, 11 May 2026 18:11:38 +0200 Subject: [PATCH 2/7] fix: address pre-PR review findings for #1857 - C7: LifecycleEventRepository.list_events now uses DEFAULT_LIST_LIMIT on the Protocol + SQLite + Postgres impls. - C8: validate_pagination_args now returns the clamped limit; the helper enforces a MAX_LIST_LIMIT=10_000 ceiling so callers that bypass the API's CursorLimit cannot trigger an unbounded scan. Baselined repos (user, workflow_definition, workflow_execution, subworkflow, hr, preset, custom_rule, risk_override, training_plan) opt-in by capturing the return value. - M1: user_protocol.list_by_user docstring now matches the actual limit=100 default (was claiming limit=None fetch-all semantics). - M3 + M4: docs/reference/mcp-handler-contract.md + docs/design/tools.md clarify the intentional MCP default_limit=50 vs DEFAULT_LIST_LIMIT=100 scope split. - M6: web/src/api/types/projects.ts drops the dead offset field; the backend is cursor-based. - C6 + frontend pagination: web/src/api/endpoints/workflow-executions.ts now accepts cursor/limit params and returns PaginatedResult; the page consumer and MSW handler updated to match. - Fakes that lied: tests/unit/api/fakes_workflow.py + tests/unit/engine/workflow/test_{execution_lifecycle,execution_service,subworkflow_registry}.py now slice [:limit] instead of dropping the param. - m1a + m1b: two long-running-loop kill-switch lint-allow comments reworded to reflect the actual termination mechanism (stop()/cancel). - m3: pg project_repo gains an inline safety-invariant comment on the WHERE-clause f-string. - M7-13: per-backend limit conformance tests added for project, workflow_definition, workflow_execution (list_by_definition + list_by_status), subworkflow (list_summaries + list_versions), preset.list_all, risk_override.list_active, custom_rule.list_rules. - m2a-c: has_more cursor-pagination tests for projects, workflows, workflow_executions controllers. - m4: parametrized limit-truncation tests in test_project_service + test_rules_service. Verified false positives (logged in triage): - M2: docs/guides/budget.md correctly documents the budget controller's CursorLimit=50 default (distinct from repo DEFAULT_LIST_LIMIT=100). - M5: web/src/stores/projects.ts limit:200 is the API MAX_LIMIT ceiling, not silently truncated. - n1: gate-registry baseline counts (optional doc churn). - n2: '-- see.' lint-allow markers point to a preceding multi-line justification comment per the project convention. - C5 partial: test_protocol.py degenerate fakes return () regardless, which already satisfies 'max limit' trivially. --- docs/design/tools.md | 5 ++- docs/reference/mcp-handler-contract.md | 2 +- .../communication/meeting/scheduler.py | 2 +- src/synthorg/hr/persistence_protocol.py | 7 ++-- src/synthorg/persistence/_shared/__init__.py | 2 + .../persistence/_shared/pagination.py | 16 +++++++- .../persistence/postgres/custom_rule_repo.py | 2 +- .../persistence/postgres/hr_repositories.py | 15 +++++-- .../persistence/postgres/preset_repo.py | 2 +- .../persistence/postgres/project_repo.py | 4 ++ .../postgres/risk_override_repo.py | 4 +- .../persistence/postgres/subworkflow_repo.py | 8 +++- .../postgres/training_plan_repo.py | 2 +- .../persistence/postgres/user_repo.py | 2 +- .../postgres/workflow_definition_repo.py | 4 +- .../postgres/workflow_execution_repo.py | 8 +++- .../persistence/sqlite/custom_rule_repo.py | 2 +- .../persistence/sqlite/hr_repositories.py | 15 +++++-- .../persistence/sqlite/preset_repo.py | 2 +- .../persistence/sqlite/risk_override_repo.py | 4 +- .../persistence/sqlite/subworkflow_repo.py | 8 +++- .../persistence/sqlite/training_plan_repo.py | 2 +- src/synthorg/persistence/sqlite/user_repo.py | 2 +- .../sqlite/workflow_definition_repo.py | 4 +- .../sqlite/workflow_execution_repo.py | 8 +++- src/synthorg/persistence/user_protocol.py | 9 ++--- src/synthorg/telemetry/collector.py | 2 +- .../test_custom_rule_repository.py | 7 ++++ .../persistence/test_preset_repository.py | 13 ++++++ .../persistence/test_project_repository.py | 9 +++++ .../test_risk_override_repository.py | 9 +++++ .../test_subworkflow_repository.py | 22 ++++++++++ .../test_workflow_definition_repository.py | 11 +++++ .../test_workflow_execution_repository.py | 40 +++++++++++++++++++ tests/unit/api/controllers/test_projects.py | 17 ++++++++ .../controllers/test_workflow_executions.py | 21 ++++++++++ tests/unit/api/controllers/test_workflows.py | 13 ++++++ tests/unit/api/fakes_workflow.py | 3 +- .../unit/api/services/test_project_service.py | 28 +++++++++++++ .../workflow/test_execution_lifecycle.py | 6 +-- .../engine/workflow/test_execution_service.py | 6 +-- .../workflow/test_subworkflow_registry.py | 3 +- tests/unit/meta/test_rules_service.py | 17 ++++++++ web/src/api/endpoints/workflow-executions.ts | 19 +++++---- web/src/api/types/projects.ts | 2 +- web/src/mocks/handlers/workflow-executions.ts | 10 ++++- web/src/pages/WorkflowExecutionsPage.tsx | 4 +- 47 files changed, 336 insertions(+), 67 deletions(-) diff --git a/docs/design/tools.md b/docs/design/tools.md index 2db811ace6..c0ff9a3615 100644 --- a/docs/design/tools.md +++ b/docs/design/tools.md @@ -336,7 +336,10 @@ exceptions: `since < until` ordering. - `parse_str_sequence(arguments, key)`: optional sequence-of-non-blank-strings. - `coerce_pagination(arguments, *, default_limit=50)`: offset/limit - parsing with strict bounds and explicit bool rejection. + parsing with strict bounds and explicit bool rejection. MCP tools + default to 50; this is intentionally lower than the repository-layer + `DEFAULT_LIST_LIMIT = 100` so paginated MCP responses stay terse for + assistants. - `actor_id(actor)` / `require_actor_id(actor)` / `actor_label(actor)`: actor identity helpers. Use `actor_id` for optional attribution, `require_actor_id` when attribution is mandatory (raises if diff --git a/docs/reference/mcp-handler-contract.md b/docs/reference/mcp-handler-contract.md index 0e4d8bb7dc..4015ab2ed7 100644 --- a/docs/reference/mcp-handler-contract.md +++ b/docs/reference/mcp-handler-contract.md @@ -59,7 +59,7 @@ Use the helpers in `common_args.py` for tools without `args_model`: - `require_dict(arguments, key, *, value_type=None, deep_copy=True)` for dict args; pass `value_type=str` for `dict[str, str]` validation. - `parse_time_window(arguments, *, until_required=True)` for ISO 8601 since/until parsing. - `parse_str_sequence(arguments, key)` for optional sequence-of-non-blank-strings args. -- `coerce_pagination(arguments, *, default_limit=50)` for offset/limit with bool rejection and bound enforcement. +- `coerce_pagination(arguments, *, default_limit=50)` for offset/limit with bool rejection and bound enforcement. (MCP tools default to 50; this is intentionally lower than the repository-layer `DEFAULT_LIST_LIMIT = 100` so paginated MCP responses stay terse for assistants.) For actor identity: use `actor_id(actor)` for optional attribution, `require_actor_id(actor)` when attribution is mandatory (raises if missing), and `actor_label(actor)` only for emit-only paths where a `"mcp-anonymous"` fallback is acceptable. diff --git a/src/synthorg/communication/meeting/scheduler.py b/src/synthorg/communication/meeting/scheduler.py index 4459fbf434..e87d5c8f46 100644 --- a/src/synthorg/communication/meeting/scheduler.py +++ b/src/synthorg/communication/meeting/scheduler.py @@ -489,7 +489,7 @@ async def _run_periodic( # Sleep-first: avoids duplicate meetings on restart/deploy. try: - # lint-allow: long-running-loop-kill-switch -- meetings_enabled gates work. + # lint-allow: long-running-loop-kill-switch -- stop() cancel. while True: await asyncio.sleep(interval) logger.info( diff --git a/src/synthorg/hr/persistence_protocol.py b/src/synthorg/hr/persistence_protocol.py index 29019c4965..3c41d0b074 100644 --- a/src/synthorg/hr/persistence_protocol.py +++ b/src/synthorg/hr/persistence_protocol.py @@ -40,7 +40,7 @@ async def list_events( agent_id: NotBlankStr | None = None, event_type: LifecycleEventType | None = None, since: AwareDatetime | None = None, - limit: int = 100, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[AgentLifecycleEvent, ...]: """List lifecycle events with optional filters. @@ -48,10 +48,11 @@ async def list_events( agent_id: Filter by agent identifier. event_type: Filter by event type. since: Filter events after this timestamp. - limit: Maximum number of events to return. + limit: Maximum events to return (default + :data:`DEFAULT_LIST_LIMIT`). Returns: - Matching lifecycle events. + Matching lifecycle events capped at *limit* rows. Raises: PersistenceError: If the operation fails. diff --git a/src/synthorg/persistence/_shared/__init__.py b/src/synthorg/persistence/_shared/__init__.py index 74fc353204..69f47e6a44 100644 --- a/src/synthorg/persistence/_shared/__init__.py +++ b/src/synthorg/persistence/_shared/__init__.py @@ -21,11 +21,13 @@ class predicates) stay in the backend repo modules and are passed ) from synthorg.persistence._shared.pagination import ( DEFAULT_LIST_LIMIT, + MAX_LIST_LIMIT, validate_pagination_args, ) __all__ = ( "DEFAULT_LIST_LIMIT", + "MAX_LIST_LIMIT", "coerce_row_timestamp", "format_iso_utc", "normalize_utc", diff --git a/src/synthorg/persistence/_shared/pagination.py b/src/synthorg/persistence/_shared/pagination.py index 468960d2e2..f1b677c7e0 100644 --- a/src/synthorg/persistence/_shared/pagination.py +++ b/src/synthorg/persistence/_shared/pagination.py @@ -23,6 +23,14 @@ # the codebase (30+ repositories already default ``limit`` to 100). DEFAULT_LIST_LIMIT: Final[int] = 100 +# Hard upper bound on ``list_*`` / ``query`` page sizes regardless of +# caller-supplied limit. Defense-in-depth: the API layer's +# ``CursorLimit`` already caps caller input at 200, but internal +# service calls that bypass the API would not see that bound. The +# 10_000 ceiling matches the established per-repo ``_MAX_LIST_ROWS`` +# precedent in ``persistence/{sqlite,postgres}/project_repo.py``. +MAX_LIST_LIMIT: Final[int] = 10_000 + def validate_pagination_args( limit: object, @@ -30,7 +38,7 @@ def validate_pagination_args( *, event: str, **context: object, -) -> None: +) -> int: """Type-check + bounds-check pagination args; log + raise on failure. Args: @@ -44,6 +52,11 @@ def validate_pagination_args( **context: Extra structured fields the caller wants to attach to the log line (e.g. ``task_id`` or ``agent_id``). + Returns: + ``limit`` clamped to ``[1, MAX_LIST_LIMIT]`` so repository + callers cannot trigger an unbounded scan even when an internal + path bypasses the API layer's ``CursorLimit`` bound. + Raises: QueryError: If either argument fails the type or bounds check. The structured warning is emitted before the raise so @@ -85,3 +98,4 @@ def validate_pagination_args( **context, ) raise QueryError(msg) + return min(limit, MAX_LIST_LIMIT) diff --git a/src/synthorg/persistence/postgres/custom_rule_repo.py b/src/synthorg/persistence/postgres/custom_rule_repo.py index 5cf5fb187d..e14c653cc6 100644 --- a/src/synthorg/persistence/postgres/custom_rule_repo.py +++ b/src/synthorg/persistence/postgres/custom_rule_repo.py @@ -297,7 +297,7 @@ async def list_rules( Raises: QueryError: If the query or pagination validation fails. """ - validate_pagination_args(limit, 0, event=META_CUSTOM_RULE_LIST_FAILED) + limit = validate_pagination_args(limit, 0, event=META_CUSTOM_RULE_LIST_FAILED) base = ( "SELECT id, name, description, metric_path, " "comparator, threshold, severity, target_altitudes, " diff --git a/src/synthorg/persistence/postgres/hr_repositories.py b/src/synthorg/persistence/postgres/hr_repositories.py index f2db96115a..3e40df7219 100644 --- a/src/synthorg/persistence/postgres/hr_repositories.py +++ b/src/synthorg/persistence/postgres/hr_repositories.py @@ -117,9 +117,12 @@ async def list_events( agent_id: str | None = None, event_type: LifecycleEventType | None = None, since: AwareDatetime | None = None, - limit: int = 100, + limit: int = DEFAULT_LIST_LIMIT, ) -> tuple[AgentLifecycleEvent, ...]: - """List lifecycle events with optional filters.""" + """List lifecycle events with optional filters. + + Bounded by *limit* (default :data:`DEFAULT_LIST_LIMIT`). + """ clauses: list[str] = [] params: list[Any] = [] if agent_id is not None: @@ -253,7 +256,9 @@ async def query( Bounded by *limit* (default :data:`DEFAULT_LIST_LIMIT`). """ - validate_pagination_args(limit, 0, event=PERSISTENCE_TASK_METRIC_QUERY_FAILED) + limit = validate_pagination_args( + limit, 0, event=PERSISTENCE_TASK_METRIC_QUERY_FAILED + ) clauses: list[str] = [] params: list[Any] = [] if agent_id is not None: @@ -372,7 +377,9 @@ async def query( Bounded by *limit* (default :data:`DEFAULT_LIST_LIMIT`). """ - validate_pagination_args(limit, 0, event=PERSISTENCE_COLLAB_METRIC_QUERY_FAILED) + limit = validate_pagination_args( + limit, 0, event=PERSISTENCE_COLLAB_METRIC_QUERY_FAILED + ) clauses: list[str] = [] params: list[Any] = [] if agent_id is not None: diff --git a/src/synthorg/persistence/postgres/preset_repo.py b/src/synthorg/persistence/postgres/preset_repo.py index 7c06def625..7d3c7f8e80 100644 --- a/src/synthorg/persistence/postgres/preset_repo.py +++ b/src/synthorg/persistence/postgres/preset_repo.py @@ -214,7 +214,7 @@ async def list_all( QueryError: If the database query or pagination validation fails. """ - validate_pagination_args(limit, 0, event=PRESET_CUSTOM_LIST_FAILED) + limit = validate_pagination_args(limit, 0, event=PRESET_CUSTOM_LIST_FAILED) try: async with self._pool.connection() as conn, conn.cursor() as cur: await cur.execute( diff --git a/src/synthorg/persistence/postgres/project_repo.py b/src/synthorg/persistence/postgres/project_repo.py index 9141dfbf3c..cb3bc31176 100644 --- a/src/synthorg/persistence/postgres/project_repo.py +++ b/src/synthorg/persistence/postgres/project_repo.py @@ -259,6 +259,10 @@ async def list_projects( conditions.append("lead = %s") params.append(lead) + # Safety invariant: ``conditions`` only ever contains hardcoded + # ``"