From 91e75fada83913bdfeeb9b457cd9fdaa53fec9e8 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Thu, 21 May 2026 01:06:19 +0200 Subject: [PATCH 1/8] feat: governed external API/data access tool (#1991) --- src/synthorg/api/approval_store.py | 50 ++ src/synthorg/api/controllers/connections.py | 13 + src/synthorg/approval/protocol.py | 15 + src/synthorg/core/approval.py | 7 + src/synthorg/core/enums.py | 2 + src/synthorg/engine/_security_factory.py | 44 ++ src/synthorg/engine/agent_engine.py | 3 + src/synthorg/engine/agent_engine_factories.py | 10 + .../integrations/connections/catalog.py | 14 +- .../integrations/connections/models.py | 4 + .../health/checks/generic_http.py | 133 +----- .../observability/events/external_api.py | 12 + src/synthorg/persistence/approval_protocol.py | 31 +- .../persistence/postgres/approval_repo.py | 61 ++- .../persistence/postgres/connection_repo.py | 10 +- ...521000001_external_api_governed_access.sql | 19 + src/synthorg/persistence/postgres/schema.sql | 2 + .../persistence/sqlite/approval_repo.py | 76 +++- .../persistence/sqlite/connection_repo.py | 10 +- ...521000001_external_api_governed_access.sql | 21 + src/synthorg/persistence/sqlite/schema.sql | 4 + src/synthorg/security/action_type_mapping.py | 1 + src/synthorg/security/action_types.py | 1 + src/synthorg/security/risk_scorer.py | 3 + .../security/rules/risk_classifier.py | 1 + .../security/timeout/risk_tier_classifier.py | 1 + src/synthorg/settings/definitions/__init__.py | 2 + .../settings/definitions/external_api.py | 97 ++++ src/synthorg/settings/enums.py | 1 + src/synthorg/tools/_dns_pinning.py | 144 ++++++ src/synthorg/tools/external_api/__init__.py | 9 + src/synthorg/tools/external_api/_args.py | 97 ++++ .../tools/external_api/_credentials.py | 62 +++ src/synthorg/tools/external_api/_runtime.py | 28 ++ src/synthorg/tools/external_api/_signature.py | 80 ++++ src/synthorg/tools/external_api/errors.py | 86 ++++ .../tools/external_api/external_api_tool.py | 427 ++++++++++++++++++ .../tools/external_api/httpx_provider.py | 95 ++++ src/synthorg/tools/external_api/provider.py | 71 +++ .../tools/external_api/provider_factory.py | 37 ++ src/synthorg/tools/permissions.py | 1 + src/synthorg/workers/runtime_builder.py | 77 +++- .../persistence/test_approval_repository.py | 78 ++++ .../test_connection_repositories.py | 18 + tests/e2e/test_external_api_governance_e2e.py | 231 ++++++++++ tests/unit/api/test_approval_store.py | 51 +++ tests/unit/core/test_enums.py | 5 +- tests/unit/hr/test_registry_autonomy.py | 3 + tests/unit/security/test_action_types.py | 3 +- tests/unit/tools/external_api/test_errors.py | 93 ++++ .../external_api/test_external_api_tool.py | 380 ++++++++++++++++ .../tools/external_api/test_httpx_provider.py | 123 +++++ .../unit/tools/external_api/test_signature.py | 62 +++ web/src/__tests__/helpers/factories.ts | 1 + web/src/__tests__/stores/connections.test.ts | 3 + web/src/api/types/enum-values.gen.ts | 2 + web/src/api/types/openapi.gen.ts | 14 +- web/src/mocks/handlers/approvals.ts | 1 + web/src/mocks/handlers/connections.ts | 1 + .../pages/approvals/ApprovalCard.stories.tsx | 1 + .../ApprovalDetailDrawer.stories.tsx | 1 + .../approvals/ApprovalTimeline.stories.tsx | 1 + .../connections/ConnectionCard.stories.tsx | 1 + .../ConnectionFormModal.stories.tsx | 1 + .../pages/connections/ConnectionFormModal.tsx | 19 + .../mcp-catalog/McpInstallWizard.stories.tsx | 1 + .../pages/oauth-apps/OauthAppCard.stories.tsx | 1 + web/src/pages/settings/utils.ts | 1 + web/src/stores/approvals.ts | 1 + web/src/utils/constants.ts | 2 + 70 files changed, 2807 insertions(+), 154 deletions(-) create mode 100644 src/synthorg/observability/events/external_api.py create mode 100644 src/synthorg/persistence/postgres/revisions/20260521000001_external_api_governed_access.sql create mode 100644 src/synthorg/persistence/sqlite/revisions/20260521000001_external_api_governed_access.sql create mode 100644 src/synthorg/settings/definitions/external_api.py create mode 100644 src/synthorg/tools/_dns_pinning.py create mode 100644 src/synthorg/tools/external_api/__init__.py create mode 100644 src/synthorg/tools/external_api/_args.py create mode 100644 src/synthorg/tools/external_api/_credentials.py create mode 100644 src/synthorg/tools/external_api/_runtime.py create mode 100644 src/synthorg/tools/external_api/_signature.py create mode 100644 src/synthorg/tools/external_api/errors.py create mode 100644 src/synthorg/tools/external_api/external_api_tool.py create mode 100644 src/synthorg/tools/external_api/httpx_provider.py create mode 100644 src/synthorg/tools/external_api/provider.py create mode 100644 src/synthorg/tools/external_api/provider_factory.py create mode 100644 tests/e2e/test_external_api_governance_e2e.py create mode 100644 tests/unit/tools/external_api/test_errors.py create mode 100644 tests/unit/tools/external_api/test_external_api_tool.py create mode 100644 tests/unit/tools/external_api/test_httpx_provider.py create mode 100644 tests/unit/tools/external_api/test_signature.py diff --git a/src/synthorg/api/approval_store.py b/src/synthorg/api/approval_store.py index 4b5b8806b9..95cd6832eb 100644 --- a/src/synthorg/api/approval_store.py +++ b/src/synthorg/api/approval_store.py @@ -730,6 +730,56 @@ async def save_if_pending( self._items[item.id] = item return item + async def consume_if_approved( + self, + approval_id: NotBlankStr, + ) -> ApprovalItem | None: + """Atomically mark an APPROVED one-shot grant as consumed. + + Stamps ``consumed_at`` (read through the store clock) iff the + approval is currently APPROVED and not already consumed, so a + single grant authorises exactly one action. The authoritative + compare-and-set runs in the repository when one is configured; + the in-memory cache is updated only after the CAS wins. + + Args: + approval_id: The approval id to consume. + + Returns: + The consumed item on success, or ``None`` when the approval + is missing, not APPROVED, already consumed, or the CAS lost a + concurrent race. + """ + async with self._lock: + current = self._items.get(approval_id) + if current is None and self._repo is not None: + current = await self._repo.get(approval_id) + if current is not None: + self._items[current.id] = current + if current is None: + return None + current = await self._check_expiration_locked(current) + if ( + current.status != ApprovalStatus.APPROVED + or current.consumed_at is not None + ): + return None + consumed_at = self._clock.now() + if self._repo is not None: + won = await self._repo.consume_if_approved( + approval_id, + consumed_at=consumed_at, + ) + if not won: + # The backend rejected the CAS (concurrent consume or + # state drift); drop the stale cache entry so the next + # reader reloads committed truth. + self._items.pop(approval_id, None) + return None + consumed = current.model_copy(update={"consumed_at": consumed_at}) + self._items[approval_id] = consumed + return consumed + async def _invalidate_cache(self, approval_id: str) -> None: """Evict a cache entry, acquiring the lock first. diff --git a/src/synthorg/api/controllers/connections.py b/src/synthorg/api/controllers/connections.py index ab77930823..bb76771521 100644 --- a/src/synthorg/api/controllers/connections.py +++ b/src/synthorg/api/controllers/connections.py @@ -104,6 +104,9 @@ class CreateConnectionRequest(BaseModel): # ``None`` falls back to the global ``integrations.webhook_receipt_retention_days`` # setting; ``0`` opts this connection out of the sweep entirely. webhook_receipt_retention_days: int | None = Field(default=None, ge=0) + # Marks the connection sensitive so the governed external-access + # tool routes every call against it (read or write) to approval. + sensitive: bool = False @field_validator("name") @classmethod @@ -137,6 +140,9 @@ class UpdateConnectionRequest(BaseModel): # an int sets the override, omitting the field keeps the existing # stored value (handled via ``model_fields_set`` below). webhook_receipt_retention_days: int | None = Field(default=None, ge=0) + # Omitting keeps the stored value; setting true/false toggles whether + # external-access calls against this connection require approval. + sensitive: bool | None = None class ConnectionsController(Controller): @@ -242,6 +248,7 @@ async def create_connection( metadata=metadata_copy, health_check_enabled=data.health_check_enabled, webhook_receipt_retention_days=data.webhook_receipt_retention_days, + sensitive=data.sensitive, ) except DuplicateConnectionError as exc: logger.warning( @@ -329,6 +336,11 @@ async def update_connection( if "webhook_receipt_retention_days" in data.model_fields_set else _UNSET ) + sensitive: bool | _UnsetType = ( + bool(data.sensitive) + if "sensitive" in data.model_fields_set and data.sensitive is not None + else _UNSET + ) catalog = state["app_state"].connection_catalog try: conn = await catalog.update( @@ -337,6 +349,7 @@ async def update_connection( metadata=metadata, health_check_enabled=health_check_enabled, webhook_receipt_retention_days=webhook_receipt_retention_days, + sensitive=sensitive, ) except ConnectionNotFoundError as exc: logger.warning( diff --git a/src/synthorg/approval/protocol.py b/src/synthorg/approval/protocol.py index b5c863b735..37e3f03e28 100644 --- a/src/synthorg/approval/protocol.py +++ b/src/synthorg/approval/protocol.py @@ -75,6 +75,21 @@ async def save_if_pending( """Conditionally update an approval item if it is still pending.""" ... + async def consume_if_approved( + self, + approval_id: NotBlankStr, + ) -> ApprovalItem | None: + """Atomically mark an APPROVED one-shot grant as consumed. + + Stamps ``consumed_at`` iff the approval is currently APPROVED and + not already consumed, so a single grant authorises exactly one + action (the governed external-access tool calls this before + egress). Returns the consumed item on success, or ``None`` when + the approval is missing, not APPROVED, already consumed, or a + concurrent consume won the race. + """ + ... + @runtime_checkable class SyncResettableApprovalStore(Protocol): diff --git a/src/synthorg/core/approval.py b/src/synthorg/core/approval.py index 99080e81d3..b3610963b8 100644 --- a/src/synthorg/core/approval.py +++ b/src/synthorg/core/approval.py @@ -49,6 +49,12 @@ class ApprovalItem(BaseModel): Defaults to ``REVIEW_GATE``; the two park producers (SecOps escalation and the ``request_human_approval`` tool) set ``PARKED_CONTEXT``. + consumed_at: When an APPROVED one-shot grant was spent. ``None`` + until consumed. The governed external-access tool sets this + via an atomic compare-and-set (``consume_if_approved``) + before egress so the same approval cannot authorise a second + call; the approval keeps ``status == APPROVED`` because + consumption is orthogonal to the decision lifecycle. metadata: Additional key-value metadata. """ @@ -68,6 +74,7 @@ class ApprovalItem(BaseModel): decided_by: NotBlankStr | None = None decision_reason: NotBlankStr | None = None task_id: NotBlankStr | None = None + consumed_at: AwareDatetime | None = None evidence_package: EvidencePackage | None = Field( default=None, description="Structured evidence for HITL approval", diff --git a/src/synthorg/core/enums.py b/src/synthorg/core/enums.py index 06198222d0..74a6111d42 100644 --- a/src/synthorg/core/enums.py +++ b/src/synthorg/core/enums.py @@ -474,6 +474,7 @@ class ToolCategory(StrEnum): ONTOLOGY = "ontology" MCP = "mcp" BROWSER = "browser" + EXTERNAL_DATA = "external_data" OTHER = "other" @@ -581,6 +582,7 @@ class ActionType(StrEnum): BROWSER_DIFF = "browser:diff" BROWSER_ACCESSIBILITY_SCAN = "browser:accessibility_scan" BROWSER_SPEC = "browser:spec" + EXTERNAL_DATA_REQUEST = "external_data:request" class MergeOrder(StrEnum): diff --git a/src/synthorg/engine/_security_factory.py b/src/synthorg/engine/_security_factory.py index 1d1c0f97db..159a2ec716 100644 --- a/src/synthorg/engine/_security_factory.py +++ b/src/synthorg/engine/_security_factory.py @@ -41,6 +41,7 @@ from synthorg.providers.routing.resolver import ModelResolver from synthorg.security.autonomy.models import EffectiveAutonomy from synthorg.security.protocol import SecurityInterceptionStrategy + from synthorg.tools.external_api._runtime import ExternalApiRuntime from synthorg.tools.registry import ToolRegistry logger = get_logger(__name__) @@ -279,3 +280,46 @@ def registry_with_approval_tool( ) existing = list(tool_registry.all_tools()) return _ToolRegistry([*existing, approval_tool]) + + +def registry_with_external_api_tool( # noqa: PLR0913 -- run-scoped wiring inputs + tool_registry: ToolRegistry, + runtime: ExternalApiRuntime | None, + approval_store: ApprovalStoreProtocol | None, + identity: AgentIdentity, + task_id: str | None = None, + effective_autonomy: EffectiveAutonomy | None = None, +) -> ToolRegistry: + """Add the governed external-access tool when its runtime is wired. + + Returns the registry unchanged when no runtime bundle is present (the + feature is disabled or no connection catalog is configured) or when no + approval store is available (sensitive calls could not be gated). The + tool is run-scoped: it binds the run's identity, task, and effective + autonomy alongside the boot-scoped catalog / provider / policy. + """ + if runtime is None or approval_store is None: + return tool_registry + + from synthorg.tools.external_api.external_api_tool import ( # noqa: PLC0415 + ExternalApiTool, + ) + from synthorg.tools.registry import ( # noqa: PLC0415 + ToolRegistry as _ToolRegistry, + ) + + external_api_tool = ExternalApiTool( + connection_catalog=runtime.connection_catalog, + approval_store=approval_store, + provider=runtime.provider, + agent_id=str(identity.id), + task_id=task_id, + network_policy=runtime.network_policy, + effective_autonomy=effective_autonomy, + risk_classifier=DefaultRiskTierClassifier(), + max_response_bytes=runtime.max_response_bytes, + timeout_seconds=runtime.timeout_seconds, + default_max_rpm=runtime.default_max_rpm, + ) + existing = list(tool_registry.all_tools()) + return _ToolRegistry([*existing, external_api_tool]) diff --git a/src/synthorg/engine/agent_engine.py b/src/synthorg/engine/agent_engine.py index ca17916f2d..61aa7bc1ef 100644 --- a/src/synthorg/engine/agent_engine.py +++ b/src/synthorg/engine/agent_engine.py @@ -110,6 +110,7 @@ from synthorg.security.config import SecurityConfig from synthorg.security.trust.service import TrustService from synthorg.settings.resolver import ConfigResolver + from synthorg.tools.external_api._runtime import ExternalApiRuntime from synthorg.tools.invocation_tracker import ToolInvocationTracker from synthorg.tools.protocol import ToolInvokerProtocol from synthorg.tools.registry import ToolRegistry @@ -198,6 +199,7 @@ def __init__( # noqa: PLR0913, PLR0915 event_stream_hub: EventStreamHub | None = None, interrupt_store: InterruptStore | None = None, approval_interrupt_timeout_seconds: float | None = None, + external_api_runtime: ExternalApiRuntime | None = None, clock: Clock | None = None, ) -> None: self._agent_middleware_chain = agent_middleware_chain @@ -217,6 +219,7 @@ def __init__( # noqa: PLR0913, PLR0915 self._provider_configs = provider_configs self._model_resolver = model_resolver self._approval_store = approval_store + self._external_api_runtime = external_api_runtime self._parked_context_repo = parked_context_repo self._cost_forecast_repo = cost_forecast_repo # The boot path constructs one ApprovalGate (backed by the diff --git a/src/synthorg/engine/agent_engine_factories.py b/src/synthorg/engine/agent_engine_factories.py index 67751b6bf6..8e9b91fe6b 100644 --- a/src/synthorg/engine/agent_engine_factories.py +++ b/src/synthorg/engine/agent_engine_factories.py @@ -5,6 +5,7 @@ from synthorg.engine._security_factory import ( make_security_interceptor, registry_with_approval_tool, + registry_with_external_api_tool, ) from synthorg.engine.approval_gate import ApprovalGate from synthorg.engine.loop_selector import ( @@ -40,6 +41,7 @@ class AgentEngineFactoriesMixin: """Mixin providing approval-gate, loop, and tool-invoker factories.""" _approval_store: Any + _external_api_runtime: Any _parked_context_repo: Any _event_stream_hub: Any _interrupt_store: Any @@ -243,6 +245,14 @@ def _make_tool_invoker( identity, task_id=task_id, ) + registry = registry_with_external_api_tool( + registry, + self._external_api_runtime, + self._approval_store, + identity, + task_id=task_id, + effective_autonomy=effective_autonomy, + ) if self._memory_injection_strategy is not None: from synthorg.memory.tools import ( # noqa: PLC0415 registry_with_memory_tools, diff --git a/src/synthorg/integrations/connections/catalog.py b/src/synthorg/integrations/connections/catalog.py index 2ae7d0d343..c293d65270 100644 --- a/src/synthorg/integrations/connections/catalog.py +++ b/src/synthorg/integrations/connections/catalog.py @@ -191,6 +191,7 @@ def _build_connection( # noqa: PLR0913 metadata: dict[str, str] | None, health_check_enabled: bool, webhook_receipt_retention_days: int | None, + sensitive: bool = False, ) -> Connection: """Build and validate the ``Connection`` model BEFORE secret writes. @@ -214,6 +215,7 @@ def _build_connection( # noqa: PLR0913 health_check_enabled=health_check_enabled, metadata=metadata or {}, webhook_receipt_retention_days=webhook_receipt_retention_days, + sensitive=sensitive, created_at=now, updated_at=now, ) @@ -315,6 +317,7 @@ async def create( # noqa: PLR0913 metadata: dict[str, str] | None = None, health_check_enabled: bool = True, webhook_receipt_retention_days: int | None = None, + sensitive: bool = False, ) -> Connection: """Create a new connection. @@ -333,6 +336,9 @@ async def create( # noqa: PLR0913 for the webhook-receipt retention window (days). ``None`` falls back to the global default; ``0`` opts out of the cleanup sweep entirely. + sensitive: Marks the connection sensitive so the governed + external-access tool routes every call against it to + human approval. Returns: The persisted connection. @@ -364,6 +370,7 @@ async def create( # noqa: PLR0913 metadata=metadata, health_check_enabled=health_check_enabled, webhook_receipt_retention_days=webhook_receipt_retention_days, + sensitive=sensitive, ) await self._store_secret(secret_id, credentials, connection_name=name) await self._persist_connection_with_cleanup( @@ -457,6 +464,7 @@ def _build_update_candidate( metadata: dict[str, str] | None | _UnsetType, health_check_enabled: bool | None | _UnsetType, webhook_receipt_retention_days: int | None | _UnsetType, + sensitive: bool | _UnsetType, ) -> dict[str, object]: """Compose the PATCH candidate dict, normalising explicit nulls. @@ -490,9 +498,11 @@ def _build_update_candidate( # per-connection override and falls back to the global # default. Pass through verbatim. candidate["webhook_receipt_retention_days"] = webhook_receipt_retention_days + if sensitive is not _UNSET: + candidate["sensitive"] = sensitive return candidate - async def update( + async def update( # noqa: PLR0913 -- one kwarg per independently-patchable field self, name: str, *, @@ -500,6 +510,7 @@ async def update( metadata: dict[str, str] | None | _UnsetType = _UNSET, health_check_enabled: bool | None | _UnsetType = _UNSET, webhook_receipt_retention_days: int | None | _UnsetType = _UNSET, + sensitive: bool | _UnsetType = _UNSET, ) -> Connection: """Update a connection's mutable fields. @@ -526,6 +537,7 @@ async def update( metadata=metadata, health_check_enabled=health_check_enabled, webhook_receipt_retention_days=webhook_receipt_retention_days, + sensitive=sensitive, ) except MemoryError, RecursionError: raise diff --git a/src/synthorg/integrations/connections/models.py b/src/synthorg/integrations/connections/models.py index e8fd2e410c..114fe728f6 100644 --- a/src/synthorg/integrations/connections/models.py +++ b/src/synthorg/integrations/connections/models.py @@ -101,6 +101,9 @@ class Connection(BaseModel): webhook-receipt retention window (days). ``None`` falls back to ``integrations.webhook_receipt_retention_days``; ``0`` disables sweeping for this connection's receipts. + sensitive: Marks the connection as sensitive so the governed + external-access tool routes every call against it (read or + write) to human approval, not just write methods. created_at: Creation timestamp. updated_at: Last modification timestamp. """ @@ -119,6 +122,7 @@ class Connection(BaseModel): health_check_enabled: bool = True health_status: ConnectionStatus = ConnectionStatus.UNKNOWN last_health_check_at: AwareDatetime | None = None + sensitive: bool = False metadata: dict[str, str] = Field(default_factory=dict) webhook_receipt_retention_days: WebhookRetentionDays = Field( default=None, diff --git a/src/synthorg/integrations/health/checks/generic_http.py b/src/synthorg/integrations/health/checks/generic_http.py index c982586b37..be57d1e804 100644 --- a/src/synthorg/integrations/health/checks/generic_http.py +++ b/src/synthorg/integrations/health/checks/generic_http.py @@ -1,10 +1,8 @@ """Generic HTTP health check.""" -import ssl from datetime import UTC, datetime -from typing import TYPE_CHECKING, Final, cast +from typing import Final -import httpcore import httpx from synthorg.core.clock import Clock, SystemClock @@ -18,17 +16,13 @@ HEALTH_CHECK_FAILED, HEALTH_CHECK_PASSED, ) +from synthorg.tools._dns_pinning import PinnedDnsTransport from synthorg.tools.network_validator import ( DnsValidationOk, NetworkPolicy, validate_url_host, ) -if TYPE_CHECKING: - from collections.abc import AsyncIterable, AsyncIterator, Iterable - - from httpcore._backends.base import SOCKET_OPTION - logger = get_logger(__name__) _TIMEOUT: Final[float] = 10.0 @@ -37,127 +31,6 @@ _NOT_IMPLEMENTED: Final[int] = 501 -class _PinnedDnsBackend(httpcore.AsyncNetworkBackend): - """httpcore network backend that pins a hostname to a validated IP. - - Closes the DNS-rebinding TOCTOU window between - :func:`validate_url_host` and the actual TCP connect: the backend - intercepts ``connect_tcp`` and substitutes the validated IP for the - request's hostname before delegating to the inner backend. Because - httpcore passes ``server_hostname`` to ``start_tls`` separately from - the ``host`` arg of ``connect_tcp``, the TLS SNI and certificate - verification still use the original hostname -- no custom SSL - context required. - """ - - def __init__( - self, - inner: httpcore.AsyncNetworkBackend, - *, - hostname: str, - ip: str, - ) -> None: - self._inner = inner - self._hostname = hostname.lower() - self._ip = ip - - async def connect_tcp( - self, - host: str, - port: int, - timeout: float | None = None, # noqa: ASYNC109 -- AsyncNetworkBackend interface - local_address: str | None = None, - socket_options: Iterable[SOCKET_OPTION] | None = None, - ) -> httpcore.AsyncNetworkStream: - target = self._ip if host.lower() == self._hostname else host - return await self._inner.connect_tcp( - target, - port, - timeout=timeout, - local_address=local_address, - socket_options=socket_options, - ) - - async def connect_unix_socket( - self, - path: str, - timeout: float | None = None, # noqa: ASYNC109 -- AsyncNetworkBackend interface - socket_options: Iterable[SOCKET_OPTION] | None = None, - ) -> httpcore.AsyncNetworkStream: - return await self._inner.connect_unix_socket( - path, - timeout=timeout, - socket_options=socket_options, - ) - - async def sleep(self, seconds: float) -> None: - await self._inner.sleep(seconds) - - -class _PinnedDnsTransport(httpx.AsyncBaseTransport): - """httpx transport whose underlying pool uses a hostname-pinned backend. - - Constructed only when there is a hostname-to-IP pinning to apply. - Calls without a matching hostname fall through to the inner backend - unchanged, so this transport is safe to use on any URL. - """ - - def __init__(self, *, hostname: str, ip: str) -> None: - self._pool = httpcore.AsyncConnectionPool( - ssl_context=ssl.create_default_context(), - network_backend=_PinnedDnsBackend( - httpcore.AnyIOBackend(), - hostname=hostname, - ip=ip, - ), - ) - - async def handle_async_request(self, request: httpx.Request) -> httpx.Response: - if not isinstance(request.stream, httpx.AsyncByteStream): - msg = "Pinned-DNS transport requires an async byte stream" - raise TypeError(msg) - req = httpcore.Request( - method=request.method, - url=httpcore.URL( - scheme=request.url.raw_scheme, - host=request.url.raw_host, - port=request.url.port, - target=request.url.raw_path, - ), - headers=request.headers.raw, - content=request.stream, - extensions=request.extensions, - ) - resp = await self._pool.handle_async_request(req) - return httpx.Response( - status_code=resp.status, - headers=resp.headers, - stream=_PinnedDnsResponseStream( - cast("AsyncIterable[bytes]", resp.stream), - ), - extensions=resp.extensions, - ) - - async def aclose(self) -> None: - await self._pool.aclose() - - -class _PinnedDnsResponseStream(httpx.AsyncByteStream): - """Forwarding wrapper that adapts an httpcore async stream to httpx.""" - - def __init__(self, inner: AsyncIterable[bytes]) -> None: - self._inner = inner - - async def __aiter__(self) -> AsyncIterator[bytes]: - async for part in self._inner: - yield part - - async def aclose(self) -> None: - aclose = getattr(self._inner, "aclose", None) - if aclose is not None: - await aclose() - - class GenericHttpHealthCheck: """Health check via HTTP HEAD to the connection's base URL. @@ -227,7 +100,7 @@ async def check(self, connection: Connection) -> HealthReport: # behaviour minus the rebinding window. transport: httpx.AsyncBaseTransport | None = None if validation.resolved_ips: - transport = _PinnedDnsTransport( + transport = PinnedDnsTransport( hostname=validation.hostname, ip=validation.resolved_ips[0], ) diff --git a/src/synthorg/observability/events/external_api.py b/src/synthorg/observability/events/external_api.py new file mode 100644 index 0000000000..0f48cf8b95 --- /dev/null +++ b/src/synthorg/observability/events/external_api.py @@ -0,0 +1,12 @@ +"""Event constants for the governed external-access tool.""" + +from typing import Final + +EXTERNAL_API_CALL_STARTED: Final[str] = "external_api.call.started" +EXTERNAL_API_CALL_SUCCEEDED: Final[str] = "external_api.call.succeeded" +EXTERNAL_API_CALL_FAILED: Final[str] = "external_api.call.failed" +EXTERNAL_API_APPROVAL_REQUIRED: Final[str] = "external_api.approval.required" +EXTERNAL_API_APPROVAL_CONSUMED: Final[str] = "external_api.approval.consumed" +EXTERNAL_API_SIGNATURE_MISMATCH: Final[str] = "external_api.signature.mismatch" +EXTERNAL_API_EGRESS_BLOCKED: Final[str] = "external_api.egress.blocked" +EXTERNAL_API_RATE_LIMITED: Final[str] = "external_api.rate_limited" diff --git a/src/synthorg/persistence/approval_protocol.py b/src/synthorg/persistence/approval_protocol.py index 0a632d9da4..d2800ab04b 100644 --- a/src/synthorg/persistence/approval_protocol.py +++ b/src/synthorg/persistence/approval_protocol.py @@ -26,6 +26,7 @@ if TYPE_CHECKING: from collections.abc import Sequence + from datetime import datetime from typing_extensions import TypedDict @@ -59,7 +60,8 @@ class ApprovalRepository( Composes :class:`StatefulRepository` + :class:`FilteredQueryRepository` (ADR-0001). Bespoke per D7: :meth:`save_many` and :meth:`expire_if_pending` are performance optimisations for bulk operations; :meth:`get_many` is a - batch-fetch optimisation. + batch-fetch optimisation; :meth:`consume_if_approved` enforces the one-shot + domain invariant for governed external-access grants. All methods are async; non-recoverable errors (``MemoryError``, ``RecursionError``) propagate to callers. Constraint violations @@ -128,6 +130,33 @@ async def transition_if( """ ... + async def consume_if_approved( + self, + approval_id: NotBlankStr, + *, + consumed_at: datetime, + ) -> bool: + """Atomic CAS: mark an APPROVED grant as consumed (D7 bespoke). + + Sets ``consumed_at`` iff the row is currently ``approved`` and not + already consumed, enforcing the one-shot domain invariant for + governed external-access approvals: a single grant authorises + exactly one egress. Returns ``True`` iff this call won the race; + ``False`` on replay (already consumed), state mismatch (not + approved), or missing row. + + Args: + approval_id: The approval id to consume. + consumed_at: Aware UTC timestamp to stamp on success. + + Returns: + ``True`` iff the grant was consumed by this call. + + Raises: + QueryError: On database errors. + """ + ... + async def expire_if_pending( self, ids: Sequence[NotBlankStr] ) -> tuple[NotBlankStr, ...]: diff --git a/src/synthorg/persistence/postgres/approval_repo.py b/src/synthorg/persistence/postgres/approval_repo.py index 2659b08874..c55be36ea7 100644 --- a/src/synthorg/persistence/postgres/approval_repo.py +++ b/src/synthorg/persistence/postgres/approval_repo.py @@ -11,6 +11,7 @@ structurally. """ +from datetime import datetime # noqa: TC003 -- runtime param type from typing import TYPE_CHECKING, Any import psycopg @@ -45,12 +46,12 @@ _SELECT_COLS = ( "id, action_type, title, description, requested_by, risk_level, " "source, status, created_at, expires_at, decided_at, decided_by, " - "decision_reason, task_id, evidence_package, metadata" + "decision_reason, task_id, evidence_package, metadata, consumed_at" ) _APPROVALS_UPSERT_SQL = f""" INSERT INTO approvals ({_SELECT_COLS}) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (id) DO UPDATE SET action_type = EXCLUDED.action_type, title = EXCLUDED.title, @@ -65,7 +66,8 @@ decision_reason = EXCLUDED.decision_reason, task_id = EXCLUDED.task_id, evidence_package = EXCLUDED.evidence_package, - metadata = EXCLUDED.metadata + metadata = EXCLUDED.metadata, + consumed_at = EXCLUDED.consumed_at """ # noqa: S608 -- column list is compile-time constant @@ -113,6 +115,11 @@ def _row_to_item(row: dict[str, Any]) -> ApprovalItem: if row["decided_at"] is not None else None ) + consumed_at = ( + coerce_row_timestamp(row["consumed_at"]) + if row["consumed_at"] is not None + else None + ) return ApprovalItem( id=str(row["id"]), action_type=str(row["action_type"]), @@ -134,6 +141,7 @@ def _row_to_item(row: dict[str, Any]) -> ApprovalItem: else None ), task_id=(str(row["task_id"]) if row["task_id"] is not None else None), + consumed_at=consumed_at, evidence_package=evidence_package, metadata=metadata_raw, ) @@ -195,6 +203,7 @@ async def save(self, item: ApprovalItem) -> None: item.task_id, evidence_json, Jsonb(item.metadata), + item.consumed_at, ) try: async with self._pool.connection() as conn, conn.cursor() as cur: @@ -267,6 +276,7 @@ async def save_many(self, items: Sequence[ApprovalItem]) -> None: item.task_id, evidence_json, Jsonb(item.metadata), + item.consumed_at, ), ) try: @@ -622,6 +632,51 @@ async def transition_if( raise QueryError(msg) from exc return updated + async def consume_if_approved( + self, + approval_id: NotBlankStr, + *, + consumed_at: datetime, + ) -> bool: + """Atomic compare-and-set: mark an APPROVED grant as consumed. + + Sets ``consumed_at`` iff the row is currently ``approved`` and not + already consumed, so a one-shot approval can authorise exactly one + action. Returns ``True`` iff this call won the race (rowcount == 1); + ``False`` on replay (already consumed), state mismatch (not + approved), or missing row. + + Args: + approval_id: The approval id. + consumed_at: Aware UTC timestamp to stamp on success. + + Returns: + ``True`` iff the grant was consumed by this call. + + Raises: + QueryError: On database errors. + """ + sql = ( + "UPDATE approvals SET consumed_at = %s " + "WHERE id = %s AND status = %s AND consumed_at IS NULL" + ) + params = (consumed_at, approval_id, ApprovalStatus.APPROVED.value) + try: + async with self._pool.connection() as conn, conn.cursor() as cur: + await cur.execute(sql, params) + consumed = cur.rowcount > 0 + await conn.commit() + except psycopg.Error as exc: + msg = f"Failed to consume approval {approval_id!r}" + logger.warning( + API_APPROVAL_REPO_FAILED, + approval_id=approval_id, + error_type=type(exc).__name__, + error=safe_error_description(exc), + ) + raise QueryError(msg) from exc + return consumed + async def delete(self, approval_id: NotBlankStr) -> bool: """Delete an approval item; returns True when a row was removed. diff --git a/src/synthorg/persistence/postgres/connection_repo.py b/src/synthorg/persistence/postgres/connection_repo.py index 5ae509f7af..6f82b5ac80 100644 --- a/src/synthorg/persistence/postgres/connection_repo.py +++ b/src/synthorg/persistence/postgres/connection_repo.py @@ -52,7 +52,7 @@ "name, connection_type, auth_method, base_url, secret_refs_json, " "rate_limit_rpm, rate_limit_concurrent, health_check_enabled, " "health_status, last_health_check_at, metadata_json, " - "webhook_receipt_retention_days, created_at, updated_at" + "webhook_receipt_retention_days, sensitive, created_at, updated_at" ) @@ -73,6 +73,7 @@ def _row_to_connection(row: dict[str, Any]) -> Connection: ) last_health_check_at = row.get("last_health_check_at") retention = row.get("webhook_receipt_retention_days") + sensitive = bool(row.get("sensitive", False)) return Connection( name=NotBlankStr(row["name"]), connection_type=ConnectionType(row["connection_type"]), @@ -87,6 +88,7 @@ def _row_to_connection(row: dict[str, Any]) -> Connection: ), metadata=metadata, webhook_receipt_retention_days=safe_int(retention, default=None), + sensitive=sensitive, created_at=coerce_row_timestamp(row["created_at"]), updated_at=coerce_row_timestamp(row["updated_at"]), ) @@ -131,6 +133,7 @@ async def save(self, connection: Connection) -> None: ), Jsonb(connection.metadata), connection.webhook_receipt_retention_days, + connection.sensitive, normalize_utc(connection.created_at), normalize_utc(connection.updated_at), ) @@ -143,10 +146,10 @@ async def save(self, connection: Connection) -> None: secret_refs_json, rate_limit_rpm, rate_limit_concurrent, health_check_enabled, health_status, last_health_check_at, metadata_json, - webhook_receipt_retention_days, + webhook_receipt_retention_days, sensitive, created_at, updated_at ) VALUES ( - %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) ON CONFLICT (name) DO UPDATE SET connection_type = EXCLUDED.connection_type, @@ -161,6 +164,7 @@ async def save(self, connection: Connection) -> None: metadata_json = EXCLUDED.metadata_json, webhook_receipt_retention_days = EXCLUDED.webhook_receipt_retention_days, + sensitive = EXCLUDED.sensitive, updated_at = EXCLUDED.updated_at """, params, diff --git a/src/synthorg/persistence/postgres/revisions/20260521000001_external_api_governed_access.sql b/src/synthorg/persistence/postgres/revisions/20260521000001_external_api_governed_access.sql new file mode 100644 index 0000000000..06dd109059 --- /dev/null +++ b/src/synthorg/persistence/postgres/revisions/20260521000001_external_api_governed_access.sql @@ -0,0 +1,19 @@ +-- depends: 20260519000001_conversational_intake 20260519000001_project_workspaces + +-- Governed external API/data access (#1991). +-- +-- connections.sensitive: marks a connection so the governed +-- external-access tool routes every call against it (read or write) to +-- human approval, not only write methods. Existing rows default to +-- FALSE (non-sensitive). +-- +-- approvals.consumed_at: records when an APPROVED one-shot grant was +-- spent. The external-access tool sets it via an atomic compare-and-set +-- (consume_if_approved) before egress so the same approval cannot +-- authorise a second call. NULL until consumed; the row keeps +-- status='approved' because consumption is orthogonal to the +-- approve/reject/expire decision lifecycle. + +ALTER TABLE connections ADD COLUMN sensitive BOOLEAN NOT NULL DEFAULT FALSE; + +ALTER TABLE approvals ADD COLUMN consumed_at TIMESTAMPTZ; diff --git a/src/synthorg/persistence/postgres/schema.sql b/src/synthorg/persistence/postgres/schema.sql index a6f1bb1589..e6b40963aa 100644 --- a/src/synthorg/persistence/postgres/schema.sql +++ b/src/synthorg/persistence/postgres/schema.sql @@ -972,6 +972,7 @@ CREATE TABLE connections ( webhook_receipt_retention_days IS NULL OR webhook_receipt_retention_days >= 0 ), + sensitive BOOLEAN NOT NULL DEFAULT FALSE, created_at TIMESTAMPTZ NOT NULL, updated_at TIMESTAMPTZ NOT NULL ); @@ -1209,6 +1210,7 @@ CREATE TABLE approvals ( task_id TEXT REFERENCES tasks(id), evidence_package JSONB, metadata JSONB NOT NULL DEFAULT '{}'::jsonb, + consumed_at TIMESTAMPTZ, CHECK ( (decided_at IS NULL AND decided_by IS NULL) OR (decided_at IS NOT NULL AND decided_by IS NOT NULL) diff --git a/src/synthorg/persistence/sqlite/approval_repo.py b/src/synthorg/persistence/sqlite/approval_repo.py index edd6d0c292..7a91056e34 100644 --- a/src/synthorg/persistence/sqlite/approval_repo.py +++ b/src/synthorg/persistence/sqlite/approval_repo.py @@ -2,6 +2,7 @@ import json import sqlite3 +from datetime import datetime # noqa: TC003 -- runtime param type from typing import TYPE_CHECKING import aiosqlite @@ -40,8 +41,8 @@ id, action_type, title, description, requested_by, risk_level, source, status, created_at, expires_at, decided_at, decided_by, decision_reason, - task_id, evidence_package, metadata - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + task_id, evidence_package, metadata, consumed_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET action_type = excluded.action_type, title = excluded.title, @@ -56,7 +57,8 @@ decision_reason = excluded.decision_reason, task_id = excluded.task_id, evidence_package = excluded.evidence_package, - metadata = excluded.metadata + metadata = excluded.metadata, + consumed_at = excluded.consumed_at """ @@ -140,6 +142,11 @@ def _row_to_item(row: Row) -> ApprovalItem: else None ), task_id=(str(row["task_id"]) if row["task_id"] is not None else None), + consumed_at=( + coerce_row_timestamp(row["consumed_at"]) + if row["consumed_at"] is not None + else None + ), evidence_package=( EvidencePackage.model_validate_json(str(row["evidence_package"])) if row["evidence_package"] is not None @@ -226,6 +233,7 @@ async def save(self, item: ApprovalItem) -> None: item.task_id, evidence_json, json.dumps(item.metadata), + format_iso_utc(item.consumed_at) if item.consumed_at else None, ) async with self._write_context(): try: @@ -292,6 +300,7 @@ async def save_many(self, items: Sequence[ApprovalItem]) -> None: item.task_id, evidence_json, json.dumps(item.metadata), + format_iso_utc(item.consumed_at) if item.consumed_at else None, ), ) async with self._write_context(): @@ -398,7 +407,7 @@ async def get(self, approval_id: NotBlankStr) -> ApprovalItem | None: SELECT id, action_type, title, description, requested_by, risk_level, source, status, created_at, expires_at, decided_at, decided_by, decision_reason, - task_id, evidence_package, metadata + task_id, evidence_package, metadata, consumed_at FROM approvals WHERE id = ? """ try: @@ -435,7 +444,7 @@ async def get_many(self, ids: Sequence[NotBlankStr]) -> tuple[ApprovalItem, ...] SELECT id, action_type, title, description, requested_by, risk_level, source, status, created_at, expires_at, decided_at, decided_by, decision_reason, - task_id, evidence_package, metadata + task_id, evidence_package, metadata, consumed_at FROM approvals WHERE id IN ({placeholders}) """ # noqa: S608 -- placeholders is a closed-set "?,?,..." pattern try: @@ -487,7 +496,7 @@ async def list_items( SELECT id, action_type, title, description, requested_by, risk_level, source, status, created_at, expires_at, decided_at, decided_by, decision_reason, - task_id, evidence_package, metadata + task_id, evidence_package, metadata, consumed_at FROM approvals ORDER BY created_at DESC, id DESC LIMIT ? OFFSET ? @@ -556,7 +565,7 @@ async def query( SELECT id, action_type, title, description, requested_by, risk_level, source, status, created_at, expires_at, decided_at, decided_by, decision_reason, - task_id, evidence_package, metadata + task_id, evidence_package, metadata, consumed_at FROM approvals WHERE {where} ORDER BY created_at DESC, id DESC LIMIT ? OFFSET ? @@ -671,6 +680,59 @@ async def transition_if( raise QueryError(msg) from exc return cursor.rowcount > 0 + async def consume_if_approved( + self, + approval_id: NotBlankStr, + *, + consumed_at: datetime, + ) -> bool: + """Atomic compare-and-set: mark an APPROVED grant as consumed. + + Sets ``consumed_at`` iff the row is currently ``approved`` and not + already consumed, so a one-shot approval can authorise exactly one + action. Returns ``True`` iff this call won the race (rowcount == 1); + ``False`` on replay (already consumed), state mismatch (not + approved), or missing row. + + Args: + approval_id: The approval id. + consumed_at: Aware UTC timestamp to stamp on success. + + Returns: + ``True`` iff the grant was consumed by this call. + + Raises: + QueryError: On database errors. + """ + sql = ( + "UPDATE approvals SET consumed_at = ? " + "WHERE id = ? AND status = ? AND consumed_at IS NULL" + ) + params = ( + format_iso_utc(consumed_at), + approval_id, + ApprovalStatus.APPROVED.value, + ) + async with self._write_context(): + try: + cursor = await self._db.execute(sql, params) + await self._db.commit() + except (sqlite3.Error, aiosqlite.Error) as exc: + await _safe_rollback( + self._db, + operation="consume_if_approved", + approval_id=approval_id, + ) + msg = f"Failed to consume approval {approval_id!r}" + logger.warning( + API_APPROVAL_REPO_FAILED, + approval_id=approval_id, + error_type=type(exc).__name__, + error=safe_error_description(exc), + ) + raise QueryError(msg) from exc + return cursor.rowcount > 0 + async def delete(self, approval_id: NotBlankStr) -> bool: """Delete an approval item by ID. diff --git a/src/synthorg/persistence/sqlite/connection_repo.py b/src/synthorg/persistence/sqlite/connection_repo.py index 6ee144ed27..15fd2c49d6 100644 --- a/src/synthorg/persistence/sqlite/connection_repo.py +++ b/src/synthorg/persistence/sqlite/connection_repo.py @@ -51,7 +51,7 @@ "name, connection_type, auth_method, base_url, secret_refs_json, " "rate_limit_rpm, rate_limit_concurrent, health_check_enabled, " "health_status, last_health_check_at, metadata_json, " - "webhook_receipt_retention_days, created_at, updated_at" + "webhook_receipt_retention_days, sensitive, created_at, updated_at" ) @@ -70,6 +70,7 @@ def _row_to_connection(row: aiosqlite.Row | tuple[Any, ...]) -> Connection: last_health_check_at, metadata_json, webhook_receipt_retention_days, + sensitive, created_at, updated_at, ) = row @@ -102,6 +103,7 @@ def _row_to_connection(row: aiosqlite.Row | tuple[Any, ...]) -> Connection: if webhook_receipt_retention_days is not None else None ), + sensitive=bool(sensitive), created_at=coerce_row_timestamp(created_at), updated_at=coerce_row_timestamp(updated_at), ) @@ -152,9 +154,9 @@ async def save(self, connection: Connection) -> None: secret_refs_json, rate_limit_rpm, rate_limit_concurrent, health_check_enabled, health_status, last_health_check_at, metadata_json, - webhook_receipt_retention_days, + webhook_receipt_retention_days, sensitive, created_at, updated_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(name) DO UPDATE SET connection_type = excluded.connection_type, auth_method = excluded.auth_method, @@ -168,6 +170,7 @@ async def save(self, connection: Connection) -> None: metadata_json = excluded.metadata_json, webhook_receipt_retention_days = excluded.webhook_receipt_retention_days, + sensitive = excluded.sensitive, updated_at = excluded.updated_at """, ( @@ -183,6 +186,7 @@ async def save(self, connection: Connection) -> None: last_health_check_at_iso, metadata_json, connection.webhook_receipt_retention_days, + 1 if connection.sensitive else 0, created_at_iso, updated_at_iso, ), diff --git a/src/synthorg/persistence/sqlite/revisions/20260521000001_external_api_governed_access.sql b/src/synthorg/persistence/sqlite/revisions/20260521000001_external_api_governed_access.sql new file mode 100644 index 0000000000..de3af82402 --- /dev/null +++ b/src/synthorg/persistence/sqlite/revisions/20260521000001_external_api_governed_access.sql @@ -0,0 +1,21 @@ +-- depends: 20260519000001_conversational_intake 20260519000001_project_workspaces + +-- Governed external API/data access (#1991). +-- +-- connections.sensitive: marks a connection so the governed +-- external-access tool routes every call against it (read or write) to +-- human approval, not only write methods. Existing rows default to 0 +-- (non-sensitive). +-- +-- approvals.consumed_at: records when an APPROVED one-shot grant was +-- spent. The external-access tool sets it via an atomic compare-and-set +-- (consume_if_approved) before egress so the same approval cannot +-- authorise a second call. NULL until consumed; the row keeps +-- status='approved' because consumption is orthogonal to the +-- approve/reject/expire decision lifecycle. + +ALTER TABLE connections ADD COLUMN sensitive INTEGER NOT NULL DEFAULT 0 + CHECK (sensitive IN (0, 1)); + +ALTER TABLE approvals ADD COLUMN consumed_at TEXT + CHECK (consumed_at IS NULL OR consumed_at LIKE '%+00:00' OR consumed_at LIKE '%Z'); diff --git a/src/synthorg/persistence/sqlite/schema.sql b/src/synthorg/persistence/sqlite/schema.sql index 64a46c6a06..763003c1fc 100644 --- a/src/synthorg/persistence/sqlite/schema.sql +++ b/src/synthorg/persistence/sqlite/schema.sql @@ -967,6 +967,7 @@ CREATE TABLE connections ( webhook_receipt_retention_days IS NULL OR webhook_receipt_retention_days >= 0 ), + sensitive INTEGER NOT NULL DEFAULT 0 CHECK(sensitive IN (0, 1)), created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); @@ -1141,6 +1142,9 @@ CREATE TABLE approvals ( task_id TEXT CONSTRAINT fk_approvals_task_id REFERENCES tasks(id), evidence_package TEXT, metadata TEXT NOT NULL DEFAULT '{}', + consumed_at TEXT CHECK( + consumed_at IS NULL OR consumed_at LIKE '%+00:00' OR consumed_at LIKE '%Z' + ), CHECK( (decided_at IS NULL AND decided_by IS NULL) OR (decided_at IS NOT NULL AND decided_by IS NOT NULL) diff --git a/src/synthorg/security/action_type_mapping.py b/src/synthorg/security/action_type_mapping.py index 05a4ba8ec2..9dd1993dc5 100644 --- a/src/synthorg/security/action_type_mapping.py +++ b/src/synthorg/security/action_type_mapping.py @@ -28,6 +28,7 @@ ToolCategory.ONTOLOGY: ActionType.MEMORY_READ, ToolCategory.MCP: ActionType.CODE_WRITE, ToolCategory.BROWSER: ActionType.BROWSER_NAVIGATE, + ToolCategory.EXTERNAL_DATA: ActionType.EXTERNAL_DATA_REQUEST, ToolCategory.OTHER: ActionType.CODE_READ, } ) diff --git a/src/synthorg/security/action_types.py b/src/synthorg/security/action_types.py index 32364f0f84..1225f32eaa 100644 --- a/src/synthorg/security/action_types.py +++ b/src/synthorg/security/action_types.py @@ -34,6 +34,7 @@ class ActionTypeCategory(StrEnum): ARCH = "arch" MEMORY = "memory" BROWSER = "browser" + EXTERNAL_DATA = "external_data" def _build_category_map() -> dict[str, frozenset[str]]: diff --git a/src/synthorg/security/risk_scorer.py b/src/synthorg/security/risk_scorer.py index a3cc4d9aeb..be851d6c41 100644 --- a/src/synthorg/security/risk_scorer.py +++ b/src/synthorg/security/risk_scorer.py @@ -199,6 +199,9 @@ def score(self, action_type: str) -> RiskScore: ActionType.CODE_DELETE: _HIGH_SCORE, ActionType.VCS_PUSH: _HIGH_SCORE, ActionType.COMMS_EXTERNAL: _HIGH_SCORE, + # External API/data access reaches a third party with brokered + # credentials; same tier as outbound external comms. + ActionType.EXTERNAL_DATA_REQUEST: _HIGH_SCORE, ActionType.BUDGET_EXCEED: _HIGH_SCORE, # MEDIUM ActionType.CODE_CREATE: _MEDIUM_SCORE, diff --git a/src/synthorg/security/rules/risk_classifier.py b/src/synthorg/security/rules/risk_classifier.py index d014e24f54..b38070c56c 100644 --- a/src/synthorg/security/rules/risk_classifier.py +++ b/src/synthorg/security/rules/risk_classifier.py @@ -21,6 +21,7 @@ ActionType.CODE_DELETE: ApprovalRiskLevel.HIGH, ActionType.VCS_PUSH: ApprovalRiskLevel.HIGH, ActionType.COMMS_EXTERNAL: ApprovalRiskLevel.HIGH, + ActionType.EXTERNAL_DATA_REQUEST: ApprovalRiskLevel.HIGH, ActionType.BUDGET_EXCEED: ApprovalRiskLevel.HIGH, # MEDIUM ActionType.CODE_CREATE: ApprovalRiskLevel.MEDIUM, diff --git a/src/synthorg/security/timeout/risk_tier_classifier.py b/src/synthorg/security/timeout/risk_tier_classifier.py index 69b00e48d7..8d4bdb58be 100644 --- a/src/synthorg/security/timeout/risk_tier_classifier.py +++ b/src/synthorg/security/timeout/risk_tier_classifier.py @@ -42,6 +42,7 @@ def elevate_one_tier(level: ApprovalRiskLevel) -> ApprovalRiskLevel: ActionType.CODE_DELETE: ApprovalRiskLevel.HIGH, ActionType.VCS_PUSH: ApprovalRiskLevel.HIGH, ActionType.COMMS_EXTERNAL: ApprovalRiskLevel.HIGH, + ActionType.EXTERNAL_DATA_REQUEST: ApprovalRiskLevel.HIGH, ActionType.BUDGET_EXCEED: ApprovalRiskLevel.HIGH, # MEDIUM ActionType.CODE_CREATE: ApprovalRiskLevel.MEDIUM, diff --git a/src/synthorg/settings/definitions/__init__.py b/src/synthorg/settings/definitions/__init__.py index 7d586e3f7c..6e97f63451 100644 --- a/src/synthorg/settings/definitions/__init__.py +++ b/src/synthorg/settings/definitions/__init__.py @@ -14,6 +14,7 @@ company, coordination, engine, + external_api, hr, integrations, memory, @@ -40,6 +41,7 @@ "company", "coordination", "engine", + "external_api", "hr", "integrations", "memory", diff --git a/src/synthorg/settings/definitions/external_api.py b/src/synthorg/settings/definitions/external_api.py new file mode 100644 index 0000000000..9fcfc63f45 --- /dev/null +++ b/src/synthorg/settings/definitions/external_api.py @@ -0,0 +1,97 @@ +"""External-API namespace setting definitions. + +Governs the first-class external-access tool: its master feature flag / +provider discriminator and the default per-call limits applied when a +connection does not carry its own override. +""" + +from synthorg.settings.enums import SettingLevel, SettingNamespace, SettingType +from synthorg.settings.models import SettingDefinition +from synthorg.settings.registry import get_registry + +_r = get_registry() + +_r.register( + SettingDefinition( + namespace=SettingNamespace.EXTERNAL_API, + key="enabled", + type=SettingType.BOOLEAN, + default="true", + description=( + "Master switch for the governed external-access tool. When" + " false the tool is not registered, so agents cannot make" + " external API calls." + ), + group="General", + ) +) + +_r.register( + SettingDefinition( + namespace=SettingNamespace.EXTERNAL_API, + key="provider_type", + type=SettingType.STRING, + default="httpx", + description=( + "Discriminator selecting the ExternalAccessProvider strategy" + " used for egress. 'httpx' (default) makes DNS-pinned" + " requests directly; future strategies (e.g. a sidecar proxy)" + " register under their own key." + ), + group="General", + level=SettingLevel.ADVANCED, + restart_required=True, + validator_pattern=r"^[a-z][a-z0-9_]*$", + ) +) + +_r.register( + SettingDefinition( + namespace=SettingNamespace.EXTERNAL_API, + key="default_max_response_bytes", + type=SettingType.INTEGER, + default="1048576", + description=( + "Hard cap on the response body size (bytes) read from an" + " external API before truncation, bounding agent memory." + ), + group="Limits", + level=SettingLevel.ADVANCED, + min_value=1024, + max_value=10485760, + ) +) + +_r.register( + SettingDefinition( + namespace=SettingNamespace.EXTERNAL_API, + key="default_timeout_seconds", + type=SettingType.FLOAT, + default="30.0", + description=( + "Maximum wall-clock time an external API request may run" + " before it is cancelled." + ), + group="Limits", + level=SettingLevel.ADVANCED, + min_value=1.0, + max_value=300.0, + ) +) + +_r.register( + SettingDefinition( + namespace=SettingNamespace.EXTERNAL_API, + key="default_max_rpm", + type=SettingType.INTEGER, + default="60", + description=( + "Default per-connection rate limit (requests per minute)" + " applied when a connection carries no rate_limiter override." + ), + group="Limits", + level=SettingLevel.ADVANCED, + min_value=1, + max_value=10000, + ) +) diff --git a/src/synthorg/settings/enums.py b/src/synthorg/settings/enums.py index 8cf7f3dc12..0857708244 100644 --- a/src/synthorg/settings/enums.py +++ b/src/synthorg/settings/enums.py @@ -33,6 +33,7 @@ class SettingNamespace(StrEnum): HR = "hr" WORKERS = "workers" TELEMETRY = "telemetry" + EXTERNAL_API = "external_api" class SettingType(StrEnum): diff --git a/src/synthorg/tools/_dns_pinning.py b/src/synthorg/tools/_dns_pinning.py new file mode 100644 index 0000000000..727b98fdc6 --- /dev/null +++ b/src/synthorg/tools/_dns_pinning.py @@ -0,0 +1,144 @@ +"""DNS-rebinding-safe httpx transport. + +Closes the DNS-rebinding TOCTOU window between +:func:`synthorg.tools.network_validator.validate_url_host` (which resolves and +validates a hostname's IPs) and the actual TCP connect: the transport pins the +validated IP at ``connect_tcp`` time while letting httpcore pass the original +hostname to ``start_tls`` separately, so TLS SNI and certificate verification +still use the hostname. No custom SSL context required, and HTTPS is handled +correctly. + +Shared by the connection health check +(:mod:`synthorg.integrations.health.checks.generic_http`) and the governed +external-access tool's httpx provider so the proven pinning path is not +duplicated. +""" + +import ssl +from typing import TYPE_CHECKING, cast + +import httpcore +import httpx + +if TYPE_CHECKING: + from collections.abc import AsyncIterable, AsyncIterator, Iterable + + from httpcore._backends.base import SOCKET_OPTION + + +class PinnedDnsBackend(httpcore.AsyncNetworkBackend): + """httpcore network backend that pins a hostname to a validated IP. + + Intercepts ``connect_tcp`` and substitutes the validated IP for the + request's hostname before delegating to the inner backend. Because + httpcore passes ``server_hostname`` to ``start_tls`` separately from the + ``host`` arg of ``connect_tcp``, TLS SNI and certificate verification still + use the original hostname. + """ + + def __init__( + self, + inner: httpcore.AsyncNetworkBackend, + *, + hostname: str, + ip: str, + ) -> None: + self._inner = inner + self._hostname = hostname.lower() + self._ip = ip + + async def connect_tcp( + self, + host: str, + port: int, + timeout: float | None = None, # noqa: ASYNC109 -- AsyncNetworkBackend interface + local_address: str | None = None, + socket_options: Iterable[SOCKET_OPTION] | None = None, + ) -> httpcore.AsyncNetworkStream: + target = self._ip if host.lower() == self._hostname else host + return await self._inner.connect_tcp( + target, + port, + timeout=timeout, + local_address=local_address, + socket_options=socket_options, + ) + + async def connect_unix_socket( + self, + path: str, + timeout: float | None = None, # noqa: ASYNC109 -- AsyncNetworkBackend interface + socket_options: Iterable[SOCKET_OPTION] | None = None, + ) -> httpcore.AsyncNetworkStream: + return await self._inner.connect_unix_socket( + path, + timeout=timeout, + socket_options=socket_options, + ) + + async def sleep(self, seconds: float) -> None: + await self._inner.sleep(seconds) + + +class PinnedDnsTransport(httpx.AsyncBaseTransport): + """httpx transport whose underlying pool uses a hostname-pinned backend. + + Constructed only when there is a hostname-to-IP pinning to apply. Calls + without a matching hostname fall through to the inner backend unchanged, so + this transport is safe to use on any URL. + """ + + def __init__(self, *, hostname: str, ip: str) -> None: + self._pool = httpcore.AsyncConnectionPool( + ssl_context=ssl.create_default_context(), + network_backend=PinnedDnsBackend( + httpcore.AnyIOBackend(), + hostname=hostname, + ip=ip, + ), + ) + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + if not isinstance(request.stream, httpx.AsyncByteStream): + msg = "Pinned-DNS transport requires an async byte stream" + raise TypeError(msg) + req = httpcore.Request( + method=request.method, + url=httpcore.URL( + scheme=request.url.raw_scheme, + host=request.url.raw_host, + port=request.url.port, + target=request.url.raw_path, + ), + headers=request.headers.raw, + content=request.stream, + extensions=request.extensions, + ) + resp = await self._pool.handle_async_request(req) + return httpx.Response( + status_code=resp.status, + headers=resp.headers, + stream=PinnedDnsResponseStream( + cast("AsyncIterable[bytes]", resp.stream), + ), + extensions=resp.extensions, + ) + + async def aclose(self) -> None: + await self._pool.aclose() + + +class PinnedDnsResponseStream(httpx.AsyncByteStream): + """Forwarding wrapper that adapts an httpcore async stream to httpx.""" + + def __init__(self, inner: AsyncIterable[bytes]) -> None: + self._inner = inner + + async def __aiter__(self) -> AsyncIterator[bytes]: + async for part in self._inner: + yield part + + async def aclose(self) -> None: + aclose = getattr(self._inner, "aclose", None) + if aclose is not None: + await aclose() diff --git a/src/synthorg/tools/external_api/__init__.py b/src/synthorg/tools/external_api/__init__.py new file mode 100644 index 0000000000..fc47880772 --- /dev/null +++ b/src/synthorg/tools/external_api/__init__.py @@ -0,0 +1,9 @@ +"""Governed external API/data access tool. + +A first-class, governed wrapper over existing infrastructure (connection +catalog + secret backends for credential brokering, the bus-coordinated +sliding-window rate limiter, the SSRF ``NetworkPolicy`` + DNS-pinning egress +guard, and the approval gate). It replaces ad hoc curl-in-sandbox with a +single tool that brokers credentials, enforces rate limits, constrains egress, +and routes sensitive calls to human approval. +""" diff --git a/src/synthorg/tools/external_api/_args.py b/src/synthorg/tools/external_api/_args.py new file mode 100644 index 0000000000..bab2fa82ca --- /dev/null +++ b/src/synthorg/tools/external_api/_args.py @@ -0,0 +1,97 @@ +"""Typed arguments for the governed external-access tool.""" + +from typing import Self + +from pydantic import BaseModel, ConfigDict, Field, model_validator + +from synthorg.core.types import NotBlankStr # noqa: TC001 -- Pydantic field type + +# HTTP methods the tool accepts. Writes route to approval (per the +# sensitive-or-write gating rule); reads do not. +_READ_METHODS: frozenset[str] = frozenset({"GET", "HEAD"}) +_WRITE_METHODS: frozenset[str] = frozenset({"POST", "PUT", "PATCH", "DELETE"}) +_ALLOWED_METHODS: frozenset[str] = _READ_METHODS | _WRITE_METHODS +# Methods that may carry a request body. +_BODY_METHODS: frozenset[str] = frozenset({"POST", "PUT", "PATCH"}) + + +class ExternalApiArgs(BaseModel): + """Arguments for the ``external_api`` tool. + + The agent always names a catalog ``connection`` (which supplies the + base URL, credentials, egress hosts, and the sensitivity flag) and + targets a resource either by a relative ``path`` (joined to the + connection's base URL) or an absolute ``url`` (validated to resolve + within the connection's allowed hosts). + """ + + model_config = ConfigDict(frozen=True, allow_inf_nan=False, extra="forbid") + + connection: NotBlankStr = Field( + description="Name of the pre-configured connection from the catalog.", + ) + method: str = Field( + default="GET", + description="HTTP method: GET, HEAD, POST, PUT, PATCH, or DELETE.", + ) + path: str = Field( + default="", + description=( + "Relative path joined to the connection's base URL. Provide" + " exactly one of path or url." + ), + ) + url: str = Field( + default="", + description=( + "Absolute URL. Must resolve to a host within the connection's" + " allowed hosts. Provide exactly one of path or url." + ), + ) + headers: dict[str, str] = Field( + default_factory=dict, + description=( + "Additional request headers. The connection's credentials are" + " injected automatically and must not be supplied here." + ), + ) + body: str | None = Field( + default=None, + description="Request body. Permitted only for POST, PUT, and PATCH.", + ) + approval_id: NotBlankStr | None = Field( + default=None, + description=( + "Identifier of a granted approval, supplied when re-issuing a" + " sensitive call after human approval. Optional: the call is" + " matched to its approval by content signature regardless." + ), + ) + + @model_validator(mode="after") + def _normalise_and_validate(self) -> Self: + """Uppercase the method and enforce cross-field invariants.""" + normalised = self.method.strip().upper() + if normalised not in _ALLOWED_METHODS: + allowed = ", ".join(sorted(_ALLOWED_METHODS)) + msg = f"method must be one of {allowed}; got {self.method!r}" + raise ValueError(msg) + object.__setattr__(self, "method", normalised) + + has_path = bool(self.path.strip()) + has_url = bool(self.url.strip()) + if has_path == has_url: + msg = "provide exactly one of path or url" + raise ValueError(msg) + + if self.body is not None and normalised not in _BODY_METHODS: + allowed_body = ", ".join(sorted(_BODY_METHODS)) + msg = f"body is only permitted for {allowed_body}; got {normalised}" + raise ValueError(msg) + + return self + + @property + def is_write(self) -> bool: + """Whether the method mutates remote state (routes to approval).""" + return self.method in _WRITE_METHODS diff --git a/src/synthorg/tools/external_api/_credentials.py b/src/synthorg/tools/external_api/_credentials.py new file mode 100644 index 0000000000..e78a00c411 --- /dev/null +++ b/src/synthorg/tools/external_api/_credentials.py @@ -0,0 +1,62 @@ +"""Map brokered connection credentials to request auth headers. + +Runs in-process inside the tool; the returned headers carry secrets and MUST +NOT be logged (SEC-1). Credential field names follow the generic-HTTP +connection convention (``token``, ``api_key``, ``username``, ``password``, +``header_name``, ``header_value``). +""" + +import base64 + +from synthorg.integrations.connections.models import AuthMethod +from synthorg.tools.external_api.errors import ExternalApiCredentialError + + +def build_auth_headers( + auth_method: AuthMethod, + credentials: dict[str, str], +) -> dict[str, str]: + """Return the auth headers for *auth_method* from *credentials*. + + Args: + auth_method: The connection's configured authentication method. + credentials: Decrypted credential fields from the secret backend. + + Returns: + Header name/value pairs to merge into the request. + + Raises: + ExternalApiCredentialError: If the method's required credential + field is absent (a misconfigured connection). + """ + match auth_method: + case AuthMethod.BEARER_TOKEN | AuthMethod.OAUTH2: + token = credentials.get("token") or credentials.get("access_token") + if not token: + msg = f"{auth_method.value} connection has no 'token'" + raise ExternalApiCredentialError(msg) + return {"Authorization": f"Bearer {token}"} + case AuthMethod.API_KEY: + header_name = credentials.get("header_name") + header_value = credentials.get("header_value") + if header_name and header_value: + return {header_name: header_value} + api_key = credentials.get("api_key") + if not api_key: + msg = "api_key connection has no 'api_key' or header pair" + raise ExternalApiCredentialError(msg) + return {"X-API-Key": api_key} + case AuthMethod.BASIC_AUTH: + username = credentials.get("username") + password = credentials.get("password") + if not username or not password: + msg = "basic_auth connection requires 'username' and 'password'" + raise ExternalApiCredentialError(msg) + token = base64.b64encode(f"{username}:{password}".encode()).decode("ascii") + return {"Authorization": f"Basic {token}"} + case AuthMethod.CUSTOM: + header_name = credentials.get("header_name") + header_value = credentials.get("header_value") + if header_name and header_value: + return {header_name: header_value} + return {} diff --git a/src/synthorg/tools/external_api/_runtime.py b/src/synthorg/tools/external_api/_runtime.py new file mode 100644 index 0000000000..8801bfe753 --- /dev/null +++ b/src/synthorg/tools/external_api/_runtime.py @@ -0,0 +1,28 @@ +"""Boot-scoped collaborators for the governed external-access tool. + +Built once at runtime-service construction and carried on the ``AgentEngine``. +The per-run registry augmentation pairs this bundle with the run's identity, +task, and effective autonomy to construct the tool. ``None`` when the feature +is disabled or no connection catalog is wired, in which case the tool is not +registered. +""" + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from synthorg.integrations.connections.catalog import ConnectionCatalog + from synthorg.tools.external_api.provider import ExternalAccessProvider + from synthorg.tools.network_validator import NetworkPolicy + + +@dataclass(frozen=True) +class ExternalApiRuntime: + """Boot-scoped dependencies the external-access tool needs per run.""" + + connection_catalog: ConnectionCatalog + provider: ExternalAccessProvider + network_policy: NetworkPolicy + max_response_bytes: int + timeout_seconds: float + default_max_rpm: int diff --git a/src/synthorg/tools/external_api/_signature.py b/src/synthorg/tools/external_api/_signature.py new file mode 100644 index 0000000000..22313878ae --- /dev/null +++ b/src/synthorg/tools/external_api/_signature.py @@ -0,0 +1,80 @@ +"""Content-addressed signature binding an approval to a specific call. + +A sensitive external call stores its signature in the approval's metadata at +park time. On resume the tool recomputes the signature for the re-issued call +and matches it against APPROVED, unconsumed approvals, so a grant authorises +exactly the call it was approved for (not a replay or a different call). +""" + +import hashlib +import json + +from pydantic import BaseModel, ConfigDict + +_SIGNATURE_METADATA_KEY = "external_api_signature" + + +def _hash(value: str | None) -> str: + """Stable SHA-256 hex digest of *value* (``""`` sentinel for None).""" + payload = "\x00NONE\x00" if value is None else value + return hashlib.sha256(payload.encode("utf-8")).hexdigest() + + +class ApprovalSignature(BaseModel): + """Immutable fingerprint of a governed external call. + + Equality over the resolved request shape: connection, method, the + resolved URL, a hash of the body, and a hash of the agent-supplied + request headers (credentials are injected later and excluded, so an + approval cannot be invalidated by a credential rotation). + """ + + model_config = ConfigDict(frozen=True, allow_inf_nan=False, extra="forbid") + + connection: str + method: str + resolved_url: str + body_hash: str + headers_hash: str + + @classmethod + def build( + cls, + *, + connection: str, + method: str, + resolved_url: str, + body: str | None, + headers: dict[str, str], + ) -> ApprovalSignature: + """Construct a signature from the resolved call components.""" + canonical_headers = json.dumps( + sorted((k.lower(), v) for k, v in headers.items()), + separators=(",", ":"), + ) + return cls( + connection=connection, + method=method, + resolved_url=resolved_url, + body_hash=_hash(body), + headers_hash=_hash(canonical_headers), + ) + + def to_metadata(self) -> dict[str, str]: + """Serialise to an approval-metadata fragment.""" + return {_SIGNATURE_METADATA_KEY: self.model_dump_json()} + + @classmethod + def from_metadata(cls, metadata: dict[str, str]) -> ApprovalSignature | None: + """Parse a signature from approval metadata, or ``None`` if absent/invalid.""" + raw = metadata.get(_SIGNATURE_METADATA_KEY) + if raw is None: + return None + try: + return cls.model_validate_json(raw) + except ValueError: + return None + + def matches(self, other: ApprovalSignature | None) -> bool: + """Whether *other* is an identical call signature.""" + return other is not None and self == other diff --git a/src/synthorg/tools/external_api/errors.py b/src/synthorg/tools/external_api/errors.py new file mode 100644 index 0000000000..c11d4120c1 --- /dev/null +++ b/src/synthorg/tools/external_api/errors.py @@ -0,0 +1,86 @@ +"""Domain error hierarchy for the governed external-access tool. + +Every failure path raises an ``ExternalApiError`` subclass of +:class:`synthorg.tools.errors.ToolError` so the +``check_domain_error_hierarchy.py`` gate stays clean and callers can +discriminate failures by ``error_code`` / class. +""" + +from typing import ClassVar + +from synthorg.core.error_taxonomy import ErrorCategory, ErrorCode +from synthorg.tools.errors import ToolError + + +class ExternalApiError(ToolError): + """Base for all governed external-access tool domain errors.""" + + status_code: ClassVar[int] = 500 + error_code: ClassVar[ErrorCode] = ErrorCode.TOOL_EXECUTION_ERROR + error_category: ClassVar[ErrorCategory] = ErrorCategory.INTERNAL + default_message: ClassVar[str] = "External API tool failure" + + +class ExternalApiArgumentError(ExternalApiError): + """Arguments violated a validation invariant the args model alone cannot express.""" + + status_code: ClassVar[int] = 422 + error_code: ClassVar[ErrorCode] = ErrorCode.TOOL_PARAMETER_ERROR + error_category: ClassVar[ErrorCategory] = ErrorCategory.VALIDATION + default_message: ClassVar[str] = "External API tool arguments invalid" + + +class ExternalApiConnectionNotFoundError(ExternalApiError): + """Named connection is absent from the connection catalog.""" + + status_code: ClassVar[int] = 404 + error_code: ClassVar[ErrorCode] = ErrorCode.CONNECTION_NOT_FOUND + error_category: ClassVar[ErrorCategory] = ErrorCategory.NOT_FOUND + default_message: ClassVar[str] = "External API connection not found" + + +class ExternalApiEgressBlockedError(ExternalApiError): + """Target host failed the SSRF / connection-host allowlist check.""" + + status_code: ClassVar[int] = 403 + error_code: ClassVar[ErrorCode] = ErrorCode.FORBIDDEN + error_category: ClassVar[ErrorCategory] = ErrorCategory.AUTH + default_message: ClassVar[str] = "External API egress blocked" + + +class ExternalApiCredentialError(ExternalApiError): + """Credentials could not be brokered or applied for the connection.""" + + status_code: ClassVar[int] = 500 + error_code: ClassVar[ErrorCode] = ErrorCode.TOOL_EXECUTION_ERROR + error_category: ClassVar[ErrorCategory] = ErrorCategory.INTERNAL + default_message: ClassVar[str] = "External API credential brokering failed" + + +class ExternalApiApprovalMismatchError(ExternalApiError): + """A supplied approval did not match the call, or was already consumed.""" + + status_code: ClassVar[int] = 409 + error_code: ClassVar[ErrorCode] = ErrorCode.RESOURCE_CONFLICT + error_category: ClassVar[ErrorCategory] = ErrorCategory.CONFLICT + default_message: ClassVar[str] = ( + "External API approval mismatch or already consumed" + ) + + +class ExternalApiRateLimitedError(ExternalApiError): + """The connection's rate-limit window is exhausted.""" + + status_code: ClassVar[int] = 429 + error_code: ClassVar[ErrorCode] = ErrorCode.RATE_LIMITED + error_category: ClassVar[ErrorCategory] = ErrorCategory.RATE_LIMIT + default_message: ClassVar[str] = "External API rate limit exceeded" + + +class ExternalApiResponseError(ExternalApiError): + """The upstream request failed (timeout, connection error, or 5xx).""" + + status_code: ClassVar[int] = 502 + error_code: ClassVar[ErrorCode] = ErrorCode.TOOL_EXECUTION_ERROR + error_category: ClassVar[ErrorCategory] = ErrorCategory.INTERNAL + default_message: ClassVar[str] = "External API upstream request failed" diff --git a/src/synthorg/tools/external_api/external_api_tool.py b/src/synthorg/tools/external_api/external_api_tool.py new file mode 100644 index 0000000000..2fac5417b7 --- /dev/null +++ b/src/synthorg/tools/external_api/external_api_tool.py @@ -0,0 +1,427 @@ +"""Governed external API/data access tool. + +Brokers credentials from the connection catalog, constrains egress to the +connection's host via the SSRF ``NetworkPolicy`` + DNS pinning, enforces the +connection's bus-coordinated rate limit, and routes sensitive calls (a +connection flagged ``sensitive`` or any write method) to human approval with a +content-addressed, one-shot consumption guard. Delegates the actual egress to a +pluggable :class:`ExternalAccessProvider`. +""" + +from datetime import UTC +from typing import TYPE_CHECKING, Any, ClassVar +from uuid import uuid4 + +from pydantic import BaseModel +from pydantic import ValidationError as PydanticValidationError + +from synthorg.api.boundary import parse_typed +from synthorg.core.approval import ApprovalItem +from synthorg.core.clock import Clock, SystemClock +from synthorg.core.enums import ( + ActionType, + ApprovalRiskLevel, + ApprovalSource, + ApprovalStatus, + ToolCategory, +) +from synthorg.core.resilience_config import RateLimiterConfig +from synthorg.integrations.errors import ( + ConnectionRateLimitError, + SecretRetrievalError, +) +from synthorg.integrations.rate_limiting.decorator import with_connection_rate_limit +from synthorg.observability import get_logger, safe_error_description +from synthorg.observability.events.external_api import ( + EXTERNAL_API_APPROVAL_CONSUMED, + EXTERNAL_API_APPROVAL_REQUIRED, + EXTERNAL_API_CALL_STARTED, + EXTERNAL_API_CALL_SUCCEEDED, + EXTERNAL_API_EGRESS_BLOCKED, + EXTERNAL_API_RATE_LIMITED, + EXTERNAL_API_SIGNATURE_MISMATCH, +) +from synthorg.providers.url_utils import redact_url +from synthorg.tools.base import BaseTool, ToolExecutionResult +from synthorg.tools.external_api._args import ExternalApiArgs +from synthorg.tools.external_api._credentials import build_auth_headers +from synthorg.tools.external_api._signature import ApprovalSignature +from synthorg.tools.external_api.errors import ( + ExternalApiApprovalMismatchError, + ExternalApiConnectionNotFoundError, + ExternalApiCredentialError, + ExternalApiEgressBlockedError, + ExternalApiError, + ExternalApiResponseError, +) +from synthorg.tools.external_api.provider import ExternalAccessRequest +from synthorg.tools.network_validator import ( + NetworkPolicy, + extract_hostname, + validate_url_host, +) + +if TYPE_CHECKING: + from synthorg.approval.protocol import ApprovalStoreProtocol + from synthorg.integrations.connections.catalog import ConnectionCatalog + from synthorg.integrations.connections.models import Connection + from synthorg.security.autonomy.models import EffectiveAutonomy + from synthorg.security.timeout.risk_tier_classifier import ( + DefaultRiskTierClassifier, + ) + from synthorg.tools.external_api.provider import ExternalAccessProvider + +logger = get_logger(__name__) + +_ACTION_TYPE = ActionType.EXTERNAL_DATA_REQUEST.value + + +class ExternalApiTool(BaseTool): + """Agent-callable tool for governed external API/data access.""" + + args_model: ClassVar[type[BaseModel] | None] = ExternalApiArgs + + def __init__( # noqa: PLR0913 -- governance collaborators are all required + self, + *, + connection_catalog: ConnectionCatalog, + approval_store: ApprovalStoreProtocol, + provider: ExternalAccessProvider, + agent_id: str, + task_id: str | None = None, + network_policy: NetworkPolicy | None = None, + effective_autonomy: EffectiveAutonomy | None = None, + risk_classifier: DefaultRiskTierClassifier | None = None, + max_response_bytes: int, + timeout_seconds: float, + default_max_rpm: int, + clock: Clock | None = None, + ) -> None: + super().__init__( + name="external_api", + description=( + "Access an external API or data source through a configured" + " connection. Provide the connection name plus a relative path" + " (or an absolute url within the connection's hosts). Credentials" + " are brokered automatically; rate limits and egress are enforced;" + " sensitive calls require human approval. On approval, re-issue" + " the same call to proceed." + ), + category=ToolCategory.EXTERNAL_DATA, + parameters_schema=ExternalApiArgs.model_json_schema(), + ) + self._catalog = connection_catalog + self._approval_store = approval_store + self._provider = provider + self._agent_id = agent_id + self._task_id = task_id + self._network_policy = network_policy or NetworkPolicy() + self._risk_classifier = risk_classifier + self._max_response_bytes = max_response_bytes + self._timeout_seconds = timeout_seconds + self._default_max_rpm = default_max_rpm + self._clock: Clock = clock if clock is not None else SystemClock() + self._auto_approved = ( + effective_autonomy is not None + and _ACTION_TYPE in effective_autonomy.auto_approve_actions + ) + + async def execute(self, *, arguments: dict[str, Any]) -> ToolExecutionResult: + """Run a governed external API call.""" + try: + args = parse_typed("tool.external_api", arguments, ExternalApiArgs) + except PydanticValidationError as exc: + return ToolExecutionResult( + content=f"Invalid arguments: {safe_error_description(exc)}", + is_error=True, + ) + + try: + return await self._run(args) + except ExternalApiError as exc: + return ToolExecutionResult(content=str(exc), is_error=True) + + async def _run(self, args: ExternalApiArgs) -> ToolExecutionResult: + """Execute the governed flow; raises ``ExternalApiError`` on failure.""" + conn = await self._catalog.get(args.connection) + if conn is None: + msg = f"Connection {args.connection!r} not found" + raise ExternalApiConnectionNotFoundError(msg) + + resolved_url = self._resolve_url(conn, args) + validation = await validate_url_host(resolved_url, self._network_policy) + if isinstance(validation, str): + logger.warning( + EXTERNAL_API_EGRESS_BLOCKED, + connection=args.connection, + url=redact_url(resolved_url), + reason=validation, + ) + msg = f"Egress blocked: {validation}" + raise ExternalApiEgressBlockedError(msg) + + signature = ApprovalSignature.build( + connection=args.connection, + method=args.method, + resolved_url=resolved_url, + body=args.body, + headers=args.headers, + ) + if (conn.sensitive or args.is_write) and not self._auto_approved: + gate = await self._gate_approval(args, signature) + if gate is not None: + return gate + + merged_headers = self._broker_headers(conn, await self._credentials(conn)) + merged_headers = {**args.headers, **merged_headers} + + pinned_ip = validation.resolved_ips[0] if validation.resolved_ips else None + pinned_hostname = validation.hostname if validation.resolved_ips else None + request = ExternalAccessRequest( + method=args.method, + url=resolved_url, + headers=merged_headers, + body=args.body, + timeout_seconds=self._timeout_seconds, + max_response_bytes=self._max_response_bytes, + pinned_ip=pinned_ip, + pinned_hostname=pinned_hostname, + ) + logger.info( + EXTERNAL_API_CALL_STARTED, + connection=args.connection, + method=args.method, + url=redact_url(resolved_url), + ) + return await self._egress(conn, request) + + def _resolve_url(self, conn: Connection, args: ExternalApiArgs) -> str: + """Build the target URL and confirm its host matches the connection. + + The agent may narrow within the connection (relative path, or an + absolute url on the same host) but never widen to another host. + """ + if not conn.base_url: + msg = f"Connection {args.connection!r} has no base_url" + raise ExternalApiEgressBlockedError(msg) + base_host = extract_hostname(conn.base_url) + if args.path: + resolved = conn.base_url.rstrip("/") + "/" + args.path.lstrip("/") + else: + resolved = args.url + resolved_host = extract_hostname(resolved) + if ( + base_host is None + or resolved_host is None + or resolved_host.lower() != base_host.lower() + ): + logger.warning( + EXTERNAL_API_EGRESS_BLOCKED, + connection=args.connection, + url=redact_url(resolved), + reason="host_outside_connection", + ) + msg = ( + f"URL host is not within connection {args.connection!r}" + f" (allowed host: {base_host!r})" + ) + raise ExternalApiEgressBlockedError(msg) + return resolved + + async def _credentials(self, conn: Connection) -> dict[str, str]: + """Fetch decrypted credentials, mapping retrieval failure to a domain error.""" + try: + return await self._catalog.get_credentials(conn.name) + except SecretRetrievalError as exc: + logger.warning( + EXTERNAL_API_EGRESS_BLOCKED, + connection=conn.name, + error_type=type(exc).__name__, + error=safe_error_description(exc), + reason="credential_retrieval_failed", + ) + msg = "Failed to broker credentials for connection" + raise ExternalApiCredentialError(msg) from exc + + @staticmethod + def _broker_headers( + conn: Connection, + credentials: dict[str, str], + ) -> dict[str, str]: + """Map credentials to auth headers (never logged).""" + return build_auth_headers(conn.auth_method, credentials) + + async def _egress( + self, + conn: Connection, + request: ExternalAccessRequest, + ) -> ToolExecutionResult: + """Rate-limited egress with graceful rate-limit + transport-error handling.""" + config = conn.rate_limiter or RateLimiterConfig( + max_requests_per_minute=self._default_max_rpm, + ) + rate_limited = with_connection_rate_limit(conn.name, config=config)( + self._provider.request, + ) + try: + response = await rate_limited(request) + except ConnectionRateLimitError as exc: + logger.warning( + EXTERNAL_API_RATE_LIMITED, + connection=conn.name, + error=safe_error_description(exc), + ) + return ToolExecutionResult( + content=( + f"Rate limit exceeded for connection {conn.name!r}; retry later." + ), + is_error=True, + metadata={"rate_limited": True, "connection": conn.name}, + ) + except ExternalApiResponseError as exc: + return ToolExecutionResult(content=str(exc), is_error=True) + + logger.info( + EXTERNAL_API_CALL_SUCCEEDED, + connection=conn.name, + status_code=response.status_code, + truncated=response.truncated, + ) + return ToolExecutionResult( + content=response.body, + metadata={ + "status_code": response.status_code, + "truncated": response.truncated, + "connection": conn.name, + }, + ) + + async def _gate_approval( + self, + args: ExternalApiArgs, + signature: ApprovalSignature, + ) -> ToolExecutionResult | None: + """Consume a matching approval, or park for one. + + Returns ``None`` to proceed (a matching grant was consumed), or a + parking result when no approval exists yet. Raises + ``ExternalApiApprovalMismatchError`` when an explicitly-referenced + approval does not match this call, or the consume CAS loses a race. + """ + match = await self._find_matching_approval(args, signature) + if match is None: + return await self._park_for_approval(args, signature) + consumed = await self._approval_store.consume_if_approved(match) + if consumed is None: + logger.warning( + EXTERNAL_API_SIGNATURE_MISMATCH, + connection=args.connection, + approval_id=match, + reason="already_consumed_or_race", + ) + msg = "Approval was already used or is no longer valid" + raise ExternalApiApprovalMismatchError(msg) + logger.info( + EXTERNAL_API_APPROVAL_CONSUMED, + connection=args.connection, + approval_id=match, + ) + return None + + async def _find_matching_approval( + self, + args: ExternalApiArgs, + signature: ApprovalSignature, + ) -> str | None: + """Find an APPROVED, unconsumed approval matching this call. + + With an explicit ``approval_id`` the match is strict: a missing, + un-approved, consumed, or signature-mismatched item raises + ``ExternalApiApprovalMismatchError`` (a deliberate replay/confusion + signal) rather than silently minting another approval. Without one, + a content-signature scan returns the id or ``None`` (park). + """ + if args.approval_id is not None: + item = await self._approval_store.get(args.approval_id) + if ( + item is None + or item.status is not ApprovalStatus.APPROVED + or item.consumed_at is not None + or not signature.matches( + ApprovalSignature.from_metadata(item.metadata), + ) + ): + logger.warning( + EXTERNAL_API_SIGNATURE_MISMATCH, + connection=args.connection, + approval_id=args.approval_id, + reason="explicit_approval_no_match", + ) + msg = "Supplied approval does not match this call or was already used" + raise ExternalApiApprovalMismatchError(msg) + return str(item.id) + candidates = await self._approval_store.list_items( + status=ApprovalStatus.APPROVED, + action_type=_ACTION_TYPE, + ) + for item in candidates: + if item.consumed_at is None and signature.matches( + ApprovalSignature.from_metadata(item.metadata), + ): + return str(item.id) + return None + + async def _park_for_approval( + self, + args: ExternalApiArgs, + signature: ApprovalSignature, + ) -> ToolExecutionResult: + """Create a PENDING approval bound to this call and signal parking.""" + approval_id = f"approval-{uuid4().hex}" + risk_level = self._classify_risk() + item = ApprovalItem( + id=approval_id, + action_type=_ACTION_TYPE, + title=f"External API call to {args.connection!r}", + description=( + f"Agent requests a {args.method} call against connection" + f" {args.connection!r}." + ), + requested_by=self._agent_id, + risk_level=risk_level, + source=ApprovalSource.PARKED_CONTEXT, + created_at=self._clock.now().astimezone(UTC), + task_id=self._task_id, + metadata=signature.to_metadata(), + ) + await self._approval_store.add(item) + logger.info( + EXTERNAL_API_APPROVAL_REQUIRED, + connection=args.connection, + approval_id=approval_id, + risk_level=risk_level.value, + ) + return ToolExecutionResult( + content=( + f"Approval required (id={approval_id}) for this external call." + " Execution is paused until a human approves; on approval," + " re-issue the same call to proceed." + ), + metadata={ + "requires_parking": True, + "approval_id": approval_id, + "action_type": _ACTION_TYPE, + "risk_level": risk_level.value, + }, + ) + + def _classify_risk(self) -> ApprovalRiskLevel: + """Classify the call's risk, defaulting to HIGH when unavailable.""" + if self._risk_classifier is None: + return ApprovalRiskLevel.HIGH + try: + return self._risk_classifier.classify(_ACTION_TYPE) + except MemoryError, RecursionError: + raise + except Exception: + return ApprovalRiskLevel.HIGH diff --git a/src/synthorg/tools/external_api/httpx_provider.py b/src/synthorg/tools/external_api/httpx_provider.py new file mode 100644 index 0000000000..25d05a5e48 --- /dev/null +++ b/src/synthorg/tools/external_api/httpx_provider.py @@ -0,0 +1,95 @@ +"""Default httpx-based ExternalAccessProvider. + +Makes DNS-pinned, redirect-free requests via httpx, streaming the body up to +a hard byte budget. Transport-level failures surface as +:class:`ExternalApiResponseError`; HTTP responses (any status) are returned so +the agent can react to API-level outcomes. +""" + +import httpx + +from synthorg.observability import get_logger, safe_error_description +from synthorg.observability.events.external_api import ( + EXTERNAL_API_CALL_FAILED, +) +from synthorg.providers.url_utils import redact_url +from synthorg.tools._dns_pinning import PinnedDnsTransport +from synthorg.tools.external_api.errors import ExternalApiResponseError +from synthorg.tools.external_api.provider import ( + ExternalAccessRequest, + ExternalAccessResponse, +) + +logger = get_logger(__name__) + + +class HttpxExternalAccessProvider: + """httpx implementation of :class:`ExternalAccessProvider`. + + Stateless: per-request timeout and byte budget arrive on the + :class:`ExternalAccessRequest`, so a single instance is safe to share. + """ + + async def request( + self, + req: ExternalAccessRequest, + ) -> ExternalAccessResponse: + """Stream *req* via httpx with optional DNS pinning. + + Reads at most ``max_response_bytes + 1`` to detect truncation without + buffering an unbounded body. Never logs headers or body (SEC-1). + """ + transport: httpx.AsyncBaseTransport | None = None + if req.pinned_ip is not None and req.pinned_hostname is not None: + transport = PinnedDnsTransport( + hostname=req.pinned_hostname, + ip=req.pinned_ip, + ) + budget = req.max_response_bytes + 1 + try: + async with ( + httpx.AsyncClient( + transport=transport, + follow_redirects=False, + ) as client, + client.stream( + method=req.method, + url=req.url, + headers=req.headers, + content=req.body, + timeout=req.timeout_seconds, + ) as response, + ): + chunks: list[bytes] = [] + total = 0 + async for chunk in response.aiter_bytes(): + chunks.append(chunk) + total += len(chunk) + if total >= budget: + break + status_code = response.status_code + resp_headers = dict(response.headers) + except httpx.HTTPError as exc: + logger.warning( + EXTERNAL_API_CALL_FAILED, + url=redact_url(req.url), + method=req.method, + error_type=type(exc).__name__, + error=safe_error_description(exc), + ) + msg = f"External API request failed: {safe_error_description(exc)}" + raise ExternalApiResponseError(msg) from exc + finally: + if transport is not None: + await transport.aclose() + + raw = b"".join(chunks) + truncated = len(raw) > req.max_response_bytes + if truncated: + raw = raw[: req.max_response_bytes] + return ExternalAccessResponse( + status_code=status_code, + headers=resp_headers, + body=raw.decode("utf-8", errors="replace"), + truncated=truncated, + ) diff --git a/src/synthorg/tools/external_api/provider.py b/src/synthorg/tools/external_api/provider.py new file mode 100644 index 0000000000..cfd687effd --- /dev/null +++ b/src/synthorg/tools/external_api/provider.py @@ -0,0 +1,71 @@ +"""Pluggable egress provider for the governed external-access tool. + +The tool owns governance (credential brokering, SSRF validation, rate +limiting, approval gating) and delegates the actual HTTP egress to an +:class:`ExternalAccessProvider`. The default :class:`httpx`-based strategy +makes DNS-pinned requests directly; a future strategy (e.g. a sidecar proxy) +can register under its own config discriminator without touching the tool. + +Credentials live in ``ExternalAccessRequest.headers`` and MUST NOT be logged +by any provider implementation (SEC-1). +""" + +from typing import Protocol, runtime_checkable + +from pydantic import BaseModel, ConfigDict, Field + +from synthorg.core.types import NotBlankStr # noqa: TC001 -- Pydantic field type + + +class ExternalAccessRequest(BaseModel): + """A fully-resolved, governance-cleared outbound request. + + Built by the tool after credential injection and SSRF validation. The + ``pinned_ip`` / ``pinned_hostname`` pair, when both set, instructs the + provider to pin DNS (close the rebinding TOCTOU window) while preserving + the hostname for TLS SNI. + """ + + model_config = ConfigDict(frozen=True, allow_inf_nan=False, extra="forbid") + + method: NotBlankStr + url: NotBlankStr + headers: dict[str, str] = Field(default_factory=dict) + body: str | None = None + timeout_seconds: float = Field(gt=0) + max_response_bytes: int = Field(gt=0) + pinned_ip: str | None = None + pinned_hostname: str | None = None + + +class ExternalAccessResponse(BaseModel): + """The upstream response, body already truncated to the byte budget.""" + + model_config = ConfigDict(frozen=True, allow_inf_nan=False, extra="forbid") + + status_code: int + headers: dict[str, str] = Field(default_factory=dict) + body: str + truncated: bool = False + + +@runtime_checkable +class ExternalAccessProvider(Protocol): + """Executes a governance-cleared external request and returns the response. + + Implementations: + * MUST NOT log request headers or body (they carry credentials). + * SHOULD return every HTTP response (including 4xx/5xx) so the agent + can react to API-level status; raise + :class:`~synthorg.tools.external_api.errors.ExternalApiResponseError` + only for transport-level failures (timeout, connection, protocol). + * MUST honour ``follow_redirects=False`` semantics so a 3xx to an + un-validated host cannot bypass the egress allowlist. + """ + + async def request( + self, + req: ExternalAccessRequest, + ) -> ExternalAccessResponse: + """Perform *req* and return the (truncated) response.""" + ... diff --git a/src/synthorg/tools/external_api/provider_factory.py b/src/synthorg/tools/external_api/provider_factory.py new file mode 100644 index 0000000000..d89e4dfe8d --- /dev/null +++ b/src/synthorg/tools/external_api/provider_factory.py @@ -0,0 +1,37 @@ +"""Factory selecting an ExternalAccessProvider by config discriminator. + +Keyed off the ``external_api.provider_type`` setting. Ships a single safe +default ('httpx'); future strategies register here. +""" + +from typing import TYPE_CHECKING + +from synthorg.tools.external_api.errors import ExternalApiError +from synthorg.tools.external_api.httpx_provider import HttpxExternalAccessProvider + +if TYPE_CHECKING: + from synthorg.tools.external_api.provider import ExternalAccessProvider + +_PROVIDER_HTTPX = "httpx" + + +def build_external_access_provider( + *, + provider_type: str = _PROVIDER_HTTPX, +) -> ExternalAccessProvider: + """Build the external-access provider for *provider_type*. + + Args: + provider_type: Discriminator from the ``external_api.provider_type`` + setting. Only ``"httpx"`` is currently registered. + + Returns: + A concrete :class:`ExternalAccessProvider`. + + Raises: + ExternalApiError: If *provider_type* is not a registered strategy. + """ + if provider_type == _PROVIDER_HTTPX: + return HttpxExternalAccessProvider() + msg = f"Unknown external-access provider type: {provider_type!r}" + raise ExternalApiError(msg) diff --git a/src/synthorg/tools/permissions.py b/src/synthorg/tools/permissions.py index e0e4abf18c..0d0ef37bdb 100644 --- a/src/synthorg/tools/permissions.py +++ b/src/synthorg/tools/permissions.py @@ -94,6 +94,7 @@ class ToolPermissionChecker: ToolCategory.ANALYTICS, ToolCategory.MEMORY, ToolCategory.BROWSER, + ToolCategory.EXTERNAL_DATA, } ), # all categories -- new ToolCategory members are auto-included; diff --git a/src/synthorg/workers/runtime_builder.py b/src/synthorg/workers/runtime_builder.py index 1faec3d124..2c1e034bf7 100644 --- a/src/synthorg/workers/runtime_builder.py +++ b/src/synthorg/workers/runtime_builder.py @@ -49,6 +49,7 @@ from synthorg.settings.mirrors import resolve_init_int from synthorg.tools.base import BaseTool # noqa: TC001 from synthorg.tools.factory import build_default_tools_from_config +from synthorg.tools.network_validator import NetworkPolicy from synthorg.tools.registry import ToolRegistry from synthorg.tools.sandbox.factory import build_sandbox_backends from synthorg.tools.sandbox.lifecycle.factory import create_lifecycle_strategy @@ -67,12 +68,14 @@ from synthorg.engine.pipeline.protocol import WorkPipeline from synthorg.providers.protocol import CompletionProvider from synthorg.providers.registry import ProviderRegistry + from synthorg.tools.external_api._runtime import ExternalApiRuntime from synthorg.tools.sandbox.protocol import SandboxBackend logger = get_logger(__name__) _WEB_TIMEOUT_NS: str = "tools" _WEB_TIMEOUT_KEY: str = "web_request_timeout_seconds" +_EXTERNAL_API_NS: str = SettingNamespace.EXTERNAL_API.value _GIT_TIMEOUT_NS: str = "tools" _GIT_TIMEOUT_KEY: str = "git_command_timeout_seconds" _DECOMPOSITION_NS: str = "coordination" @@ -247,12 +250,81 @@ async def _build_tool_registry( return ToolRegistry(tools), len(tools), sandbox_backends -def _construct_agent_engine( +async def _build_external_api_runtime( + app_state: AppState, +) -> ExternalApiRuntime | None: + """Resolve the boot-scoped external-access runtime, or ``None`` when off. + + Returns ``None`` (so the tool is not registered) when the feature flag + is disabled or no connection catalog is wired. Otherwise resolves the + provider discriminator and default per-call limits via the settings + resolver and builds the configured ``ExternalAccessProvider``. + + Fail-open: a resolution failure (missing setting, validation error, + unknown provider discriminator) keeps the rest of the runtime buildable + by logging a warning and returning ``None``, mirroring + :func:`_resolve_routing_scorer_config`. A misconfigured external-access + feature should not crash the whole agent runtime. + """ + if not app_state.has_connection_catalog: + return None + resolver = app_state.config_resolver + try: + if not await resolver.get_bool(_EXTERNAL_API_NS, "enabled"): + return None + + from synthorg.tools.external_api._runtime import ( # noqa: PLC0415 + ExternalApiRuntime, + ) + from synthorg.tools.external_api.provider_factory import ( # noqa: PLC0415 + build_external_access_provider, + ) + + provider_type = await resolver.get_str(_EXTERNAL_API_NS, "provider_type") + max_response_bytes = await resolver.get_int( + _EXTERNAL_API_NS, + "default_max_response_bytes", + ) + timeout_seconds = await resolver.get_float( + _EXTERNAL_API_NS, + "default_timeout_seconds", + ) + default_max_rpm = await resolver.get_int(_EXTERNAL_API_NS, "default_max_rpm") + provider = build_external_access_provider(provider_type=provider_type) + except MemoryError, RecursionError: + raise + except Exception as exc: + logger.warning( + API_APP_STARTUP, + service="external_api", + context="external_api_runtime_resolve", + note="external-access feature unavailable; tool not registered", + error_type=type(exc).__name__, + error=safe_error_description(exc), + ) + return None + + web = app_state.config.web + network_policy = ( + web.network_policy if web is not None and web.network_policy else None + ) + return ExternalApiRuntime( + connection_catalog=app_state.connection_catalog, + provider=provider, + network_policy=network_policy or NetworkPolicy(), + max_response_bytes=max_response_bytes, + timeout_seconds=timeout_seconds, + default_max_rpm=default_max_rpm, + ) + + +def _construct_agent_engine( # noqa: PLR0913 -- boot collaborators threaded in app_state: AppState, provider: CompletionProvider, registry: ProviderRegistry, tool_registry: ToolRegistry, coordination_metrics_collector: CoordinationMetricsCollector | None, + external_api_runtime: ExternalApiRuntime | None = None, ) -> AgentEngine: """Assemble the boot ``AgentEngine`` from live application state. @@ -288,6 +360,7 @@ def _construct_agent_engine( config_resolver=app_state.config_resolver, event_stream_hub=app_state.event_stream_hub, interrupt_store=app_state.interrupt_store, + external_api_runtime=external_api_runtime, clock=app_state.clock, ) @@ -541,12 +614,14 @@ async def build_runtime_services( extra_tools=red_team_seed.extra_tools, ) coordination_metrics_collector = _construct_coordination_collector(app_state) + external_api_runtime = await _build_external_api_runtime(app_state) engine = _construct_agent_engine( app_state, provider, registry, tool_registry, coordination_metrics_collector, + external_api_runtime, ) autonomy_resolver = AutonomyResolver( registry=ActionTypeRegistry(), diff --git a/tests/conformance/persistence/test_approval_repository.py b/tests/conformance/persistence/test_approval_repository.py index bc97a49d49..5868983935 100644 --- a/tests/conformance/persistence/test_approval_repository.py +++ b/tests/conformance/persistence/test_approval_repository.py @@ -541,3 +541,81 @@ async def test_transition_if_returns_false_on_missing_row( to_state=ApprovalStatus.EXPIRED, ) assert result is False + + async def test_consume_if_approved_wins_once_then_rejects_replay( + self, + backend: PersistenceBackend, + ) -> None: + repo = _approval_repo(backend) + approved = _make_item( + approval_id="consume-approved", status=ApprovalStatus.APPROVED + ) + await repo.save(approved) + consumed_at = datetime(2026, 3, 1, 9, 30, tzinfo=UTC) + + first = await repo.consume_if_approved(approved.id, consumed_at=consumed_at) + assert first is True + + fetched = await repo.get(approved.id) + assert fetched is not None + assert fetched.consumed_at is not None + assert fetched.consumed_at == consumed_at + # The approval remains APPROVED; consumption is orthogonal. + assert fetched.status is ApprovalStatus.APPROVED + + # Replay loses: the row is already consumed. + replay = await repo.consume_if_approved( + approved.id, + consumed_at=datetime(2026, 3, 1, 9, 31, tzinfo=UTC), + ) + assert replay is False + unchanged = await repo.get(approved.id) + assert unchanged is not None + assert unchanged.consumed_at == consumed_at + + async def test_consume_if_approved_returns_false_when_not_approved( + self, + backend: PersistenceBackend, + ) -> None: + repo = _approval_repo(backend) + pending = _make_item( + approval_id="consume-pending", status=ApprovalStatus.PENDING + ) + await repo.save(pending) + + result = await repo.consume_if_approved( + pending.id, + consumed_at=datetime(2026, 3, 1, tzinfo=UTC), + ) + assert result is False + fetched = await repo.get(pending.id) + assert fetched is not None + assert fetched.consumed_at is None + + async def test_consume_if_approved_returns_false_on_missing_row( + self, + backend: PersistenceBackend, + ) -> None: + repo = _approval_repo(backend) + + result = await repo.consume_if_approved( + NotBlankStr("consume-missing"), + consumed_at=datetime(2026, 3, 1, tzinfo=UTC), + ) + assert result is False + + async def test_consumed_at_round_trips( + self, + backend: PersistenceBackend, + ) -> None: + repo = _approval_repo(backend) + consumed_at = datetime(2026, 3, 2, 14, 0, tzinfo=UTC) + item = _make_item( + approval_id="consume-rt", status=ApprovalStatus.APPROVED + ).model_copy(update={"consumed_at": consumed_at}) + await repo.save(item) + + fetched = await repo.get(item.id) + assert fetched is not None + assert fetched.consumed_at is not None + assert fetched.consumed_at == consumed_at diff --git a/tests/conformance/persistence/test_connection_repositories.py b/tests/conformance/persistence/test_connection_repositories.py index d444b47a4f..4ac61968a5 100644 --- a/tests/conformance/persistence/test_connection_repositories.py +++ b/tests/conformance/persistence/test_connection_repositories.py @@ -73,6 +73,24 @@ async def test_save_and_get_round_trip(self, backend: PersistenceBackend) -> Non async def test_get_missing_returns_none(self, backend: PersistenceBackend) -> None: assert await backend.connections.get(NotBlankStr("never-saved")) is None + async def test_sensitive_flag_round_trips( + self, backend: PersistenceBackend + ) -> None: + # Default is non-sensitive. + await backend.connections.save(_connection(name="conn-plain")) + plain = await backend.connections.get(NotBlankStr("conn-plain")) + assert plain is not None + assert plain.sensitive is False + + # Explicitly sensitive round-trips and survives upsert. + sensitive = _connection(name="conn-secret").model_copy( + update={"sensitive": True}, + ) + await backend.connections.save(sensitive) + fetched = await backend.connections.get(NotBlankStr("conn-secret")) + assert fetched is not None + assert fetched.sensitive is True + async def test_save_is_idempotent_upsert(self, backend: PersistenceBackend) -> None: await backend.connections.save(_connection(metadata={"team": "a"})) await backend.connections.save(_connection(metadata={"team": "b"})) diff --git a/tests/e2e/test_external_api_governance_e2e.py b/tests/e2e/test_external_api_governance_e2e.py new file mode 100644 index 0000000000..b310dc51c9 --- /dev/null +++ b/tests/e2e/test_external_api_governance_e2e.py @@ -0,0 +1,231 @@ +"""End-to-end governance test for the external_api tool (#1991). + +Drives the tool through the real ``ToolInvoker`` escalation path, a real +``ConnectionCatalog`` (in-memory repository + stub secret backend, so +credential brokering round-trips), and a real ``ApprovalStore``, proving the +full acceptance criteria: an agent consumes an external API while building a +deliverable, with credentials brokered, rate limits enforced, egress +constrained, and a sensitive call gated to approval, then resumed and consumed +exactly once. + +The upstream HTTP egress is replaced by a deterministic stub +``ExternalAccessProvider`` so the test is hermetic; everything else (catalog, +secret round-trip, invoker escalation detection, approval consumption) is the +real production path. +""" + +from datetime import UTC, datetime + +import pytest + +from synthorg.api.approval_store import ApprovalStore +from synthorg.core.enums import ApprovalStatus +from synthorg.core.types import NotBlankStr +from synthorg.integrations.connections.catalog import ConnectionCatalog +from synthorg.integrations.connections.models import AuthMethod, ConnectionType +from synthorg.integrations.errors import ConnectionRateLimitError +from synthorg.persistence.integration_stubs import InMemoryConnectionRepository +from synthorg.providers.models import ToolCall +from synthorg.tools.external_api.external_api_tool import ExternalApiTool +from synthorg.tools.external_api.provider import ( + ExternalAccessRequest, + ExternalAccessResponse, +) +from synthorg.tools.invoker import ToolInvoker +from synthorg.tools.network_validator import NetworkPolicy +from synthorg.tools.registry import ToolRegistry + +pytestmark = pytest.mark.e2e + +_AGENT_ID = "agent-e2e" + + +class _StubSecretBackend: + """In-memory ``SecretBackend`` so credential brokering round-trips.""" + + def __init__(self) -> None: + self._store: dict[str, bytes] = {} + + @property + def backend_name(self) -> NotBlankStr: + return NotBlankStr("stub") + + async def store(self, secret_id: NotBlankStr, value: bytes) -> None: + self._store[str(secret_id)] = value + + async def retrieve(self, secret_id: NotBlankStr) -> bytes | None: + return self._store.get(str(secret_id)) + + async def delete(self, secret_id: NotBlankStr) -> bool: + return self._store.pop(str(secret_id), None) is not None + + async def rotate(self, old_id: NotBlankStr, new_value: bytes) -> NotBlankStr: + new_id = NotBlankStr(f"{old_id}-rotated") + self._store[str(new_id)] = new_value + self._store.pop(str(old_id), None) + return new_id + + async def close(self) -> None: + return None + + +class _StubProvider: + """Deterministic ``ExternalAccessProvider`` recording every request.""" + + def __init__(self) -> None: + self.requests: list[ExternalAccessRequest] = [] + self.error: Exception | None = None + + async def request(self, req: ExternalAccessRequest) -> ExternalAccessResponse: + self.requests.append(req) + if self.error is not None: + raise self.error + return ExternalAccessResponse( + status_code=200, + headers={}, + body=f"DATA::{req.url}", + truncated=False, + ) + + +async def _make_catalog() -> ConnectionCatalog: + """Build an in-memory catalog with a public and a sensitive connection.""" + catalog = ConnectionCatalog(InMemoryConnectionRepository(), _StubSecretBackend()) + await catalog.create( + name="public-api", + connection_type=ConnectionType.GENERIC_HTTP, + auth_method=AuthMethod.BEARER_TOKEN.value, + credentials={"base_url": "https://api.example.com", "token": "public-token"}, + base_url="https://api.example.com", + ) + await catalog.create( + name="crm-api", + connection_type=ConnectionType.GENERIC_HTTP, + auth_method=AuthMethod.BEARER_TOKEN.value, + credentials={"base_url": "https://crm.example.com", "token": "crm-token"}, + base_url="https://crm.example.com", + sensitive=True, + ) + return catalog + + +def _make_invoker( + catalog: ConnectionCatalog, + store: ApprovalStore, + provider: _StubProvider, +) -> ToolInvoker: + tool = ExternalApiTool( + connection_catalog=catalog, + approval_store=store, + provider=provider, + agent_id=_AGENT_ID, + task_id="task-e2e", + network_policy=NetworkPolicy(block_private_ips=False), + max_response_bytes=1_048_576, + timeout_seconds=30.0, + default_max_rpm=60, + ) + return ToolInvoker(ToolRegistry([tool]), agent_id=_AGENT_ID, task_id="task-e2e") + + +def _call(call_id: str, **arguments: object) -> ToolCall: + return ToolCall(id=call_id, name="external_api", arguments=arguments) + + +@pytest.mark.e2e +class TestExternalApiGovernanceE2E: + async def test_full_governance_lifecycle(self) -> None: + catalog = await _make_catalog() + store = ApprovalStore() + provider = _StubProvider() + invoker = _make_invoker(catalog, store, provider) + + # 1. Build a deliverable: a non-sensitive read proceeds, with the + # connection's credential brokered into the request and egress + # constrained to the connection's host. + read = await invoker.invoke( + _call("c1", connection="public-api", path="/widgets"), + ) + assert read.is_error is False + assert read.content == "DATA::https://api.example.com/widgets" + assert len(provider.requests) == 1 + assert provider.requests[0].headers["Authorization"] == "Bearer public-token" + # Credential never surfaces to the agent-visible result. + assert "public-token" not in read.content + assert not invoker.pending_escalations + + # 2. A sensitive connection gates to approval BEFORE any egress; the + # invoker surfaces the parking escalation. + gated = await invoker.invoke( + _call("c2", connection="crm-api", path="/customers"), + ) + assert gated.is_error is True + assert len(invoker.pending_escalations) == 1 + approval_id = invoker.pending_escalations[0].approval_id + # No egress to the sensitive host occurred. + assert all("crm.example.com" not in r.url for r in provider.requests) + + # 3. A human approves the parked request. + item = await store.get(approval_id) + assert item is not None + await store.save( + item.model_copy( + update={ + "status": ApprovalStatus.APPROVED, + "decided_at": datetime.now(UTC), + "decided_by": "operator", + }, + ), + ) + + # 4. On resume the agent re-issues the same call; the grant is matched + # by content signature, consumed exactly once, and egress proceeds. + resumed = await invoker.invoke( + _call("c3", connection="crm-api", path="/customers"), + ) + assert resumed.is_error is False + assert resumed.content == "DATA::https://crm.example.com/customers" + assert provider.requests[-1].headers["Authorization"] == "Bearer crm-token" + consumed = await store.get(approval_id) + assert consumed is not None + assert consumed.consumed_at is not None + + # 5. Replaying the now-consumed approval id is rejected (no egress). + egress_count = len(provider.requests) + replay = await invoker.invoke( + _call( + "c4", + connection="crm-api", + path="/customers", + approval_id=approval_id, + ), + ) + assert replay.is_error is True + assert len(provider.requests) == egress_count + + async def test_egress_constrained_to_connection_host(self) -> None: + catalog = await _make_catalog() + provider = _StubProvider() + invoker = _make_invoker(catalog, ApprovalStore(), provider) + + blocked = await invoker.invoke( + _call( + "e1", + connection="public-api", + url="https://evil.example.net/exfil", + ), + ) + assert blocked.is_error is True + assert provider.requests == [] + + async def test_rate_limit_surfaces_gracefully(self) -> None: + catalog = await _make_catalog() + provider = _StubProvider() + provider.error = ConnectionRateLimitError("window full") + invoker = _make_invoker(catalog, ApprovalStore(), provider) + + limited = await invoker.invoke( + _call("r1", connection="public-api", path="/widgets"), + ) + assert limited.is_error is True + assert "rate limit" in limited.content.lower() diff --git a/tests/unit/api/test_approval_store.py b/tests/unit/api/test_approval_store.py index c820bbc04f..ca0649b427 100644 --- a/tests/unit/api/test_approval_store.py +++ b/tests/unit/api/test_approval_store.py @@ -239,6 +239,57 @@ async def test_returns_none_when_expired(self) -> None: assert result is None +@pytest.mark.unit +class TestConsumeIfApproved: + """consume_if_approved() one-shot atomic consumption guard.""" + + async def test_consumes_approved_item_once(self) -> None: + store = ApprovalStore() + now = _now() + item = _make_item( + status=ApprovalStatus.APPROVED, + decided_at=now, + decided_by="admin", + ) + await store.add(item) + + consumed = await store.consume_if_approved("approval-001") + assert consumed is not None + assert consumed.consumed_at is not None + # Consumption is orthogonal to the decision lifecycle. + assert consumed.status is ApprovalStatus.APPROVED + + fetched = await store.get("approval-001") + assert fetched is not None + assert fetched.consumed_at is not None + + async def test_replay_returns_none(self) -> None: + store = ApprovalStore() + now = _now() + await store.add( + _make_item( + status=ApprovalStatus.APPROVED, + decided_at=now, + decided_by="admin", + ), + ) + first = await store.consume_if_approved("approval-001") + assert first is not None + second = await store.consume_if_approved("approval-001") + assert second is None + + async def test_returns_none_when_pending(self) -> None: + store = ApprovalStore() + await store.add(_make_item(status=ApprovalStatus.PENDING)) + result = await store.consume_if_approved("approval-001") + assert result is None + + async def test_returns_none_when_missing(self) -> None: + store = ApprovalStore() + result = await store.consume_if_approved("nonexistent") + assert result is None + + @pytest.mark.unit class TestApprovalStoreFilters: """Combined filter tests.""" diff --git a/tests/unit/core/test_enums.py b/tests/unit/core/test_enums.py index 0d69f9c8ca..195e1d5ca9 100644 --- a/tests/unit/core/test_enums.py +++ b/tests/unit/core/test_enums.py @@ -117,10 +117,11 @@ def test_workflow_node_type_has_9_members(self) -> None: def test_workflow_edge_type_has_7_members(self) -> None: assert len(WorkflowEdgeType) == 7 - def test_action_type_has_31_members(self) -> None: - assert len(ActionType) == 31 + def test_action_type_has_32_members(self) -> None: + assert len(ActionType) == 32 assert ActionType.MEMORY_READ.value == "memory:read" assert ActionType.BROWSER_NAVIGATE.value == "browser:navigate" + assert ActionType.EXTERNAL_DATA_REQUEST.value == "external_data:request" # ── String Values ────────────────────────────────────────────────── diff --git a/tests/unit/hr/test_registry_autonomy.py b/tests/unit/hr/test_registry_autonomy.py index a3927ec495..a07adb62fe 100644 --- a/tests/unit/hr/test_registry_autonomy.py +++ b/tests/unit/hr/test_registry_autonomy.py @@ -83,6 +83,9 @@ async def save_if_pending( ) -> ApprovalItem | None: return None + async def consume_if_approved(self, approval_id: Any) -> ApprovalItem | None: + return None + class TestUpdateAutonomy: """update_autonomy() requests promotions through the approval queue.""" diff --git a/tests/unit/security/test_action_types.py b/tests/unit/security/test_action_types.py index ecc9d0381c..5850b90ef8 100644 --- a/tests/unit/security/test_action_types.py +++ b/tests/unit/security/test_action_types.py @@ -11,7 +11,7 @@ @pytest.mark.unit class TestActionTypeCategory: def test_has_expected_member_count(self) -> None: - assert len(ActionTypeCategory) == 12 + assert len(ActionTypeCategory) == 13 @pytest.mark.parametrize( ("member", "value"), @@ -28,6 +28,7 @@ def test_has_expected_member_count(self) -> None: (ActionTypeCategory.ARCH, "arch"), (ActionTypeCategory.MEMORY, "memory"), (ActionTypeCategory.BROWSER, "browser"), + (ActionTypeCategory.EXTERNAL_DATA, "external_data"), ], ) def test_member_values(self, member: ActionTypeCategory, value: str) -> None: diff --git a/tests/unit/tools/external_api/test_errors.py b/tests/unit/tools/external_api/test_errors.py new file mode 100644 index 0000000000..a9e7581550 --- /dev/null +++ b/tests/unit/tools/external_api/test_errors.py @@ -0,0 +1,93 @@ +"""Tests for the external-access tool domain error hierarchy.""" + +import pytest + +from synthorg.core.domain_errors import DomainError +from synthorg.core.error_taxonomy import ErrorCategory, ErrorCode +from synthorg.tools.errors import ToolError +from synthorg.tools.external_api.errors import ( + ExternalApiApprovalMismatchError, + ExternalApiArgumentError, + ExternalApiConnectionNotFoundError, + ExternalApiCredentialError, + ExternalApiEgressBlockedError, + ExternalApiError, + ExternalApiRateLimitedError, + ExternalApiResponseError, +) + +_ALL_ERRORS = ( + ExternalApiError, + ExternalApiArgumentError, + ExternalApiConnectionNotFoundError, + ExternalApiEgressBlockedError, + ExternalApiCredentialError, + ExternalApiApprovalMismatchError, + ExternalApiRateLimitedError, + ExternalApiResponseError, +) + + +@pytest.mark.unit +class TestExternalApiErrorHierarchy: + @pytest.mark.parametrize("err_cls", _ALL_ERRORS) + def test_subclasses_tool_error_and_domain_error( + self, err_cls: type[ExternalApiError] + ) -> None: + assert issubclass(err_cls, ExternalApiError) + assert issubclass(err_cls, ToolError) + assert issubclass(err_cls, DomainError) + + @pytest.mark.parametrize( + ("err_cls", "code", "category", "status"), + [ + ( + ExternalApiArgumentError, + ErrorCode.TOOL_PARAMETER_ERROR, + ErrorCategory.VALIDATION, + 422, + ), + ( + ExternalApiConnectionNotFoundError, + ErrorCode.CONNECTION_NOT_FOUND, + ErrorCategory.NOT_FOUND, + 404, + ), + ( + ExternalApiEgressBlockedError, + ErrorCode.FORBIDDEN, + ErrorCategory.AUTH, + 403, + ), + ( + ExternalApiApprovalMismatchError, + ErrorCode.RESOURCE_CONFLICT, + ErrorCategory.CONFLICT, + 409, + ), + ( + ExternalApiRateLimitedError, + ErrorCode.RATE_LIMITED, + ErrorCategory.RATE_LIMIT, + 429, + ), + ], + ) + def test_code_category_status( + self, + err_cls: type[ExternalApiError], + code: ErrorCode, + category: ErrorCategory, + status: int, + ) -> None: + assert err_cls.error_code is code + assert err_cls.error_category is category + assert err_cls.status_code == status + + def test_raisable_with_context(self) -> None: + err = ExternalApiEgressBlockedError( + "blocked", + context={"connection": "crm-api"}, + ) + assert err.context["connection"] == "crm-api" + assert "blocked" in str(err) diff --git a/tests/unit/tools/external_api/test_external_api_tool.py b/tests/unit/tools/external_api/test_external_api_tool.py new file mode 100644 index 0000000000..c6f690bb46 --- /dev/null +++ b/tests/unit/tools/external_api/test_external_api_tool.py @@ -0,0 +1,380 @@ +"""Unit tests for the governed ExternalApiTool.""" + +from datetime import UTC, datetime +from typing import Any +from unittest.mock import AsyncMock + +import pytest +from tests._shared.mock_of import mock_of + +from synthorg.api.approval_store import ApprovalStore +from synthorg.core.enums import ApprovalStatus, AutonomyLevel +from synthorg.integrations.connections.catalog import ConnectionCatalog +from synthorg.integrations.connections.models import ( + AuthMethod, + Connection, + ConnectionType, +) +from synthorg.integrations.errors import ConnectionRateLimitError +from synthorg.security.autonomy.models import EffectiveAutonomy +from synthorg.tools.external_api.errors import ExternalApiResponseError +from synthorg.tools.external_api.external_api_tool import ExternalApiTool +from synthorg.tools.external_api.provider import ( + ExternalAccessRequest, + ExternalAccessResponse, +) +from synthorg.tools.network_validator import NetworkPolicy + +_ACTION_TYPE = "external_data:request" + + +class StubProvider: + """Records requests and returns a canned response (or raises).""" + + def __init__( + self, + response: ExternalAccessResponse | None = None, + *, + error: Exception | None = None, + ) -> None: + self.response = response or ExternalAccessResponse( + status_code=200, + headers={}, + body="payload", + truncated=False, + ) + self.error = error + self.requests: list[ExternalAccessRequest] = [] + + async def request(self, req: ExternalAccessRequest) -> ExternalAccessResponse: + self.requests.append(req) + if self.error is not None: + raise self.error + return self.response + + +def _connection( + *, + auth_method: AuthMethod = AuthMethod.BEARER_TOKEN, + sensitive: bool = False, +) -> Connection: + return Connection( + name="crm-api", + connection_type=ConnectionType.GENERIC_HTTP, + auth_method=auth_method, + base_url="https://api.example.com", + sensitive=sensitive, + ) + + +def _build_tool( # noqa: PLR0913 -- test helper mirrors the tool's collaborators + *, + conn: Connection | None, + credentials: dict[str, str] | None = None, + provider: StubProvider | None = None, + autonomy: EffectiveAutonomy | None = None, + network_policy: NetworkPolicy | None = None, + approval_store: ApprovalStore | None = None, +) -> ExternalApiTool: + catalog = mock_of[ConnectionCatalog]( + get=AsyncMock(spec=ConnectionCatalog.get, return_value=conn), + get_credentials=AsyncMock( + spec=ConnectionCatalog.get_credentials, + return_value=credentials or {"token": "sekret-token"}, + ), + ) + return ExternalApiTool( + connection_catalog=catalog, + approval_store=approval_store or ApprovalStore(), + provider=provider or StubProvider(), + agent_id="agent-1", + task_id="task-1", + network_policy=network_policy or NetworkPolicy(block_private_ips=False), + effective_autonomy=autonomy, + max_response_bytes=1_048_576, + timeout_seconds=30.0, + default_max_rpm=60, + ) + + +@pytest.mark.unit +class TestExternalApiToolHappyPath: + async def test_get_proceeds_and_returns_body(self) -> None: + provider = StubProvider() + tool = _build_tool(conn=_connection(), provider=provider) + result = await tool.execute( + arguments={"connection": "crm-api", "path": "/v2/contacts"}, + ) + assert result.is_error is False + assert result.content == "payload" + assert result.metadata["status_code"] == 200 + assert len(provider.requests) == 1 + assert provider.requests[0].url == "https://api.example.com/v2/contacts" + + async def test_returns_upstream_status_without_raising(self) -> None: + provider = StubProvider( + ExternalAccessResponse(status_code=404, body="missing", headers={}), + ) + tool = _build_tool(conn=_connection(), provider=provider) + result = await tool.execute( + arguments={"connection": "crm-api", "path": "/v2/missing"}, + ) + assert result.is_error is False + assert result.metadata["status_code"] == 404 + + +@pytest.mark.unit +class TestExternalApiToolCredentials: + async def test_bearer_token_injected_not_leaked(self) -> None: + provider = StubProvider() + tool = _build_tool( + conn=_connection(auth_method=AuthMethod.BEARER_TOKEN), + credentials={"token": "sekret-token"}, + provider=provider, + ) + result = await tool.execute( + arguments={"connection": "crm-api", "path": "/data"}, + ) + sent = provider.requests[0].headers + assert sent["Authorization"] == "Bearer sekret-token" + # Credential never surfaces to the agent. + assert "sekret-token" not in result.content + assert "sekret-token" not in str(result.metadata) + + async def test_api_key_header(self) -> None: + provider = StubProvider() + tool = _build_tool( + conn=_connection(auth_method=AuthMethod.API_KEY), + credentials={"api_key": "key-123"}, + provider=provider, + ) + await tool.execute(arguments={"connection": "crm-api", "path": "/data"}) + assert provider.requests[0].headers["X-API-Key"] == "key-123" + + async def test_basic_auth_header(self) -> None: + provider = StubProvider() + tool = _build_tool( + conn=_connection(auth_method=AuthMethod.BASIC_AUTH), + credentials={"username": "u", "password": "p"}, + provider=provider, + ) + await tool.execute(arguments={"connection": "crm-api", "path": "/data"}) + # base64("u:p") == "dTpw" + assert provider.requests[0].headers["Authorization"] == "Basic dTpw" + + +@pytest.mark.unit +class TestExternalApiToolEgress: + async def test_connection_not_found(self) -> None: + provider = StubProvider() + tool = _build_tool(conn=None, provider=provider) + result = await tool.execute( + arguments={"connection": "ghost", "path": "/x"}, + ) + assert result.is_error is True + assert "not found" in result.content.lower() + assert provider.requests == [] + + async def test_url_outside_connection_blocked(self) -> None: + provider = StubProvider() + tool = _build_tool(conn=_connection(), provider=provider) + result = await tool.execute( + arguments={"connection": "crm-api", "url": "https://evil.example.net/x"}, + ) + assert result.is_error is True + assert provider.requests == [] + + async def test_ssrf_private_ip_blocked(self) -> None: + provider = StubProvider() + conn = Connection( + name="crm-api", + connection_type=ConnectionType.GENERIC_HTTP, + auth_method=AuthMethod.CUSTOM, + base_url="http://10.0.0.1", + ) + tool = _build_tool( + conn=conn, + provider=provider, + network_policy=NetworkPolicy(), # blocks private IPs + ) + result = await tool.execute( + arguments={"connection": "crm-api", "path": "/x"}, + ) + assert result.is_error is True + assert provider.requests == [] + + +@pytest.mark.unit +class TestExternalApiToolApprovalGating: + async def test_sensitive_connection_parks(self) -> None: + provider = StubProvider() + tool = _build_tool(conn=_connection(sensitive=True), provider=provider) + result = await tool.execute( + arguments={"connection": "crm-api", "path": "/data"}, + ) + assert result.metadata.get("requires_parking") is True + assert result.metadata["approval_id"].startswith("approval-") + assert result.metadata["action_type"] == _ACTION_TYPE + assert provider.requests == [] + + async def test_write_method_parks(self) -> None: + provider = StubProvider() + tool = _build_tool(conn=_connection(sensitive=False), provider=provider) + result = await tool.execute( + arguments={ + "connection": "crm-api", + "method": "POST", + "path": "/data", + "body": "{}", + }, + ) + assert result.metadata.get("requires_parking") is True + assert provider.requests == [] + + async def test_full_autonomy_bypasses_park(self) -> None: + provider = StubProvider() + autonomy = EffectiveAutonomy( + level=AutonomyLevel.FULL, + auto_approve_actions=frozenset({_ACTION_TYPE}), + human_approval_actions=frozenset(), + security_agent=False, + ) + tool = _build_tool( + conn=_connection(sensitive=True), + provider=provider, + autonomy=autonomy, + ) + result = await tool.execute( + arguments={"connection": "crm-api", "method": "POST", "path": "/d"}, + ) + assert result.metadata.get("requires_parking") is None + assert len(provider.requests) == 1 + + async def test_approval_consumed_then_proceeds(self) -> None: + store = ApprovalStore() + provider = StubProvider() + tool = _build_tool( + conn=_connection(sensitive=True), + provider=provider, + approval_store=store, + ) + args = {"connection": "crm-api", "path": "/data"} + parked = await tool.execute(arguments=args) + approval_id = parked.metadata["approval_id"] + + # Human approves. + item = await store.get(approval_id) + assert item is not None + approved = item.model_copy( + update={ + "status": ApprovalStatus.APPROVED, + "decided_at": datetime.now(UTC), + "decided_by": "human", + }, + ) + await store.save(approved) + + # Resume: re-issue the same call. + resumed = await tool.execute(arguments=args) + assert resumed.is_error is False + assert len(provider.requests) == 1 + consumed = await store.get(approval_id) + assert consumed is not None + assert consumed.consumed_at is not None + + async def test_replay_with_consumed_approval_id_errors(self) -> None: + store = ApprovalStore() + provider = StubProvider() + tool = _build_tool( + conn=_connection(sensitive=True), + provider=provider, + approval_store=store, + ) + args: dict[str, Any] = {"connection": "crm-api", "path": "/data"} + parked = await tool.execute(arguments=args) + approval_id = parked.metadata["approval_id"] + item = await store.get(approval_id) + assert item is not None + await store.save( + item.model_copy( + update={ + "status": ApprovalStatus.APPROVED, + "decided_at": datetime.now(UTC), + "decided_by": "human", + }, + ), + ) + await tool.execute(arguments=args) # consumes + # Replay with the now-consumed approval id is rejected. + replay = await tool.execute(arguments={**args, "approval_id": approval_id}) + assert replay.is_error is True + assert len(provider.requests) == 1 + + async def test_unknown_approval_id_errors(self) -> None: + provider = StubProvider() + tool = _build_tool(conn=_connection(sensitive=True), provider=provider) + result = await tool.execute( + arguments={ + "connection": "crm-api", + "path": "/data", + "approval_id": "approval-does-not-exist", + }, + ) + assert result.is_error is True + assert provider.requests == [] + + +@pytest.mark.unit +class TestExternalApiToolResilience: + async def test_rate_limited_returns_graceful_result(self) -> None: + provider = StubProvider(error=ConnectionRateLimitError("window full")) + tool = _build_tool(conn=_connection(), provider=provider) + result = await tool.execute( + arguments={"connection": "crm-api", "path": "/data"}, + ) + assert result.is_error is True + assert result.metadata.get("rate_limited") is True + + async def test_transport_error_returns_error(self) -> None: + provider = StubProvider(error=ExternalApiResponseError("connection refused")) + tool = _build_tool(conn=_connection(), provider=provider) + result = await tool.execute( + arguments={"connection": "crm-api", "path": "/data"}, + ) + assert result.is_error is True + + async def test_truncation_flag_passthrough(self) -> None: + provider = StubProvider( + ExternalAccessResponse( + status_code=200, + body="abc", + headers={}, + truncated=True, + ), + ) + tool = _build_tool(conn=_connection(), provider=provider) + result = await tool.execute( + arguments={"connection": "crm-api", "path": "/data"}, + ) + assert result.metadata["truncated"] is True + + +@pytest.mark.unit +class TestExternalApiToolArgs: + async def test_both_path_and_url_rejected(self) -> None: + tool = _build_tool(conn=_connection()) + result = await tool.execute( + arguments={ + "connection": "crm-api", + "path": "/a", + "url": "https://api.example.com/b", + }, + ) + assert result.is_error is True + + async def test_body_on_get_rejected(self) -> None: + tool = _build_tool(conn=_connection()) + result = await tool.execute( + arguments={"connection": "crm-api", "path": "/a", "body": "x"}, + ) + assert result.is_error is True diff --git a/tests/unit/tools/external_api/test_httpx_provider.py b/tests/unit/tools/external_api/test_httpx_provider.py new file mode 100644 index 0000000000..e411d6fe21 --- /dev/null +++ b/tests/unit/tools/external_api/test_httpx_provider.py @@ -0,0 +1,123 @@ +"""Unit tests for the httpx ExternalAccessProvider.""" + +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from typing import Any +from unittest.mock import AsyncMock, patch + +import httpx +import pytest + +from synthorg.tools.external_api.errors import ExternalApiResponseError +from synthorg.tools.external_api.httpx_provider import HttpxExternalAccessProvider +from synthorg.tools.external_api.provider import ExternalAccessRequest + + +class _RaisingStream: + """Async context manager that raises on entry.""" + + def __init__(self, exc: Exception) -> None: + self._exc = exc + + async def __aenter__(self) -> None: + raise self._exc + + async def __aexit__(self, *_args: Any) -> None: + pass # pragma: no cover + + +def _mock_stream_client( + response: httpx.Response | None = None, + *, + side_effect: Exception | None = None, +) -> AsyncMock: + """Build an AsyncMock ``httpx.AsyncClient`` supporting ``.stream()``.""" + client = AsyncMock() + client.__aenter__ = AsyncMock(return_value=client) + client.__aexit__ = AsyncMock(return_value=False) + if side_effect is not None: + client.stream = lambda **_kw: _RaisingStream(side_effect) + elif response is not None: + + async def _aiter_bytes() -> AsyncIterator[bytes]: + yield response.content + + response.aiter_bytes = _aiter_bytes # type: ignore[assignment] + + @asynccontextmanager + async def _stream(**_kwargs: object) -> AsyncIterator[httpx.Response]: + yield response + + client.stream = _stream + return client + + +def _request(*, max_response_bytes: int = 1_048_576) -> ExternalAccessRequest: + return ExternalAccessRequest( + method="GET", + url="https://api.example.com/data", + headers={"Authorization": "Bearer redacted"}, + body=None, + timeout_seconds=30.0, + max_response_bytes=max_response_bytes, + ) + + +@pytest.mark.unit +class TestHttpxExternalAccessProvider: + async def test_returns_response(self) -> None: + provider = HttpxExternalAccessProvider() + mock_response = httpx.Response(200, content=b"hello") + with patch( + "synthorg.tools.external_api.httpx_provider.httpx.AsyncClient" + ) as mock_cls: + mock_cls.return_value = _mock_stream_client(mock_response) + result = await provider.request(_request()) + assert result.status_code == 200 + assert result.body == "hello" + assert result.truncated is False + + async def test_returns_4xx_5xx_without_raising(self) -> None: + # HTTP responses (any status) are data, not transport failures. + provider = HttpxExternalAccessProvider() + mock_response = httpx.Response(503, content=b"upstream down") + with patch( + "synthorg.tools.external_api.httpx_provider.httpx.AsyncClient" + ) as mock_cls: + mock_cls.return_value = _mock_stream_client(mock_response) + result = await provider.request(_request()) + assert result.status_code == 503 + assert "upstream down" in result.body + + async def test_truncates_oversized_body(self) -> None: + provider = HttpxExternalAccessProvider() + mock_response = httpx.Response(200, content=b"abcdefghij") + with patch( + "synthorg.tools.external_api.httpx_provider.httpx.AsyncClient" + ) as mock_cls: + mock_cls.return_value = _mock_stream_client(mock_response) + result = await provider.request(_request(max_response_bytes=4)) + assert result.truncated is True + assert result.body == "abcd" + + async def test_transport_failure_raises_response_error(self) -> None: + provider = HttpxExternalAccessProvider() + with patch( + "synthorg.tools.external_api.httpx_provider.httpx.AsyncClient" + ) as mock_cls: + mock_cls.return_value = _mock_stream_client( + side_effect=httpx.ConnectError("refused"), + ) + with pytest.raises(ExternalApiResponseError): + await provider.request(_request()) + + async def test_timeout_raises_response_error(self) -> None: + provider = HttpxExternalAccessProvider() + with patch( + "synthorg.tools.external_api.httpx_provider.httpx.AsyncClient" + ) as mock_cls: + mock_cls.return_value = _mock_stream_client( + side_effect=httpx.TimeoutException("timed out"), + ) + with pytest.raises(ExternalApiResponseError): + await provider.request(_request()) diff --git a/tests/unit/tools/external_api/test_signature.py b/tests/unit/tools/external_api/test_signature.py new file mode 100644 index 0000000000..7f7eabd841 --- /dev/null +++ b/tests/unit/tools/external_api/test_signature.py @@ -0,0 +1,62 @@ +"""Tests for the content-addressed approval signature.""" + +import pytest + +from synthorg.tools.external_api._signature import ApprovalSignature + + +def _sig(**overrides: object) -> ApprovalSignature: + base: dict[str, object] = { + "connection": "crm-api", + "method": "POST", + "resolved_url": "https://api.example.com/v2/contacts", + "body": '{"name": "x"}', + "headers": {"Accept": "application/json"}, + } + base.update(overrides) + return ApprovalSignature.build(**base) # type: ignore[arg-type] + + +@pytest.mark.unit +class TestApprovalSignature: + def test_deterministic(self) -> None: + assert _sig() == _sig() + + def test_metadata_round_trip(self) -> None: + sig = _sig() + restored = ApprovalSignature.from_metadata(sig.to_metadata()) + assert restored == sig + + def test_from_metadata_absent_returns_none(self) -> None: + assert ApprovalSignature.from_metadata({}) is None + + def test_from_metadata_invalid_returns_none(self) -> None: + assert ( + ApprovalSignature.from_metadata({"external_api_signature": "not json"}) + is None + ) + + @pytest.mark.parametrize( + "overrides", + [ + {"method": "PUT"}, + {"resolved_url": "https://api.example.com/v2/other"}, + {"body": '{"name": "y"}'}, + {"connection": "other-api"}, + ], + ) + def test_differs_on_call_change(self, overrides: dict[str, object]) -> None: + assert _sig() != _sig(**overrides) + + def test_matches_helper(self) -> None: + assert _sig().matches(_sig()) is True + assert _sig().matches(None) is False + assert _sig().matches(_sig(method="GET")) is False + + def test_credential_headers_excluded(self) -> None: + # Auth headers are injected later, not part of the signed shape; the + # agent-supplied headers are what's signed. Different agent headers + # change the signature, but that is the caller's request shape. + assert _sig(headers={"Accept": "application/json"}) != _sig( + headers={"Accept": "text/plain"}, + ) diff --git a/web/src/__tests__/helpers/factories.ts b/web/src/__tests__/helpers/factories.ts index effbacfb1f..3f09148079 100644 --- a/web/src/__tests__/helpers/factories.ts +++ b/web/src/__tests__/helpers/factories.ts @@ -245,6 +245,7 @@ export function makeApproval(id: string, overrides?: Partial): created_at: new Date(Date.now() - 3600_000).toISOString(), // 1 hour ago decided_at: null, expires_at: null, + consumed_at: null, evidence_package: null, seconds_remaining: null, urgency_level: 'no_expiry', diff --git a/web/src/__tests__/stores/connections.test.ts b/web/src/__tests__/stores/connections.test.ts index 7691ba6946..6b12711421 100644 --- a/web/src/__tests__/stores/connections.test.ts +++ b/web/src/__tests__/stores/connections.test.ts @@ -48,6 +48,7 @@ const sampleConnection: Connection = { rate_limiter: null, secret_refs: [], webhook_receipt_retention_days: null, + sensitive: false, created_at: '2026-04-01T09:00:00Z', updated_at: '2026-04-12T08:00:00Z', } @@ -119,6 +120,7 @@ describe('useConnectionsStore', () => { auth_method: 'bearer_token', credentials: { token: 'abc' }, health_check_enabled: true, + sensitive: false, }) expect(result).toEqual(sampleConnection) @@ -128,6 +130,7 @@ describe('useConnectionsStore', () => { auth_method: 'bearer_token', credentials: { token: 'abc' }, health_check_enabled: true, + sensitive: false, }) expect(useConnectionsStore.getState().connections).toHaveLength(1) }) diff --git a/web/src/api/types/enum-values.gen.ts b/web/src/api/types/enum-values.gen.ts index 42459f3090..6b44413ca0 100644 --- a/web/src/api/types/enum-values.gen.ts +++ b/web/src/api/types/enum-values.gen.ts @@ -644,6 +644,7 @@ export const SETTING_NAMESPACE_VALUES = [ 'hr', 'workers', 'telemetry', + 'external_api', ] as const export type SettingNamespace = (typeof SETTING_NAMESPACE_VALUES)[number] @@ -766,6 +767,7 @@ export const TOOL_CATEGORY_VALUES = [ 'ontology', 'mcp', 'browser', + 'external_data', 'other', ] as const export type ToolCategory = (typeof TOOL_CATEGORY_VALUES)[number] diff --git a/web/src/api/types/openapi.gen.ts b/web/src/api/types/openapi.gen.ts index bf58bcb258..7b83ac8080 100644 --- a/web/src/api/types/openapi.gen.ts +++ b/web/src/api/types/openapi.gen.ts @@ -5960,6 +5960,11 @@ export type components = { /** ApprovalResponse */ readonly ApprovalResponse: { readonly action_type: string; + /** + * Format: date-time + * @description datetime with the constraint that the value must have timezone info + */ + readonly consumed_at: string | null; /** * Format: date-time * @description datetime with the constraint that the value must have timezone info @@ -6933,6 +6938,8 @@ export type components = { readonly rate_limiter: components["schemas"]["RateLimiterConfig"] | null; /** @default [] */ readonly secret_refs: readonly components["schemas"]["SecretRef"][]; + /** @default false */ + readonly sensitive: boolean; /** * Format: date-time * @description datetime with the constraint that the value must have timezone info @@ -7239,6 +7246,8 @@ export type components = { readonly [key: string]: string; } | null; readonly name: string; + /** @default false */ + readonly sensitive: boolean; readonly webhook_receipt_retention_days?: number | null; }; /** CreateCustomRuleRequest */ @@ -11458,7 +11467,7 @@ export type components = { * can be edited at runtime via the settings API. * @enum {string} */ - readonly SettingNamespace: "api" | "client" | "company" | "providers" | "memory" | "budget" | "security" | "coordination" | "observability" | "backup" | "engine" | "communication" | "a2a" | "integrations" | "meta" | "notifications" | "objectives" | "simulations" | "tools" | "settings" | "hr" | "workers" | "telemetry"; + readonly SettingNamespace: "api" | "client" | "company" | "providers" | "memory" | "budget" | "security" | "coordination" | "observability" | "backup" | "engine" | "communication" | "a2a" | "integrations" | "meta" | "notifications" | "objectives" | "simulations" | "tools" | "settings" | "hr" | "workers" | "telemetry" | "external_api"; /** * SettingSource * @description Origin of a resolved setting value. @@ -12222,7 +12231,7 @@ export type components = { * @description Category of a tool for access-level gating. * @enum {string} */ - readonly ToolCategory: "file_system" | "code_execution" | "version_control" | "web" | "database" | "terminal" | "design" | "communication" | "analytics" | "deployment" | "memory" | "ontology" | "mcp" | "browser" | "other"; + readonly ToolCategory: "file_system" | "code_execution" | "version_control" | "web" | "database" | "terminal" | "design" | "communication" | "analytics" | "deployment" | "memory" | "ontology" | "mcp" | "browser" | "external_data" | "other"; /** * ToolPermissions * @description Tool permissions @@ -12475,6 +12484,7 @@ export type components = { readonly metadata?: { readonly [key: string]: string; } | null; + readonly sensitive?: boolean | null; readonly webhook_receipt_retention_days?: number | null; }; /** UpdateCustomRuleRequest */ diff --git a/web/src/mocks/handlers/approvals.ts b/web/src/mocks/handlers/approvals.ts index 5e41f732be..567612b0fd 100644 --- a/web/src/mocks/handlers/approvals.ts +++ b/web/src/mocks/handlers/approvals.ts @@ -29,6 +29,7 @@ export function buildApproval( created_at: '2026-04-19T00:00:00Z', decided_at: null, expires_at: null, + consumed_at: null, evidence_package: null, seconds_remaining: null, urgency_level: 'normal', diff --git a/web/src/mocks/handlers/connections.ts b/web/src/mocks/handlers/connections.ts index 0f037f7dde..733992ea5c 100644 --- a/web/src/mocks/handlers/connections.ts +++ b/web/src/mocks/handlers/connections.ts @@ -28,6 +28,7 @@ export function buildConnection( rate_limiter: null, secret_refs: [], webhook_receipt_retention_days: null, + sensitive: false, created_at: NOW, updated_at: NOW, ...overrides, diff --git a/web/src/pages/approvals/ApprovalCard.stories.tsx b/web/src/pages/approvals/ApprovalCard.stories.tsx index 08ddec64df..f520f4a84a 100644 --- a/web/src/pages/approvals/ApprovalCard.stories.tsx +++ b/web/src/pages/approvals/ApprovalCard.stories.tsx @@ -19,6 +19,7 @@ const base: ApprovalResponse = { created_at: '2026-03-27T10:00:00Z', decided_at: null, expires_at: '2026-03-27T14:00:00Z', + consumed_at: null, evidence_package: null, seconds_remaining: 7200, urgency_level: 'high', diff --git a/web/src/pages/approvals/ApprovalDetailDrawer.stories.tsx b/web/src/pages/approvals/ApprovalDetailDrawer.stories.tsx index 6f5ed0df62..be2e40708a 100644 --- a/web/src/pages/approvals/ApprovalDetailDrawer.stories.tsx +++ b/web/src/pages/approvals/ApprovalDetailDrawer.stories.tsx @@ -19,6 +19,7 @@ const base: ApprovalResponse = { created_at: '2026-03-27T10:00:00Z', decided_at: null, expires_at: '2026-03-27T14:00:00Z', + consumed_at: null, evidence_package: null, seconds_remaining: 7200, urgency_level: 'high', diff --git a/web/src/pages/approvals/ApprovalTimeline.stories.tsx b/web/src/pages/approvals/ApprovalTimeline.stories.tsx index 355137d737..a0c82f2e7d 100644 --- a/web/src/pages/approvals/ApprovalTimeline.stories.tsx +++ b/web/src/pages/approvals/ApprovalTimeline.stories.tsx @@ -18,6 +18,7 @@ const base: ApprovalResponse = { created_at: '2026-03-27T10:00:00Z', decided_at: null, expires_at: '2026-03-27T14:00:00Z', + consumed_at: null, evidence_package: null, seconds_remaining: 14400, urgency_level: 'high', diff --git a/web/src/pages/connections/ConnectionCard.stories.tsx b/web/src/pages/connections/ConnectionCard.stories.tsx index d4f97f24d6..a1d02fae4b 100644 --- a/web/src/pages/connections/ConnectionCard.stories.tsx +++ b/web/src/pages/connections/ConnectionCard.stories.tsx @@ -16,6 +16,7 @@ const baseConnection: Connection = { rate_limiter: null, secret_refs: [], webhook_receipt_retention_days: null, + sensitive: false, created_at: '2026-04-01T09:00:00Z', updated_at: '2026-04-12T08:00:00Z', } diff --git a/web/src/pages/connections/ConnectionFormModal.stories.tsx b/web/src/pages/connections/ConnectionFormModal.stories.tsx index 884f9a6da2..bb0d6ddad9 100644 --- a/web/src/pages/connections/ConnectionFormModal.stories.tsx +++ b/web/src/pages/connections/ConnectionFormModal.stories.tsx @@ -71,6 +71,7 @@ export const EditMode: Story = { rate_limiter: null, secret_refs: [], webhook_receipt_retention_days: null, + sensitive: false, created_at: '2026-04-01T09:00:00Z', updated_at: '2026-04-12T08:00:00Z', }, diff --git a/web/src/pages/connections/ConnectionFormModal.tsx b/web/src/pages/connections/ConnectionFormModal.tsx index a7284348d1..40546eb55d 100644 --- a/web/src/pages/connections/ConnectionFormModal.tsx +++ b/web/src/pages/connections/ConnectionFormModal.tsx @@ -17,6 +17,7 @@ import { } from '@/components/ui/dialog' import { InputField } from '@/components/ui/input-field' import { SelectField } from '@/components/ui/select-field' +import { ToggleField } from '@/components/ui/toggle-field' import { validateA2APeerCredentials } from './connection-type-fields' import { cn } from '@/lib/utils' import { useConnectionsStore } from '@/stores/connections' @@ -49,6 +50,11 @@ interface FormState { * Parsed to `number | null` on submit. */ webhookRetention: string + /** + * When true, the governed external-access tool routes every call + * against this connection (read or write) to human approval. + */ + sensitive: boolean } const EMPTY_STATE: FormState = { @@ -57,6 +63,7 @@ const EMPTY_STATE: FormState = { topLevel: {}, credentials: {}, webhookRetention: '', + sensitive: false, } function makeInitialState( @@ -74,6 +81,7 @@ function makeInitialState( connection.webhook_receipt_retention_days === null ? '' : String(connection.webhook_receipt_retention_days), + sensitive: connection.sensitive, } } return { @@ -284,6 +292,7 @@ export function ConnectionFormModal({ credentials, base_url: form.topLevel.base_url?.trim() || null, health_check_enabled: true, + sensitive: form.sensitive, ...(supportsWebhookRetention ? { webhook_receipt_retention_days: retentionValue } : {}), @@ -293,6 +302,7 @@ export function ConnectionFormModal({ } else if (connection) { const result = await updateConnection(connection.name, { base_url: form.topLevel.base_url?.trim() || null, + sensitive: form.sensitive, ...(supportsWebhookRetention ? { webhook_receipt_retention_days: retentionValue } : {}), @@ -415,6 +425,15 @@ export function ConnectionFormModal({ /> )} + + setForm((prev) => ({ ...prev, sensitive: checked })) + } + /> +