Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 39 additions & 35 deletions docs/python-sdk/fastmcp-server-context.mdx

Large diffs are not rendered by default.

28 changes: 25 additions & 3 deletions docs/python-sdk/fastmcp-server-tasks-elicitation.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ in Docket workers. Unlike regular MCP requests, background tasks don't have
an active request context, so elicitation requires special handling:

1. Set task status to "input_required" via Redis
2. Send notifications/tasks/updated with elicitation metadata
2. Send notifications/tasks/status with elicitation metadata
3. Wait for client to send input via tasks/sendInput
4. Resume task execution with the provided input

Expand All @@ -26,7 +26,7 @@ internal APIs for background task coordination.
### `elicit_for_task` <sup><a href="https://github.com/jlowin/fastmcp/blob/main/src/fastmcp/server/tasks/elicitation.py#L42" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
elicit_for_task(task_id: str, session: ServerSession, message: str, schema: dict[str, Any], fastmcp: FastMCP) -> mcp.types.ElicitResult
elicit_for_task(task_id: str, session: ServerSession | None, message: str, schema: dict[str, Any], fastmcp: FastMCP) -> mcp.types.ElicitResult
```


Expand All @@ -50,7 +50,29 @@ in a Docket worker context where there's no active MCP request.
- `McpError`: If the elicitation request fails


### `handle_task_input` <sup><a href="https://github.com/jlowin/fastmcp/blob/main/src/fastmcp/server/tasks/elicitation.py#L176" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
### `relay_elicitation` <sup><a href="https://github.com/jlowin/fastmcp/blob/main/src/fastmcp/server/tasks/elicitation.py#L234" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
relay_elicitation(session: ServerSession, session_id: str, task_id: str, elicitation: dict[str, Any], fastmcp: FastMCP) -> None
```


Relay elicitation from a background task worker to the client.

Called by the notification subscriber when it detects an input_required
notification with elicitation metadata. Sends a standard elicitation/create
request to the client session, then uses handle_task_input() to push the
response to Redis so the blocked worker can resume.

**Args:**
- `session`: MCP ServerSession
- `session_id`: Session identifier
- `task_id`: Background task ID
- `elicitation`: Elicitation metadata (message, requestedSchema)
- `fastmcp`: FastMCP server instance


### `handle_task_input` <sup><a href="https://github.com/jlowin/fastmcp/blob/main/src/fastmcp/server/tasks/elicitation.py#L290" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
handle_task_input(task_id: str, session_id: str, action: str, content: dict[str, Any] | None, fastmcp: FastMCP) -> bool
Expand Down
2 changes: 1 addition & 1 deletion docs/python-sdk/fastmcp-server-tasks-handlers.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Handles queuing tool/prompt/resource executions to Docket as background tasks.

## Functions

### `submit_to_docket` <sup><a href="https://github.com/jlowin/fastmcp/blob/main/src/fastmcp/server/tasks/handlers.py#L31" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>
### `submit_to_docket` <sup><a href="https://github.com/jlowin/fastmcp/blob/main/src/fastmcp/server/tasks/handlers.py#L34" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
submit_to_docket(task_type: Literal['tool', 'resource', 'template', 'prompt'], key: str, component: Tool | Resource | ResourceTemplate | Prompt, arguments: dict[str, Any] | None = None, task_meta: TaskMeta | None = None) -> mcp.types.CreateTaskResult
Expand Down
113 changes: 113 additions & 0 deletions docs/python-sdk/fastmcp-server-tasks-notifications.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
---
title: notifications
sidebarTitle: notifications
---

# `fastmcp.server.tasks.notifications`


Distributed notification queue for background task events (SEP-1686).

Enables distributed Docket workers to send MCP notifications to clients
without holding session references. Workers push to a Redis queue,
the MCP server process subscribes and forwards to the client's session.

Pattern: Fire-and-forward with retry
- One queue per session_id
- LPUSH/BRPOP for reliable ordered delivery
- Retry up to 3 times on delivery failure, then discard
- TTL-based expiration for stale messages

Note: Docket's execution.subscribe() handles task state/progress events via
Redis Pub/Sub. This module handles elicitation-specific notifications that
require reliable delivery (input_required prompts, cancel signals).


## Functions

