diff --git a/CLAUDE.md b/CLAUDE.md index 83c969ecd2..97ac3bf04c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -90,7 +90,7 @@ See `web/CLAUDE.md` for the full component inventory, design token rules, and po - **Every module** with business logic MUST have: `from synthorg.observability import get_logger` then `logger = get_logger(__name__)` - **Never** use `import logging` / `logging.getLogger()` / `print()` in application code (exception: `observability/setup.py`, `observability/sinks.py`, `observability/syslog_handler.py`, `observability/http_handler.py`, and `observability/otlp_handler.py` may use stdlib `logging` and `print(..., file=sys.stderr)` for handler construction, bootstrap, and error reporting code that runs before or during logging system configuration) - **Variable name**: always `logger` (not `_logger`, not `log`) -- **Event names**: always use constants from the domain-specific module under `synthorg.observability.events` (e.g., `API_REQUEST_STARTED` from `events.api`, `TOOL_INVOKE_START` from `events.tool`, `GIT_COMMAND_START` from `events.git`, `CONTEXT_BUDGET_FILL_UPDATED`, `CONTEXT_BUDGET_COMPACTION_STARTED`, `CONTEXT_BUDGET_COMPACTION_COMPLETED`, `CONTEXT_BUDGET_COMPACTION_FAILED`, `CONTEXT_BUDGET_COMPACTION_SKIPPED`, `CONTEXT_BUDGET_COMPACTION_FALLBACK`, `CONTEXT_BUDGET_INDICATOR_INJECTED`, `CONTEXT_BUDGET_AGENT_COMPACTION_REQUESTED`, `CONTEXT_BUDGET_EPISTEMIC_MARKERS_PRESERVED` from `events.context_budget`, `BACKUP_STARTED` from `events.backup`, `SETUP_COMPLETED` from `events.setup`, `ROUTING_CANDIDATE_SELECTED` from `events.routing`, `SHIPPING_HTTP_BATCH_SENT` from `events.shipping`, `EVAL_REPORT_COMPUTED` from `events.evaluation`, `PROMPT_PROFILE_SELECTED` from `events.prompt`, `PROCEDURAL_MEMORY_START` from `events.procedural_memory`, `PERF_LLM_JUDGE_STARTED` from `events.performance`, `TASK_ENGINE_OBSERVER_FAILED` from `events.task_engine`, `TASK_ASSIGNMENT_PROJECT_FILTERED` and `TASK_ASSIGNMENT_PROJECT_NO_ELIGIBLE` from `events.task_assignment`, `WORKFLOW_EXEC_COMPLETED` from `events.workflow_execution`, `BLUEPRINT_INSTANTIATE_START` from `events.blueprint`, `WORKFLOW_DEF_ROLLED_BACK` from `events.workflow_definition`, `WORKFLOW_VERSION_SAVED` from `events.workflow_version`, `MEMORY_FINE_TUNE_STARTED`, `MEMORY_SELF_EDIT_TOOL_EXECUTE`, `MEMORY_SELF_EDIT_CORE_READ`, `MEMORY_SELF_EDIT_CORE_WRITE`, `MEMORY_SELF_EDIT_CORE_WRITE_REJECTED`, `MEMORY_SELF_EDIT_ARCHIVAL_SEARCH`, `MEMORY_SELF_EDIT_ARCHIVAL_WRITE`, `MEMORY_SELF_EDIT_RECALL_READ`, `MEMORY_SELF_EDIT_RECALL_WRITE`, `MEMORY_SELF_EDIT_WRITE_FAILED` from `events.memory`, `REPORTING_GENERATION_STARTED` from `events.reporting`, `RISK_BUDGET_SCORE_COMPUTED` from `events.risk_budget`, `BUDGET_PROJECT_COST_QUERIED`, `BUDGET_PROJECT_RECORDS_QUERIED`, `BUDGET_PROJECT_BUDGET_EXCEEDED`, and `BUDGET_PROJECT_ENFORCEMENT_CHECK` from `events.budget`, `LLM_STRATEGY_SYNTHESIZED` and `DISTILLATION_CAPTURED` from `events.consolidation`, `MEMORY_DIVERSITY_RERANKED`, `MEMORY_DIVERSITY_RERANK_FAILED`, and `MEMORY_REFORMULATION_ROUND` from `events.memory`, `NOTIFICATION_DISPATCHED` and `NOTIFICATION_DISPATCH_FAILED` from `events.notification`, `QUALITY_STEP_CLASSIFIED` from `events.quality`, `HEALTH_TICKET_EMITTED` from `events.health`, `TRAJECTORY_SCORING_START` from `events.trajectory`, `COORD_METRICS_AMDAHL_COMPUTED` from `events.coordination_metrics`, `COORDINATION_STARTED`, `COORDINATION_COMPLETED`, `COORDINATION_FAILED`, `COORDINATION_PHASE_STARTED`, `COORDINATION_PHASE_COMPLETED`, `COORDINATION_PHASE_FAILED`, `COORDINATION_WAVE_STARTED`, `COORDINATION_WAVE_COMPLETED`, `COORDINATION_TOPOLOGY_RESOLVED`, `COORDINATION_CLEANUP_STARTED`, `COORDINATION_CLEANUP_COMPLETED`, `COORDINATION_CLEANUP_FAILED`, `COORDINATION_WAVE_BUILT`, `COORDINATION_FACTORY_BUILT`, and `COORDINATION_ATTRIBUTION_BUILT` from `events.coordination`, `WEB_REQUEST_START` and `WEB_SSRF_BLOCKED` from `events.web`, `DB_QUERY_START` and `DB_WRITE_BLOCKED` from `events.database`, `TERMINAL_COMMAND_START` and `TERMINAL_COMMAND_BLOCKED` from `events.terminal`, `SUB_CONSTRAINT_RESOLVED` and `SUB_CONSTRAINT_DENIED` from `events.sub_constraint`, `VERSION_SAVED` and `VERSION_SNAPSHOT_FAILED` from `events.versioning`, `ANALYTICS_AGGREGATION_COMPUTED` and `ANALYTICS_RETRY_RATE_ALERT` from `events.analytics`, `CALL_CLASSIFICATION_COMPUTED` from `events.call_classification`, `QUOTA_THRESHOLD_ALERT` and `QUOTA_POLL_FAILED` from `events.quota`, `CONFLICT_DEBATE_EVALUATOR_FAILED` from `events.conflict`, `DELEGATION_LOOP_CIRCUIT_BACKOFF` and `DELEGATION_LOOP_CIRCUIT_PERSIST_FAILED` from `events.delegation`, `MEETING_EVENT_COOLDOWN_SKIPPED` and `MEETING_TASKS_CAPPED` from `events.meeting`, `PERSISTENCE_CIRCUIT_BREAKER_SAVED`, `PERSISTENCE_CIRCUIT_BREAKER_SAVE_FAILED`, `PERSISTENCE_CIRCUIT_BREAKER_LOADED`, `PERSISTENCE_CIRCUIT_BREAKER_LOAD_FAILED`, `PERSISTENCE_CIRCUIT_BREAKER_DELETED`, and `PERSISTENCE_CIRCUIT_BREAKER_DELETE_FAILED` from `events.persistence`, `METRICS_SCRAPE_COMPLETED`, `METRICS_SCRAPE_FAILED`, `METRICS_COLLECTOR_INITIALIZED`, `METRICS_COORDINATION_RECORDED`, `METRICS_OTLP_EXPORT_COMPLETED` and `METRICS_OTLP_FLUSHER_STOPPED` from `events.metrics`, `EXECUTION_PROJECT_VALIDATION_FAILED` from `events.execution`, `ORG_MEMORY_QUERY_START`, `ORG_MEMORY_QUERY_COMPLETE`, `ORG_MEMORY_QUERY_FAILED`, `ORG_MEMORY_WRITE_START`, `ORG_MEMORY_WRITE_COMPLETE`, `ORG_MEMORY_WRITE_DENIED`, `ORG_MEMORY_WRITE_FAILED`, `ORG_MEMORY_POLICIES_LISTED`, `ORG_MEMORY_BACKEND_CREATED`, `ORG_MEMORY_CONNECT_FAILED`, `ORG_MEMORY_DISCONNECT_FAILED`, `ORG_MEMORY_NOT_CONNECTED`, `ORG_MEMORY_ROW_PARSE_FAILED`, `ORG_MEMORY_CONFIG_INVALID`, `ORG_MEMORY_MODEL_INVALID`, `ORG_MEMORY_MVCC_PUBLISH_APPENDED`, `ORG_MEMORY_MVCC_RETRACT_APPENDED`, `ORG_MEMORY_MVCC_SNAPSHOT_AT_QUERIED`, and `ORG_MEMORY_MVCC_LOG_QUERIED` from `events.org_memory`). Each domain has its own module -- see `src/synthorg/observability/events/` for the full inventory of constants. Import directly: `from synthorg.observability.events. import EVENT_CONSTANT` +- **Event names**: always use constants from the domain-specific module under `synthorg.observability.events` (e.g., `API_REQUEST_STARTED` from `events.api`, `TOOL_INVOKE_START` from `events.tool`, `GIT_COMMAND_START` from `events.git`, `CONTEXT_BUDGET_FILL_UPDATED`, `CONTEXT_BUDGET_COMPACTION_STARTED`, `CONTEXT_BUDGET_COMPACTION_COMPLETED`, `CONTEXT_BUDGET_COMPACTION_FAILED`, `CONTEXT_BUDGET_COMPACTION_SKIPPED`, `CONTEXT_BUDGET_COMPACTION_FALLBACK`, `CONTEXT_BUDGET_INDICATOR_INJECTED`, `CONTEXT_BUDGET_AGENT_COMPACTION_REQUESTED`, `CONTEXT_BUDGET_EPISTEMIC_MARKERS_PRESERVED` from `events.context_budget`, `BACKUP_STARTED` from `events.backup`, `SETUP_COMPLETED` from `events.setup`, `ROUTING_CANDIDATE_SELECTED` from `events.routing`, `SHIPPING_HTTP_BATCH_SENT` from `events.shipping`, `EVAL_REPORT_COMPUTED` from `events.evaluation`, `PROMPT_PROFILE_SELECTED` from `events.prompt`, `PROCEDURAL_MEMORY_START` from `events.procedural_memory`, `PERF_LLM_JUDGE_STARTED` from `events.performance`, `TASK_ENGINE_OBSERVER_FAILED` from `events.task_engine`, `TASK_ASSIGNMENT_PROJECT_FILTERED` and `TASK_ASSIGNMENT_PROJECT_NO_ELIGIBLE` from `events.task_assignment`, `EXECUTION_SHUTDOWN_IMMEDIATE_CANCEL`, `EXECUTION_SHUTDOWN_TOOL_WAIT`, `EXECUTION_SHUTDOWN_CHECKPOINT_SAVE`, `EXECUTION_SHUTDOWN_CHECKPOINT_FAILED`, and `EXECUTION_PROJECT_VALIDATION_FAILED` from `events.execution`, `WORKFLOW_EXEC_COMPLETED` from `events.workflow_execution`, `BLUEPRINT_INSTANTIATE_START` from `events.blueprint`, `WORKFLOW_DEF_ROLLED_BACK` from `events.workflow_definition`, `WORKFLOW_VERSION_SAVED` from `events.workflow_version`, `MEMORY_FINE_TUNE_STARTED`, `MEMORY_SELF_EDIT_TOOL_EXECUTE`, `MEMORY_SELF_EDIT_CORE_READ`, `MEMORY_SELF_EDIT_CORE_WRITE`, `MEMORY_SELF_EDIT_CORE_WRITE_REJECTED`, `MEMORY_SELF_EDIT_ARCHIVAL_SEARCH`, `MEMORY_SELF_EDIT_ARCHIVAL_WRITE`, `MEMORY_SELF_EDIT_RECALL_READ`, `MEMORY_SELF_EDIT_RECALL_WRITE`, `MEMORY_SELF_EDIT_WRITE_FAILED` from `events.memory`, `REPORTING_GENERATION_STARTED` from `events.reporting`, `RISK_BUDGET_SCORE_COMPUTED` from `events.risk_budget`, `BUDGET_PROJECT_COST_QUERIED`, `BUDGET_PROJECT_RECORDS_QUERIED`, `BUDGET_PROJECT_BUDGET_EXCEEDED`, and `BUDGET_PROJECT_ENFORCEMENT_CHECK` from `events.budget`, `LLM_STRATEGY_SYNTHESIZED` and `DISTILLATION_CAPTURED` from `events.consolidation`, `MEMORY_DIVERSITY_RERANKED`, `MEMORY_DIVERSITY_RERANK_FAILED`, and `MEMORY_REFORMULATION_ROUND` from `events.memory`, `NOTIFICATION_DISPATCHED` and `NOTIFICATION_DISPATCH_FAILED` from `events.notification`, `QUALITY_STEP_CLASSIFIED` from `events.quality`, `HEALTH_TICKET_EMITTED` from `events.health`, `TRAJECTORY_SCORING_START` from `events.trajectory`, `COORD_METRICS_AMDAHL_COMPUTED` from `events.coordination_metrics`, `COORDINATION_STARTED`, `COORDINATION_COMPLETED`, `COORDINATION_FAILED`, `COORDINATION_PHASE_STARTED`, `COORDINATION_PHASE_COMPLETED`, `COORDINATION_PHASE_FAILED`, `COORDINATION_WAVE_STARTED`, `COORDINATION_WAVE_COMPLETED`, `COORDINATION_TOPOLOGY_RESOLVED`, `COORDINATION_CLEANUP_STARTED`, `COORDINATION_CLEANUP_COMPLETED`, `COORDINATION_CLEANUP_FAILED`, `COORDINATION_WAVE_BUILT`, `COORDINATION_FACTORY_BUILT`, and `COORDINATION_ATTRIBUTION_BUILT` from `events.coordination`, `WEB_REQUEST_START` and `WEB_SSRF_BLOCKED` from `events.web`, `DB_QUERY_START` and `DB_WRITE_BLOCKED` from `events.database`, `TERMINAL_COMMAND_START` and `TERMINAL_COMMAND_BLOCKED` from `events.terminal`, `SUB_CONSTRAINT_RESOLVED` and `SUB_CONSTRAINT_DENIED` from `events.sub_constraint`, `VERSION_SAVED` and `VERSION_SNAPSHOT_FAILED` from `events.versioning`, `ANALYTICS_AGGREGATION_COMPUTED` and `ANALYTICS_RETRY_RATE_ALERT` from `events.analytics`, `CALL_CLASSIFICATION_COMPUTED` from `events.call_classification`, `QUOTA_THRESHOLD_ALERT` and `QUOTA_POLL_FAILED` from `events.quota`, `CONFLICT_DEBATE_EVALUATOR_FAILED` from `events.conflict`, `DELEGATION_LOOP_CIRCUIT_BACKOFF` and `DELEGATION_LOOP_CIRCUIT_PERSIST_FAILED` from `events.delegation`, `MEETING_EVENT_COOLDOWN_SKIPPED` and `MEETING_TASKS_CAPPED` from `events.meeting`, `PERSISTENCE_CIRCUIT_BREAKER_SAVED`, `PERSISTENCE_CIRCUIT_BREAKER_SAVE_FAILED`, `PERSISTENCE_CIRCUIT_BREAKER_LOADED`, `PERSISTENCE_CIRCUIT_BREAKER_LOAD_FAILED`, `PERSISTENCE_CIRCUIT_BREAKER_DELETED`, and `PERSISTENCE_CIRCUIT_BREAKER_DELETE_FAILED` from `events.persistence`, `METRICS_SCRAPE_COMPLETED`, `METRICS_SCRAPE_FAILED`, `METRICS_COLLECTOR_INITIALIZED`, `METRICS_COORDINATION_RECORDED`, `METRICS_OTLP_EXPORT_COMPLETED` and `METRICS_OTLP_FLUSHER_STOPPED` from `events.metrics`, `ORG_MEMORY_QUERY_START`, `ORG_MEMORY_QUERY_COMPLETE`, `ORG_MEMORY_QUERY_FAILED`, `ORG_MEMORY_WRITE_START`, `ORG_MEMORY_WRITE_COMPLETE`, `ORG_MEMORY_WRITE_DENIED`, `ORG_MEMORY_WRITE_FAILED`, `ORG_MEMORY_POLICIES_LISTED`, `ORG_MEMORY_BACKEND_CREATED`, `ORG_MEMORY_CONNECT_FAILED`, `ORG_MEMORY_DISCONNECT_FAILED`, `ORG_MEMORY_NOT_CONNECTED`, `ORG_MEMORY_ROW_PARSE_FAILED`, `ORG_MEMORY_CONFIG_INVALID`, `ORG_MEMORY_MODEL_INVALID`, `ORG_MEMORY_MVCC_PUBLISH_APPENDED`, `ORG_MEMORY_MVCC_RETRACT_APPENDED`, `ORG_MEMORY_MVCC_SNAPSHOT_AT_QUERIED`, and `ORG_MEMORY_MVCC_LOG_QUERIED` from `events.org_memory`). Each domain has its own module -- see `src/synthorg/observability/events/` for the full inventory of constants. Import directly: `from synthorg.observability.events. import EVENT_CONSTANT` - **Structured kwargs**: always `logger.info(EVENT, key=value)` -- never `logger.info("msg %s", val)` - **All error paths** must log at WARNING or ERROR with context before raising - **All state transitions** must log at INFO diff --git a/docs/design/engine.md b/docs/design/engine.md index 1062048283..f449c6f4be 100644 --- a/docs/design/engine.md +++ b/docs/design/engine.md @@ -25,11 +25,13 @@ stateDiagram-v2 ASSIGNED --> BLOCKED : blocked ASSIGNED --> CANCELLED : cancelled ASSIGNED --> INTERRUPTED : shutdown signal + ASSIGNED --> SUSPENDED : checkpoint shutdown IN_PROGRESS --> IN_REVIEW : agent done IN_PROGRESS --> FAILED : runtime crash IN_PROGRESS --> CANCELLED : cancelled IN_PROGRESS --> INTERRUPTED : shutdown signal + IN_PROGRESS --> SUSPENDED : checkpoint shutdown IN_REVIEW --> COMPLETED : approved IN_REVIEW --> IN_PROGRESS : rework @@ -40,18 +42,22 @@ stateDiagram-v2 INTERRUPTED --> ASSIGNED : reassign on restart + SUSPENDED --> ASSIGNED : resume from checkpoint + COMPLETED --> [*] CANCELLED --> [*] ``` !!! info "Non-terminal states" - `BLOCKED`, `FAILED`, and `INTERRUPTED` are non-terminal: + `BLOCKED`, `FAILED`, `INTERRUPTED`, and `SUSPENDED` are non-terminal: - **BLOCKED** returns to `ASSIGNED` when unblocked. - **FAILED** returns to `ASSIGNED` for retry when `retry_count < max_retries` (see [Crash Recovery](#agent-crash-recovery)). - **INTERRUPTED** returns to `ASSIGNED` on restart (see [Graceful Shutdown](#graceful-shutdown-protocol)). + - **SUSPENDED** returns to `ASSIGNED` for resume from checkpoint + (see [Graceful Shutdown](#graceful-shutdown-protocol), Strategy 4). - **COMPLETED** and **CANCELLED** are the only terminal states with no outgoing transitions. @@ -140,7 +146,7 @@ Backlog | Ready | In Progress | Review | Done The `KanbanColumn` enum defines five columns that map bidirectionally to `TaskStatus` (Backlog=CREATED, Ready=ASSIGNED, In Progress=IN_PROGRESS, Review=IN_REVIEW, Done=COMPLETED). Off-board statuses (BLOCKED, FAILED, -INTERRUPTED, CANCELLED) map to `None`. `KanbanConfig` provides per-column +INTERRUPTED, SUSPENDED, CANCELLED) map to `None`. `KanbanConfig` provides per-column WIP limits with strict (hard-reject) or advisory (log-warning) enforcement. Column transitions are validated independently and resolved to the underlying task status transition path. @@ -751,8 +757,9 @@ async run( ``DecisionRecord.metadata`` field (best-effort; lookup failure is logged at WARNING and the decision record is still written). See ``docs/design/agents.md`` for the full design. - - `SHUTDOWN` termination: current status -> INTERRUPTED - (see [Graceful Shutdown](#graceful-shutdown-protocol)). + - `SHUTDOWN` termination: current status -> INTERRUPTED (or SUSPENDED + if the checkpoint strategy successfully checkpointed the task; + see [Graceful Shutdown](#graceful-shutdown-protocol)). - `ERROR` termination: recovery strategy is applied (default `FailAndReassignStrategy` transitions to FAILED; see [Crash Recovery](#agent-crash-recovery)). @@ -1147,15 +1154,16 @@ The engine sets a shutdown event, stops accepting new tasks, and gives in-flight agents a grace period to finish their current turn. Agents check the shutdown event at turn boundaries (between LLM calls, before tool invocations) and exit cooperatively. After the grace period, remaining agents are force-cancelled. -**All tasks terminated by shutdown -- whether they exited cooperatively or were -force-cancelled -- are marked `INTERRUPTED`** by the engine layer. +**All tasks terminated by this strategy -- whether they exited cooperatively or +were force-cancelled -- are marked `INTERRUPTED`** by the engine layer. +(Strategy 4 uses `SUSPENDED` for successfully checkpointed tasks instead; +see [Strategy 4](#strategy-4-checkpoint-and-stop).) ```yaml graceful_shutdown: strategy: "cooperative_timeout" # cooperative_timeout, immediate, finish_tool, checkpoint - cooperative_timeout: - grace_seconds: 30 # time for agents to finish cooperatively - cleanup_seconds: 5 # time for final cleanup (persist cost records, close connections) + grace_seconds: 30 # time for agents to finish cooperatively + cleanup_seconds: 5 # time for final cleanup (persist cost records, close connections) ``` On shutdown signal: @@ -1188,25 +1196,56 @@ On shutdown signal: minimum an input-cost audit record. Streaming calls are charged only for tokens sent before disconnect. -### Future Strategies +### Strategy 2: Immediate Cancel + +All agent tasks are cancelled immediately via `task.cancel()` with no grace +period. Fastest shutdown but highest data loss -- partial tool side effects, +billed-but-lost LLM responses. Tasks are marked `INTERRUPTED`. + +```yaml +graceful_shutdown: + strategy: "immediate" + cleanup_seconds: 5 +``` + +### Strategy 3: Finish Current Tool + +Like cooperative timeout, but uses a per-tool timeout (default 60s) to allow +the current tool invocation to complete. The execution loop finishes the +current tool before checking shutdown at turn boundaries; this strategy +gives a longer window for that. Tasks that exceed the tool timeout are +force-cancelled and marked `INTERRUPTED`. -Strategy 2: Immediate Cancel -: All agent tasks are cancelled immediately via `task.cancel()`. Fastest - shutdown but highest data loss -- partial tool side effects, billed-but-lost - LLM responses. - -Strategy 3: Finish Current Tool -: Like cooperative timeout, but waits for the current tool invocation to - complete even if it exceeds the grace period. Needs per-tool timeout as a - backstop for long-running sandboxed execution. - -Strategy 4: Checkpoint and Stop -: On shutdown signal, each agent persists its full `AgentContext` snapshot and - transitions to `INTERRUPTED`. On restart, the engine loads checkpoints and - resumes execution. This naturally extends - [Checkpoint Recovery](#agent-crash-recovery) -- the only difference is - whether the checkpoint was written proactively (graceful shutdown) or loaded - from the last turn (crash recovery). +```yaml +graceful_shutdown: + strategy: "finish_tool" + tool_timeout_seconds: 60 + cleanup_seconds: 5 +``` + +### Strategy 4: Checkpoint and Stop + +On shutdown signal, agents checkpoint cooperatively during the grace period. +Stragglers are checkpointed via a `checkpoint_saver` callback, then cancelled. +Successfully checkpointed tasks transition to `SUSPENDED` (not `INTERRUPTED`); +failed checkpoints fall back to `INTERRUPTED`. On restart, the engine loads +checkpoints and resumes execution from the exact point of interruption. This +naturally extends [Checkpoint Recovery](#agent-crash-recovery) -- the only +difference is whether the checkpoint was written proactively (graceful +shutdown) or loaded from the last turn (crash recovery). + +!!! info "SUSPENDED vs INTERRUPTED" + `SUSPENDED` indicates the task was checkpointed before stop and can resume + from the exact point of interruption. `INTERRUPTED` indicates the task was + stopped without a checkpoint and requires full reassignment. Both are + non-terminal: `SUSPENDED -> ASSIGNED`, `INTERRUPTED -> ASSIGNED`. + +```yaml +graceful_shutdown: + strategy: "checkpoint" + grace_seconds: 30 + cleanup_seconds: 5 +``` --- diff --git a/src/synthorg/config/schema.py b/src/synthorg/config/schema.py index 0c6ae34f8a..bef9eae91a 100644 --- a/src/synthorg/config/schema.py +++ b/src/synthorg/config/schema.py @@ -1,7 +1,7 @@ """Root configuration schema and config-level Pydantic models.""" from collections import Counter -from typing import Any, ClassVar, Self +from typing import Any, ClassVar, Literal, Self from pydantic import AwareDatetime, BaseModel, ConfigDict, Field, model_validator @@ -441,16 +441,21 @@ class GracefulShutdownConfig(BaseModel): """Configuration for graceful shutdown behaviour. Attributes: - strategy: Shutdown strategy name (e.g. ``"cooperative_timeout"``). + strategy: Shutdown strategy name (``"cooperative_timeout"``, + ``"immediate"``, ``"finish_tool"``, or ``"checkpoint"``). grace_seconds: Seconds to wait for cooperative agent exit before force-cancelling. cleanup_seconds: Seconds allowed for cleanup callbacks (persist costs, close connections, flush logs). + tool_timeout_seconds: Per-tool timeout for the + ``"finish_tool"`` strategy (seconds). """ model_config = ConfigDict(frozen=True, allow_inf_nan=False) - strategy: NotBlankStr = Field( + strategy: Literal[ + "cooperative_timeout", "immediate", "finish_tool", "checkpoint" + ] = Field( default="cooperative_timeout", description="Shutdown strategy name", ) @@ -466,6 +471,12 @@ class GracefulShutdownConfig(BaseModel): le=60, description="Seconds allowed for cleanup callbacks", ) + tool_timeout_seconds: float = Field( + default=60.0, + gt=0, + le=300, + description="Per-tool timeout for finish_tool strategy", + ) class TaskAssignmentConfig(BaseModel): diff --git a/src/synthorg/core/enums.py b/src/synthorg/core/enums.py index 8784e4e1bd..27986bcd8f 100644 --- a/src/synthorg/core/enums.py +++ b/src/synthorg/core/enums.py @@ -231,14 +231,17 @@ class TaskStatus(StrEnum): Summary for quick reference: CREATED -> ASSIGNED - ASSIGNED -> IN_PROGRESS | BLOCKED | CANCELLED | FAILED | INTERRUPTED - IN_PROGRESS -> IN_REVIEW | BLOCKED | CANCELLED | FAILED | INTERRUPTED + ASSIGNED -> IN_PROGRESS | BLOCKED | CANCELLED | FAILED + | INTERRUPTED | SUSPENDED + IN_PROGRESS -> IN_REVIEW | BLOCKED | CANCELLED | FAILED + | INTERRUPTED | SUSPENDED IN_REVIEW -> COMPLETED | IN_PROGRESS (rework) | BLOCKED | CANCELLED BLOCKED -> ASSIGNED (unblocked) FAILED -> ASSIGNED (reassignment for retry) INTERRUPTED -> ASSIGNED (reassignment on restart) COMPLETED and CANCELLED are terminal states. - FAILED and INTERRUPTED are non-terminal (can be reassigned). + FAILED, INTERRUPTED, and SUSPENDED are non-terminal (can be reassigned). + SUSPENDED -> ASSIGNED (resume from checkpoint) """ CREATED = "created" @@ -249,6 +252,7 @@ class TaskStatus(StrEnum): BLOCKED = "blocked" FAILED = "failed" INTERRUPTED = "interrupted" + SUSPENDED = "suspended" CANCELLED = "cancelled" diff --git a/src/synthorg/core/task_transitions.py b/src/synthorg/core/task_transitions.py index 400df3de17..451cedb4fa 100644 --- a/src/synthorg/core/task_transitions.py +++ b/src/synthorg/core/task_transitions.py @@ -2,18 +2,20 @@ Defines the valid state transitions for the task lifecycle, based on the Engine design page, extended with BLOCKED, CANCELLED, -FAILED, and INTERRUPTED transitions for completeness:: +FAILED, INTERRUPTED, and SUSPENDED transitions for completeness:: CREATED -> ASSIGNED - ASSIGNED -> IN_PROGRESS | BLOCKED | CANCELLED | FAILED | INTERRUPTED - IN_PROGRESS -> IN_REVIEW | BLOCKED | CANCELLED | FAILED | INTERRUPTED + ASSIGNED -> IN_PROGRESS | BLOCKED | CANCELLED | FAILED | INTERRUPTED | SUSPENDED + IN_PROGRESS -> IN_REVIEW | BLOCKED | CANCELLED | FAILED | INTERRUPTED | SUSPENDED IN_REVIEW -> COMPLETED | IN_PROGRESS (rework) | BLOCKED | CANCELLED BLOCKED -> ASSIGNED (unblocked) FAILED -> ASSIGNED (reassignment for retry) INTERRUPTED -> ASSIGNED (reassignment on restart) + SUSPENDED -> ASSIGNED (resume from checkpoint) COMPLETED and CANCELLED are terminal states with no outgoing -transitions. FAILED and INTERRUPTED are non-terminal (can be reassigned). +transitions. FAILED, INTERRUPTED, and SUSPENDED are non-terminal +(can be reassigned). """ from synthorg.core.enums import TaskStatus @@ -34,6 +36,7 @@ TaskStatus.CANCELLED, TaskStatus.FAILED, TaskStatus.INTERRUPTED, + TaskStatus.SUSPENDED, } ), TaskStatus.IN_PROGRESS: frozenset( @@ -43,6 +46,7 @@ TaskStatus.CANCELLED, TaskStatus.FAILED, TaskStatus.INTERRUPTED, + TaskStatus.SUSPENDED, } ), TaskStatus.IN_REVIEW: frozenset( @@ -56,6 +60,7 @@ TaskStatus.BLOCKED: frozenset({TaskStatus.ASSIGNED}), TaskStatus.FAILED: frozenset({TaskStatus.ASSIGNED}), # reassignment TaskStatus.INTERRUPTED: frozenset({TaskStatus.ASSIGNED}), # reassignment on restart + TaskStatus.SUSPENDED: frozenset({TaskStatus.ASSIGNED}), # resume from checkpoint TaskStatus.COMPLETED: frozenset(), # terminal TaskStatus.CANCELLED: frozenset(), # terminal } diff --git a/src/synthorg/engine/__init__.py b/src/synthorg/engine/__init__.py index 99adc57b50..5adaec29ac 100644 --- a/src/synthorg/engine/__init__.py +++ b/src/synthorg/engine/__init__.py @@ -167,12 +167,19 @@ ) from synthorg.engine.run_result import AgentRunResult from synthorg.engine.shutdown import ( + CheckpointSaver, CleanupCallback, CooperativeTimeoutStrategy, ShutdownManager, ShutdownResult, ShutdownStrategy, ) +from synthorg.engine.shutdown_strategies import ( + CheckpointAndStopStrategy, + FinishCurrentToolStrategy, + ImmediateCancelStrategy, + build_shutdown_strategy, +) from synthorg.engine.stagnation import ( StagnationConfig, StagnationDetector, @@ -246,9 +253,11 @@ "CancelTaskMutation", "CentralizedDispatcher", "Checkpoint", + "CheckpointAndStopStrategy", "CheckpointCallback", "CheckpointConfig", "CheckpointRecoveryStrategy", + "CheckpointSaver", "ClassificationResult", "CleanupCallback", "ContextDependentDispatcher", @@ -286,10 +295,12 @@ "ExecutionResult", "ExecutionStateError", "FailAndReassignStrategy", + "FinishCurrentToolStrategy", "Heartbeat", "HierarchicalAssignmentStrategy", "HybridLoop", "HybridLoopConfig", + "ImmediateCancelStrategy", "InMemoryResourceLock", "LlmDecompositionConfig", "LlmDecompositionStrategy", @@ -382,6 +393,7 @@ "add_token_usage", "build_execution_loop", "build_execution_waves", + "build_shutdown_strategy", "build_strategy_map", "build_system_prompt", "classify_execution_errors", diff --git a/src/synthorg/engine/assignment/service.py b/src/synthorg/engine/assignment/service.py index 89756b83c2..859349c1b5 100644 --- a/src/synthorg/engine/assignment/service.py +++ b/src/synthorg/engine/assignment/service.py @@ -28,11 +28,16 @@ logger = get_logger(__name__) -# Tasks in CREATED, FAILED, or INTERRUPTED can be assigned directly. -# BLOCKED tasks must first be unblocked (transition to ASSIGNED via -# the task lifecycle), so they are not directly assignable. +# Tasks in CREATED, FAILED, INTERRUPTED, or SUSPENDED can be assigned +# directly. BLOCKED tasks must first be unblocked (transition to +# ASSIGNED via the task lifecycle), so they are not directly assignable. _ASSIGNABLE_STATUSES = frozenset( - {TaskStatus.CREATED, TaskStatus.FAILED, TaskStatus.INTERRUPTED}, + { + TaskStatus.CREATED, + TaskStatus.FAILED, + TaskStatus.INTERRUPTED, + TaskStatus.SUSPENDED, + }, ) diff --git a/src/synthorg/engine/decomposition/models.py b/src/synthorg/engine/decomposition/models.py index 5ea089e5dc..da9c617c87 100644 --- a/src/synthorg/engine/decomposition/models.py +++ b/src/synthorg/engine/decomposition/models.py @@ -179,11 +179,11 @@ def _validate_plan_task_consistency(self) -> Self: class SubtaskStatusRollup(BaseModel): """Aggregated status of subtasks for a parent task. - Tracks five explicit statuses: COMPLETED, FAILED, IN_PROGRESS, - BLOCKED, and CANCELLED. Other statuses (CREATED, ASSIGNED, - IN_REVIEW, INTERRUPTED) are not individually tracked; the gap - between the sum of tracked counts and ``total`` accounts for - these. The ``derived_parent_status`` treats any such remainder + Tracks six explicit statuses: COMPLETED, FAILED, IN_PROGRESS, + BLOCKED, CANCELLED, and SUSPENDED. Other statuses (CREATED, + ASSIGNED, IN_REVIEW, INTERRUPTED) are not individually tracked; + the gap between the sum of tracked counts and ``total`` accounts + for these. The ``derived_parent_status`` treats any such remainder as work still pending (IN_PROGRESS). When all subtasks are in terminal states but with a mix of @@ -198,6 +198,7 @@ class SubtaskStatusRollup(BaseModel): in_progress: Count of IN_PROGRESS subtasks. blocked: Count of BLOCKED subtasks. cancelled: Count of CANCELLED subtasks. + suspended: Count of SUSPENDED subtasks. """ model_config = ConfigDict(frozen=True, allow_inf_nan=False) @@ -209,6 +210,7 @@ class SubtaskStatusRollup(BaseModel): in_progress: int = Field(ge=0, description="In-progress subtasks") blocked: int = Field(ge=0, description="Blocked subtasks") cancelled: int = Field(ge=0, description="Cancelled subtasks") + suspended: int = Field(ge=0, default=0, description="Suspended subtasks") @model_validator(mode="after") def _validate_counts(self) -> Self: @@ -219,6 +221,7 @@ def _validate_counts(self) -> Self: + self.in_progress + self.blocked + self.cancelled + + self.suspended ) if counted > self.total: msg = "Sum of status counts exceeds total" @@ -249,6 +252,9 @@ def derived_parent_status(self) -> TaskStatus: # noqa: PLR0911 if self.blocked > 0: return TaskStatus.BLOCKED + if self.suspended > 0: + return TaskStatus.SUSPENDED + # All subtasks in terminal states but mixed completed + cancelled # -- not fully completed (pure completed already handled above), # and not fully cancelled (pure cancelled already handled above). diff --git a/src/synthorg/engine/decomposition/rollup.py b/src/synthorg/engine/decomposition/rollup.py index 4fa815178f..405245bc73 100644 --- a/src/synthorg/engine/decomposition/rollup.py +++ b/src/synthorg/engine/decomposition/rollup.py @@ -63,6 +63,7 @@ def compute( in_progress = subtask_statuses.count(TaskStatus.IN_PROGRESS) blocked = subtask_statuses.count(TaskStatus.BLOCKED) cancelled = subtask_statuses.count(TaskStatus.CANCELLED) + suspended = subtask_statuses.count(TaskStatus.SUSPENDED) rollup = SubtaskStatusRollup( parent_task_id=parent_task_id, @@ -72,6 +73,7 @@ def compute( in_progress=in_progress, blocked=blocked, cancelled=cancelled, + suspended=suspended, ) logger.debug( diff --git a/src/synthorg/engine/shutdown.py b/src/synthorg/engine/shutdown.py index 8cf6f91829..7cf82ed5ae 100644 --- a/src/synthorg/engine/shutdown.py +++ b/src/synthorg/engine/shutdown.py @@ -1,13 +1,15 @@ """Graceful shutdown strategy and manager. Implements the Graceful Shutdown section of the Engine design page -- -cooperative timeout strategy for clean process shutdown. -When SIGINT/SIGTERM is received the framework signals -agents to exit at turn boundaries, waits a grace period, force-cancels -stragglers, and runs cleanup callbacks. The *engine* layer is responsible -for transitioning tasks to INTERRUPTED (see ``AgentEngine``). - -The ``ShutdownStrategy`` protocol is pluggable for future strategies. +pluggable shutdown strategies for clean process shutdown. +When SIGINT/SIGTERM is received the framework signals agents to exit at +turn boundaries, waits a grace period, force-cancels stragglers, and +runs cleanup callbacks. The *engine* layer is responsible for +transitioning tasks to INTERRUPTED or SUSPENDED (see ``AgentEngine``). + +Four strategies are provided: cooperative timeout, immediate cancel, +finish current tool, and checkpoint-and-stop. The +``ShutdownStrategy`` protocol is pluggable for custom strategies. """ import asyncio @@ -41,6 +43,9 @@ CleanupCallback = Callable[[], Coroutine[Any, Any, None]] """Async callback invoked during shutdown cleanup phase.""" +CheckpointSaver = Callable[[str], Coroutine[Any, Any, bool]] +"""Async callback: given task_id, save checkpoint. Returns True on success.""" + class ShutdownResult(BaseModel): """Outcome of a graceful shutdown sequence. @@ -49,6 +54,8 @@ class ShutdownResult(BaseModel): strategy_type: Name of the strategy that executed the shutdown. tasks_interrupted: Number of tasks that were force-cancelled. tasks_completed: Number of tasks that exited cooperatively. + tasks_suspended: Number of tasks checkpointed before stop + (checkpoint strategy only, default 0). cleanup_completed: Whether all cleanup callbacks finished within the allowed time. duration_seconds: Wall-clock duration of the entire shutdown. @@ -70,6 +77,13 @@ class ShutdownResult(BaseModel): ge=0, description="Number of tasks that exited cooperatively", ) + tasks_suspended: int = Field( + ge=0, + default=0, + description=( + "Number of tasks checkpointed before stop (checkpoint strategy only)" + ), + ) cleanup_completed: bool = Field( description="Whether all cleanup callbacks finished in time", ) @@ -172,7 +186,7 @@ async def execute_shutdown( running_tasks, ) - cleanup_completed = await self._run_cleanup(cleanup_callbacks) + cleanup_completed = await _run_cleanup(cleanup_callbacks, self._cleanup_seconds) duration = time.monotonic() - start result = ShutdownResult( @@ -225,7 +239,7 @@ async def _wait_and_cancel( if exc is not None: logger.warning( EXECUTION_SHUTDOWN_TASK_ERROR, - error=(f"Task raised during shutdown: {type(exc).__name__}"), + error=f"Task raised during shutdown: {type(exc).__name__}: {exc}", ) else: tasks_completed += 1 @@ -243,87 +257,89 @@ async def _wait_and_cancel( pending, timeout=self._CANCEL_PROPAGATION_TIMEOUT, ) - self._log_post_cancel_exceptions(cancel_done) + _log_post_cancel_exceptions(cancel_done) return tasks_completed, len(pending) - def _log_post_cancel_exceptions( - self, - tasks: set[asyncio.Task[Any]], - ) -> None: - """Retrieve and log exceptions from post-cancel tasks. - Retrieving the exception prevents asyncio's "Task exception was - never retrieved" warning. Non-cancelled tasks with exceptions - are logged at DEBUG. - """ - for task in tasks: - if task.cancelled(): - continue - try: - exc = task.exception() - except asyncio.InvalidStateError: +# ── Shared helpers ─────────────────────────────────────────────── + + +def _log_post_cancel_exceptions(tasks: set[asyncio.Task[Any]]) -> None: + """Retrieve and log exceptions from post-cancel tasks. + + Retrieving the exception prevents asyncio's "Task exception was + never retrieved" warning. Non-cancelled tasks with exceptions + are logged at DEBUG. + """ + for task in tasks: + if task.cancelled(): + continue + try: + exc = task.exception() + except asyncio.InvalidStateError: + logger.debug( + EXECUTION_SHUTDOWN_TASK_ERROR, + error="Failed to inspect post-cancel task: InvalidStateError", + task_name=task.get_name(), + ) + else: + if exc is not None: logger.debug( EXECUTION_SHUTDOWN_TASK_ERROR, - error="Failed to inspect post-cancel task: InvalidStateError", + error=(f"Post-cancel task exception: {type(exc).__name__}: {exc}"), task_name=task.get_name(), ) - else: - if exc is not None: - logger.debug( - EXECUTION_SHUTDOWN_TASK_ERROR, - error=( - f"Post-cancel task exception: {type(exc).__name__}: {exc}" - ), - task_name=task.get_name(), - ) - async def _run_cleanup( - self, - callbacks: Sequence[CleanupCallback], - ) -> bool: - """Run cleanup callbacks sequentially within the time budget. - Returns: - ``True`` if all callbacks completed successfully within the - time budget, ``False`` otherwise. - """ - if not callbacks: - return True +async def _run_cleanup( + callbacks: Sequence[CleanupCallback], + cleanup_seconds: float, +) -> bool: + """Run cleanup callbacks sequentially within the time budget. - logger.info( - EXECUTION_SHUTDOWN_CLEANUP, - callback_count=len(callbacks), - cleanup_seconds=self._cleanup_seconds, - ) + Returns: + ``True`` if all callbacks completed successfully, ``False`` + otherwise. + """ + if not callbacks: + return True - all_succeeded = True + logger.info( + EXECUTION_SHUTDOWN_CLEANUP, + callback_count=len(callbacks), + cleanup_seconds=cleanup_seconds, + ) - async def _run_all() -> None: - nonlocal all_succeeded - for i, callback in enumerate(callbacks): - try: - await callback() - except Exception: - all_succeeded = False - logger.exception( - EXECUTION_SHUTDOWN_CLEANUP_FAILED, - callback_index=i, - callback_count=len(callbacks), - ) + all_succeeded = True - try: - await asyncio.wait_for( - _run_all(), - timeout=self._cleanup_seconds, - ) - except TimeoutError: - logger.warning( - EXECUTION_SHUTDOWN_CLEANUP_TIMEOUT, - cleanup_seconds=self._cleanup_seconds, - ) - return False - return all_succeeded + async def _run_all() -> None: + nonlocal all_succeeded + for i, callback in enumerate(callbacks): + try: + await callback() + except asyncio.CancelledError: + raise + except Exception: + all_succeeded = False + logger.exception( + EXECUTION_SHUTDOWN_CLEANUP_FAILED, + callback_index=i, + callback_count=len(callbacks), + ) + + try: + await asyncio.wait_for( + _run_all(), + timeout=cleanup_seconds, + ) + except TimeoutError: + logger.warning( + EXECUTION_SHUTDOWN_CLEANUP_TIMEOUT, + cleanup_seconds=cleanup_seconds, + ) + return False + return all_succeeded class ShutdownManager: diff --git a/src/synthorg/engine/shutdown_strategies.py b/src/synthorg/engine/shutdown_strategies.py new file mode 100644 index 0000000000..46e47ce799 --- /dev/null +++ b/src/synthorg/engine/shutdown_strategies.py @@ -0,0 +1,602 @@ +"""Additional shutdown strategy implementations. + +Provides ``ImmediateCancelStrategy``, ``FinishCurrentToolStrategy``, +``CheckpointAndStopStrategy``, and the ``build_shutdown_strategy`` +factory. All satisfy the ``ShutdownStrategy`` protocol defined in +``synthorg.engine.shutdown``. +""" + +import asyncio +import time +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from collections.abc import Callable, Mapping, Sequence + + from synthorg.config.schema import GracefulShutdownConfig + +from synthorg.engine.shutdown import ( + CheckpointSaver, + CleanupCallback, + CooperativeTimeoutStrategy, + ShutdownResult, + ShutdownStrategy, + _log_post_cancel_exceptions, + _run_cleanup, +) +from synthorg.observability import get_logger +from synthorg.observability.events.execution import ( + EXECUTION_SHUTDOWN_CHECKPOINT_FAILED, + EXECUTION_SHUTDOWN_CHECKPOINT_SAVE, + EXECUTION_SHUTDOWN_COMPLETE, + EXECUTION_SHUTDOWN_FORCE_CANCEL, + EXECUTION_SHUTDOWN_GRACE_START, + EXECUTION_SHUTDOWN_IMMEDIATE_CANCEL, + EXECUTION_SHUTDOWN_TASK_ERROR, + EXECUTION_SHUTDOWN_TOOL_WAIT, +) + +logger = get_logger(__name__) + + +# ── Shared helpers ─────────────────────────────────────────────── + + +def _count_cooperative_exits( + done: set[asyncio.Task[Any]], +) -> tuple[int, int]: + """Count tasks that exited cooperatively and those that errored. + + Tasks that raised exceptions are logged at WARNING. + + Args: + done: Set of completed asyncio tasks. + + Returns: + Tuple of (completed_count, errored_count). + """ + completed = 0 + errored = 0 + for task in done: + if task.cancelled(): + continue + exc = task.exception() + if exc is not None: + errored += 1 + logger.warning( + EXECUTION_SHUTDOWN_TASK_ERROR, + error=(f"Task raised during shutdown: {type(exc).__name__}: {exc}"), + ) + else: + completed += 1 + return completed, errored + + +# ── Strategy implementations ──────────────────────────────────── + + +class ImmediateCancelStrategy: + """Immediate cancel shutdown strategy. + + Force-cancel all agent tasks immediately with no grace period. + Fastest shutdown but highest data loss -- partial tool side effects, + billed-but-lost LLM responses. + """ + + def __init__(self, *, cleanup_seconds: float = 5.0) -> None: + if cleanup_seconds <= 0: + msg = f"cleanup_seconds must be positive, got {cleanup_seconds}" + logger.warning( + EXECUTION_SHUTDOWN_TASK_ERROR, + error=msg, + param="cleanup_seconds", + value=cleanup_seconds, + ) + raise ValueError(msg) + self._cleanup_seconds = cleanup_seconds + self._shutdown_event = asyncio.Event() + + def request_shutdown(self) -> None: + """Signal that a graceful shutdown has been requested.""" + self._shutdown_event.set() + + def is_shutting_down(self) -> bool: + """Return ``True`` when shutdown has been requested.""" + return self._shutdown_event.is_set() + + def get_strategy_type(self) -> str: + """Return the strategy identifier.""" + return "immediate" + + _CANCEL_PROPAGATION_TIMEOUT: float = 5.0 + + async def execute_shutdown( + self, + *, + running_tasks: Mapping[str, asyncio.Task[Any]], + cleanup_callbacks: Sequence[CleanupCallback], + ) -> ShutdownResult: + """Cancel all tasks immediately, then run cleanup.""" + start = time.monotonic() + self._shutdown_event.set() + + task_set = set(running_tasks.values()) + tasks_interrupted = len(task_set) + + if task_set: + logger.info( + EXECUTION_SHUTDOWN_IMMEDIATE_CANCEL, + running_tasks=tasks_interrupted, + ) + for task in task_set: + task.cancel() + cancel_done, _ = await asyncio.wait( + task_set, + timeout=self._CANCEL_PROPAGATION_TIMEOUT, + ) + _log_post_cancel_exceptions(cancel_done) + + cleanup_completed = await _run_cleanup( + cleanup_callbacks, + self._cleanup_seconds, + ) + + result = ShutdownResult( + strategy_type=self.get_strategy_type(), + tasks_interrupted=tasks_interrupted, + tasks_completed=0, + cleanup_completed=cleanup_completed, + duration_seconds=time.monotonic() - start, + ) + logger.info( + EXECUTION_SHUTDOWN_COMPLETE, + strategy=result.strategy_type, + tasks_interrupted=result.tasks_interrupted, + tasks_completed=result.tasks_completed, + cleanup_completed=result.cleanup_completed, + duration_seconds=result.duration_seconds, + ) + return result + + +class FinishCurrentToolStrategy: + """Finish current tool shutdown strategy. + + Like cooperative timeout, but uses a per-tool timeout (default 60s) + to allow the current tool invocation to complete. The execution + loop already finishes the current tool before checking shutdown at + turn boundaries; this strategy gives a longer window for that. + """ + + def __init__( + self, + *, + tool_timeout_seconds: float = 60.0, + cleanup_seconds: float = 5.0, + ) -> None: + if tool_timeout_seconds <= 0: + msg = f"tool_timeout_seconds must be positive, got {tool_timeout_seconds}" + logger.warning( + EXECUTION_SHUTDOWN_TASK_ERROR, + error=msg, + param="tool_timeout_seconds", + value=tool_timeout_seconds, + ) + raise ValueError(msg) + if cleanup_seconds <= 0: + msg = f"cleanup_seconds must be positive, got {cleanup_seconds}" + logger.warning( + EXECUTION_SHUTDOWN_TASK_ERROR, + error=msg, + param="cleanup_seconds", + value=cleanup_seconds, + ) + raise ValueError(msg) + self._tool_timeout_seconds = tool_timeout_seconds + self._cleanup_seconds = cleanup_seconds + self._shutdown_event = asyncio.Event() + + def request_shutdown(self) -> None: + """Signal that a graceful shutdown has been requested.""" + self._shutdown_event.set() + + def is_shutting_down(self) -> bool: + """Return ``True`` when shutdown has been requested.""" + return self._shutdown_event.is_set() + + def get_strategy_type(self) -> str: + """Return the strategy identifier.""" + return "finish_tool" + + _CANCEL_PROPAGATION_TIMEOUT: float = 5.0 + + async def execute_shutdown( + self, + *, + running_tasks: Mapping[str, asyncio.Task[Any]], + cleanup_callbacks: Sequence[CleanupCallback], + ) -> ShutdownResult: + """Wait for current tool, then cancel stragglers.""" + start = time.monotonic() + self._shutdown_event.set() + + logger.info( + EXECUTION_SHUTDOWN_TOOL_WAIT, + tool_timeout_seconds=self._tool_timeout_seconds, + running_tasks=len(running_tasks), + ) + + if not running_tasks: + cleanup_completed = await _run_cleanup( + cleanup_callbacks, + self._cleanup_seconds, + ) + result = ShutdownResult( + strategy_type=self.get_strategy_type(), + tasks_interrupted=0, + tasks_completed=0, + cleanup_completed=cleanup_completed, + duration_seconds=time.monotonic() - start, + ) + logger.info( + EXECUTION_SHUTDOWN_COMPLETE, + strategy=result.strategy_type, + tasks_interrupted=0, + tasks_completed=0, + cleanup_completed=result.cleanup_completed, + duration_seconds=result.duration_seconds, + ) + return result + + task_set = set(running_tasks.values()) + done, pending = await asyncio.wait( + task_set, + timeout=self._tool_timeout_seconds, + ) + + tasks_completed, tasks_errored = _count_cooperative_exits(done) + + # Force-cancel stragglers. + if pending: + logger.warning( + EXECUTION_SHUTDOWN_FORCE_CANCEL, + pending_tasks=len(pending), + ) + for task in pending: + task.cancel() + cancel_done, _ = await asyncio.wait( + pending, + timeout=self._CANCEL_PROPAGATION_TIMEOUT, + ) + _log_post_cancel_exceptions(cancel_done) + + cleanup_completed = await _run_cleanup( + cleanup_callbacks, + self._cleanup_seconds, + ) + + result = ShutdownResult( + strategy_type=self.get_strategy_type(), + tasks_interrupted=len(pending) + tasks_errored, + tasks_completed=tasks_completed, + cleanup_completed=cleanup_completed, + duration_seconds=time.monotonic() - start, + ) + logger.info( + EXECUTION_SHUTDOWN_COMPLETE, + strategy=result.strategy_type, + tasks_interrupted=result.tasks_interrupted, + tasks_completed=result.tasks_completed, + cleanup_completed=result.cleanup_completed, + duration_seconds=result.duration_seconds, + ) + return result + + +class CheckpointAndStopStrategy: + """Checkpoint and stop shutdown strategy. + + On shutdown signal, agents checkpoint cooperatively during the + grace period. Stragglers are checkpointed via the + ``checkpoint_saver`` callback (if provided), then cancelled. + Tasks that are successfully checkpointed are reported as + ``tasks_suspended``; those that fail checkpoint or have no saver + are reported as ``tasks_interrupted``. + """ + + _CANCEL_PROPAGATION_TIMEOUT: float = 5.0 + _CHECKPOINT_TIMEOUT: float = 30.0 + + def __init__( + self, + *, + grace_seconds: float = 30.0, + cleanup_seconds: float = 5.0, + checkpoint_saver: CheckpointSaver | None = None, + ) -> None: + if grace_seconds <= 0: + msg = f"grace_seconds must be positive, got {grace_seconds}" + logger.warning( + EXECUTION_SHUTDOWN_TASK_ERROR, + error=msg, + param="grace_seconds", + value=grace_seconds, + ) + raise ValueError(msg) + if cleanup_seconds <= 0: + msg = f"cleanup_seconds must be positive, got {cleanup_seconds}" + logger.warning( + EXECUTION_SHUTDOWN_TASK_ERROR, + error=msg, + param="cleanup_seconds", + value=cleanup_seconds, + ) + raise ValueError(msg) + self._grace_seconds = grace_seconds + self._cleanup_seconds = cleanup_seconds + self._checkpoint_saver = checkpoint_saver + self._shutdown_event = asyncio.Event() + + def request_shutdown(self) -> None: + """Signal that a graceful shutdown has been requested.""" + self._shutdown_event.set() + + def is_shutting_down(self) -> bool: + """Return ``True`` when shutdown has been requested.""" + return self._shutdown_event.is_set() + + def get_strategy_type(self) -> str: + """Return the strategy identifier.""" + return "checkpoint" + + async def execute_shutdown( + self, + *, + running_tasks: Mapping[str, asyncio.Task[Any]], + cleanup_callbacks: Sequence[CleanupCallback], + ) -> ShutdownResult: + """Checkpoint tasks, then stop.""" + start = time.monotonic() + self._shutdown_event.set() + + logger.info( + EXECUTION_SHUTDOWN_GRACE_START, + grace_seconds=self._grace_seconds, + running_tasks=len(running_tasks), + ) + + if not running_tasks: + cleanup_completed = await _run_cleanup( + cleanup_callbacks, + self._cleanup_seconds, + ) + result = ShutdownResult( + strategy_type=self.get_strategy_type(), + tasks_interrupted=0, + tasks_completed=0, + tasks_suspended=0, + cleanup_completed=cleanup_completed, + duration_seconds=time.monotonic() - start, + ) + logger.info( + EXECUTION_SHUTDOWN_COMPLETE, + strategy=result.strategy_type, + tasks_suspended=0, + tasks_interrupted=0, + cleanup_completed=result.cleanup_completed, + duration_seconds=result.duration_seconds, + ) + return result + + task_set = set(running_tasks.values()) + done, pending = await asyncio.wait( + task_set, + timeout=self._grace_seconds, + ) + + # Cooperative exits counted as suspended; errored tasks + # are counted as interrupted (they need attention on restart). + tasks_suspended, tasks_errored = _count_cooperative_exits(done) + + # Checkpoint and cancel stragglers. + ( + straggler_suspended, + tasks_interrupted, + ) = await self._checkpoint_and_cancel_pending( + pending, + running_tasks, + ) + tasks_suspended += straggler_suspended + tasks_interrupted += tasks_errored + + cleanup_completed = await _run_cleanup( + cleanup_callbacks, + self._cleanup_seconds, + ) + + result = ShutdownResult( + strategy_type=self.get_strategy_type(), + tasks_interrupted=tasks_interrupted, + tasks_completed=0, + tasks_suspended=tasks_suspended, + cleanup_completed=cleanup_completed, + duration_seconds=time.monotonic() - start, + ) + logger.info( + EXECUTION_SHUTDOWN_COMPLETE, + strategy=result.strategy_type, + tasks_suspended=result.tasks_suspended, + tasks_interrupted=result.tasks_interrupted, + cleanup_completed=result.cleanup_completed, + duration_seconds=result.duration_seconds, + ) + return result + + async def _checkpoint_and_cancel_pending( + self, + pending: set[asyncio.Task[Any]], + running_tasks: Mapping[str, asyncio.Task[Any]], + ) -> tuple[int, int]: + """Checkpoint straggler tasks concurrently, then cancel. + + Uses ``asyncio.TaskGroup`` to fan out checkpoint attempts + for all stragglers in parallel. + + Returns: + Tuple of (tasks_suspended, tasks_interrupted). + """ + if not pending: + return 0, 0 + + task_to_id = {t: tid for tid, t in running_tasks.items()} + tasks_suspended = 0 + tasks_interrupted = 0 + + # Identify tasks with valid IDs vs unknown. + checkpointable: list[tuple[asyncio.Task[Any], str]] = [] + for task in pending: + task_id = task_to_id.get(task) + if task_id is None: + logger.warning( + EXECUTION_SHUTDOWN_TASK_ERROR, + error="Task not found in reverse map during checkpoint", + ) + tasks_interrupted += 1 + task.cancel() + else: + checkpointable.append((task, task_id)) + + # Fan out checkpoint attempts concurrently. + if checkpointable: + + async def _checkpoint_one(tid: str) -> bool: + return await self._try_checkpoint(tid) + + async with asyncio.TaskGroup() as tg: + checkpoint_tasks = [ + tg.create_task(_checkpoint_one(tid)) for _, tid in checkpointable + ] + + for (task, _), ct in zip( + checkpointable, + checkpoint_tasks, + strict=True, + ): + saved = ct.result() + if saved: + tasks_suspended += 1 + else: + tasks_interrupted += 1 + task.cancel() + + # Wait for cancellation to propagate. + cancel_done, _ = await asyncio.wait( + pending, + timeout=self._CANCEL_PROPAGATION_TIMEOUT, + ) + _log_post_cancel_exceptions(cancel_done) + + return tasks_suspended, tasks_interrupted + + async def _try_checkpoint(self, task_id: str) -> bool: + """Attempt to save a checkpoint for the given task. + + The saver call is bounded by ``_CHECKPOINT_TIMEOUT`` to + prevent hangs from blocking shutdown indefinitely. + + Returns: + ``True`` if checkpoint was saved, ``False`` otherwise. + """ + if self._checkpoint_saver is None: + return False + try: + saved = await asyncio.wait_for( + self._checkpoint_saver(task_id), + timeout=self._CHECKPOINT_TIMEOUT, + ) + except TimeoutError: + logger.warning( + EXECUTION_SHUTDOWN_CHECKPOINT_FAILED, + task_id=task_id, + reason="checkpoint timed out", + ) + return False + except MemoryError, RecursionError: + raise + except Exception as exc: + logger.exception( + EXECUTION_SHUTDOWN_CHECKPOINT_FAILED, + task_id=task_id, + error_type=type(exc).__name__, + ) + return False + if saved: + logger.info( + EXECUTION_SHUTDOWN_CHECKPOINT_SAVE, + task_id=task_id, + ) + else: + logger.warning( + EXECUTION_SHUTDOWN_CHECKPOINT_FAILED, + task_id=task_id, + reason="saver returned False", + ) + return saved + + +# ── Factory ────────────────────────────────────────────────────── + + +def build_shutdown_strategy( + config: GracefulShutdownConfig, + *, + checkpoint_saver: CheckpointSaver | None = None, +) -> ShutdownStrategy: + """Build a shutdown strategy from configuration. + + Args: + config: Shutdown configuration with strategy name and params. + checkpoint_saver: Optional checkpoint callback for the + ``"checkpoint"`` strategy. + + Returns: + Configured shutdown strategy instance. + + Raises: + ValueError: If ``config.strategy`` is not a known strategy + name. + """ + strategies: dict[str, Callable[[], ShutdownStrategy]] = { + "cooperative_timeout": lambda: CooperativeTimeoutStrategy( + grace_seconds=config.grace_seconds, + cleanup_seconds=config.cleanup_seconds, + ), + "immediate": lambda: ImmediateCancelStrategy( + cleanup_seconds=config.cleanup_seconds, + ), + "finish_tool": lambda: FinishCurrentToolStrategy( + tool_timeout_seconds=config.tool_timeout_seconds, + cleanup_seconds=config.cleanup_seconds, + ), + "checkpoint": lambda: CheckpointAndStopStrategy( + grace_seconds=config.grace_seconds, + cleanup_seconds=config.cleanup_seconds, + checkpoint_saver=checkpoint_saver, + ), + } + + builder = strategies.get(config.strategy) + if builder is None: + msg = ( + f"Unknown shutdown strategy: {config.strategy!r}. " + f"Known strategies: {sorted(strategies)}" + ) + logger.warning( + EXECUTION_SHUTDOWN_TASK_ERROR, + error=msg, + strategy=config.strategy, + known_strategies=sorted(strategies), + ) + raise ValueError(msg) + + return builder() diff --git a/src/synthorg/engine/workflow/kanban_columns.py b/src/synthorg/engine/workflow/kanban_columns.py index 7cde349fdc..926f945936 100644 --- a/src/synthorg/engine/workflow/kanban_columns.py +++ b/src/synthorg/engine/workflow/kanban_columns.py @@ -51,8 +51,8 @@ class KanbanColumn(StrEnum): ) ) -# Off-board statuses (BLOCKED, FAILED, INTERRUPTED, CANCELLED) map to -# None -- temporarily or permanently removed from the board. +# Off-board statuses (BLOCKED, FAILED, INTERRUPTED, SUSPENDED, CANCELLED) +# map to None -- temporarily or permanently removed from the board. STATUS_TO_COLUMN: MappingProxyType[TaskStatus, KanbanColumn | None] = MappingProxyType( { TaskStatus.CREATED: KanbanColumn.BACKLOG, @@ -63,6 +63,7 @@ class KanbanColumn(StrEnum): TaskStatus.BLOCKED: None, TaskStatus.FAILED: None, TaskStatus.INTERRUPTED: None, + TaskStatus.SUSPENDED: None, TaskStatus.CANCELLED: None, } ) diff --git a/src/synthorg/observability/events/execution.py b/src/synthorg/observability/events/execution.py index 7f5c722869..a75812d4c1 100644 --- a/src/synthorg/observability/events/execution.py +++ b/src/synthorg/observability/events/execution.py @@ -48,6 +48,12 @@ EXECUTION_SHUTDOWN_CLEANUP_FAILED: Final[str] = "execution.shutdown.cleanup.failed" EXECUTION_SHUTDOWN_CLEANUP_TIMEOUT: Final[str] = "execution.shutdown.cleanup.timeout" EXECUTION_SHUTDOWN_COMPLETE: Final[str] = "execution.shutdown.complete" +EXECUTION_SHUTDOWN_IMMEDIATE_CANCEL: Final[str] = "execution.shutdown.immediate_cancel" +EXECUTION_SHUTDOWN_TOOL_WAIT: Final[str] = "execution.shutdown.tool_wait" +EXECUTION_SHUTDOWN_CHECKPOINT_SAVE: Final[str] = "execution.shutdown.checkpoint_save" +EXECUTION_SHUTDOWN_CHECKPOINT_FAILED: Final[str] = ( + "execution.shutdown.checkpoint_failed" +) EXECUTION_LOOP_SHUTDOWN: Final[str] = "execution.loop.shutdown" EXECUTION_PLAN_CREATED: Final[str] = "execution.plan.created" diff --git a/tests/unit/core/test_enums.py b/tests/unit/core/test_enums.py index 91e2559ffc..07856bffb4 100644 --- a/tests/unit/core/test_enums.py +++ b/tests/unit/core/test_enums.py @@ -65,8 +65,8 @@ def test_proficiency_level_has_4_members(self) -> None: def test_department_name_has_9_members(self) -> None: assert len(DepartmentName) == 9 - def test_task_status_has_9_members(self) -> None: - assert len(TaskStatus) == 9 + def test_task_status_has_10_members(self) -> None: + assert len(TaskStatus) == 10 def test_task_type_has_6_members(self) -> None: assert len(TaskType) == 6 @@ -157,6 +157,7 @@ def test_task_status_values(self) -> None: assert TaskStatus.FAILED.value == "failed" assert TaskStatus.CANCELLED.value == "cancelled" assert TaskStatus.INTERRUPTED.value == "interrupted" + assert TaskStatus.SUSPENDED.value == "suspended" def test_task_type_values(self) -> None: assert TaskType.DEVELOPMENT.value == "development" diff --git a/tests/unit/core/test_task_transitions.py b/tests/unit/core/test_task_transitions.py index 6ceab24e21..0f39622b87 100644 --- a/tests/unit/core/test_task_transitions.py +++ b/tests/unit/core/test_task_transitions.py @@ -34,9 +34,12 @@ class TestValidTransitions: (TaskStatus.IN_REVIEW, TaskStatus.CANCELLED), (TaskStatus.ASSIGNED, TaskStatus.INTERRUPTED), (TaskStatus.IN_PROGRESS, TaskStatus.INTERRUPTED), + (TaskStatus.ASSIGNED, TaskStatus.SUSPENDED), + (TaskStatus.IN_PROGRESS, TaskStatus.SUSPENDED), (TaskStatus.BLOCKED, TaskStatus.ASSIGNED), (TaskStatus.FAILED, TaskStatus.ASSIGNED), (TaskStatus.INTERRUPTED, TaskStatus.ASSIGNED), + (TaskStatus.SUSPENDED, TaskStatus.ASSIGNED), ], ids=lambda p: p.value if isinstance(p, TaskStatus) else str(p), ) @@ -64,6 +67,10 @@ class TestInvalidTransitions: (TaskStatus.FAILED, TaskStatus.IN_PROGRESS), (TaskStatus.INTERRUPTED, TaskStatus.COMPLETED), (TaskStatus.INTERRUPTED, TaskStatus.IN_PROGRESS), + (TaskStatus.SUSPENDED, TaskStatus.COMPLETED), + (TaskStatus.SUSPENDED, TaskStatus.IN_PROGRESS), + (TaskStatus.CREATED, TaskStatus.SUSPENDED), + (TaskStatus.IN_REVIEW, TaskStatus.SUSPENDED), ], ids=lambda p: p.value if isinstance(p, TaskStatus) else str(p), ) @@ -119,6 +126,16 @@ def test_interrupted_is_non_terminal(self) -> None: """INTERRUPTED has outgoing transitions (reassignment on restart).""" assert len(VALID_TRANSITIONS[TaskStatus.INTERRUPTED]) > 0 + def test_suspended_is_non_terminal(self) -> None: + """SUSPENDED has outgoing transitions (resume from checkpoint).""" + assert len(VALID_TRANSITIONS[TaskStatus.SUSPENDED]) > 0 + + def test_suspended_has_single_outgoing(self) -> None: + """SUSPENDED can only transition to ASSIGNED (resume).""" + assert VALID_TRANSITIONS[TaskStatus.SUSPENDED] == frozenset( + {TaskStatus.ASSIGNED} + ) + def test_all_targets_are_valid_statuses(self) -> None: """Every target in the transition map must be a valid TaskStatus.""" for source, targets in VALID_TRANSITIONS.items(): diff --git a/tests/unit/engine/test_assignment_service.py b/tests/unit/engine/test_assignment_service.py index c435905c38..9f89030599 100644 --- a/tests/unit/engine/test_assignment_service.py +++ b/tests/unit/engine/test_assignment_service.py @@ -93,14 +93,15 @@ def test_delegates_to_strategy(self) -> None: TaskStatus.CREATED, TaskStatus.FAILED, TaskStatus.INTERRUPTED, + TaskStatus.SUSPENDED, ], - ids=["created", "failed", "interrupted"], + ids=["created", "failed", "interrupted", "suspended"], ) def test_accepts_assignable_statuses( self, status: TaskStatus, ) -> None: - """Service accepts CREATED, FAILED, and INTERRUPTED tasks.""" + """Service accepts CREATED, FAILED, INTERRUPTED, and SUSPENDED tasks.""" scorer = AgentTaskScorer() strategy = RoleBasedAssignmentStrategy(scorer) service = TaskAssignmentService(strategy) diff --git a/tests/unit/engine/test_shutdown.py b/tests/unit/engine/test_shutdown.py index 1215a29891..734bcda7fb 100644 --- a/tests/unit/engine/test_shutdown.py +++ b/tests/unit/engine/test_shutdown.py @@ -10,10 +10,18 @@ from synthorg.config.schema import GracefulShutdownConfig from synthorg.engine.shutdown import ( + CheckpointSaver, CooperativeTimeoutStrategy, ShutdownManager, ShutdownResult, ShutdownStrategy, + _log_post_cancel_exceptions, +) +from synthorg.engine.shutdown_strategies import ( + CheckpointAndStopStrategy, + FinishCurrentToolStrategy, + ImmediateCancelStrategy, + build_shutdown_strategy, ) # ── Protocol compliance ────────────────────────────────────────── @@ -408,7 +416,35 @@ def test_zero_grace_seconds_rejected(self) -> None: def test_blank_strategy_rejected(self) -> None: with pytest.raises(ValidationError): - GracefulShutdownConfig(strategy=" ") + GracefulShutdownConfig(strategy=" ") # type: ignore[arg-type] + + @pytest.mark.parametrize( + ("value", "expected"), + [ + (None, 60.0), + (300, 300.0), + ], + ids=["default", "at-max"], + ) + def test_tool_timeout_valid( + self, + value: float | None, + expected: float, + ) -> None: + if value is None: + config = GracefulShutdownConfig() + else: + config = GracefulShutdownConfig(tool_timeout_seconds=value) + assert config.tool_timeout_seconds == expected + + @pytest.mark.parametrize( + "value", + [0, 301], + ids=["zero", "above-max"], + ) + def test_tool_timeout_rejected(self, value: float) -> None: + with pytest.raises(ValidationError): + GracefulShutdownConfig(tool_timeout_seconds=value) # ── _log_post_cancel_exceptions ─────────────────────────────────── @@ -419,40 +455,33 @@ class TestLogPostCancelExceptions: """Extracted helper retrieves exceptions without swallowing them.""" def test_skips_cancelled_tasks(self) -> None: - strategy = CooperativeTimeoutStrategy() task = MagicMock(spec=asyncio.Task) task.cancelled.return_value = True - # Should not call task.exception() - strategy._log_post_cancel_exceptions({task}) + _log_post_cancel_exceptions({task}) task.exception.assert_not_called() def test_logs_task_exception(self) -> None: - strategy = CooperativeTimeoutStrategy() task = MagicMock(spec=asyncio.Task) task.cancelled.return_value = False task.exception.return_value = RuntimeError("boom") task.get_name.return_value = "test-task" - # Should not raise - strategy._log_post_cancel_exceptions({task}) + _log_post_cancel_exceptions({task}) task.exception.assert_called_once() def test_handles_no_exception(self) -> None: - strategy = CooperativeTimeoutStrategy() task = MagicMock(spec=asyncio.Task) task.cancelled.return_value = False task.exception.return_value = None task.get_name.return_value = "test-task" - strategy._log_post_cancel_exceptions({task}) + _log_post_cancel_exceptions({task}) task.exception.assert_called_once() def test_handles_invalid_state_error(self) -> None: - strategy = CooperativeTimeoutStrategy() task = MagicMock(spec=asyncio.Task) task.cancelled.return_value = False task.exception.side_effect = asyncio.InvalidStateError task.get_name.return_value = "test-task" - # Should not raise -- logs at DEBUG instead of silent pass - strategy._log_post_cancel_exceptions({task}) + _log_post_cancel_exceptions({task}) # ── Signal handler recovery ────────────────────────────────────── @@ -499,3 +528,484 @@ def test_handle_signal_threadsafe_no_loop_stderr_also_fails(self) -> None: mock_stderr.write.side_effect = OSError("stderr closed") # Should not raise even when stderr fails manager._handle_signal_threadsafe(signal.SIGINT.value, None) + + +# ── ShutdownResult.tasks_suspended ────────────────────────────── + + +@pytest.mark.unit +class TestShutdownResultSuspendedField: + """tasks_suspended field backward compatibility and validation.""" + + def test_default_tasks_suspended_is_zero(self) -> None: + result = ShutdownResult( + strategy_type="cooperative_timeout", + tasks_interrupted=0, + tasks_completed=1, + cleanup_completed=True, + duration_seconds=0.5, + ) + assert result.tasks_suspended == 0 + + def test_tasks_suspended_set(self) -> None: + result = ShutdownResult( + strategy_type="checkpoint", + tasks_interrupted=0, + tasks_completed=0, + tasks_suspended=3, + cleanup_completed=True, + duration_seconds=1.0, + ) + assert result.tasks_suspended == 3 + + def test_tasks_suspended_frozen(self) -> None: + result = ShutdownResult( + strategy_type="checkpoint", + tasks_interrupted=0, + tasks_completed=0, + tasks_suspended=1, + cleanup_completed=True, + duration_seconds=0.1, + ) + with pytest.raises(ValidationError, match="frozen"): + result.tasks_suspended = 5 # type: ignore[misc] + + +# ── ImmediateCancelStrategy ───────────────────────────────────── + + +@pytest.mark.unit +class TestImmediateCancelProtocol: + """ImmediateCancelStrategy satisfies ShutdownStrategy protocol.""" + + def test_is_runtime_checkable(self) -> None: + strategy = ImmediateCancelStrategy() + assert isinstance(strategy, ShutdownStrategy) + + def test_not_shutting_down_initially(self) -> None: + strategy = ImmediateCancelStrategy() + assert strategy.is_shutting_down() is False + + def test_request_sets_shutting_down(self) -> None: + strategy = ImmediateCancelStrategy() + strategy.request_shutdown() + assert strategy.is_shutting_down() is True + + def test_get_strategy_type(self) -> None: + strategy = ImmediateCancelStrategy() + assert strategy.get_strategy_type() == "immediate" + + +@pytest.mark.unit +class TestImmediateCancelExecute: + """All tasks force-cancelled immediately, no grace period.""" + + async def test_all_tasks_force_cancelled(self) -> None: + strategy = ImmediateCancelStrategy() + + async def task_a() -> None: + await asyncio.Event().wait() + + async def task_b() -> None: + await asyncio.Event().wait() + + t1 = asyncio.create_task(task_a()) + t2 = asyncio.create_task(task_b()) + result = await strategy.execute_shutdown( + running_tasks={"t1": t1, "t2": t2}, + cleanup_callbacks=[], + ) + + assert result.tasks_completed == 0 + assert result.tasks_interrupted == 2 + assert result.strategy_type == "immediate" + + async def test_no_tasks(self) -> None: + strategy = ImmediateCancelStrategy() + result = await strategy.execute_shutdown( + running_tasks={}, + cleanup_callbacks=[], + ) + assert result.tasks_completed == 0 + assert result.tasks_interrupted == 0 + + async def test_cleanup_runs(self) -> None: + strategy = ImmediateCancelStrategy(cleanup_seconds=5.0) + ran = [] + + async def cb() -> None: + ran.append("done") + + result = await strategy.execute_shutdown( + running_tasks={}, + cleanup_callbacks=[cb], + ) + assert ran == ["done"] + assert result.cleanup_completed is True + + +@pytest.mark.unit +class TestImmediateCancelValidation: + """Constructor rejects non-positive values.""" + + def test_invalid_cleanup_seconds(self) -> None: + with pytest.raises(ValueError, match="cleanup_seconds must be positive"): + ImmediateCancelStrategy(cleanup_seconds=0) + + def test_negative_cleanup_seconds(self) -> None: + with pytest.raises(ValueError, match="cleanup_seconds must be positive"): + ImmediateCancelStrategy(cleanup_seconds=-1.0) + + +# ── FinishCurrentToolStrategy ─────────────────────────────────── + + +@pytest.mark.unit +class TestFinishCurrentToolProtocol: + """FinishCurrentToolStrategy satisfies ShutdownStrategy protocol.""" + + def test_is_runtime_checkable(self) -> None: + strategy = FinishCurrentToolStrategy() + assert isinstance(strategy, ShutdownStrategy) + + def test_not_shutting_down_initially(self) -> None: + strategy = FinishCurrentToolStrategy() + assert strategy.is_shutting_down() is False + + def test_request_sets_shutting_down(self) -> None: + strategy = FinishCurrentToolStrategy() + strategy.request_shutdown() + assert strategy.is_shutting_down() is True + + def test_get_strategy_type(self) -> None: + strategy = FinishCurrentToolStrategy() + assert strategy.get_strategy_type() == "finish_tool" + + +@pytest.mark.unit +class TestFinishCurrentToolExecute: + """Tasks wait for tool completion with per-tool timeout.""" + + async def test_cooperative_tasks_within_timeout(self) -> None: + strategy = FinishCurrentToolStrategy(tool_timeout_seconds=5.0) + shutdown_event = strategy._shutdown_event + + async def tool_task() -> None: + await shutdown_event.wait() + + task = asyncio.create_task(tool_task()) + result = await strategy.execute_shutdown( + running_tasks={"t1": task}, + cleanup_callbacks=[], + ) + + assert result.tasks_completed == 1 + assert result.tasks_interrupted == 0 + + async def test_stragglers_cancelled_after_timeout(self) -> None: + strategy = FinishCurrentToolStrategy(tool_timeout_seconds=0.1) + + async def long_tool() -> None: + await asyncio.Event().wait() + + task = asyncio.create_task(long_tool()) + result = await strategy.execute_shutdown( + running_tasks={"t1": task}, + cleanup_callbacks=[], + ) + + assert result.tasks_completed == 0 + assert result.tasks_interrupted == 1 + + async def test_cleanup_runs(self) -> None: + strategy = FinishCurrentToolStrategy(cleanup_seconds=5.0) + ran = [] + + async def cb() -> None: + ran.append("done") + + result = await strategy.execute_shutdown( + running_tasks={}, + cleanup_callbacks=[cb], + ) + assert ran == ["done"] + assert result.cleanup_completed is True + + async def test_empty_tasks(self) -> None: + strategy = FinishCurrentToolStrategy() + result = await strategy.execute_shutdown( + running_tasks={}, + cleanup_callbacks=[], + ) + assert result.tasks_completed == 0 + assert result.tasks_interrupted == 0 + + +@pytest.mark.unit +class TestFinishCurrentToolValidation: + """Constructor rejects non-positive values.""" + + def test_invalid_tool_timeout(self) -> None: + with pytest.raises(ValueError, match="tool_timeout_seconds must be positive"): + FinishCurrentToolStrategy(tool_timeout_seconds=0) + + def test_invalid_cleanup_seconds(self) -> None: + with pytest.raises(ValueError, match="cleanup_seconds must be positive"): + FinishCurrentToolStrategy(cleanup_seconds=-1.0) + + +# ── CheckpointAndStopStrategy ────────────────────────────────── + + +@pytest.mark.unit +class TestCheckpointAndStopProtocol: + """CheckpointAndStopStrategy satisfies ShutdownStrategy protocol.""" + + def test_is_runtime_checkable(self) -> None: + strategy = CheckpointAndStopStrategy() + assert isinstance(strategy, ShutdownStrategy) + + def test_not_shutting_down_initially(self) -> None: + strategy = CheckpointAndStopStrategy() + assert strategy.is_shutting_down() is False + + def test_request_sets_shutting_down(self) -> None: + strategy = CheckpointAndStopStrategy() + strategy.request_shutdown() + assert strategy.is_shutting_down() is True + + def test_get_strategy_type(self) -> None: + strategy = CheckpointAndStopStrategy() + assert strategy.get_strategy_type() == "checkpoint" + + +@pytest.mark.unit +class TestCheckpointAndStopExecute: + """Checkpoint-based shutdown with tasks_suspended tracking.""" + + async def test_cooperative_exit_counted_as_suspended(self) -> None: + strategy = CheckpointAndStopStrategy(grace_seconds=5.0) + shutdown_event = strategy._shutdown_event + + async def cooperative_task() -> None: + await shutdown_event.wait() + + task = asyncio.create_task(cooperative_task()) + result = await strategy.execute_shutdown( + running_tasks={"t1": task}, + cleanup_callbacks=[], + ) + + assert result.tasks_suspended == 1 + assert result.tasks_completed == 0 + assert result.tasks_interrupted == 0 + + async def test_straggler_checkpointed_then_cancelled(self) -> None: + strategy = CheckpointAndStopStrategy( + grace_seconds=0.1, + checkpoint_saver=_make_saver(success=True), + ) + + async def stubborn() -> None: + await asyncio.Event().wait() + + task = asyncio.create_task(stubborn()) + result = await strategy.execute_shutdown( + running_tasks={"t1": task}, + cleanup_callbacks=[], + ) + + assert result.tasks_suspended == 1 + assert result.tasks_interrupted == 0 + + async def test_straggler_checkpoint_fails_counted_as_interrupted(self) -> None: + strategy = CheckpointAndStopStrategy( + grace_seconds=0.1, + checkpoint_saver=_make_saver(success=False), + ) + + async def stubborn() -> None: + await asyncio.Event().wait() + + task = asyncio.create_task(stubborn()) + result = await strategy.execute_shutdown( + running_tasks={"t1": task}, + cleanup_callbacks=[], + ) + + assert result.tasks_suspended == 0 + assert result.tasks_interrupted == 1 + + async def test_no_checkpoint_saver_stragglers_interrupted(self) -> None: + strategy = CheckpointAndStopStrategy(grace_seconds=0.1) + + async def stubborn() -> None: + await asyncio.Event().wait() + + task = asyncio.create_task(stubborn()) + result = await strategy.execute_shutdown( + running_tasks={"t1": task}, + cleanup_callbacks=[], + ) + + assert result.tasks_suspended == 0 + assert result.tasks_interrupted == 1 + + async def test_checkpoint_saver_raises_counted_as_interrupted(self) -> None: + async def failing_saver(task_id: str) -> bool: + msg = "Storage unavailable" + raise OSError(msg) + + strategy = CheckpointAndStopStrategy( + grace_seconds=0.1, + checkpoint_saver=failing_saver, + ) + + async def stubborn() -> None: + await asyncio.Event().wait() + + task = asyncio.create_task(stubborn()) + result = await strategy.execute_shutdown( + running_tasks={"t1": task}, + cleanup_callbacks=[], + ) + + assert result.tasks_suspended == 0 + assert result.tasks_interrupted == 1 + + async def test_mixed_cooperative_and_straggler(self) -> None: + saver_calls: list[str] = [] + + async def saver(task_id: str) -> bool: + saver_calls.append(task_id) + return True + + strategy = CheckpointAndStopStrategy( + grace_seconds=0.1, + checkpoint_saver=saver, + ) + shutdown_event = strategy._shutdown_event + + async def cooperative() -> None: + await shutdown_event.wait() + + async def stubborn() -> None: + await asyncio.Event().wait() + + t1 = asyncio.create_task(cooperative()) + t2 = asyncio.create_task(stubborn()) + result = await strategy.execute_shutdown( + running_tasks={"coop": t1, "stub": t2}, + cleanup_callbacks=[], + ) + + assert result.tasks_suspended == 2 + assert result.tasks_interrupted == 0 + assert "stub" in saver_calls + + async def test_cleanup_runs(self) -> None: + strategy = CheckpointAndStopStrategy(cleanup_seconds=5.0) + ran = [] + + async def cb() -> None: + ran.append("done") + + result = await strategy.execute_shutdown( + running_tasks={}, + cleanup_callbacks=[cb], + ) + assert ran == ["done"] + assert result.cleanup_completed is True + + async def test_empty_tasks(self) -> None: + strategy = CheckpointAndStopStrategy() + result = await strategy.execute_shutdown( + running_tasks={}, + cleanup_callbacks=[], + ) + assert result.tasks_suspended == 0 + assert result.tasks_interrupted == 0 + + +@pytest.mark.unit +class TestCheckpointAndStopValidation: + """Constructor rejects non-positive values.""" + + def test_invalid_grace_seconds(self) -> None: + with pytest.raises(ValueError, match="grace_seconds must be positive"): + CheckpointAndStopStrategy(grace_seconds=0) + + def test_invalid_cleanup_seconds(self) -> None: + with pytest.raises(ValueError, match="cleanup_seconds must be positive"): + CheckpointAndStopStrategy(cleanup_seconds=-1.0) + + +# ── build_shutdown_strategy factory ───────────────────────────── + + +@pytest.mark.unit +class TestBuildShutdownStrategy: + """Factory maps config.strategy to the correct class.""" + + def test_cooperative_timeout(self) -> None: + config = GracefulShutdownConfig(strategy="cooperative_timeout") + strategy = build_shutdown_strategy(config) + assert isinstance(strategy, CooperativeTimeoutStrategy) + + def test_immediate(self) -> None: + config = GracefulShutdownConfig(strategy="immediate") + strategy = build_shutdown_strategy(config) + assert isinstance(strategy, ImmediateCancelStrategy) + + def test_finish_tool(self) -> None: + config = GracefulShutdownConfig(strategy="finish_tool") + strategy = build_shutdown_strategy(config) + assert isinstance(strategy, FinishCurrentToolStrategy) + + def test_checkpoint(self) -> None: + config = GracefulShutdownConfig(strategy="checkpoint") + strategy = build_shutdown_strategy(config) + assert isinstance(strategy, CheckpointAndStopStrategy) + + def test_unknown_strategy_rejected(self) -> None: + with pytest.raises(ValidationError): + GracefulShutdownConfig(strategy="nonexistent") # type: ignore[arg-type] + + def test_config_params_propagate(self) -> None: + config = GracefulShutdownConfig( + strategy="cooperative_timeout", + grace_seconds=10.0, + cleanup_seconds=2.0, + ) + strategy = build_shutdown_strategy(config) + assert isinstance(strategy, CooperativeTimeoutStrategy) + assert strategy._grace_seconds == 10.0 + assert strategy._cleanup_seconds == 2.0 + + def test_tool_timeout_propagates(self) -> None: + config = GracefulShutdownConfig( + strategy="finish_tool", + tool_timeout_seconds=120.0, + ) + strategy = build_shutdown_strategy(config) + assert isinstance(strategy, FinishCurrentToolStrategy) + assert strategy._tool_timeout_seconds == 120.0 + + def test_checkpoint_saver_injected(self) -> None: + config = GracefulShutdownConfig(strategy="checkpoint") + saver = _make_saver(success=True) + strategy = build_shutdown_strategy(config, checkpoint_saver=saver) + assert isinstance(strategy, CheckpointAndStopStrategy) + assert strategy._checkpoint_saver is saver + + +# ── Test helpers ──────────────────────────────────────────────── + + +def _make_saver(*, success: bool) -> CheckpointSaver: + """Build a mock checkpoint saver that returns *success*.""" + + async def _saver(task_id: str) -> bool: + return success + + return _saver diff --git a/tests/unit/engine/workflow/test_kanban_columns.py b/tests/unit/engine/workflow/test_kanban_columns.py index 42ef15acf8..2d156c358e 100644 --- a/tests/unit/engine/workflow/test_kanban_columns.py +++ b/tests/unit/engine/workflow/test_kanban_columns.py @@ -79,6 +79,7 @@ def test_off_board_statuses(self) -> None: TaskStatus.BLOCKED, TaskStatus.FAILED, TaskStatus.INTERRUPTED, + TaskStatus.SUSPENDED, TaskStatus.CANCELLED, } for status in off_board: diff --git a/web/src/__tests__/components/layout/StatusBar.test.tsx b/web/src/__tests__/components/layout/StatusBar.test.tsx index 0273652928..0febc940e3 100644 --- a/web/src/__tests__/components/layout/StatusBar.test.tsx +++ b/web/src/__tests__/components/layout/StatusBar.test.tsx @@ -9,7 +9,7 @@ function makeOverview(overrides: Partial = {}): OverviewMetrics total_tasks: 0, tasks_by_status: { created: 0, assigned: 0, in_progress: 0, in_review: 0, completed: 0, - blocked: 0, failed: 0, interrupted: 0, cancelled: 0, + blocked: 0, failed: 0, interrupted: 0, suspended: 0, cancelled: 0, } as OverviewMetrics['tasks_by_status'], total_agents: 0, total_cost_usd: 0, @@ -123,7 +123,7 @@ describe('StatusBar', () => { total_tasks: 10, tasks_by_status: { created: 0, assigned: 0, in_progress: 0, in_review: 3, completed: 0, - blocked: 0, failed: 0, interrupted: 0, cancelled: 0, + blocked: 0, failed: 0, interrupted: 0, suspended: 0, cancelled: 0, } as OverviewMetrics['tasks_by_status'], total_agents: 5, total_cost_usd: 50, diff --git a/web/src/__tests__/hooks/useDashboardData.test.ts b/web/src/__tests__/hooks/useDashboardData.test.ts index 89864a035e..772e4679a3 100644 --- a/web/src/__tests__/hooks/useDashboardData.test.ts +++ b/web/src/__tests__/hooks/useDashboardData.test.ts @@ -67,7 +67,7 @@ describe('useDashboardData', () => { total_tasks: 10, tasks_by_status: { created: 0, assigned: 0, in_progress: 0, in_review: 0, completed: 0, - blocked: 0, failed: 0, interrupted: 0, cancelled: 0, + blocked: 0, failed: 0, interrupted: 0, suspended: 0, cancelled: 0, }, total_agents: 5, total_cost_usd: 50, budget_remaining_usd: 450, budget_used_percent: 10, diff --git a/web/src/__tests__/integration/ws-dashboard.test.tsx b/web/src/__tests__/integration/ws-dashboard.test.tsx index 0681fbb0ad..2f49eb93ac 100644 --- a/web/src/__tests__/integration/ws-dashboard.test.tsx +++ b/web/src/__tests__/integration/ws-dashboard.test.tsx @@ -24,6 +24,7 @@ describe('WS Dashboard Integration', () => { blocked: 0, failed: 0, interrupted: 0, + suspended: 0, cancelled: 0, }, total_cost_usd: 42.17, diff --git a/web/src/__tests__/pages/DashboardPage.test.tsx b/web/src/__tests__/pages/DashboardPage.test.tsx index 6ea9371e8b..5796bd255c 100644 --- a/web/src/__tests__/pages/DashboardPage.test.tsx +++ b/web/src/__tests__/pages/DashboardPage.test.tsx @@ -6,7 +6,7 @@ import type { OverviewMetrics, BudgetConfig } from '@/api/types' function makeTasksByStatus(overrides: Partial = {}): OverviewMetrics['tasks_by_status'] { return { created: 0, assigned: 0, in_progress: 0, in_review: 0, completed: 0, - blocked: 0, failed: 0, interrupted: 0, cancelled: 0, + blocked: 0, failed: 0, interrupted: 0, suspended: 0, cancelled: 0, ...overrides, } } diff --git a/web/src/__tests__/pages/budget/ThresholdAlerts.test.tsx b/web/src/__tests__/pages/budget/ThresholdAlerts.test.tsx index 79aac16799..5bb23c2bed 100644 --- a/web/src/__tests__/pages/budget/ThresholdAlerts.test.tsx +++ b/web/src/__tests__/pages/budget/ThresholdAlerts.test.tsx @@ -17,7 +17,7 @@ const mockOverview: OverviewMetrics = { total_tasks: 10, tasks_by_status: { created: 1, assigned: 1, in_progress: 2, in_review: 1, completed: 3, - blocked: 0, failed: 1, interrupted: 0, cancelled: 1, + blocked: 0, failed: 1, interrupted: 0, suspended: 0, cancelled: 1, }, total_agents: 5, total_cost_usd: 400, diff --git a/web/src/__tests__/stores/analytics.test.ts b/web/src/__tests__/stores/analytics.test.ts index 71b51012a2..caa1119765 100644 --- a/web/src/__tests__/stores/analytics.test.ts +++ b/web/src/__tests__/stores/analytics.test.ts @@ -6,7 +6,7 @@ vi.mock('@/api/endpoints/analytics', () => ({ total_tasks: 24, tasks_by_status: { created: 2, assigned: 3, in_progress: 8, in_review: 2, completed: 5, - blocked: 1, failed: 1, interrupted: 1, cancelled: 1, + blocked: 1, failed: 1, interrupted: 1, suspended: 0, cancelled: 1, }, total_agents: 10, total_cost_usd: 42.17, diff --git a/web/src/__tests__/utils/dashboard.property.test.ts b/web/src/__tests__/utils/dashboard.property.test.ts index c9f9191160..1ba9220c20 100644 --- a/web/src/__tests__/utils/dashboard.property.test.ts +++ b/web/src/__tests__/utils/dashboard.property.test.ts @@ -34,6 +34,7 @@ const arbOverview: fc.Arbitrary = fc.record({ blocked: fc.nat({ max: 100 }), failed: fc.nat({ max: 100 }), interrupted: fc.nat({ max: 100 }), + suspended: fc.nat({ max: 100 }), cancelled: fc.nat({ max: 100 }), }), total_agents: fc.nat({ max: 100 }), diff --git a/web/src/__tests__/utils/dashboard.test.ts b/web/src/__tests__/utils/dashboard.test.ts index be8d55db3f..f384fe4780 100644 --- a/web/src/__tests__/utils/dashboard.test.ts +++ b/web/src/__tests__/utils/dashboard.test.ts @@ -19,6 +19,7 @@ function makeOverview(overrides: Partial = {}): OverviewMetrics blocked: 1, failed: 1, interrupted: 1, + suspended: 0, cancelled: 1, }, total_agents: 10, diff --git a/web/src/__tests__/utils/tasks.test.ts b/web/src/__tests__/utils/tasks.test.ts index 1e6934c76d..585b3b4bf3 100644 --- a/web/src/__tests__/utils/tasks.test.ts +++ b/web/src/__tests__/utils/tasks.test.ts @@ -2,6 +2,7 @@ import { describe, expect, it } from 'vitest' import type { Priority, Task, TaskStatus } from '@/api/types' import { KANBAN_COLUMNS, + OFF_BOARD_STATUSES, STATUS_TO_COLUMN, VALID_TRANSITIONS, canTransitionTo, @@ -23,7 +24,7 @@ function makeTask(overrides: Partial = {}): Task { const ALL_STATUSES: TaskStatus[] = [ 'created', 'assigned', 'in_progress', 'in_review', 'completed', - 'blocked', 'failed', 'interrupted', 'cancelled', + 'blocked', 'failed', 'interrupted', 'suspended', 'cancelled', ] // ── getTaskStatusColor ────────────────────────────────────── @@ -38,6 +39,7 @@ describe('getTaskStatusColor', () => { ['blocked', 'danger'], ['failed', 'danger'], ['interrupted', 'warning'], + ['suspended', 'warning'], ['cancelled', 'text-secondary'], ])('maps %s to %s', (status, expected) => { expect(getTaskStatusColor(status)).toBe(expected) @@ -56,6 +58,7 @@ describe('getTaskStatusLabel', () => { ['blocked', 'Blocked'], ['failed', 'Failed'], ['interrupted', 'Interrupted'], + ['suspended', 'Suspended'], ['cancelled', 'Cancelled'], ])('maps %s to %s', (status, expected) => { expect(getTaskStatusLabel(status)).toBe(expected) @@ -95,10 +98,11 @@ describe('KANBAN_COLUMNS', () => { expect(KANBAN_COLUMNS).toHaveLength(7) }) - it('covers all 9 task statuses exactly once', () => { + it('covers all on-board task statuses exactly once', () => { + const onBoard = ALL_STATUSES.filter((s) => !OFF_BOARD_STATUSES.has(s)) const allStatuses = KANBAN_COLUMNS.flatMap((col) => col.statuses) - expect(allStatuses).toHaveLength(ALL_STATUSES.length) - for (const status of ALL_STATUSES) { + expect(allStatuses).toHaveLength(onBoard.length) + for (const status of onBoard) { expect(allStatuses).toContain(status) } }) @@ -112,9 +116,13 @@ describe('KANBAN_COLUMNS', () => { // ── STATUS_TO_COLUMN ──────────────────────────────────────── describe('STATUS_TO_COLUMN', () => { - it('maps every TaskStatus to a column', () => { + it('maps every on-board TaskStatus to a column', () => { for (const status of ALL_STATUSES) { - expect(STATUS_TO_COLUMN[status]).toBeDefined() + if (OFF_BOARD_STATUSES.has(status)) { + expect(STATUS_TO_COLUMN[status]).toBeNull() + } else { + expect(STATUS_TO_COLUMN[status]).toBeDefined() + } } }) @@ -164,13 +172,14 @@ describe('groupTasksByColumn', () => { } }) - it('preserves all tasks (no task lost)', () => { + it('preserves all on-board tasks (off-board excluded)', () => { const tasks = ALL_STATUSES.map((status, i) => makeTask({ id: `t${i}`, status }), ) const grouped = groupTasksByColumn(tasks) const total = Object.values(grouped).reduce((sum, arr) => sum + arr.length, 0) - expect(total).toBe(ALL_STATUSES.length) + const onBoardCount = ALL_STATUSES.filter((s) => !OFF_BOARD_STATUSES.has(s)).length + expect(total).toBe(onBoardCount) }) }) diff --git a/web/src/api/types.ts b/web/src/api/types.ts index d2d110852e..6e36cc860a 100644 --- a/web/src/api/types.ts +++ b/web/src/api/types.ts @@ -11,6 +11,7 @@ export type TaskStatus = | 'blocked' | 'failed' | 'interrupted' + | 'suspended' | 'cancelled' export type TaskType = diff --git a/web/src/components/ui/task-status-indicator.stories.tsx b/web/src/components/ui/task-status-indicator.stories.tsx index e8dc96325f..8c95b4c19c 100644 --- a/web/src/components/ui/task-status-indicator.stories.tsx +++ b/web/src/components/ui/task-status-indicator.stories.tsx @@ -15,7 +15,7 @@ type StatusStory = StoryObj const ALL_STATUSES: TaskStatus[] = [ 'created', 'assigned', 'in_progress', 'in_review', 'completed', - 'blocked', 'failed', 'interrupted', 'cancelled', + 'blocked', 'failed', 'interrupted', 'suspended', 'cancelled', ] export const Default: StatusStory = { diff --git a/web/src/pages/budget/ThresholdAlerts.stories.tsx b/web/src/pages/budget/ThresholdAlerts.stories.tsx index 3bb3fe5cee..226b3c1221 100644 --- a/web/src/pages/budget/ThresholdAlerts.stories.tsx +++ b/web/src/pages/budget/ThresholdAlerts.stories.tsx @@ -17,7 +17,7 @@ function makeOverview(usedPercent: number): OverviewMetrics { total_tasks: 10, tasks_by_status: { created: 1, assigned: 1, in_progress: 2, in_review: 1, completed: 3, - blocked: 0, failed: 1, interrupted: 0, cancelled: 1, + blocked: 0, failed: 1, interrupted: 0, suspended: 0, cancelled: 1, }, total_agents: 5, total_cost_usd: usedPercent * 5, diff --git a/web/src/pages/dashboard/DashboardPage.stories.tsx b/web/src/pages/dashboard/DashboardPage.stories.tsx index 917a7ca353..325df7e7d6 100644 --- a/web/src/pages/dashboard/DashboardPage.stories.tsx +++ b/web/src/pages/dashboard/DashboardPage.stories.tsx @@ -8,7 +8,7 @@ const mockOverview: OverviewMetrics = { total_tasks: 24, tasks_by_status: { created: 2, assigned: 3, in_progress: 8, in_review: 2, completed: 5, - blocked: 1, failed: 1, interrupted: 1, cancelled: 1, + blocked: 1, failed: 1, interrupted: 1, suspended: 0, cancelled: 1, }, total_agents: 10, total_cost_usd: 42.17, diff --git a/web/src/utils/constants.ts b/web/src/utils/constants.ts index 62ef6f67b6..1e191b07a6 100644 --- a/web/src/utils/constants.ts +++ b/web/src/utils/constants.ts @@ -30,6 +30,7 @@ export const TASK_STATUS_ORDER: readonly TaskStatus[] = [ 'completed', 'failed', 'interrupted', + 'suspended', 'cancelled', ] as const @@ -39,12 +40,13 @@ export const TERMINAL_STATUSES: ReadonlySet = new Set([' /** Task status transitions map. */ export const VALID_TRANSITIONS: Readonly> = { created: ['assigned'], - assigned: ['in_progress', 'blocked', 'cancelled', 'failed', 'interrupted'], - in_progress: ['in_review', 'blocked', 'cancelled', 'failed', 'interrupted'], + assigned: ['in_progress', 'blocked', 'cancelled', 'failed', 'interrupted', 'suspended'], + in_progress: ['in_review', 'blocked', 'cancelled', 'failed', 'interrupted', 'suspended'], in_review: ['completed', 'in_progress', 'blocked', 'cancelled'], blocked: ['assigned'], failed: ['assigned'], interrupted: ['assigned'], + suspended: ['assigned'], completed: [], cancelled: [], } diff --git a/web/src/utils/tasks.ts b/web/src/utils/tasks.ts index 0effc0bdaa..18c6ec4575 100644 --- a/web/src/utils/tasks.ts +++ b/web/src/utils/tasks.ts @@ -12,6 +12,7 @@ const TASK_STATUS_COLOR_MAP: Record = { blocked: 'Blocked', failed: 'Failed', interrupted: 'Interrupted', + suspended: 'Suspended', cancelled: 'Cancelled', } @@ -106,11 +108,17 @@ export const KANBAN_COLUMNS: readonly KanbanColumn[] = [ { id: 'terminal', label: 'Terminal', statuses: ['failed', 'interrupted', 'cancelled'], color: 'text-secondary' }, ] as const -export const STATUS_TO_COLUMN: Record = Object.fromEntries( - KANBAN_COLUMNS.flatMap((col) => - col.statuses.map((status) => [status, col.id]), +/** Off-board statuses not displayed on the Kanban board (resumable). */ +export const OFF_BOARD_STATUSES: ReadonlySet = new Set(['suspended']) + +export const STATUS_TO_COLUMN: Record = { + ...Object.fromEntries( + KANBAN_COLUMNS.flatMap((col) => + col.statuses.map((status) => [status, col.id]), + ), ), -) as Record + ...Object.fromEntries([...OFF_BOARD_STATUSES].map((s) => [s, null])), +} as Record // ── Group tasks by column ─────────────────────────────────── @@ -127,7 +135,9 @@ export function groupTasksByColumn(tasks: readonly Task[]): Record = { created: ['assigned'], - assigned: ['in_progress', 'failed', 'blocked', 'cancelled', 'interrupted'], - in_progress: ['in_review', 'failed', 'cancelled', 'interrupted'], + assigned: ['in_progress', 'failed', 'blocked', 'cancelled', 'interrupted', 'suspended'], + in_progress: ['in_review', 'failed', 'cancelled', 'interrupted', 'suspended'], in_review: ['completed', 'in_progress'], completed: [], blocked: ['assigned'], failed: ['assigned'], interrupted: ['assigned'], + suspended: ['assigned'], cancelled: [], }