-
Notifications
You must be signed in to change notification settings - Fork 1
feat: governed external API/data access tool (#1991) #2032
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 6 commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
91e75fa
feat: governed external API/data access tool (#1991)
Aureliolo acb8ba6
fix: reject path-traversal segments in external_api URL resolution (#…
Aureliolo f0c7308
test: correct parking assertion in external_api e2e (#1991)
Aureliolo a060842
fix: register external_api namespace/events + consumed_at drift basel…
Aureliolo f11909f
docs: drop SEC-N tags and issue back-ref from external_api docstrings…
Aureliolo 094a32f
refactor: address pre-PR review for governed external API access (#1991)
Aureliolo a189767
fix: babysit round 2, 11 findings (8 coderabbit, 3 gemini) - approval…
Aureliolo 28e3c6b
fix: babysit round 4, 4 findings (4 coderabbit) - postgres consumed_a…
Aureliolo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,226 @@ | ||
| """Lazy-expiration behaviour for :class:`ApprovalStore`. | ||
|
|
||
| The PENDING -> EXPIRED lazy transition (scalar ``_check_expiration_locked``, | ||
| the pure batch ``_compute_expiration`` / ``_compute_page`` companions, the | ||
| cache-only list path, and the best-effort expire callback) is a cohesive | ||
| slice of the store. It lives in its own mixin so the main store module | ||
| stays focused on the CRUD + CAS + cache-coherency concurrency model. | ||
|
|
||
| The mixin reaches back into the host store for shared state (``_clock``, | ||
| ``_repo``, ``_items``, ``_on_expire``); the ``TYPE_CHECKING`` block below | ||
| declares that surface so ``mypy`` type-checks the mixin in isolation. | ||
| """ | ||
|
|
||
| from typing import TYPE_CHECKING | ||
|
|
||
| from synthorg.core.approval import ApprovalItem # noqa: TC001 | ||
| from synthorg.core.enums import ApprovalRiskLevel, ApprovalStatus | ||
| from synthorg.observability import get_logger, safe_error_description | ||
| from synthorg.observability.events.api import ( | ||
| API_APPROVAL_EXPIRE_CALLBACK_FAILED, | ||
| API_APPROVAL_EXPIRED, | ||
| ) | ||
| from synthorg.observability.events.approval_gate import ( | ||
| APPROVAL_STATUS_TRANSITIONED, | ||
| ) | ||
| from synthorg.observability.metrics_hub import record_approval_decision | ||
|
|
||
| if TYPE_CHECKING: | ||
| from collections.abc import Callable | ||
|
|
||
| from synthorg.core.clock import Clock | ||
| from synthorg.core.types import NotBlankStr | ||
| from synthorg.persistence.approval_protocol import ApprovalRepository | ||
|
|
||
| logger = get_logger(__name__) | ||
|
|
||
|
|
||
| class ApprovalExpirationMixin: | ||
| """Lazy-expiration methods mixed into :class:`ApprovalStore`.""" | ||
|
|
||
| if TYPE_CHECKING: | ||
| _clock: Clock | ||
| _repo: ApprovalRepository | None | ||
| _items: dict[str, ApprovalItem] | ||
| _on_expire: Callable[[ApprovalItem], None] | None | ||
|
|
||
| def _compute_page( | ||
| self, | ||
| page: tuple[ApprovalItem, ...], | ||
| *, | ||
| status: ApprovalStatus | None, | ||
| risk_level: ApprovalRiskLevel | None, | ||
| ) -> tuple[ | ||
| list[ApprovalItem], | ||
| list[ApprovalItem], | ||
| dict[str, ApprovalItem], | ||
| ]: | ||
| """Pure: classify a repo page into (filtered, to_persist, page_cache). | ||
|
|
||
| Companion to :meth:`ApprovalStore._list_from_repo`. Walks ``page`` | ||
| once, computing lazy expiration via :meth:`_compute_expiration` and | ||
| applying caller-supplied filters. No I/O, no lock acquisition. | ||
|
|
||
| ``page_cache`` carries every row from the page (with the | ||
| possibly-EXPIRED replacement substituted in) so the caller | ||
| can refresh the entire page slice in ``_items``, not just the | ||
| EXPIRED transitions. ``to_persist`` carries only the rows | ||
| that flipped locally, which is the candidate set the caller | ||
| feeds to ``expire_if_pending`` for the compare-and-set. | ||
| """ | ||
| page_result: list[ApprovalItem] = [] | ||
| to_persist: list[ApprovalItem] = [] | ||
| page_cache: dict[str, ApprovalItem] = {} | ||
| for item in page: | ||
| checked = self._compute_expiration(item) | ||
| page_cache[item.id] = checked | ||
| if checked is not item: | ||
| to_persist.append(checked) | ||
| if status is not None and checked.status != status: | ||
| continue | ||
| if risk_level is not None and checked.risk_level != risk_level: | ||
| continue | ||
| page_result.append(checked) | ||
| return page_result, to_persist, page_cache | ||
|
|
||
| async def _list_from_cache_locked( | ||
| self, | ||
| *, | ||
| status: ApprovalStatus | None, | ||
| risk_level: ApprovalRiskLevel | None, | ||
| action_type: NotBlankStr | None, | ||
| ) -> tuple[ApprovalItem, ...]: | ||
| """Cache-only list path (no repository wired). | ||
|
|
||
| Falls through ``_check_expiration_locked`` per item because | ||
| without a repository there is no batch endpoint to amortise; | ||
| a per-item save is also a no-op (the in-memory cache is | ||
| already updated by ``_check_expiration_locked``). | ||
| """ | ||
| checked_items: list[ApprovalItem] = [] | ||
| for stored in list(self._items.values()): | ||
| checked = await self._check_expiration_locked(stored) | ||
| if status is not None and checked.status != status: | ||
| continue | ||
| if risk_level is not None and checked.risk_level != risk_level: | ||
| continue | ||
| if action_type is not None and checked.action_type != action_type: | ||
| continue | ||
| checked_items.append(checked) | ||
| return tuple(checked_items) | ||
|
|
||
| async def _check_expiration_locked( | ||
| self, | ||
| item: ApprovalItem, | ||
| ) -> ApprovalItem: | ||
| """Lazy expiration, assuming ``self._lock`` is held. | ||
|
|
||
| If the item is PENDING and has expired, transition it to | ||
| EXPIRED in both the cache and the repository. Callers MUST | ||
| hold ``self._lock``; the method performs cache + repo mutations | ||
| without re-acquiring it. | ||
|
|
||
| Args: | ||
| item: The item to check. | ||
|
|
||
| Returns: | ||
| The original or expired item. | ||
| """ | ||
| if ( | ||
| item.status == ApprovalStatus.PENDING | ||
| and item.expires_at is not None | ||
| and self._clock.now() >= item.expires_at | ||
| ): | ||
| expired = item.model_copy( | ||
| update={"status": ApprovalStatus.EXPIRED}, | ||
| ) | ||
| if self._repo is not None: | ||
| await self._repo.save(expired) | ||
| self._items[item.id] = expired | ||
| # State-transition log fires AFTER persistence + cache | ||
| # update succeed so the audit stream only records hops | ||
| # that actually landed. Pairs with the | ||
| # APPROVAL_STATUS_TRANSITIONED emissions on PENDING -> | ||
| # APPROVED / REJECTED in ``api/controllers/approvals.py``; | ||
| # ``API_APPROVAL_EXPIRED`` below is the terminal-state | ||
| # summary event that subscribers can use as a single | ||
| # signal that an approval has expired. | ||
| logger.info( | ||
| APPROVAL_STATUS_TRANSITIONED, | ||
| approval_id=item.id, | ||
| from_status=ApprovalStatus.PENDING.value, | ||
| to_status=ApprovalStatus.EXPIRED.value, | ||
| ) | ||
| logger.info( | ||
| API_APPROVAL_EXPIRED, | ||
| approval_id=item.id, | ||
| ) | ||
| record_approval_decision(outcome="expired") | ||
| if self._on_expire is not None: | ||
| try: | ||
| self._on_expire(expired) | ||
| except MemoryError, RecursionError: | ||
| raise | ||
| except Exception as exc: | ||
| # ERROR (matching ``_fire_expire_callback``): the | ||
| # approval is already EXPIRED in cache + repo, so | ||
| # the callback failure can't unwind the expiration, | ||
| # but a dropped downstream side effect (webhook, | ||
| # audit dispatch, workflow resume) is operationally | ||
| # meaningful and operators must be able to alert | ||
| # on it. Both paths emit at ERROR so alerting is | ||
| # not sensitive to which expiration path fired. | ||
| logger.error( | ||
| API_APPROVAL_EXPIRE_CALLBACK_FAILED, | ||
| approval_id=item.id, | ||
| error_type=type(exc).__name__, | ||
| error=safe_error_description(exc), | ||
| ) | ||
| return expired | ||
| return item | ||
|
|
||
| def _compute_expiration(self, item: ApprovalItem) -> ApprovalItem: | ||
| """Pure: return the (possibly-EXPIRED) item without I/O. | ||
|
|
||
| Companion to ``_check_expiration_locked`` for the batch path | ||
| in :meth:`ApprovalStore.list_items`. Returns the input unchanged | ||
| when no transition applies, or a fresh EXPIRED copy otherwise. | ||
| Persistence + audit logging + callback fire AFTER the batch | ||
| save in the caller, not here -- this method must be safe to | ||
| call inside a tight loop with no side effects. | ||
| """ | ||
| if ( | ||
| item.status == ApprovalStatus.PENDING | ||
| and item.expires_at is not None | ||
| and self._clock.now() >= item.expires_at | ||
| ): | ||
| return item.model_copy(update={"status": ApprovalStatus.EXPIRED}) | ||
| return item | ||
|
|
||
| def _fire_expire_callback(self, expired: ApprovalItem) -> None: | ||
| """Best-effort fire of ``_on_expire`` for a batched expiration. | ||
|
|
||
| Mirrors the callback handling in | ||
| :meth:`_check_expiration_locked`: a callback failure must not | ||
| unwind the expiration (the row is already EXPIRED in cache + | ||
| repo); emit ``API_APPROVAL_EXPIRE_CALLBACK_FAILED`` so | ||
| operators can filter callback failures from real expirations. | ||
| """ | ||
| if self._on_expire is None: | ||
| return | ||
| try: | ||
| self._on_expire(expired) | ||
| except MemoryError, RecursionError: | ||
| raise | ||
| except Exception as exc: | ||
| # ERROR rather than WARNING: the approval is already | ||
| # EXPIRED in cache + repo, so the callback can't | ||
| # propagate, but a failed downstream side effect (webhook, | ||
| # audit dispatch, workflow resume) is operationally | ||
| # meaningful and operators must be able to alert on it. | ||
| logger.error( | ||
| API_APPROVAL_EXPIRE_CALLBACK_FAILED, | ||
| approval_id=expired.id, | ||
| error_type=type(exc).__name__, | ||
| error=safe_error_description(exc), | ||
| ) | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.