### `push_notification` <sup><a href="https://github.com/jlowin/fastmcp/blob/main/src/fastmcp/server/tasks/notifications.py#L48" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
push_notification(session_id: str, notification: dict[str, Any], docket: Docket) -> None
```


Push notification to session's queue (called from Docket worker).

Used for elicitation-specific notifications (input_required, cancel)
that need reliable delivery across distributed processes.

**Args:**
- `session_id`: Target session's identifier
- `notification`: MCP notification dict (method, params, _meta)
- `docket`: Docket instance for Redis access


### `notification_subscriber_loop` <sup><a href="https://github.com/jlowin/fastmcp/blob/main/src/fastmcp/server/tasks/notifications.py#L76" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
notification_subscriber_loop(session_id: str, session: ServerSession, docket: Docket, fastmcp: FastMCP) -> None
```


Subscribe to notification queue and forward to session.

Runs in the MCP server process. Bridges distributed workers to clients.

This loop:
1. Maintains a heartbeat (active subscriber marker for debugging)
2. Blocks on BRPOP waiting for notifications
3. Forwards notifications to the client's session
4. Retries failed deliveries, then discards (no dead-letter queue)

**Args:**
- `session_id`: Session identifier to subscribe to
- `session`: MCP ServerSession for sending notifications
- `docket`: Docket instance for Redis access
- `fastmcp`: FastMCP server instance (for elicitation relay)


### `ensure_subscriber_running` <sup><a href="https://github.com/jlowin/fastmcp/blob/main/src/fastmcp/server/tasks/notifications.py#L238" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
ensure_subscriber_running(session_id: str, session: ServerSession, docket: Docket, fastmcp: FastMCP) -> None
```


Start notification subscriber if not already running (idempotent).

Subscriber is created on first task submission and cleaned up on disconnect.
Safe to call multiple times for the same session.

**Args:**
- `session_id`: Session identifier
- `session`: MCP ServerSession
- `docket`: Docket instance
- `fastmcp`: FastMCP server instance (for elicitation relay)


### `stop_subscriber` <sup><a href="https://github.com/jlowin/fastmcp/blob/main/src/fastmcp/server/tasks/notifications.py#L278" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
stop_subscriber(session_id: str) -> None
```


Stop notification subscriber for a session.

Called when session disconnects. Pending messages remain in queue
for delivery if client reconnects (with TTL expiration).

**Args:**
- `session_id`: Session identifier


