Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
22694a8
feat: flight-recorder frame store, cockpit + steering services
Aureliolo May 22, 2026
7fd903f
feat: cockpit REST controller, boot wiring, WS channel, manifest
Aureliolo May 22, 2026
9fb5b3b
feat: mission control web cockpit (Live + Flight Recorder)
Aureliolo May 22, 2026
a348f99
feat: cockpit MCP domain (read tools + guarded interventions)
Aureliolo May 22, 2026
1bb0e78
test: unique basenames for cockpit/flight-recorder service tests
Aureliolo May 22, 2026
e3952f7
fix: declare flight_recorder_frames in canonical schema (sqlite+postg…
Aureliolo May 22, 2026
e439af2
fix: address core pre-review findings (cockpit)
Aureliolo May 22, 2026
7f7ba29
fix: update tests + MCP handlers for cockpit additions
Aureliolo May 22, 2026
2fcd152
fix: drop persistence repos from ghost-wiring manifest
Aureliolo May 22, 2026
064a84f
docs: drop issue back-ref from steering docstring (no-review-origin g…
Aureliolo May 22, 2026
f5ecae9
fix: extend events + settings + currency-gate allowlists for cockpit
Aureliolo May 22, 2026
2804365
fix: close coroutine in audit_chain timeout test to stop unraisable leak
Aureliolo May 22, 2026
0b33364
fix: address pre-pr review findings (immutability, a11y, types, DRY)
Aureliolo May 22, 2026
d07e9a6
chore: regenerate DTOs after rebase merge
Aureliolo May 22, 2026
f000850
fix: babysit round 1, 25 findings across CI + reviewer feedback
Aureliolo May 22, 2026
a6f8b64
fix: drop SEC-1 taxonomy references (gate: no-review-origin-in-code)
Aureliolo May 22, 2026
729febe
fix: babysit round 4, 3 findings (coderabbit re-review on new HEAD)
Aureliolo May 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions scripts/_ghost_wiring_manifest.txt
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,7 @@ ENFORCED ToolCreationApplier #1995 -- constructed by meta/toolsmith/factory.py::
ENFORCED DynamicToolRegistry #1995 -- constructed by meta/toolsmith/factory.py::build_toolsmith; mutable live authored-tool registry read behind the static surface
ENFORCED install_dynamic_tool_layer #1995 -- called by api/app.py::_wire_toolsmith; layers the dynamic registry into the live MCP invoker so authored tools dispatch
ENFORCED build_stakes_router #1998 -- called by workers/runtime_builder._build_stakes_router_or_none when a benchmark provider is wired; injected into AgentEngine for stakes-aware tier selection before budget downgrade
ENFORCED CockpitService #1981 -- constructed in api/app.py::_wire_cockpit_services and installed on AppState; serves the mission-control live-activity snapshot (who/what + stuck/runaway)
ENFORCED FlightRecorderService #1981 -- constructed in api/app.py::_wire_cockpit_services; frame-authoritative get/seek behind the flight-recorder cockpit endpoints
ENFORCED build_steering_directive #1981 -- called in api/app.py::_wire_cockpit_services; builds the SafeDefaultSteeringDirective that delivers cockpit hint/redirect as INFO_REQUEST interrupts
ENFORCED build_flight_recorder_sink #1981 -- called in workers/runtime_builder._build_flight_recorder_sink; selects the recorder sink the boot AgentEngine records per-turn frames through
56 changes: 56 additions & 0 deletions src/synthorg/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,61 @@ def _try_wire_cost_dial(app_state: AppState) -> None:
)


def _wire_cockpit_services(app_state: AppState) -> None:
"""Construct the mission-control cockpit services from live state.

Builds the live-activity ``CockpitService``, the flight-recorder
query/seek service, and the steering directive, then installs them
on ``AppState`` for the cockpit controllers and MCP tools. Requires
a connected persistence backend (for the frame store) plus a task
engine and interrupt store.
"""
interrupt_store = app_state.interrupt_store
if (
not app_state.has_persistence
or not app_state.has_task_engine
or interrupt_store is None
):
return
from synthorg.engine.cockpit import CockpitService # noqa: PLC0415
from synthorg.engine.flight_recording import ( # noqa: PLC0415
FlightRecorderService,
)
from synthorg.engine.intervention import build_steering_directive # noqa: PLC0415

frames = app_state.persistence.flight_recorder_frames
app_state.set_cockpit_services(
cockpit_service=CockpitService(
app_state.task_engine,
frames,
clock=app_state.clock,
),
flight_recorder_service=FlightRecorderService(frames),
steering_directive=build_steering_directive(
interrupt_store,
clock=app_state.clock,
),
)


def _try_wire_cockpit(app_state: AppState) -> None:
"""Wire the cockpit services best-effort; never poison startup."""
if not app_state.has_persistence or app_state.has_cockpit_service:
return
try:
_wire_cockpit_services(app_state)
except MemoryError, RecursionError:
raise
except Exception as exc:
logger.warning(
API_APP_STARTUP,
service="cockpit",
note="cockpit wiring failed; controllers will 503",
error_type=type(exc).__name__,
error=safe_error_description(exc),
)


def _wire_environment_service(app_state: AppState) -> None:
"""Wire the per-project reproducible-environment substrate.

Expand Down Expand Up @@ -1245,6 +1300,7 @@ async def _install_runtime_services() -> None:
# tree per project under the workspace base. Persistence-less
# boots (test fixtures, dev apps with no DB) skip wiring -- the
_try_wire_cost_dial(app_state)
_try_wire_cockpit(app_state)

# service is optional and gates on ``has_project_workspace_service``.
if app_state.has_persistence and app_state.project_workspace_service is None:
Expand Down
2 changes: 2 additions & 0 deletions src/synthorg/api/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
CHANNEL_REVIEWS: Final[str] = "reviews"
CHANNEL_EVENTS: Final[str] = "events"
CHANNEL_INTERRUPTS: Final[str] = "interrupts"
CHANNEL_COCKPIT: Final[str] = "cockpit"
CHANNEL_DISSENT: Final[str] = "#dissent"
CHANNEL_WEBHOOKS: Final[str] = "#webhooks"
CHANNEL_RATELIMIT: Final[str] = "#ratelimit"
Expand All @@ -60,6 +61,7 @@
CHANNEL_REVIEWS,
CHANNEL_EVENTS,
CHANNEL_INTERRUPTS,
CHANNEL_COCKPIT,
CHANNEL_DISSENT,
CHANNEL_WEBHOOKS,
CHANNEL_RATELIMIT,
Expand Down
2 changes: 2 additions & 0 deletions src/synthorg/api/controllers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
CeremonyPolicyController,
)
from synthorg.api.controllers.clients import ClientController
from synthorg.api.controllers.cockpit import CockpitController
from synthorg.api.controllers.collaboration import CollaborationController
from synthorg.api.controllers.company import CompanyController
from synthorg.api.controllers.company_versions import (
Expand Down Expand Up @@ -168,6 +169,7 @@
MetaController,
MetaAnalyticsController,
CustomRuleController,
CockpitController,
)

# Controllers gated by their collaborator service. These do NOT live
Expand Down
261 changes: 261 additions & 0 deletions src/synthorg/api/controllers/cockpit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
"""Mission-control cockpit controller: live activity, flight recorder, intervention.

Live activity and flight-recorder reads are read-access; interventions
require write access. All endpoints 503 (via the ``AppState`` service
properties) until the cockpit services are wired after persistence
connects. Interventions are audit-logged via ``cockpit.intervention.*``.
"""

from typing import Annotated, Final

from litestar import Controller, get, post
from litestar.datastructures import State # noqa: TC002
from litestar.params import Parameter
from pydantic import BaseModel, ConfigDict, Field

from synthorg.api.cursor import decode_cursor
from synthorg.api.dto import DEFAULT_LIMIT, ApiResponse, PaginatedResponse
from synthorg.api.guards import require_read_access, require_write_access
from synthorg.api.pagination import CursorLimit, CursorParam, encode_countless_seek_meta
from synthorg.api.path_params import PathId # noqa: TC001
from synthorg.api.state import AppState # noqa: TC001
from synthorg.core.enums import InterventionKind, TaskStatus
from synthorg.core.task import Task # noqa: TC001 -- response field type
from synthorg.core.types import NotBlankStr # noqa: TC001
from synthorg.engine.cockpit import LiveActivitySnapshot # noqa: TC001
from synthorg.engine.flight_recording import ReplaySeekView # noqa: TC001
from synthorg.engine.intervention import SteeringOutcome # noqa: TC001
from synthorg.engine.prompt_safety import TAG_TASK_DATA, wrap_untrusted
from synthorg.observability import get_logger
from synthorg.observability.events.cockpit import (
COCKPIT_INTERVENTION_APPLIED,
COCKPIT_INTERVENTION_INITIATED,
)
from synthorg.persistence.flight_recorder_protocol import (
FlightRecorderFrame,
)

logger = get_logger(__name__)

_OPERATOR: Final[str] = "mission-control"
_COCKPIT_NS: Final[str] = "cockpit"

#: Litestar-validated annotated form for the ``turn_index`` path param so
#: a negative value is rejected at request parsing instead of leaking
#: into the repository as an invalid filter bound. ``ge=1`` matches the
#: ``FlightRecorderFrame.turn_index`` invariant.
TurnIndexPath = Annotated[
int,
Parameter(ge=1, description="Target turn index (1-based)"),
]


class PauseInterventionRequest(BaseModel):
"""Pause a running task (transition to INTERRUPTED)."""

model_config = ConfigDict(frozen=True, allow_inf_nan=False, extra="forbid")

task_id: NotBlankStr = Field(description="Task to pause")
reason: NotBlankStr = Field(description="Operator reason for the pause")


class KillInterventionRequest(BaseModel):
"""Kill a running task (cancel it)."""

model_config = ConfigDict(frozen=True, allow_inf_nan=False, extra="forbid")

task_id: NotBlankStr = Field(description="Task to kill")
reason: NotBlankStr = Field(description="Operator reason for the kill")


class SteerInterventionRequest(BaseModel):
"""Send a hint or redirect to a running agent."""

model_config = ConfigDict(frozen=True, allow_inf_nan=False, extra="forbid")

execution_id: NotBlankStr = Field(description="Execution to steer")
agent_id: NotBlankStr = Field(description="Agent to steer")
text: NotBlankStr = Field(description="Operator hint / redirect text")


class CockpitController(Controller):
"""Live activity, flight-recorder replay, and operator interventions."""

path = "/cockpit"
tags = ("cockpit",)
guards = [require_read_access] # noqa: RUF012

@get("/snapshot")
async def get_snapshot(self, state: State) -> ApiResponse[LiveActivitySnapshot]:
"""Return the live org-activity snapshot."""
app_state: AppState = state.app_state
resolver = app_state.config_resolver
stuck_idle_minutes = await resolver.get_float(
_COCKPIT_NS, "stuck_idle_threshold_minutes"
)
runaway_cost_percent = await resolver.get_float(
_COCKPIT_NS, "runaway_cost_threshold_percent"
)
snapshot = await app_state.cockpit_service.get_live_snapshot(
stuck_idle_minutes=stuck_idle_minutes,
runaway_cost_percent=runaway_cost_percent,
)
return ApiResponse(data=snapshot)

@get("/flight-recorder/{execution_id:str}/frames")
async def get_frames(
self,
state: State,
execution_id: PathId,
cursor: CursorParam = None,
limit: CursorLimit = DEFAULT_LIMIT,
) -> PaginatedResponse[FlightRecorderFrame]:
"""Return the flight-recorder scrubber timeline (newest-first, paginated).

Uses opaque cursor pagination (``cursor`` + ``limit``) per the
web dashboard's MANDATORY pagination contract; offset-based
paging is gone. The underlying repo still slices on offset
internally, but the cursor is HMAC-signed so the client treats
it as opaque.
"""
app_state: AppState = state.app_state
offset = (
0
if cursor is None
else decode_cursor(cursor, secret=app_state.cursor_secret)
)
# Fetch ``limit + 1`` so we can detect that another page follows
# without paying a separate COUNT round-trip on the frames table.
frames = await app_state.flight_recorder_service.get_frames(
execution_id,
limit=limit + 1,
offset=offset,
)
meta = encode_countless_seek_meta(
offset=offset,
fetched_rows=len(frames),
limit=limit,
secret=app_state.cursor_secret,
)
window = tuple(frames[:limit])
return PaginatedResponse[FlightRecorderFrame](data=window, pagination=meta)

@get("/flight-recorder/{execution_id:str}/seek/{turn_index:int}")
async def seek_frame(
self,
state: State,
execution_id: PathId,
turn_index: TurnIndexPath,
) -> ApiResponse[ReplaySeekView]:
"""Reconstruct scrubber state at a target turn."""
app_state: AppState = state.app_state
view = await app_state.flight_recorder_service.seek(execution_id, turn_index)
return ApiResponse(data=view)

@post("/interventions/pause", guards=[require_write_access])
async def pause(
self,
state: State,
data: PauseInterventionRequest,
) -> ApiResponse[Task]:
"""Pause a running task (transition to INTERRUPTED)."""
app_state: AppState = state.app_state
logger.info(
COCKPIT_INTERVENTION_INITIATED,
intervention_kind=InterventionKind.PAUSE.value,
task_id=data.task_id,
)
task, _from = await app_state.task_engine.transition_task(
data.task_id,
TaskStatus.INTERRUPTED,
requested_by=_OPERATOR,
reason=data.reason,
)
logger.info(
COCKPIT_INTERVENTION_APPLIED,
intervention_kind=InterventionKind.PAUSE.value,
task_id=data.task_id,
)
return ApiResponse(data=task)

@post("/interventions/kill", guards=[require_write_access])
async def kill(
self,
state: State,
data: KillInterventionRequest,
) -> ApiResponse[Task]:
"""Kill a running task (cancel it)."""
app_state: AppState = state.app_state
logger.info(
COCKPIT_INTERVENTION_INITIATED,
intervention_kind=InterventionKind.KILL.value,
task_id=data.task_id,
)
task, _prior = await app_state.task_engine.cancel_task(
data.task_id,
requested_by=_OPERATOR,
reason=data.reason,
)
logger.info(
COCKPIT_INTERVENTION_APPLIED,
intervention_kind=InterventionKind.KILL.value,
task_id=data.task_id,
)
return ApiResponse(data=task)

@post("/interventions/hint", guards=[require_write_access])
async def hint(
self,
state: State,
data: SteerInterventionRequest,
) -> ApiResponse[SteeringOutcome]:
"""Queue a hint for a running agent."""
return await self._steer(state, InterventionKind.HINT, data)

@post("/interventions/redirect", guards=[require_write_access])
async def redirect(
self,
state: State,
data: SteerInterventionRequest,
) -> ApiResponse[SteeringOutcome]:
"""Queue a redirect for a running agent."""
return await self._steer(state, InterventionKind.REDIRECT, data)

async def _steer(
self,
state: State,
kind: InterventionKind,
data: SteerInterventionRequest,
) -> ApiResponse[SteeringOutcome]:
"""Route a hint/redirect through the steering directive.

Wraps the operator-supplied text via :func:`wrap_untrusted` at
the controller boundary: the agent will read this text as
untrusted content the next time it consumes interrupts, so
the boundary must apply the prompt-safety envelope before the
directive persists it. The directive applies its own wrap on
the persisted question for defence-in-depth; double-wrapping is
safe because the safety envelope is idempotent on already-tagged
content.
"""
app_state: AppState = state.app_state
logger.info(
COCKPIT_INTERVENTION_INITIATED,
intervention_kind=kind.value,
execution_id=data.execution_id,
agent_id=data.agent_id,
)
outcome = await app_state.steering_directive.steer(
kind=kind,
execution_id=data.execution_id,
agent_id=data.agent_id,
details={"text": wrap_untrusted(TAG_TASK_DATA, data.text)},
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
logger.info(
COCKPIT_INTERVENTION_APPLIED,
intervention_kind=kind.value,
execution_id=data.execution_id,
agent_id=data.agent_id,
applied=outcome.applied,
)
return ApiResponse(data=outcome)
Loading
Loading