Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/fastmcp/server/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ def lifespan_context(self) -> dict[str, Any]:
Returns an empty dict if no lifespan was configured or if the MCP
session is not yet established.

In background tasks (Docket workers), where request_context is not
available, falls back to reading from the FastMCP server's lifespan
result directly.

Example:
```python
@server.tool
Expand All @@ -330,6 +334,11 @@ def my_tool(ctx: Context) -> str:
"""
rc = self.request_context
if rc is None:
# In background tasks, request_context is not available.
# Fall back to the server's lifespan result directly (#3095).
result = self.fastmcp._lifespan_result
if result is not None:
return result
return {}
return rc.lifespan_context

Expand Down
93 changes: 89 additions & 4 deletions src/fastmcp/server/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import weakref
from collections.abc import AsyncGenerator, Callable
from contextlib import AsyncExitStack, asynccontextmanager
from contextvars import ContextVar
from contextvars import ContextVar, Token
from dataclasses import dataclass
from functools import lru_cache
from typing import TYPE_CHECKING, Any, Protocol, cast, get_type_hints, runtime_checkable
Expand Down Expand Up @@ -165,6 +165,9 @@ def get_task_session(session_id: str) -> ServerSession | None:
)
_current_docket: ContextVar[Docket | None] = ContextVar("docket", default=None)
_current_worker: ContextVar[Worker | None] = ContextVar("worker", default=None)
_task_access_token: ContextVar[AccessToken | None] = ContextVar(
"task_access_token", default=None
)


# --- Docket availability check ---
Expand Down Expand Up @@ -478,7 +481,8 @@ def get_access_token() -> AccessToken | None:
This function first tries to get the token from the current HTTP request's scope,
which is more reliable for long-lived connections where the SDK's auth_context_var
may become stale after token refresh. Falls back to the SDK's context var if no
request is available.
request is available. In background tasks (Docket workers), falls back to the
token snapshot stored in Redis at task submission time.

Returns:
The access token if an authenticated user is available, None otherwise.
Expand All @@ -501,6 +505,21 @@ def get_access_token() -> AccessToken | None:
if access_token is None:
access_token = _sdk_get_access_token()

# Fall back to background task snapshot (#3095)
# In Docket workers, neither HTTP request nor SDK context var are available.
# The token was snapshotted in Redis at submit_to_docket() time and restored
# into this ContextVar by _CurrentContext.__aenter__().
if access_token is None:
task_token = _task_access_token.get()
if task_token is not None:
# Check expiration: if expires_at is set and past, treat as expired
if task_token.expires_at is not None:
from datetime import datetime, timezone

if task_token.expires_at < int(datetime.now(timezone.utc).timestamp()):
return None
return task_token

if access_token is None or isinstance(access_token, AccessToken):
return access_token

Expand Down Expand Up @@ -718,14 +737,54 @@ async def resolve_dependencies(
# so that get_dependency_parameters can detect them.


async def _restore_task_access_token(
session_id: str, task_id: str
) -> Token[AccessToken | None] | None:
"""Restore the access token snapshot from Redis into a ContextVar.

Called when setting up context in a Docket worker. The token was stored at
submit_to_docket() time. If the token has expired, it is not restored
(get_access_token() will return None).

Returns:
The ContextVar token for resetting, or None if nothing was restored.
"""
docket = _current_docket.get()
# Fall back to docket's own worker ContextVar, which is set by the worker
# even when _CurrentContext hasn't run (no ctx: Context in signature)
if docket is None:
try:
from docket.dependencies import Dependency as DocketDependency

docket = DocketDependency.docket.get()
except (ImportError, LookupError):
pass
if docket is None:
return None
Comment on lines +752 to +763
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

rg -n "DocketDependency\.docket" --type=py -C2

Repository: jlowin/fastmcp

Length of output: 439


🏁 Script executed:

# Check if there are other imports or uses of docket library
rg -n "from docket" --type=py -C1

Repository: jlowin/fastmcp

Length of output: 6373


🏁 Script executed:

# Look at the full context of the file to understand the pattern
head -n 20 src/fastmcp/server/dependencies.py

Repository: jlowin/fastmcp

Length of output: 768


🌐 Web query:

docket library Python Dependency ContextVar docket attribute documentation

💡 Result:

In Docket (PyPI package pydocket), “dependency injection” is done by declaring default parameter values like CurrentDocket(), CurrentWorker(), CurrentExecution(), TaskKey(), TaskArgument(), or Depends(fn) in your task signature; Docket resolves those at runtime when executing the task. (chrisguidry.github.io)

Those “current-*” dependencies are contextual (they’re meant to give you “the current docket/worker/execution for this running task” without threading it through every call). In Python, the standard way libraries implement safe per-request/per-task ambient context across async boundaries is contextvars.ContextVar. ContextVar objects have a read-only .name attribute (so if a library creates something like ContextVar("docket"), its “docket attribute”/name is just "docket"). (docs.python.org)

If you’re coming from FastMCP: FastMCP re-exports these DI concepts (and notes its DI is “powered by Docket”; task-related dependencies like CurrentDocket/CurrentWorker require fastmcp[tasks]). (gofastmcp.com)

Citations:


🌐 Web query:

pydocket Dependency.docket ContextVar stable API

💡 Result:

Dependency.docket (a ContextVar hanging off the internal Dependency base class) should be treated as internal / not a stable public API. Docket is still marked “Beta” on PyPI, so internals are especially likely to change. [1]

If you need “current docket/worker/execution” context in a supported way, use the documented built-in dependencies instead:

  • CurrentDocket() for the current Docket [2]
  • CurrentWorker() / CurrentExecution() for worker/execution context [2]
  • TaskKey() / TaskArgument() for task identity/arguments (often used inside your own dependency functions) [2]

And for custom DI, use the public helpers Depends(...) (per-task) and Shared(...) (worker-scoped), rather than reaching into Dependency internals. [3]

Bottom line: don’t rely on Dependency.docket / ContextVar attributes directly; prefer CurrentDocket() (and friends) or Depends(...). [2][3]

References:
[1] https://pypi.org/project/pydocket/
[2] https://chrisguidry.github.io/docket/dependencies/
[3] https://chrisguidry.github.io/docket/api-reference/


Replace DocketDependency.docket with the documented public API.

The fallback to DocketDependency.docket.get() relies on an internal, unstable API that is not part of Docket's public contract. According to Docket's documentation, Dependency.docket is an internal implementation detail and should not be used directly. Additionally, Docket is currently in Beta status on PyPI, making internal APIs especially prone to breaking changes.

Use the documented public dependency CurrentDocket() instead, which is the official way to access the current docket context in Docket-powered applications. If the fallback is needed to handle cases where _CurrentContext hasn't propagated, consider whether CurrentDocket() would better address the underlying issue, or add a comment explaining why this fallback is necessary.


token_key = docket.key(f"fastmcp:task:{session_id}:{task_id}:access_token")
try:
async with docket.redis() as redis:
token_data = await redis.get(token_key)
if token_data is not None:
restored = AccessToken.model_validate_json(token_data)
return _task_access_token.set(restored)
except Exception:
# Don't let token restoration failures break task execution
pass
return None


class _CurrentContext(Dependency): # type: ignore[misc]
"""Async context manager for Context dependency.

In foreground (request) mode: returns the active context from _current_context.
In background (Docket worker) mode: creates a task-aware Context with task_id.
In background (Docket worker) mode: creates a task-aware Context with task_id
and restores the access token snapshot from Redis.
"""

_context: Context | None = None
_access_token_cv_token: Any = None

async def __aenter__(self) -> Context:
from fastmcp.server.context import Context, _current_context
Expand All @@ -750,6 +809,12 @@ async def __aenter__(self) -> Context:
)
# Enter the context to set up ContextVars
await self._context.__aenter__()

# Restore access token snapshot from Redis (#3095)
self._access_token_cv_token = await _restore_task_access_token(
task_info.session_id, task_info.task_id
)

return self._context

# Neither foreground nor background context available
Expand All @@ -761,6 +826,10 @@ async def __aenter__(self) -> Context:
)

async def __aexit__(self, *args: object) -> None:
# Clean up access token ContextVar
if self._access_token_cv_token is not None:
_task_access_token.reset(self._access_token_cv_token)
self._access_token_cv_token = None
# Clean up if we created a context for background task
if self._context is not None:
await self._context.__aexit__(*args)
Expand Down Expand Up @@ -994,8 +1063,22 @@ async def get_auth_type(headers: dict = CurrentHeaders()) -> str:
class _CurrentAccessToken(Dependency): # type: ignore[misc]
"""Async context manager for AccessToken dependency."""

_access_token_cv_token: Any = None

async def __aenter__(self) -> AccessToken:
token = get_access_token()

# If no token found and we're in a Docket worker, try restoring from
# Redis. This handles the case where ctx: Context is not in the
# function signature, so _CurrentContext never ran the restoration.
if token is None:
task_info = get_task_context()
if task_info is not None:
self._access_token_cv_token = await _restore_task_access_token(
task_info.session_id, task_info.task_id
)
token = get_access_token()

if token is None:
raise RuntimeError(
"No access token found. Ensure authentication is configured "
Expand All @@ -1004,7 +1087,9 @@ async def __aenter__(self) -> AccessToken:
return token

async def __aexit__(self, *args: object) -> None:
pass
if self._access_token_cv_token is not None:
_task_access_token.reset(self._access_token_cv_token)
self._access_token_cv_token = None


def CurrentAccessToken() -> AccessToken:
Expand Down
13 changes: 12 additions & 1 deletion src/fastmcp/server/tasks/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from mcp.shared.exceptions import McpError
from mcp.types import INTERNAL_ERROR, ErrorData

from fastmcp.server.dependencies import _current_docket, get_context
from fastmcp.server.dependencies import _current_docket, get_access_token, get_context
from fastmcp.server.tasks.config import TaskMeta
from fastmcp.server.tasks.keys import build_task_key

Expand Down Expand Up @@ -96,10 +96,21 @@ async def submit_to_docket(
f"fastmcp:task:{session_id}:{server_task_id}:poll_interval"
)
poll_interval_ms = int(component.task_config.poll_interval.total_seconds() * 1000)

# Snapshot the current access token (if any) for background task access (#3095)
access_token = get_access_token()
access_token_key = docket.key(
f"fastmcp:task:{session_id}:{server_task_id}:access_token"
)

async with docket.redis() as redis:
await redis.set(task_meta_key, task_key, ex=ttl_seconds)
await redis.set(created_at_key, created_at.isoformat(), ex=ttl_seconds)
await redis.set(poll_interval_key, str(poll_interval_ms), ex=ttl_seconds)
if access_token is not None:
await redis.set(
access_token_key, access_token.model_dump_json(), ex=ttl_seconds
)

# Register session for Context access in background workers (SEP-1686)
# This enables elicitation/sampling from background tasks via weakref
Expand Down
Loading