### `get_subscriber_count` <sup><a href="https://github.com/jlowin/fastmcp/blob/main/src/fastmcp/server/tasks/notifications.py#L298" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
get_subscriber_count() -> int
```


Get number of active subscribers (for monitoring).

82 changes: 82 additions & 0 deletions examples/task_elicitation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""
Background task elicitation demo.

A background task (Docket) that pauses mid-execution to ask the user a
question, waits for the answer, then resumes and finishes.

Works with both in-memory and Redis backends:

# In-memory (single process, no Redis needed)
FASTMCP_DOCKET_URL=memory:// uv run python examples/task_elicitation.py

# Redis (distributed, needs a worker running separately)
# Terminal 1: docker compose -f examples/tasks/docker-compose.yml up -d
# Terminal 2: FASTMCP_DOCKET_URL=redis://localhost:24242/0 \
# uv run fastmcp tasks worker examples/task_elicitation.py
# Terminal 3: FASTMCP_DOCKET_URL=redis://localhost:24242/0 \
# uv run python examples/task_elicitation.py

Requires the `docket` extra (included in dev dependencies).
"""

import asyncio
from dataclasses import dataclass

from mcp.types import TextContent

from fastmcp import Context, FastMCP
from fastmcp.client import Client
from fastmcp.server.elicitation import AcceptedElicitation

mcp = FastMCP("Task Elicitation Demo")


@dataclass
class DinnerPrefs:
cuisine: str
vegetarian: bool


@mcp.tool(task=True)
async def plan_dinner(ctx: Context) -> str:
"""Plan a dinner menu, asking the user what they're in the mood for."""

await ctx.report_progress(0, 2, "Asking what you'd like...")

result = await ctx.elicit(
"What kind of dinner are you in the mood for?",
response_type=DinnerPrefs,
)

if not isinstance(result, AcceptedElicitation):
return "Dinner cancelled!"

prefs = result.data
await ctx.report_progress(1, 2, "Planning your menu...")
await asyncio.sleep(1)
await ctx.report_progress(2, 2, "Done!")

veg = "vegetarian " if prefs.vegetarian else ""
return f"Tonight's menu: a lovely {veg}{prefs.cuisine} dinner!"


async def handle_elicitation(message, response_type, params, context):
"""Handle elicitation requests from background tasks."""
print(f" Server asks: {message}")
print(" Responding with: cuisine=Thai, vegetarian=True")
return DinnerPrefs(cuisine="Thai", vegetarian=True)


async def main():
async with Client(mcp, elicitation_handler=handle_elicitation) as client:
print("Starting background task...")
task = await client.call_tool("plan_dinner", {}, task=True)
print(f" task_id = {task.task_id}\n")

result = await task.result()
assert isinstance(result.content[0], TextContent)
print(f"\nResult: {result.content[0].text}")


if __name__ == "__main__":
asyncio.run(main())
2 changes: 1 addition & 1 deletion src/fastmcp/server/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1133,7 +1133,7 @@ async def _elicit_for_task(

return await elicit_for_task(
task_id=self._task_id, # type: ignore[arg-type]
session=self.session,
session=self._session,
message=message,
schema=schema,
fastmcp=self.fastmcp,
Expand Down
7 changes: 6 additions & 1 deletion src/fastmcp/server/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@

from fastmcp.server.tasks.capabilities import get_task_capabilities
from fastmcp.server.tasks.config import TaskConfig, TaskMeta, TaskMode
from fastmcp.server.tasks.elicitation import elicit_for_task, handle_task_input
from fastmcp.server.tasks.elicitation import (
elicit_for_task,
handle_task_input,
relay_elicitation,
)
from fastmcp.server.tasks.keys import (
build_task_key,
get_client_task_id_from_key,
Expand All @@ -29,5 +33,6 @@
"handle_task_input",
"parse_task_key",
"push_notification",
"relay_elicitation",
"stop_subscriber",
]
60 changes: 58 additions & 2 deletions src/fastmcp/server/tasks/elicitation.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

async def elicit_for_task(
task_id: str,
session: ServerSession,
session: ServerSession | None,
message: str,
schema: dict[str, Any],
fastmcp: FastMCP,
Expand Down Expand Up @@ -134,7 +134,7 @@ async def elicit_for_task(
"ttl": ELICIT_TTL_SECONDS * 1000,
},
"_meta": {
"modelcontextprotocol.io/related-task": {
"io.modelcontextprotocol/related-task": {
"taskId": task_id,
"status": "input_required",
"statusMessage": message,
Expand Down Expand Up @@ -231,6 +231,62 @@ async def elicit_for_task(
return mcp.types.ElicitResult(action="cancel", content=None)


async def relay_elicitation(
session: ServerSession,
session_id: str,
task_id: str,
elicitation: dict[str, Any],
fastmcp: FastMCP,
) -> None:
"""Relay elicitation from a background task worker to the client.

Called by the notification subscriber when it detects an input_required
notification with elicitation metadata. Sends a standard elicitation/create
request to the client session, then uses handle_task_input() to push the
response to Redis so the blocked worker can resume.

Args:
session: MCP ServerSession
session_id: Session identifier
task_id: Background task ID
elicitation: Elicitation metadata (message, requestedSchema)
fastmcp: FastMCP server instance
"""
try:
result = await session.elicit(
message=elicitation["message"],
requestedSchema=elicitation["requestedSchema"],
)
await handle_task_input(
task_id=task_id,
session_id=session_id,
action=result.action,
content=result.content,
fastmcp=fastmcp,
)
logger.debug(
"Relayed elicitation response for task %s (action=%s)",
task_id,
result.action,
)
except Exception as e:
logger.warning("Failed to relay elicitation for task %s: %s", task_id, e)
# Push a cancel response so the worker's BLPOP doesn't block forever
success = await handle_task_input(
task_id=task_id,
session_id=session_id,
action="cancel",
content=None,
fastmcp=fastmcp,
)
if not success:
logger.warning(
"Failed to push cancel response for task %s "
"(worker may block until TTL)",
task_id,
)


async def handle_task_input(
task_id: str,
session_id: str,
Expand Down
4 changes: 2 additions & 2 deletions src/fastmcp/server/tasks/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ async def submit_to_docket(
"pollInterval": poll_interval_ms,
},
"_meta": {
"modelcontextprotocol.io/related-task": {
"io.modelcontextprotocol/related-task": {
"taskId": server_task_id,
}
},
Expand Down Expand Up @@ -173,7 +173,7 @@ async def submit_to_docket(
)

try:
await ensure_subscriber_running(session_id, ctx.session, docket)
await ensure_subscriber_running(session_id, ctx.session, docket, ctx.fastmcp)

# Register cleanup callback on session exit (once per session)
# This ensures subscriber is stopped when the session disconnects
Expand Down
Loading