feat: enhance JWT security checks and improve OTP verification logic#138
feat: enhance JWT security checks and improve OTP verification logic#138
Conversation
Reviewer's GuideThis PR tightens OTP verification by fetching the latest unused token per user, tracking failed attempts, and hardening password reset against user enumeration, while also adding startup warnings for weak JWT configuration and corresponding unit tests. Sequence diagram for enhanced OTP verification flowsequenceDiagram
actor User
participant Client
participant AuthService
participant TokenRepository
User->>Client: Submit OTP code
Client->>AuthService: _verify_otp(user_id, otp_code, token_type)
AuthService->>TokenRepository: find_latest_by_user(user_id, token_type)
TokenRepository-->>AuthService: VerificationTokenDoc or None
alt no_token_found
AuthService-->>Client: ValidationError Invalid or expired verification code
else token_found
AuthService->>AuthService: Check token_doc.expires_at
alt token_expired
AuthService-->>Client: ValidationError Verification code has expired
else token_not_expired
AuthService->>AuthService: Check token_doc.attempts >= MAX_VERIFICATION_ATTEMPTS
alt too_many_attempts
AuthService-->>Client: ValidationError Too many failed attempts
else attempts_remaining
AuthService->>AuthService: otp_hash = hash_token(otp_code)
AuthService->>AuthService: Compare token_doc.token_hash with otp_hash
alt hash_mismatch
AuthService->>TokenRepository: increment_attempts(token_doc.id)
TokenRepository-->>AuthService: bool modified
AuthService-->>Client: ValidationError Invalid or expired verification code
else hash_match
AuthService->>TokenRepository: mark_as_used(token_doc.id)
TokenRepository-->>AuthService: bool marked
alt mark_failed
AuthService-->>Client: AppError Failed to mark token as used
else mark_succeeded
AuthService-->>Client: OTP verification succeeds
end
end
end
end
end
Class diagram for updated AuthService and TokenRepositoryclassDiagram
class TokenRepository {
+_col
+mark_as_used(token_id)
+find_latest_by_user(user_id, token_type)
+increment_attempts(token_id)
+delete_by_user(user_id, token_type)
}
class AuthService {
-_token_repo
-_user_repo
+_verify_otp(user_id, otp_code, token_type)
+reset_password(email, otp_code, new_password)
}
AuthService --> TokenRepository : uses
class VerificationTokenDoc {
+id
+user_id
+token_type
+token_hash
+created_at
+expires_at
+used_at
+attempts
+from_mongo(doc)
}
TokenRepository --> VerificationTokenDoc : returns
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdded path-aware CORS and global security headers middleware, new CORS private-origins config and startup JWT-secret warnings; extended token repo with latest-token lookup and attempt increments; OTP/password-reset now use latest-token and increment attempts; introduced a device-auth code flow with routes, DTOs, template, rate limits, and tests. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant Routes as Auth Routes
participant Auth as AuthService
participant Repo as TokenRepository
participant DB as Database
Client->>Routes: POST /auth/device/token { code }
Routes->>Auth: exchange_device_code(code)
Auth->>Repo: find_by_hash_and_type(hashed_code, DEVICE_AUTH)
Repo->>DB: query token by hash/type/used_at
DB-->>Repo: token_doc or none
alt token not found or expired
Auth-->>Routes: raise ValidationError("invalid or expired")
Routes-->>Client: 400 Invalid or expired
else token found
Auth->>Repo: mark token used (update used_at)
Repo->>DB: update token.used_at
Auth->>Auth: validate user exists and ACTIVE
Auth->>Auth: issue access_token & refresh_token (AMR "ext")
Auth-->>Routes: return tokens + user
Routes-->>Client: 200 DeviceTokenResponse
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- In
_verify_otp, the result of_token_repo.increment_attemptsis ignored; consider at least logging whenmodified_count == 0so you can detect cases where the token was not updated (e.g. deleted concurrently or wrong ID). - In
reset_password, the user-not-found path only does a synchronoushash_tokencall before raising, while the valid-user path goes through_verify_otp(hash + DB lookup + updates); if you’re aiming to mitigate user enumeration via timing, consider using a dummy user ID and running_verify_otp(or a similar async path) for non-existent users as well.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `_verify_otp`, the result of `_token_repo.increment_attempts` is ignored; consider at least logging when `modified_count == 0` so you can detect cases where the token was not updated (e.g. deleted concurrently or wrong ID).
- In `reset_password`, the user-not-found path only does a synchronous `hash_token` call before raising, while the valid-user path goes through `_verify_otp` (hash + DB lookup + updates); if you’re aiming to mitigate user enumeration via timing, consider using a dummy user ID and running `_verify_otp` (or a similar async path) for non-existent users as well.
## Individual Comments
### Comment 1
<location path="tests/unit/services/test_auth_service.py" line_range="718-720" />
<code_context>
await svc.reset_password("test@example.com", "wrong-code", "NewValidPass1!")
+ @pytest.mark.asyncio
+ async def test_reset_password_wrong_code_increments_attempts(self):
+ svc = make_auth_service()
+ user = make_user_doc(password_set=True)
+ token_doc = self._make_token_doc(USER_OID, "654321")
+ svc._user_repo.find_by_email.return_value = user
+ svc._token_repo.find_latest_by_user.return_value = token_doc
+ svc._token_repo.increment_attempts.return_value = True
+
+ with pytest.raises(ValidationError, match="Invalid or expired"):
+ await svc.reset_password("test@example.com", "000000", "NewValidPass1!")
+ svc._token_repo.increment_attempts.assert_awaited_once()
+
@pytest.mark.asyncio
</code_context>
<issue_to_address>
**suggestion (testing):** Consider also asserting increment_attempts is called with the correct token id in reset-password tests
To better mirror the email verification tests and tighten this check, assert that `increment_attempts` is awaited with `token_doc.id`, not just that it was awaited once. This verifies the service uses the token from `find_latest_by_user` when incrementing attempts.
```suggestion
with pytest.raises(ValidationError, match="Invalid or expired"):
await svc.reset_password("test@example.com", "000000", "NewValidPass1!")
svc._token_repo.increment_attempts.assert_awaited_once_with(token_doc.id)
```
</issue_to_address>
### Comment 2
<location path="tests/unit/services/test_auth_service.py" line_range="434" />
<code_context>
async def test_reset_password_wrong_otp_raises(self):
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test for reset_password when the email does not exist to cover the new timing-safe branch
The new behavior hashes the OTP and raises `ValidationError("invalid email or code")` when `find_by_email` returns `None` to prevent user enumeration, but there’s no explicit test for this case. Please add a test that sets `svc._user_repo.find_by_email.return_value = None` and asserts that the same `ValidationError` message is raised, so this timing-safe behavior is preserved through future changes.
</issue_to_address>
### Comment 3
<location path="tests/unit/services/test_auth_service.py" line_range="399" />
<code_context>
token_doc = self._make_token_doc(USER_OID, "123456")
svc._user_repo.find_by_id.return_value = user
- svc._token_repo.find_by_hash.return_value = token_doc
+ svc._token_repo.find_latest_by_user.return_value = token_doc
svc._token_repo.mark_as_used.return_value = True
svc._user_repo.update.return_value = True
</code_context>
<issue_to_address>
**suggestion (testing):** Add unit tests for TokenRepository.find_latest_by_user and increment_attempts behavior
These service tests now rely on `find_latest_by_user` and `increment_attempts`, but I don’t see repository-level tests for them. Please add focused tests (e.g., in `test_token_repository`) to cover: returning the latest unused token by user/type, returning `None` when no match exists, `increment_attempts` updating `attempts` and returning `True`/`False` based on `modified_count`, and behavior when Mongo operations raise `PyMongoError`, including logging paths.
Suggested implementation:
```python
import datetime
from unittest.mock import MagicMock
import pytest
from bson import ObjectId
from pymongo.errors import PyMongoError
from app.repositories.token_repository import TokenRepository
def _make_token_doc(user_id: ObjectId, token_type: str = "email_verification", used: bool = False, attempts: int = 0):
return {
"_id": ObjectId(),
"user_id": user_id,
"type": token_type,
"token": "123456",
"used": used,
"attempts": attempts,
"created_at": datetime.datetime.utcnow(),
}
class TestTokenRepositoryFindLatestByUser:
def test_returns_latest_unused_token_for_user_and_type(self):
collection = MagicMock()
logger = MagicMock()
repo = TokenRepository(collection=collection, logger=logger)
user_id = ObjectId()
token_doc = _make_token_doc(user_id, token_type="email_verification", used=False)
collection.find_one.return_value = token_doc
result = repo.find_latest_by_user(user_id=user_id, token_type="email_verification")
collection.find_one.assert_called_once_with(
{
"user_id": user_id,
"type": "email_verification",
"used": False,
},
sort=[("created_at", -1)],
)
assert result == token_doc
def test_returns_none_when_no_unused_token_found(self):
collection = MagicMock()
logger = MagicMock()
repo = TokenRepository(collection=collection, logger=logger)
user_id = ObjectId()
collection.find_one.return_value = None
result = repo.find_latest_by_user(user_id=user_id, token_type="email_verification")
collection.find_one.assert_called_once()
assert result is None
def test_logs_and_returns_none_when_pymongoerror_raised(self):
collection = MagicMock()
logger = MagicMock()
repo = TokenRepository(collection=collection, logger=logger)
user_id = ObjectId()
collection.find_one.side_effect = PyMongoError("boom")
result = repo.find_latest_by_user(user_id=user_id, token_type="email_verification")
collection.find_one.assert_called_once()
logger.exception.assert_called_once()
assert result is None
class TestTokenRepositoryIncrementAttempts:
def test_increment_attempts_updates_and_returns_true_when_modified(self):
collection = MagicMock()
logger = MagicMock()
repo = TokenRepository(collection=collection, logger=logger)
token_id = ObjectId()
update_result = MagicMock()
update_result.modified_count = 1
collection.update_one.return_value = update_result
success = repo.increment_attempts(token_id)
collection.update_one.assert_called_once_with(
{"_id": token_id},
{"$inc": {"attempts": 1}},
)
assert success is True
def test_increment_attempts_returns_false_when_not_modified(self):
collection = MagicMock()
logger = MagicMock()
repo = TokenRepository(collection=collection, logger=logger)
token_id = ObjectId()
update_result = MagicMock()
update_result.modified_count = 0
collection.update_one.return_value = update_result
success = repo.increment_attempts(token_id)
collection.update_one.assert_called_once()
assert success is False
def test_increment_attempts_logs_and_returns_false_on_pymongoerror(self):
collection = MagicMock()
logger = MagicMock()
repo = TokenRepository(collection=collection, logger=logger)
token_id = ObjectId()
collection.update_one.side_effect = PyMongoError("boom")
success = repo.increment_attempts(token_id)
collection.update_one.assert_called_once()
logger.exception.assert_called_once()
assert success is False
```
These tests assume:
1. `TokenRepository` is importable from `app.repositories.token_repository` and its constructor accepts `collection` and `logger` keyword arguments. If your actual constructor differs, adjust the instantiation in the tests accordingly.
2. `find_latest_by_user` has signature `find_latest_by_user(user_id, token_type)` and calls `collection.find_one` with a filter on `user_id`, `type`, and `used=False`, sorted by `created_at` descending, returning either the raw document or `None`. Adjust the expected filter/sort or return assertions if your implementation uses a different schema or wraps documents in model objects.
3. `increment_attempts` has signature `increment_attempts(token_id)` and calls `collection.update_one({"_id": token_id}, {"$inc": {"attempts": 1}})`, returning `True` when `modified_count == 1`, `False` otherwise, and logging via `logger.exception` when a `PyMongoError` is raised. If logging uses a different method (e.g. `error` instead of `exception`), update the logger assertions.
4. If your test layout or import paths differ (e.g. repository tests live in a different directory/module), move this new test file accordingly and update the import path to `TokenRepository`.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
repositories/token_repository.py (1)
87-109: LGTM with performance note.The implementation is correct: queries for unused tokens by user/type and returns the newest one. Error handling follows the established pattern.
Consider adding a compound index to optimize this query path:
await tokens_col.create_index([("user_id", 1), ("token_type", 1), ("used_at", 1), ("created_at", -1)])Without it, the query may perform a collection scan on larger token collections (per
repositories/indexes.py:80-84, only single-field indexes exist currently).,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@repositories/token_repository.py` around lines 87 - 109, The query in find_latest_by_user (which looks up {"user_id", "token_type", "used_at": None} sorted by created_at desc) needs a compound index to avoid collection scans; add an index creation call on the tokens collection (tokens_col.create_index with [("user_id", 1), ("token_type", 1), ("used_at", 1), ("created_at", -1)]) in the app's index creation/initialization routine (the module that currently creates single-field indexes) so the find_one query is covered and performant.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@services/auth_service.py`:
- Around line 509-512: The timing mitigation in the branch where user is falsy
is incomplete: it only calls hash_token(otp_code) while the valid-user path
performs DB work (find_latest_by_user, mark_as_used, update), so add a simulated
DB call to better equalize timing; specifically, in the "if not user" branch
call the same repository method (find_latest_by_user) using a freshly generated
dummy ObjectId() (or equivalent repository-safe no-op) and ignore its result
before raising ValidationError, keeping the existing hash_token(otp_code)
call—this mirrors the DB interaction of the real path while keeping behavior
unchanged.
---
Nitpick comments:
In `@repositories/token_repository.py`:
- Around line 87-109: The query in find_latest_by_user (which looks up
{"user_id", "token_type", "used_at": None} sorted by created_at desc) needs a
compound index to avoid collection scans; add an index creation call on the
tokens collection (tokens_col.create_index with [("user_id", 1), ("token_type",
1), ("used_at", 1), ("created_at", -1)]) in the app's index
creation/initialization routine (the module that currently creates single-field
indexes) so the find_one query is covered and performant.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 39121c09-460e-4ea0-8836-29a4e0b07940
📒 Files selected for processing (4)
app.pyrepositories/token_repository.pyservices/auth_service.pytests/unit/services/test_auth_service.py
There was a problem hiding this comment.
Pull request overview
Improves authentication security and OTP verification behavior by switching OTP lookups to “latest unused token per user”, adding failed-attempt tracking, and warning at startup when JWT signing configuration is weak.
Changes:
- Update OTP verification to fetch the latest unused token by
(user_id, token_type)and increment attempts on wrong codes. - Add
find_latest_by_userandincrement_attemptsto the token repository. - Expand unit tests to cover max-attempts behavior and attempt increments; add startup warnings for weak JWT config.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
services/auth_service.py |
Updates OTP verification logic and adjusts reset-password flow to add a timing-mitigation hook. |
repositories/token_repository.py |
Adds repository helpers for latest-token lookup and atomic attempts increment. |
tests/unit/services/test_auth_service.py |
Updates mocks and adds test coverage for attempts/max-attempts behavior. |
app.py |
Adds non-fatal startup warnings for insecure/weak JWT configuration. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@app.py`:
- Around line 195-202: SecurityHeadersMiddleware is registered between other
middlewares (e.g., SplitCORSMiddleware and MaxContentLengthMiddleware), causing
synthesized responses (204/413) to bypass security headers; move
SecurityHeadersMiddleware to the outer edge of the stack by registering/adding
SecurityHeadersMiddleware before SplitCORSMiddleware and
MaxContentLengthMiddleware (i.e., ensure
app.add_middleware(SecurityHeadersMiddleware, ...) occurs earlier than the calls
that add SplitCORSMiddleware and MaxContentLengthMiddleware) so HSTS/CSP/nosniff
are applied to all responses.
- Around line 135-147: Detect and warn when RSA keys are half-configured: add a
check on settings.jwt for a mismatched RSA key pair (one of
settings.jwt.jwt_private_key / settings.jwt.jwt_public_key present but not both)
and emit a warning (e.g. "jwt_rsa_half_configured") before falling back to
HS256; reference settings.jwt.use_rs256, settings.jwt.jwt_secret,
settings.jwt.jwt_private_key, and settings.jwt.jwt_public_key and log a clear
detail message that one RSA key is missing and the app will silently switch
algorithms so operators can fix the config.
In `@middleware/security.py`:
- Around line 80-97: configure_cors currently only forwards private_origins and
_set_cors_headers always sets Access-Control-Allow-Origin="*" for public routes,
ignoring the configured public allowlist; change configure_cors to pass the
public origins (settings.cors_origins or settings.cors_public_origins) into
SplitCORSMiddleware and update SplitCORSMiddleware._set_cors_headers to check
the origin against that public-origin list (similar to the private branch) and
set Access-Control-Allow-Origin to the specific origin when it matches,
otherwise do not set a wildcard; ensure you reference SplitCORSMiddleware,
configure_cors, and _set_cors_headers and preserve existing private-origin
behavior.
- Around line 59-61: The current middleware short-circuits every OPTIONS request
with an Origin by calling self._preflight(origin, classification), which
interferes with real OPTIONS handlers for routes classified as "none"; change
the condition to only return self._preflight(...) when the route classification
indicates CORS should be handled (i.e., classification != "none" or a specific
CORS classification), otherwise let the request continue to normal handlers;
update the if in the code that checks request.method == "OPTIONS" and origin to
additionally check classification != "none" (or the appropriate CORS-specific
value) before calling self._preflight.
In `@tests/smoke/test_config_defaults.py`:
- Around line 48-51: In test_default_cors_private_origins_empty change the
assertion to check the AppSettings schema default rather than an
instantiated/resolved value: read the model field default for
cors_private_origins from the AppSettings class (e.g. use
AppSettings.__fields__['cors_private_origins'].default for Pydantic v1 or
AppSettings.model_fields['cors_private_origins'].default for Pydantic v2) and
assert that equals [] instead of instantiating AppSettings().
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: aee1ceca-e953-42c3-9917-54bbe6708e37
📒 Files selected for processing (8)
.env.exampleapp.pyconfig.pymiddleware/security.pytests/integration/test_middleware.pytests/smoke/conftest.pytests/smoke/test_config_defaults.pytests/smoke/test_middleware_stack.py
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
middleware/security.py (1)
80-89:⚠️ Potential issue | 🟡 MinorPublic-route CORS hardcoding issue persists, plus missing
Varyfor disallowed private origins.The previously raised concern about public routes ignoring
cors_originsremains unaddressed.Additionally, there's a caching vulnerability for private routes: when the request origin is not in the allowlist, no
Vary: Originheader is set. This can cause shared caches (CDNs, reverse proxies) to serve stale responses:
- Request from
evil.com(not in allowlist) → response cached withoutVary: Origin- Request from
allowed.com→ cache returns response from step 1 (no CORS headers)- Legitimate browser-based client is blocked
Fix: Always set Vary for private classification
def _set_cors_headers( self, response: Response, origin: str, classification: str ) -> None: """Set CORS headers based on route classification.""" if classification == "public": response.headers["Access-Control-Allow-Origin"] = "*" - elif classification == "private" and origin in self._private_origins: - response.headers["Access-Control-Allow-Origin"] = origin - response.headers["Access-Control-Allow-Credentials"] = "true" + elif classification == "private": response.headers["Vary"] = "Origin" + if origin in self._private_origins: + response.headers["Access-Control-Allow-Origin"] = origin + response.headers["Access-Control-Allow-Credentials"] = "true",
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@middleware/security.py` around lines 80 - 89, The _set_cors_headers method currently hardcodes Access-Control-Allow-Origin="*" for public routes and omits a Vary header when a private route request origin is not allowlisted; update _set_cors_headers to use the configured cors origins (e.g. self._cors_origins or self._public_origins) instead of "*" for the "public" classification and always set response.headers["Vary"] = "Origin" for classification "private" (regardless of whether origin is in self._private_origins) so caches vary on Origin; keep the existing behavior of setting Access-Control-Allow-Origin and Access-Control-Allow-Credentials only when origin is allowlisted.
🧹 Nitpick comments (3)
services/auth_service.py (1)
624-640: Minor: Consider atomic find-and-mark to prevent race conditions.The
find_by_hash_and_typeandmark_as_usedare separate operations. While the check on line 639 (if not marked) catches concurrent attempts (the second request will fail to mark), an atomicfindOneAndUpdatewith{used_at: None}in the filter would be more robust.Current behavior is acceptable: the first request succeeds, subsequent concurrent requests get an
AppError. The race window is small.♻️ Optional: Atomic token consumption
In
TokenRepository, add:async def consume_token_atomic( self, token_hash: str, token_type: str ) -> VerificationTokenDoc | None: """Atomically find and mark a token as used. Returns None if not found or already used.""" doc = await self._col.find_one_and_update( {"token_hash": token_hash, "token_type": token_type, "used_at": None}, {"$set": {"used_at": datetime.now(timezone.utc)}}, return_document=ReturnDocument.BEFORE, ) return VerificationTokenDoc.from_mongo(doc) if doc else NoneThen in
exchange_device_code:token_doc = await self._token_repo.consume_token_atomic(token_hash, TOKEN_TYPE_DEVICE_AUTH) if not token_doc: raise AuthenticationError("invalid or expired device auth code") # expiry check still needed on token_doc.expires_at🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@services/auth_service.py` around lines 624 - 640, The current two-step token lookup and consumption in exchange_device_code (using TokenRepository.find_by_hash_and_type then mark_as_used) can race; implement an atomic consumer in TokenRepository (e.g., consume_token_atomic) that does a single findOneAndUpdate filtering on token_hash, token_type and used_at: None and sets used_at to now, returning the pre-update document; then replace the find_by_hash_and_type + mark_as_used sequence in exchange_device_code with a single call to consume_token_atomic and keep the expiry check on the returned VerificationTokenDoc (so if consume_token_atomic returns None throw AuthenticationError).middleware/security.py (2)
17-18: Consider documenting the intentional prefix overlap.
/auth/deviceappears in_PUBLIC_PREFIXESwhile/authis in_PRIVATE_PREFIXES. The classifier works correctly because public prefixes are checked first, but this implicit ordering dependency could cause subtle bugs during future maintenance.A brief comment would clarify the intent:
+# Note: /auth/device must remain in PUBLIC to override /auth classification _PRIVATE_PREFIXES = ("/auth", "/oauth", "/dashboard") _PUBLIC_PREFIXES = ("/api/v1", "/auth/device", "/stats", "/export", "/metric")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@middleware/security.py` around lines 17 - 18, Add a brief inline comment explaining the intentional overlap between _PUBLIC_PREFIXES and _PRIVATE_PREFIXES (specifically that "/auth/device" is listed in _PUBLIC_PREFIXES while "/auth" is in _PRIVATE_PREFIXES) and document the ordering dependency that public prefixes are checked before private prefixes in the classifier; place this comment adjacent to the _PUBLIC_PREFIXES/_PRIVATE_PREFIXES definitions (or in the module docstring) so future maintainers know the behavior and won’t reorder or remove entries accidentally.
70-78: Preflight headers set inconsistently for disallowed private origins.When a private route receives a preflight from an origin not in the allowlist,
_set_cors_headerssets no headers (correct), but lines 75-77 still emitAccess-Control-Allow-Methods,Access-Control-Allow-Headers, andAccess-Control-Max-Age. WithoutAccess-Control-Allow-Origin, the browser rejects the preflight regardless, making these headers superfluous noise.Optional: Only set preflight headers when CORS origin is accepted
def _preflight(self, origin: str, classification: str) -> Response: """Return a 204 preflight response with appropriate CORS headers.""" response = PlainTextResponse("", status_code=204) - if classification != "none": - self._set_cors_headers(response, origin, classification) + if classification == "none": + return response + + self._set_cors_headers(response, origin, classification) + # Only add preflight-specific headers if ACAO was actually set + if "Access-Control-Allow-Origin" in response.headers: response.headers["Access-Control-Allow-Methods"] = _ALLOWED_METHODS response.headers["Access-Control-Allow-Headers"] = _ALLOWED_HEADERS response.headers["Access-Control-Max-Age"] = "86400" return response🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@middleware/security.py` around lines 70 - 78, The preflight helper _preflight currently calls _set_cors_headers(origin, classification) but always emits Access-Control-Allow-Methods/Headers/Max-Age even when _set_cors_headers refused the origin; change the logic so those three headers are only added when the origin was actually accepted: either make _set_cors_headers return a boolean (e.g., True if it set Access-Control-Allow-Origin) and check that return value before adding the other headers, or call _set_cors_headers and then guard adding Access-Control-Allow-Methods/Headers/Max-Age on response.headers.get("Access-Control-Allow-Origin") being present; update _preflight accordingly (function name: _preflight, helper: _set_cors_headers, headers: Access-Control-Allow-Methods, Access-Control-Allow-Headers, Access-Control-Max-Age).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@routes/auth_routes.py`:
- Around line 469-473: The code calls auth_service.create_device_auth_code with
profile.id (a str) but AuthService.create_device_auth_code expects an ObjectId;
change the call to pass the actual ObjectId user.user_id instead — i.e., after
fetching profile via auth_service.get_user_profile(str(user.user_id)), call
auth_service.create_device_auth_code(user.user_id, profile.email) (or otherwise
convert to ObjectId) so the types match and avoid passing profile.id to
create_device_auth_code.
In `@static/js/auth.js`:
- Around line 294-295: The redirect uses the unvalidated nextUrl from the query
string (nextUrl and window.location.href) causing an open-redirect; fix by
validating the value before navigating: parse the next parameter and only allow
a relative path that begins with a single '/' (disallow values starting with
'//' or containing a different origin) or construct a URL and ensure its origin
equals location.origin, otherwise fall back to '/dashboard'; update the redirect
logic where nextUrl is read to perform this check and only assign
window.location.href when the URL passes validation.
---
Duplicate comments:
In `@middleware/security.py`:
- Around line 80-89: The _set_cors_headers method currently hardcodes
Access-Control-Allow-Origin="*" for public routes and omits a Vary header when a
private route request origin is not allowlisted; update _set_cors_headers to use
the configured cors origins (e.g. self._cors_origins or self._public_origins)
instead of "*" for the "public" classification and always set
response.headers["Vary"] = "Origin" for classification "private" (regardless of
whether origin is in self._private_origins) so caches vary on Origin; keep the
existing behavior of setting Access-Control-Allow-Origin and
Access-Control-Allow-Credentials only when origin is allowlisted.
---
Nitpick comments:
In `@middleware/security.py`:
- Around line 17-18: Add a brief inline comment explaining the intentional
overlap between _PUBLIC_PREFIXES and _PRIVATE_PREFIXES (specifically that
"/auth/device" is listed in _PUBLIC_PREFIXES while "/auth" is in
_PRIVATE_PREFIXES) and document the ordering dependency that public prefixes are
checked before private prefixes in the classifier; place this comment adjacent
to the _PUBLIC_PREFIXES/_PRIVATE_PREFIXES definitions (or in the module
docstring) so future maintainers know the behavior and won’t reorder or remove
entries accidentally.
- Around line 70-78: The preflight helper _preflight currently calls
_set_cors_headers(origin, classification) but always emits
Access-Control-Allow-Methods/Headers/Max-Age even when _set_cors_headers refused
the origin; change the logic so those three headers are only added when the
origin was actually accepted: either make _set_cors_headers return a boolean
(e.g., True if it set Access-Control-Allow-Origin) and check that return value
before adding the other headers, or call _set_cors_headers and then guard adding
Access-Control-Allow-Methods/Headers/Max-Age on
response.headers.get("Access-Control-Allow-Origin") being present; update
_preflight accordingly (function name: _preflight, helper: _set_cors_headers,
headers: Access-Control-Allow-Methods, Access-Control-Allow-Headers,
Access-Control-Max-Age).
In `@services/auth_service.py`:
- Around line 624-640: The current two-step token lookup and consumption in
exchange_device_code (using TokenRepository.find_by_hash_and_type then
mark_as_used) can race; implement an atomic consumer in TokenRepository (e.g.,
consume_token_atomic) that does a single findOneAndUpdate filtering on
token_hash, token_type and used_at: None and sets used_at to now, returning the
pre-update document; then replace the find_by_hash_and_type + mark_as_used
sequence in exchange_device_code with a single call to consume_token_atomic and
keep the expiry check on the returned VerificationTokenDoc (so if
consume_token_atomic returns None throw AuthenticationError).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3001bd74-2ea3-4471-84d8-7aafaa0bc6d8
📒 Files selected for processing (11)
middleware/rate_limiter.pymiddleware/security.pyrepositories/token_repository.pyroutes/auth_routes.pyschemas/dto/requests/auth.pyschemas/dto/responses/auth.pyschemas/models/token.pyservices/auth_service.pystatic/js/auth.jstemplates/device_callback.htmltests/unit/services/test_auth_service.py
✅ Files skipped from review due to trivial changes (3)
- schemas/models/token.py
- middleware/rate_limiter.py
- templates/device_callback.html
🚧 Files skipped from review as they are similar to previous changes (1)
- repositories/token_repository.py
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
services/auth_service.py (1)
164-202:⚠️ Potential issue | 🟠 MajorMake the OTP attempt cap atomic.
Line 164 reads the token and Line 202 increments attempts in a separate operation. Two parallel wrong-code submissions can both clear the
attempts >= MAX_VERIFICATION_ATTEMPTSguard on Line 188 and each increment the same token, so the five-attempt limit is still bypassable under concurrency. Fold the threshold check into a single repository update so the read and increment happen atomically.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@services/auth_service.py` around lines 164 - 202, The current flow in the OTP verification method reads the token with self._token_repo.find_latest_by_user and later calls self._token_repo.increment_attempts(token_doc.id) which allows a race that bypasses MAX_VERIFICATION_ATTEMPTS; change this to perform the attempts check and increment in one atomic repository call (e.g., add a method like increment_attempts_if_below_max(token_id, max_attempts) or increment_and_get_if_below(token_id, max_attempts) on the token repo) and use its return value to decide whether to raise the "Too many failed attempts" ValidationError; update the code that compares hash_token(otp_code) and calls the repo so that it calls this new atomic method instead of the separate increment_attempts, and remove the in-memory attempts check (token_doc.attempts) so the guard is enforced by the repo.
♻️ Duplicate comments (2)
app.py (2)
135-147:⚠️ Potential issue | 🟠 MajorStill missing a warning for half-configured RSA keys.
On Line 136-147, if only one of
JWT_PRIVATE_KEY/JWT_PUBLIC_KEYis set andJWT_SECRETis strong, startup logs no warning even though auth silently falls back to HS256. This is easy to miss in production.Suggested patch
- if settings.jwt and not settings.jwt.use_rs256: - if not settings.jwt.jwt_secret: + if settings.jwt: + has_private = bool(settings.jwt.jwt_private_key) + has_public = bool(settings.jwt.jwt_public_key) + if has_private != has_public: + log.warning( + "jwt_key_pair_incomplete", + detail="JWT_PRIVATE_KEY and JWT_PUBLIC_KEY must be set together; " + "otherwise the app falls back to HS256.", + ) + elif not settings.jwt.use_rs256 and not settings.jwt.jwt_secret: log.warning( "jwt_config_insecure", detail="RS256 keys not set and JWT_SECRET is empty — tokens can be forged. " "Set JWT_PRIVATE_KEY + JWT_PUBLIC_KEY or a strong JWT_SECRET.", ) - elif len(settings.jwt.jwt_secret) < 32: + elif not settings.jwt.use_rs256 and len(settings.jwt.jwt_secret) < 32: log.warning( "jwt_secret_weak", detail="JWT_SECRET is shorter than 32 characters — consider using RS256 keys or a longer secret.", )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app.py` around lines 135 - 147, Add a warning when RSA keys are half-configured: inside the existing branch handling HS256 (where settings.jwt is truthy and not settings.jwt.use_rs256), check if exactly one of settings.jwt.private_key or settings.jwt.public_key is present (XOR). If so, call log.warning (e.g., "jwt_rsa_half_configured") with a detail message that one of JWT_PRIVATE_KEY / JWT_PUBLIC_KEY is set while the other is missing and auth will fall back to HS256, so operators are alerted even if settings.jwt.jwt_secret is strong. Use the existing log.warning pattern and the same settings.jwt symbol names to locate and add this check alongside the jwt_secret checks.
195-202:⚠️ Potential issue | 🟠 Major
SecurityHeadersMiddlewareshould be moved to the outer edge of the stack.With the current order (Line 196 before Line 198/202), middleware that can short-circuit responses may bypass security headers. Move
SecurityHeadersMiddlewareto be registered last so headers are applied consistently.In FastAPI/Starlette, when multiple middlewares are added via app.add_middleware, does the last added middleware run first (outermost)?Suggested patch
- # 3. Security headers - app.add_middleware(SecurityHeadersMiddleware, hsts_enabled=settings.is_production) - # 4. Body size limit + # 3. Body size limit app.add_middleware( MaxContentLengthMiddleware, max_content_length=settings.max_content_length ) - # 5. Request logging — innermost, logs all requests with request_id + # 4. Request logging app.add_middleware(RequestLoggingMiddleware) + # 5. Security headers — outer edge to cover all responses + app.add_middleware(SecurityHeadersMiddleware, hsts_enabled=settings.is_production)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app.py` around lines 195 - 202, The SecurityHeadersMiddleware is currently added before other middleware so it may be bypassed; reorder the app.add_middleware calls so that SecurityHeadersMiddleware is registered last (i.e., call app.add_middleware(SecurityHeadersMiddleware, hsts_enabled=...) after registering MaxContentLengthMiddleware and RequestLoggingMiddleware) to ensure it runs outermost and always applies headers; locate the three add_middleware invocations (SecurityHeadersMiddleware, MaxContentLengthMiddleware, RequestLoggingMiddleware) and move the SecurityHeadersMiddleware call to the end.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@routes/auth_routes.py`:
- Around line 467-470: The callback currently leaks the device auth code by
returning RedirectResponse with the code in the query string; change the handoff
so the code is never sent as part of the request URL. Instead of
RedirectResponse(f"/auth/device/callback?code={code}"), deliver the code
out-of-band—e.g., redirect to "/auth/device/callback#code=…" (use the URL
fragment) or store the code server-side (session or short-lived store keyed to a
nonce) and return a redirect that only contains the nonce; update the handler
that expects the code (the callback receiver) to read it from the fragment via
client-side JS or to exchange the nonce for the code server-side; modify
auth_service.create_device_auth_code, the route that issues the
RedirectResponse, and the callback handler accordingly so the code never appears
in query params or server logs.
- Around line 485-487: The TemplateResponse call for device_callback.html uses
the removed overload templates.TemplateResponse(name, context); change it to the
Starlette 1.0+ signature by calling templates.TemplateResponse with the request
first, then the template name and context (mirror how verify_page() does it) so
update the templates.TemplateResponse invocation that returns
device_callback.html to pass (request, "device_callback.html", {"code": code}).
In `@services/auth_service.py`:
- Around line 514-520: The code currently returns a generic ValidationError when
the user is missing but lets _verify_otp(user.id, otp_code,
TOKEN_TYPE_PASSWORD_RESET) raise distinct errors for real users, enabling
enumeration; change this by wrapping the call to _verify_otp in a try/except
that catches its specific exceptions and re-raises a single generic
ValidationError("invalid email or code"), keep the dummy timing
hash_token(otp_code) behavior for the missing-user branch, and avoid emitting
distinct error texts or logs that could differentiate real vs missing accounts
(use the same message for all failures).
---
Outside diff comments:
In `@services/auth_service.py`:
- Around line 164-202: The current flow in the OTP verification method reads the
token with self._token_repo.find_latest_by_user and later calls
self._token_repo.increment_attempts(token_doc.id) which allows a race that
bypasses MAX_VERIFICATION_ATTEMPTS; change this to perform the attempts check
and increment in one atomic repository call (e.g., add a method like
increment_attempts_if_below_max(token_id, max_attempts) or
increment_and_get_if_below(token_id, max_attempts) on the token repo) and use
its return value to decide whether to raise the "Too many failed attempts"
ValidationError; update the code that compares hash_token(otp_code) and calls
the repo so that it calls this new atomic method instead of the separate
increment_attempts, and remove the in-memory attempts check (token_doc.attempts)
so the guard is enforced by the repo.
---
Duplicate comments:
In `@app.py`:
- Around line 135-147: Add a warning when RSA keys are half-configured: inside
the existing branch handling HS256 (where settings.jwt is truthy and not
settings.jwt.use_rs256), check if exactly one of settings.jwt.private_key or
settings.jwt.public_key is present (XOR). If so, call log.warning (e.g.,
"jwt_rsa_half_configured") with a detail message that one of JWT_PRIVATE_KEY /
JWT_PUBLIC_KEY is set while the other is missing and auth will fall back to
HS256, so operators are alerted even if settings.jwt.jwt_secret is strong. Use
the existing log.warning pattern and the same settings.jwt symbol names to
locate and add this check alongside the jwt_secret checks.
- Around line 195-202: The SecurityHeadersMiddleware is currently added before
other middleware so it may be bypassed; reorder the app.add_middleware calls so
that SecurityHeadersMiddleware is registered last (i.e., call
app.add_middleware(SecurityHeadersMiddleware, hsts_enabled=...) after
registering MaxContentLengthMiddleware and RequestLoggingMiddleware) to ensure
it runs outermost and always applies headers; locate the three add_middleware
invocations (SecurityHeadersMiddleware, MaxContentLengthMiddleware,
RequestLoggingMiddleware) and move the SecurityHeadersMiddleware call to the
end.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c6c8a549-47b2-462f-ac14-720d130acdfc
📒 Files selected for processing (7)
app.pyroutes/auth_routes.pyschemas/dto/requests/auth.pyschemas/dto/responses/auth.pyschemas/models/token.pyservices/auth_service.pytests/unit/services/test_auth_service.py
✅ Files skipped from review due to trivial changes (1)
- schemas/dto/responses/auth.py
🚧 Files skipped from review as they are similar to previous changes (1)
- schemas/models/token.py
…n and integration tests
Summary by Sourcery
Strengthen OTP verification and password reset flows while adding security warnings for weak JWT configuration.
New Features:
Bug Fixes:
Enhancements:
Tests:
Summary by CodeRabbit
New Features
Bug Fixes
Tests