Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 3 additions & 33 deletions src/synthorg/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from litestar import Litestar, Router
from litestar.config.compression import CompressionConfig
from litestar.config.cors import CORSConfig
from litestar.datastructures import ResponseHeader, State
from litestar.datastructures import State
from litestar.middleware.rate_limit import RateLimitConfig as LitestarRateLimitConfig
from litestar.openapi import OpenAPIConfig
from litestar.openapi.plugins import ScalarRenderPlugin
Expand All @@ -32,7 +32,7 @@
from synthorg.api.controllers import ALL_CONTROLLERS
from synthorg.api.controllers.ws import ws_handler
from synthorg.api.exception_handlers import EXCEPTION_HANDLERS
from synthorg.api.middleware import CSPMiddleware, RequestLoggingMiddleware
from synthorg.api.middleware import RequestLoggingMiddleware, security_headers_hook
from synthorg.api.state import AppState
from synthorg.api.ws_models import WsEvent, WsEventType
from synthorg.budget.tracker import CostTracker # noqa: TC001
Expand Down Expand Up @@ -533,36 +533,7 @@ def create_app( # noqa: PLR0913
backend="brotli",
minimum_size=1000,
),
response_headers=[
ResponseHeader(
name="X-Content-Type-Options",
value="nosniff",
),
ResponseHeader(
name="X-Frame-Options",
value="DENY",
),
ResponseHeader(
name="Referrer-Policy",
value="strict-origin-when-cross-origin",
),
ResponseHeader(
name="Strict-Transport-Security",
value="max-age=63072000; includeSubDomains",
),
ResponseHeader(
name="Permissions-Policy",
value="geolocation=(), camera=(), microphone=()",
),
ResponseHeader(
name="Cross-Origin-Resource-Policy",
value="same-origin",
),
ResponseHeader(
name="Cache-Control",
value="no-store",
),
],
before_send=[security_headers_hook],
middleware=middleware,
plugins=plugins,
exception_handlers=EXCEPTION_HANDLERS, # type: ignore[arg-type]
Expand Down Expand Up @@ -613,7 +584,6 @@ def _build_middleware(api_config: ApiConfig) -> list[Middleware]:
auth_middleware = create_auth_middleware_class(auth)
return [
auth_middleware,
CSPMiddleware,
RequestLoggingMiddleware,
rate_limit.middleware,
]
125 changes: 78 additions & 47 deletions src/synthorg/api/middleware.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
"""Request middleware.

Provides ASGI middleware for request logging and path-aware
Content-Security-Policy headers.
"""Request middleware and before-send hooks.

Provides ASGI middleware for request logging, and a ``before_send``
hook that injects security headers (CSP, CORP, HSTS, etc.) into
**every** HTTP response — including exception-handler and
unmatched-route (404/405) responses.

Why ``before_send`` instead of ASGI middleware?
Litestar's ``before_send`` hook wraps the ASGI ``send`` callback at
the outermost layer (before the middleware stack), so it fires for
all responses. By contrast, user-defined ASGI middleware only runs
for matched routes — 404 and 405 responses from the router bypass it.
"""

import time
from typing import Any, Final

Comment thread
coderabbitai[bot] marked this conversation as resolved.
from litestar import Request
from litestar.datastructures import MutableScopeHeaders
from litestar.enums import ScopeType
from litestar.types import ASGIApp, Receive, Scope, Send # noqa: TC002
from litestar.types import ASGIApp, Message, Receive, Scope, Send # noqa: TC002

from synthorg.observability import get_logger
from synthorg.observability.events.api import (
Expand All @@ -19,63 +28,85 @@

logger = get_logger(__name__)

# ── Security headers ────────────────────────────────────────────
# Applied to every HTTP response via the before_send hook.

# Strict CSP for API routes — no inline scripts, self-origin only.
_API_CSP: Final[str] = "default-src 'self'; script-src 'self'"
_API_CSP: Final[str] = (
"default-src 'self'; script-src 'self'; base-uri 'self'; frame-ancestors 'none'"
)
Comment thread
greptile-apps[bot] marked this conversation as resolved.

# Relaxed CSP for /docs/ — Scalar UI loads resources from external origins.
# cdn.jsdelivr.net: JS bundle, CSS, fonts, source maps
# fonts.scalar.com: Scalar-hosted font files
# proxy.scalar.com: API proxy and registry features
# 'unsafe-inline' in script-src/style-src: required by Scalar UI which uses
# inline <script> and <style> elements. Accepted risk — /docs is read-only,
# unauthenticated, and serves no user-submitted content.
_DOCS_CSP: Final[str] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"img-src 'self' data: https://cdn.jsdelivr.net; "
"font-src 'self' data: https://cdn.jsdelivr.net https://fonts.scalar.com; "
"connect-src 'self' https://cdn.jsdelivr.net https://proxy.scalar.com"
"connect-src 'self' https://cdn.jsdelivr.net https://proxy.scalar.com; "
"base-uri 'self'; "
"frame-ancestors 'none'"
)
Comment on lines 48 to 58

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to the API CSP, the CSP for the documentation pages is missing the frame-ancestors directive. This could allow the documentation site to be framed by malicious pages, potentially leading to clickjacking attacks. To prevent this, you should add frame-ancestors 'none' to this policy as well.

Suggested change
_DOCS_CSP: Final[str] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"img-src 'self' data: https://cdn.jsdelivr.net; "
"font-src 'self' data: https://cdn.jsdelivr.net https://fonts.scalar.com; "
"connect-src 'self' https://cdn.jsdelivr.net https://proxy.scalar.com"
"connect-src 'self' https://cdn.jsdelivr.net https://proxy.scalar.com; "
"base-uri 'self'"
)
_DOCS_CSP: Final[str] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"img-src 'self' data: https://cdn.jsdelivr.net; "
"font-src 'self' data: https://cdn.jsdelivr.net https://fonts.scalar.com; "
"connect-src 'self' https://cdn.jsdelivr.net https://proxy.scalar.com; "
"base-uri 'self'; "
"frame-ancestors 'none'"
)



class CSPMiddleware:
"""ASGI middleware that applies path-aware Content-Security-Policy.

