Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scripts/_ghost_wiring_manifest.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@ ENFORCED SeenClaimsPruner #1966 -- constructed by workers.backend_services.build
ENFORCED WorkerHeartbeatSubscriber #1966 -- constructed by workers.backend_services.build_distributed_backend_services; surfaces worker liveness in the log pipeline
ENFORCED build_work_pipeline #1960 -- called by workers.runtime_builder._build_runtime_work_pipeline behind the provider-present switch; composes the work spine (intake -> projects -> solo/team -> coordination metrics)
ENFORCED build_chief_of_staff_proposer #1968 -- called by api.app._wire_chief_of_staff_proposer behind propose_enabled + provider switch; constructs ChiefOfStaffProposer which parks approval-gated WorkItems for the conversational interface
ENFORCED ObjectiveEntryAdapter #1964 -- built at boot by engine.pipeline.entry.boot.wire_real_objective_entry; fed by POST /objectives
13 changes: 8 additions & 5 deletions src/synthorg/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import os
import sys
from pathlib import Path
from typing import TYPE_CHECKING, Final
from typing import TYPE_CHECKING, Any, Final

from litestar import Controller, Litestar, Router
from litestar.config.compression import CompressionConfig
Expand Down Expand Up @@ -258,7 +258,7 @@ def create_app( # noqa: C901, PLR0912, PLR0913, PLR0915
task_engine: TaskEngine | None = None,
coordinator: MultiAgentCoordinator | None = None,
work_pipeline: WorkPipeline | None = None,
intake_entry_adapter: WorkEntryAdapter | None = None,
intake_entry_adapter: WorkEntryAdapter[Any] | None = None,
agent_registry: AgentRegistryService | None = None,
meeting_orchestrator: MeetingOrchestrator | None = None,
meeting_scheduler: MeetingScheduler | None = None,
Expand Down Expand Up @@ -1079,14 +1079,17 @@ async def _install_runtime_services() -> None:
# one is a logged no-op then.
if services.work_pipeline is not None:
app_state.set_work_pipeline_if_absent(services.work_pipeline)
# Bring the real client-request work-entry path online: ensure
# the configured intake project exists and attach the intake
# entry adapter. No-op for an empty company (no pipeline).
# Bring the real client-request and goal/objective work-entry
# paths online: ensure the configured default projects exist
# and attach the entry adapters. No-op for an empty company
# (no pipeline).
from synthorg.engine.pipeline.entry.boot import ( # noqa: PLC0415
wire_real_intake_entry,
wire_real_objective_entry,
)

await wire_real_intake_entry(app_state)
await wire_real_objective_entry(app_state)
_runtime_services_installed = True

startup = [*startup, _install_runtime_services]
Expand Down
2 changes: 2 additions & 0 deletions src/synthorg/api/controllers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
from synthorg.api.controllers.meta_analytics import MetaAnalyticsController
from synthorg.api.controllers.metrics import MetricsController
from synthorg.api.controllers.oauth import OAuthController
from synthorg.api.controllers.objectives import ObjectiveController
from synthorg.api.controllers.ontology import OntologyController
from synthorg.api.controllers.personalities import (
PersonalityPresetController,
Expand Down Expand Up @@ -172,6 +173,7 @@
OPTIONAL_CONTROLLERS: tuple[tuple[type[Controller], str], ...] = (
(SimulationController, "has_simulation_runtime"),
(RequestController, "has_simulation_runtime"),
(ObjectiveController, "has_objective_entry_adapter"),
)

# Integration subsystem controllers. Registered only when
Expand Down
195 changes: 195 additions & 0 deletions src/synthorg/api/controllers/objectives.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
"""Goal / objective intake endpoints at ``/objectives``.

A thin HTTP boundary over the
:class:`~synthorg.engine.pipeline.entry.objective_adapter.ObjectiveEntryAdapter`.
``POST /objectives`` mints a submission id, spawns the pipeline run
as a background task, and returns ``202 Accepted`` with the
submission id. The submission id is threaded through to the spawned
root task as its idempotency key (see
``engine/pipeline/service.py``), so callers correlate by that id.
"""

import asyncio
from typing import Any
from uuid import uuid4

from litestar import Controller, post
from litestar.datastructures import State # noqa: TC002
from pydantic import BaseModel, ConfigDict, Field

from synthorg.api.dto import ApiResponse
from synthorg.api.guards import require_read_access, require_write_access
from synthorg.api.rate_limits import per_op_rate_limit_from_policy
from synthorg.api.state import AppState # noqa: TC001
from synthorg.core.types import NotBlankStr # noqa: TC001
from synthorg.engine.pipeline.entry.objective_adapter import ObjectiveSubmission
from synthorg.observability import get_logger
from synthorg.observability.background_tasks import log_task_exceptions
from synthorg.observability.events.objectives import (
OBJECTIVE_PIPELINE_FAILED,
OBJECTIVE_SUBMISSION_RECEIVED,
)

logger = get_logger(__name__)


class SubmitObjectivePayload(BaseModel):
"""HTTP body for ``POST /objectives``.

Mirrors :class:`ObjectiveSubmission` but with no
``submission_id`` field: the server mints one so concurrent
submissions cannot collide.
"""

model_config = ConfigDict(frozen=True, allow_inf_nan=False, extra="forbid")

title: NotBlankStr = Field(description="Short human-readable objective title.")
description: NotBlankStr = Field(
description="Detailed statement of the objective.",
)
requested_by: NotBlankStr = Field(
description="Identifier of the human / service requesting the work.",
)
priority: str | None = Field(
default=None,
description="Optional priority override (Priority enum value).",
)
estimated_complexity: str | None = Field(
default=None,
description="Optional complexity override (Complexity enum value).",
)
task_type: str | None = Field(
default=None,
description="Optional task-type override (TaskType enum value).",
)
acceptance_criteria: tuple[NotBlankStr, ...] = Field(
default=(),
description="Optional acceptance criteria strings.",
)


class SubmitObjectiveAck(BaseModel):
"""``202 Accepted`` response body."""

model_config = ConfigDict(frozen=True, allow_inf_nan=False, extra="forbid")

submission_id: NotBlankStr = Field(
description=(
"Server-minted correlation id. Used as the WorkItem's"
" correlation_id and the spawned root task's idempotency"
" key, so callers can correlate this submission to the"
" spawned task once it exists."
),
)
status: NotBlankStr = Field(
description='Lifecycle marker; always ``"accepted"`` on 202.',
)


_ACCEPTED_STATUS = "accepted"


async def submit_objective_impl(
app_state: AppState,
data: SubmitObjectivePayload,
) -> ApiResponse[SubmitObjectiveAck]:
"""Spawn the pipeline run and return ``202``-shaped acknowledgement.

The controller-method shim delegates to this helper so the same
behaviour is exercised by direct calls (unit tests) and HTTP
requests (integration tests / production). Public to the package
for testability, not for external callers.
"""
submission = _build_submission(data)
adapter = app_state.objective_entry_adapter
logger.info(
OBJECTIVE_SUBMISSION_RECEIVED,
submission_id=submission.submission_id,
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
task = asyncio.create_task(_drive_pipeline(adapter, submission))
task.add_done_callback(
log_task_exceptions(
logger,
OBJECTIVE_PIPELINE_FAILED,
submission_id=submission.submission_id,
),
)
app_state.objective_background_tasks.add(task)
task.add_done_callback(app_state.objective_background_tasks.discard)
return ApiResponse(
data=SubmitObjectiveAck(
submission_id=submission.submission_id,
status=_ACCEPTED_STATUS,
)
)


class ObjectiveController(Controller):
"""Goal / objective intake endpoints."""

path = "/objectives"
tags = ("objectives",)
guards = [require_read_access] # noqa: RUF012

@post(
"/",
guards=[
require_write_access,
per_op_rate_limit_from_policy("objectives.submit", key="user"),
],
status_code=202,
)
async def submit_objective(
self,
state: State,
data: SubmitObjectivePayload,
) -> ApiResponse[SubmitObjectiveAck]:
"""Submit a goal/objective for autonomous decomposition.

The pipeline spine routes the submitted objective through
``intake -> projects -> decompose -> solo/team -> execute``;
the multi-agent coordinator handles the goal-to-subtasks
decomposition under the ``SPLITTABLE`` verdict.

Returns ``202 Accepted`` immediately with the minted
``submission_id`` so the HTTP response does not block on the
full pipeline run.
"""
return await submit_objective_impl(state.app_state, data)


def _build_submission(data: SubmitObjectivePayload) -> ObjectiveSubmission:
"""Map the HTTP payload to a typed :class:`ObjectiveSubmission`.

Optional enum fields are passed through as strings; Pydantic
coerces them against the enum members defined on
:class:`ObjectiveSubmission`, raising a validation error if a
caller supplies an unknown value.
"""
fields: dict[str, Any] = {
"submission_id": str(uuid4()),
"title": data.title,
"description": data.description,
"requested_by": data.requested_by,
"acceptance_criteria": data.acceptance_criteria,
}
if data.priority is not None:
fields["priority"] = data.priority
if data.estimated_complexity is not None:
fields["estimated_complexity"] = data.estimated_complexity
if data.task_type is not None:
fields["task_type"] = data.task_type
return ObjectiveSubmission(**fields)


async def _drive_pipeline(adapter: Any, submission: ObjectiveSubmission) -> None:
"""Run the adapter's pipeline submission and discard the result.

Wrapping the awaitable in an ``async def`` lets the controller's
background-task discipline (set-tracking + done-callback exception
routing) own the lifecycle. The terminal :class:`WorkPipelineResult`
is intentionally discarded here: callers correlate to the spawned
root task via the submission id (set as the WorkItem's
correlation_id).
"""
await adapter.submit(submission)
2 changes: 2 additions & 0 deletions src/synthorg/api/controllers/setup/agent_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,11 @@ async def _rebuild_runtime_services(app_state: AppState) -> None:
app_state.swap_work_pipeline(services.work_pipeline)
from synthorg.engine.pipeline.entry.boot import ( # noqa: PLC0415
wire_real_intake_entry,
wire_real_objective_entry,
)

await wire_real_intake_entry(app_state, hot_swap=True)
await wire_real_objective_entry(app_state, hot_swap=True)
except MemoryError, RecursionError:
raise
except RuntimeServicesBuildError:
Expand Down
2 changes: 2 additions & 0 deletions src/synthorg/api/rate_limits/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@
"ontology.delete_entity": (10, 60),
"ontology.drift_check": (5, 60),
"ontology.update_entity": (30, 60),
# objectives
"objectives.submit": (30, 60),
# personalities
"personalities.create": (20, 60),
"personalities.delete": (10, 60),
Expand Down
Loading
Loading