fix: propagate origin_request_id to background task workers#3175
fix: propagate origin_request_id to background task workers#3175jlowin merged 2 commits intoPrefectHQ:mainfrom
Conversation
WalkthroughThe changes extend the Context class to support background task execution by adding Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/fastmcp/server/dependencies.py (1)
49-74:⚠️ Potential issue | 🟡 Minor
OptionalCurrentContextmissing from__all__.
CurrentContextis exported in__all__, but the newOptionalCurrentContextis not. If it's intended as a public API, add it for consistency and discoverability.Proposed fix
__all__ = [ "AccessToken", "CurrentAccessToken", "CurrentContext", "CurrentDocket", "CurrentFastMCP", "CurrentHeaders", "CurrentRequest", "CurrentWorker", + "OptionalCurrentContext", "Progress",
🧹 Nitpick comments (2)
src/fastmcp/server/dependencies.py (1)
879-888: String-matching on exception message is fragile.
_OptionalCurrentContext.__aenter__checks"No active context found" in str(exc)to distinguish "no context" from otherRuntimeErrors. If the message in_CurrentContext.__aenter__(line 862) is ever reworded, this silently breaks. Consider a dedicated exception subclass or a sentinel attribute instead.♻️ Suggested approach
+class _NoActiveContextError(RuntimeError): + """Raised when no foreground or background context is available.""" + pass class _CurrentContext(Dependency): ... async def __aenter__(self) -> Context: ... # Neither foreground nor background context available - raise RuntimeError( + raise _NoActiveContextError( "No active context found. This can happen if:\n" ... ) class _OptionalCurrentContext(_CurrentContext): async def __aenter__(self) -> Context | None: try: return await super().__aenter__() - except RuntimeError as exc: - if "No active context found" in str(exc): - return None - raise + except _NoActiveContextError: + return Nonesrc/fastmcp/__init__.py (1)
21-21: Consider whetherIconThemewarrants top-level re-export.
IconThemeis a narrowLiteral["light", "dark"]alias used only for icon theming. Per the project's coding guidelines, only the most fundamental types should be re-exported tofastmcp.*. Users can pass"light"/"dark"string literals directly without needing this type, and power users can import fromfastmcp.utilities.types. That said, the docs already referencefrom fastmcp import IconTheme, so this may be intentional.Based on learnings: "only re-export to
fastmcp.*for the most fundamental types."Also applies to: 35-35
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: cf308da05b
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| origin_request_id = ( | ||
| str(ctx.request_context.request_id) if ctx.request_context is not None else None |
There was a problem hiding this comment.
Preserve origin request ID when submitting from task context
submit_to_docket snapshots lineage from ctx.request_context.request_id, but in background workers request_context is always None, so no origin_request_id key is written for child submissions. This breaks request correlation for task chains (a task enqueueing another task), because downstream workers cannot recover the original request ID and logs/status notifications lose lineage even though Context.origin_request_id is available in that context.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Honestly I think if a task calls another task, it should provide all the necessary info to it. I don't think we want to magically propagate that.
7d3c50d to
0d3f714
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (4)
src/fastmcp/server/tasks/handlers.py (1)
111-120: Consider using a Redis pipeline for the batch of SET operations.There are now up to 5 individual
redis.setcalls inside theasync with docket.redis()block. A pipeline would reduce round-trips. This is a pre-existing pattern so not blocking, but worth noting as the number of writes grows.♻️ Suggested pipeline approach
async with docket.redis() as redis: - await redis.set(task_meta_key, task_key, ex=ttl_seconds) - await redis.set(created_at_key, created_at.isoformat(), ex=ttl_seconds) - await redis.set(poll_interval_key, str(poll_interval_ms), ex=ttl_seconds) - if origin_request_id is not None: - await redis.set(origin_request_id_key, origin_request_id, ex=ttl_seconds) - if access_token is not None: - await redis.set( - access_token_key, access_token.model_dump_json(), ex=ttl_seconds - ) + async with redis.pipeline(transaction=False) as pipe: + pipe.set(task_meta_key, task_key, ex=ttl_seconds) + pipe.set(created_at_key, created_at.isoformat(), ex=ttl_seconds) + pipe.set(poll_interval_key, str(poll_interval_ms), ex=ttl_seconds) + if origin_request_id is not None: + pipe.set(origin_request_id_key, origin_request_id, ex=ttl_seconds) + if access_token is not None: + pipe.set( + access_token_key, access_token.model_dump_json(), ex=ttl_seconds + ) + await pipe.execute()src/fastmcp/server/context.py (1)
384-390:report_progressstill usesself.request_id— safe today but inconsistent withlog().In background tasks,
request_contextisNone, soprogress_tokenisNoneand the method returns early at Line 382 before reachingself.request_id. However, for consistency with thelog()fix, consider updating this toself.origin_request_idas well to prevent future breakage if the early-return logic changes.♻️ Consistency fix
await self.session.send_progress_notification( progress_token=progress_token, progress=progress, total=total, message=message, - related_request_id=self.request_id, + related_request_id=self.origin_request_id, )src/fastmcp/server/dependencies.py (2)
823-864:get_task_sessionmay returnNoneif the session was garbage-collected — background task context will be created with no usable session.At Line 835,
get_task_session(task_info.session_id)can returnNoneif the submitting client disconnected. TheContextis still created (Line 842-847) and entered, but any subsequent call toctx.session(e.g., fromctx.log()) will raiseRuntimeError("session is not available...").This is arguably expected behavior (can't send messages to a disconnected client), but a proactive warning log when
session is Nonewould help with debugging.♻️ Add a warning when session is gone
session = get_task_session(task_info.session_id) + if session is None: + _logger.warning( + "Session %s no longer available for background task %s; " + "context operations requiring a session will fail.", + task_info.session_id, + task_info.task_id, + ) # Get server from ContextVar server = get_server()
877-886: String-based exception matching is fragile.Line 884 checks
"No active context found" in str(exc), which is coupled to the exact error message text at Line 860. If that message is ever rephrased, this catch silently stops working andNonewon't be returned — instead, theRuntimeErrorwill propagate.Consider using a sentinel or a dedicated exception subclass for more robust matching.
♻️ Example: use a custom exception
+class _NoActiveContextError(RuntimeError): + """Raised when no foreground or background context is available.""" + pass + class _CurrentContext(Dependency): ... async def __aenter__(self) -> Context: ... # Neither foreground nor background context available - raise RuntimeError( + raise _NoActiveContextError( "No active context found. This can happen if:\n" ... ) class _OptionalCurrentContext(_CurrentContext): async def __aenter__(self) -> Context | None: try: return await super().__aenter__() - except RuntimeError as exc: - if "No active context found" in str(exc): - return None - raise + except _NoActiveContextError: + return None
| __all__ = [ | ||
| "AccessToken", | ||
| "CurrentAccessToken", | ||
| "CurrentContext", | ||
| "CurrentDocket", | ||
| "CurrentFastMCP", | ||
| "CurrentHeaders", | ||
| "CurrentRequest", | ||
| "CurrentWorker", | ||
| "Progress", | ||
| "TaskContextInfo", | ||
| "TokenClaim", | ||
| "get_access_token", | ||
| "get_context", | ||
| "get_http_headers", | ||
| "get_http_request", | ||
| "get_server", | ||
| "get_task_context", | ||
| "get_task_session", | ||
| "is_docket_available", | ||
| "register_task_session", | ||
| "require_docket", | ||
| "resolve_dependencies", | ||
| "transform_context_annotations", | ||
| "without_injected_parameters", | ||
| ] |
There was a problem hiding this comment.
__all__ exports four undefined names — from fastmcp.server.dependencies import * will raise AttributeError.
Static analysis (Ruff F822) confirms that CurrentAccessToken, CurrentHeaders, CurrentRequest, and TokenClaim are not defined or imported in this file, yet they appear in __all__. This will break wildcard imports and mislead IDE autocompletion.
🐛 Proposed fix: remove undefined names
__all__ = [
"AccessToken",
- "CurrentAccessToken",
"CurrentContext",
"CurrentDocket",
"CurrentFastMCP",
- "CurrentHeaders",
- "CurrentRequest",
"CurrentWorker",
+ "OptionalCurrentContext",
"Progress",
"TaskContextInfo",
- "TokenClaim",
"get_access_token",
"get_context",
"get_http_headers",
"get_http_request",
"get_server",
"get_task_context",
"get_task_session",
"is_docket_available",
"register_task_session",
"require_docket",
"resolve_dependencies",
"transform_context_annotations",
"without_injected_parameters",
]Also note that OptionalCurrentContext (defined at Line 914) is missing from __all__ despite being a new public API.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| __all__ = [ | |
| "AccessToken", | |
| "CurrentAccessToken", | |
| "CurrentContext", | |
| "CurrentDocket", | |
| "CurrentFastMCP", | |
| "CurrentHeaders", | |
| "CurrentRequest", | |
| "CurrentWorker", | |
| "Progress", | |
| "TaskContextInfo", | |
| "TokenClaim", | |
| "get_access_token", | |
| "get_context", | |
| "get_http_headers", | |
| "get_http_request", | |
| "get_server", | |
| "get_task_context", | |
| "get_task_session", | |
| "is_docket_available", | |
| "register_task_session", | |
| "require_docket", | |
| "resolve_dependencies", | |
| "transform_context_annotations", | |
| "without_injected_parameters", | |
| ] | |
| __all__ = [ | |
| "AccessToken", | |
| "CurrentContext", | |
| "CurrentDocket", | |
| "CurrentFastMCP", | |
| "CurrentWorker", | |
| "OptionalCurrentContext", | |
| "Progress", | |
| "TaskContextInfo", | |
| "get_access_token", | |
| "get_context", | |
| "get_http_headers", | |
| "get_http_request", | |
| "get_server", | |
| "get_task_context", | |
| "get_task_session", | |
| "is_docket_available", | |
| "register_task_session", | |
| "require_docket", | |
| "resolve_dependencies", | |
| "transform_context_annotations", | |
| "without_injected_parameters", | |
| ] |
🧰 Tools
🪛 Ruff (0.15.0)
[error] 51-51: Undefined name CurrentAccessToken in __all__
(F822)
[error] 55-55: Undefined name CurrentHeaders in __all__
(F822)
[error] 56-56: Undefined name CurrentRequest in __all__
(F822)
[error] 60-60: Undefined name TokenClaim in __all__
(F822)
0d3f714 to
c0956a2
Compare
Test Failure AnalysisSummary: The static analysis workflow failed due to formatting issues and an unused type ignore comment. Root Cause: The PR introduced a new class Suggested Solution:
The specific changes needed: File: # Change this line (line 877):
class _OptionalCurrentContext(_CurrentContext): # type: ignore[misc]
# To this:
class _OptionalCurrentContext(_CurrentContext):Then run Detailed AnalysisType Checker FailureThe type checker is warning that the
Since Ruff Format FailureTwo files need formatting:
Running Related Files
|
255a2d9 to
23a1918
Compare
Test Failure AnalysisSummary: The Root Cause: In ``` Suggested Solution: The subclass at line 888 should not inherit from
Option 1 avoids breaking existing callers of Detailed AnalysisThe Full Found 1 diagnostic The parent class Note: Related Files
|
|
first: congrats on the 3.0 ga push — two betas, two rcs, 21 new contributors, ~100k opt-in prerelease installs, upgrade guides, and the move to prefecthq… that’s a seriously demanding week. quick ask: could you take a look at this pr when you have a moment? it unblocks full-featured task mode in the wild (my gemini-research-mcp server can’t run correctly without it because background work loses request lineage). i’m aiming to get this in 3.0.2 if it fits the release bar. what would you need to see (or what would you want changed) to feel good about merging / backporting it? happy to reshape/split it to match your preference and keep risk low. |
chrisguidry
left a comment
There was a problem hiding this comment.
This makes sense to me! Thanks!!
de5dc39 to
8f53d7d
Compare
Refactor OptionalCurrentContext to wrap CurrentContext instead of overriding __aenter__ with a wider return type. Adds a background-task origin_request_id round-trip test and applies ruff formatting.
8f53d7d to
839f0d0
Compare
|
Thanks @gfortaine, let's get this merged! |
Closes #2877. Follows #2905, #2906.
Description
In task mode, tool code runs in a Docket worker where request-scoped context isn’t present. Without an originating request id, background logs and traces lose the ability to reliably correlate work back to the request that queued it.
This PR snapshots the submitting request id as
origin_request_idwhen a task is enqueued, restores it into the workerContext, and uses that value for log correlation (related_request_id) so foreground and background executions stay traceable.Using AI to generate code: GitHub Copilot (GPT-5.3-Codex).
Contributors Checklist
Review Checklist