API routes get a strict policy (self-origin only). The ``/docs/``
path gets a relaxed policy that allows Scalar UI resources from
``cdn.jsdelivr.net``, ``fonts.scalar.com``, and
``proxy.scalar.com``.
# Static security headers (path-independent).
_SECURITY_HEADERS: Final[dict[str, str]] = {
"X-Content-Type-Options": "nosniff",
"X-Frame-Options": "DENY",
"Referrer-Policy": "strict-origin-when-cross-origin",
"Strict-Transport-Security": "max-age=63072000; includeSubDomains",
"Permissions-Policy": "geolocation=(), camera=(), microphone=()",
"Cross-Origin-Resource-Policy": "same-origin",
"Cross-Origin-Opener-Policy": "same-origin",
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Outdated
"Cache-Control": "no-store",
}


async def security_headers_hook(message: Message, scope: Scope) -> None:
"""Inject security headers into every HTTP response.

Registered as a Litestar ``before_send`` hook so it fires for
**all** HTTP responses — successful, exception-handler, and
router-level 404/405.

Adds static security headers (CORP, HSTS, X-Content-Type-Options,
etc.) and a path-aware Content-Security-Policy (strict for API,
relaxed for ``/docs/`` to allow Scalar UI resources).

Uses ``__setitem__`` (not ``add``) so that if any handler or
middleware already set a header, the known-good value overwrites
it rather than creating a duplicate.

Args:
message: ASGI message dict (only ``http.response.start``
is processed).
scope: ASGI connection scope.
"""

def __init__(self, app: ASGIApp) -> None:
self.app = app

async def __call__(
self,
scope: Scope,
receive: Receive,
send: Send,
) -> None:
"""Inject the appropriate CSP header based on request path."""
if scope["type"] != ScopeType.HTTP:
await self.app(scope, receive, send)
return

path: str = scope.get("path", "")
is_docs = path == "/docs" or path.startswith("/docs/")
csp_value = _DOCS_CSP if is_docs else _API_CSP

async def inject_csp(message: Any) -> None:
if (
isinstance(message, dict)
and message.get("type") == "http.response.start"
):
headers = list(message.get("headers", []))
headers.append(
(b"content-security-policy", csp_value.encode()),
)
message = {**message, "headers": headers}
await send(message)

await self.app(scope, receive, inject_csp)
if scope.get("type") != ScopeType.HTTP:
return
if message.get("type") != "http.response.start":
return

headers = MutableScopeHeaders.from_message(message)

# Static security headers — overwrite to prevent duplicates.
for name, value in _SECURITY_HEADERS.items():
headers[name] = value

# Path-aware headers
path: str = scope.get("path", "")
is_docs = path == "/docs" or path.startswith("/docs/")
headers["Content-Security-Policy"] = _DOCS_CSP if is_docs else _API_CSP

# Relax COOP for /docs — Scalar UI may use cross-origin popups
# for OAuth/API proxy features via proxy.scalar.com.
if is_docs:
headers["Cross-Origin-Opener-Policy"] = "unsafe-none"
Comment thread
greptile-apps[bot] marked this conversation as resolved.


class RequestLoggingMiddleware:
Expand Down
17 changes: 4 additions & 13 deletions tests/unit/api/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from litestar.testing import TestClient

from synthorg.api.app import create_app
from synthorg.api.middleware import _SECURITY_HEADERS


@pytest.mark.unit
Expand Down Expand Up @@ -34,26 +35,16 @@ def test_openapi_schema_accessible(self, test_client: TestClient[Any]) -> None:

@pytest.mark.parametrize(
("header", "expected"),
[
("X-Content-Type-Options", "nosniff"),
("X-Frame-Options", "DENY"),
("Referrer-Policy", "strict-origin-when-cross-origin"),
("Permissions-Policy", "geolocation=(), camera=(), microphone=()"),
(
"Strict-Transport-Security",
"max-age=63072000; includeSubDomains",
),
("Cross-Origin-Resource-Policy", "same-origin"),
("Cache-Control", "no-store"),
],
list(_SECURITY_HEADERS.items()),
)
def test_security_response_headers(
self,
test_client: TestClient[Any],
header: str,
expected: str,
) -> None:
response = test_client.get("/docs/openapi.json")
# Use a non-docs endpoint — /docs paths relax COOP for Scalar UI.
response = test_client.get("/api/v1/health")
assert response.headers.get(header) == expected


Expand Down
Loading
Loading