Skip to content

Conversation

@tisnik
Copy link
Contributor

@tisnik tisnik commented Nov 12, 2025

Summary by CodeRabbit

  • New Features
    • Added a streaming query endpoint for real-time query responses with incremental event updates.
    • Support for both JSON and plain text media types for streaming content.
    • Enhanced error handling for rate limits, connection issues, and LLM errors.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 12, 2025

Walkthrough

Adds a new streaming query endpoint module that implements SSE-based response handling for user queries processed through Llama Stack. The module provides event formatting utilities, specialized handlers for various streaming event types (turn start/complete, inference, tool execution, shield validation), error handling, and an async endpoint that validates access, retrieves responses, and streams formatted events back to clients.

Changes

Cohort / File(s) Summary
Streaming Query Endpoint
src/app/endpoints/streaming_query_v3.py
New file introducing SSE streaming endpoint handler, response retrieval logic from Llama Stack client, and public event formatting functions for streaming pipeline architecture
Event Formatting Utilities
src/app/endpoints/streaming_query_v3.py
Helper functions to serialize and format streaming data (format_stream_data, stream_event, stream_start_event, stream_end_event) supporting both JSON and plain text media types
Event Handlers (Private)
src/app/endpoints/streaming_query_v3.py
Specialized handlers for dispatching SSE events from Llama Stack chunks (stream_build_event, _handle_turn_start_event, _handle_turn_complete_event, _handle_inference_event, _handle_tool_execution_event, _handle_shield_event, _handle_heartbeat_event)
Error Handling
src/app/endpoints/streaming_query_v3.py
Error handlers for streaming failures (_handle_error_event, prompt_too_long_error, generic_llm_error) producing appropriate SSE or HTTP error responses

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Endpoint as streaming_query_endpoint_handler
    participant Auth as Auth & RBAC
    participant LlamaStack as Llama Stack Client
    participant EventHandler as stream_build_event
    participant SSE as SSE Response Stream

    Client->>Endpoint: HTTP POST /stream_query
    Endpoint->>Auth: Validate auth, user, conversation access
    Auth-->>Endpoint: Access granted
    Endpoint->>LlamaStack: Call retrieve_response(model, query)
    LlamaStack-->>Endpoint: AsyncIterator[AgentTurnResponseStreamChunk]
    
    Endpoint->>SSE: stream_start_event(conversation_id)
    SSE-->>Client: SSE: start
    
    loop For each chunk from LlamaStack
        Endpoint->>EventHandler: stream_build_event(chunk, chunk_id, metadata)
        alt Chunk Type: turn_start
            EventHandler->>SSE: emit turn_start event
        else Chunk Type: inference
            EventHandler->>SSE: emit inference (token) event
        else Chunk Type: tool_execution
            EventHandler->>SSE: emit tool_call + tool_result events
        else Chunk Type: shield
            EventHandler->>SSE: emit shield validation event
        else Chunk Type: turn_complete
            EventHandler->>SSE: emit turn_complete event
        else Error
            EventHandler->>SSE: emit error event
        end
        SSE-->>Client: SSE event(s)
    end
    
    Endpoint->>SSE: stream_end_event(metadata, summary, token_usage)
    SSE-->>Client: SSE: end (with metadata)
    
    Endpoint->>Endpoint: Conditionally persist transcript & cache
    Endpoint-->>Client: Close stream
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

  • Event handler patterns: Multiple similar private handler functions (_handle_*_event) following consistent patterns—repetition reduces per-function review complexity
  • Streaming pipeline integration: Core logic in stream_build_event dispatcher and streaming_query_endpoint_handler requires careful attention to event sequencing and state transitions
  • Error handling paths: Multiple error scenarios (API connection, rate limits, generic LLM errors) need verification for proper SSE/HTTP response formatting
  • Media type abstraction: JSON vs. plain text formatting across all event functions should be validated for consistency
  • Authentication and access control: RBAC checks and conversation ownership validation flow warrant verification

Possibly related PRs

  • LCORE-693: Added rag_chunks to streaming_query #585: Modifies stream_end_event signature to accept TurnSummary and handle RAG metadata, directly updating the end-of-stream event construction logic in this PR
  • Streaming improvements #254: Enhances stream_build_event dispatcher and adds specialized event handlers for turn/inference/tool/shield events, overlapping with the event handler implementations here

Suggested reviewers

  • jrobertboos
  • are-ces
  • umago

Pre-merge checks and finishing touches

❌ Failed checks (1 inconclusive)
Check name Status Explanation Resolution
Title check ❓ Inconclusive The title 'Streaming query v3' is vague and lacks specificity about what this endpoint does or its purpose. Consider a more descriptive title such as 'Add SSE-based streaming query endpoint with Llama Stack integration' or 'Implement streaming_query_v3 endpoint for real-time LLM responses'.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Docstring Coverage ✅ Passed Docstring coverage is 94.44% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (6)
src/app/endpoints/streaming_query_v3.py (6)

248-249: Log unknown event types at warning level instead of error.

Line 248 logs unknown event types as errors but returns an empty string, silently dropping the event. This makes it difficult to diagnose missing events in production. Consider either:

  • Logging at warning level since this appears to be a known gap rather than a critical error
  • Yielding a diagnostic event instead of an empty string

396-428: Update parameter name and docstring for consistency.

The parameter is named _chunk_id (with underscore prefix indicating it's unused) but the docstring at line 408 refers to it as chunk_id. Either remove the underscore if the parameter is needed, or update the docstring to reflect that it's unused.


534-564: Consider logging unhandled delta types for observability.

The function handles delta.type == "tool_call" and delta.type == "text" but silently ignores other delta types. While this may be intentional, adding a debug log for unhandled types would improve observability:

elif chunk.event.payload.delta.type not in ("tool_call", "text"):
    logger.debug("Unhandled delta type: %s", chunk.event.payload.delta.type)

799-836: Consider extracting the chunk processing loop into a separate function.

The nested response_generator function spans 127 lines (785-912) with multiple responsibilities: iterating chunks, updating summaries, handling token metrics, storing transcripts, and managing cache. While the logic is correct, extracting the core chunk processing loop (lines 812-836) into a helper function would improve readability and testability.


1063-1070: Mime type case handling in document conversion.

Line 1066 uses .lower() on mime_type but compares against lowercase strings. While this works correctly, the list comprehension rebuilds the entire document list even when no conversion is needed. Consider a more explicit approach:

documents: list[Document] = []
for doc in query_request.get_documents():
    if doc["mime_type"].lower() in ("application/json", "application/xml"):
        documents.append({"content": doc["content"], "mime_type": "text/plain"})
    else:
        documents.append(doc)

However, the existing code is concise and correct, so this is optional.


1010-1013: TODO comment about redacting attachments before sending to LLM.

There's a TODO at line 1010 to redact attachment content before sending to the LLM. This could be a security or privacy concern if attachments contain sensitive information. Consider raising an issue to track this work.

Do you want me to create an issue to track the implementation of attachment content redaction?

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 65acb98 and fb1e963.

📒 Files selected for processing (1)
  • src/app/endpoints/streaming_query_v3.py (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: luis5tb
Repo: lightspeed-core/lightspeed-stack PR: 727
File: src/app/endpoints/a2a.py:43-43
Timestamp: 2025-10-29T13:05:22.438Z
Learning: In the lightspeed-stack repository, endpoint files in src/app/endpoints/ intentionally use a shared logger name "app.endpoints.handlers" rather than __name__, allowing unified logging configuration across all endpoint handlers (query.py, streaming_query.py, a2a.py).
🧬 Code graph analysis (1)
src/app/endpoints/streaming_query_v3.py (14)
src/models/responses.py (4)
  • ToolCall (157-162)
  • ForbiddenResponse (1120-1142)
  • UnauthorizedResponse (1094-1117)
  • QuotaExceededResponse (1145-1210)
src/app/database.py (1)
  • get_session (34-40)
src/app/endpoints/query.py (7)
  • get_rag_toolgroups (848-875)
  • is_input_shield (575-588)
  • is_output_shield (562-572)
  • select_model_and_provider_id (468-546)
  • validate_attachments_metadata (816-845)
  • persist_user_conversation_details (113-145)
  • evaluate_model_hints (148-187)
src/authentication/__init__.py (1)
  • get_auth_dependency (14-52)
src/authorization/middleware.py (1)
  • authorize (111-122)
src/client.py (2)
  • AsyncLlamaStackClientHolder (18-55)
  • get_client (49-55)
src/configuration.py (3)
  • configuration (73-77)
  • llama_stack_configuration (87-91)
  • mcp_servers (101-105)
src/metrics/utils.py (1)
  • update_llm_token_count_from_turn (60-77)
src/models/cache_entry.py (1)
  • CacheEntry (7-24)
src/models/requests.py (2)
  • QueryRequest (73-225)
  • get_documents (197-204)
src/utils/mcp_headers.py (2)
  • handle_mcp_headers_with_toolgroups (56-90)
  • mcp_headers_dependency (15-26)
src/utils/token_counter.py (2)
  • TokenCounter (18-41)
  • extract_token_usage_from_turn (44-94)
src/utils/transcripts.py (1)
  • store_transcript (40-99)
src/utils/types.py (2)
  • TurnSummary (89-163)
  • append_tool_calls_from_llama (96-117)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: build-pr
  • GitHub Check: e2e_tests (ci)
  • GitHub Check: e2e_tests (azure)
  • GitHub Check: Konflux kflux-prd-rh02 / lightspeed-stack-on-pull-request
🔇 Additional comments (1)
src/app/endpoints/streaming_query_v3.py (1)

913-914: Metrics increment before response generation completes.

The LLM call metric is incremented at line 914 before the response_generator starts executing. This means the metric counts initiated requests, not completed ones. If this is intentional, consider adding a comment to clarify. If you want to count completed requests, increment inside response_generator after the stream finishes.

Would you like to clarify the intended semantics of this metric?

Comment on lines +100 to +103
400: {
"description": "Missing or invalid credentials provided by client",
"model": UnauthorizedResponse,
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Misleading HTTP 400 description.

The description for status code 400 states "Missing or invalid credentials provided by client" but maps to UnauthorizedResponse. HTTP 400 is for bad requests (malformed syntax, invalid parameters), while authentication/authorization issues should use 401/403. Consider either:

  • Changing the status code to 401 if this handles authentication
  • Updating the description to match bad request scenarios (e.g., "Invalid request parameters")
🤖 Prompt for AI Agents
In src/app/endpoints/streaming_query_v3.py around lines 100 to 103, the HTTP 400
entry currently describes authentication issues and maps to UnauthorizedResponse
which is misleading; either change the status code to 401 and keep the
UnauthorizedResponse and an authentication-focused description, or keep 400 but
update the description to reflect a bad request (e.g., "Invalid request
parameters") and swap the model to a BadRequestResponse (or appropriate schema).
Make the change so status code, description, and response model consistently
reflect either an authentication error (401 + UnauthorizedResponse) or a bad
request (400 + BadRequestResponse).

Comment on lines +171 to +227
def stream_end_event(
metadata_map: dict,
summary: TurnSummary, # pylint: disable=unused-argument
token_usage: TokenCounter,
media_type: str = MEDIA_TYPE_JSON,
) -> str:
"""
Yield the end of the data stream.

Format and return the end event for a streaming response,
including referenced document metadata and token usage information.

Parameters:
metadata_map (dict): A mapping containing metadata about
referenced documents.
summary (TurnSummary): Summary of the conversation turn.
token_usage (TokenCounter): Token usage information.
media_type (str): The media type for the response format.

Returns:
str: A Server-Sent Events (SSE) formatted string
representing the end of the data stream.
"""
if media_type == MEDIA_TYPE_TEXT:
ref_docs_string = "\n".join(
f'{v["title"]}: {v["docs_url"]}'
for v in filter(
lambda v: ("docs_url" in v) and ("title" in v),
metadata_map.values(),
)
)
return f"\n\n---\n\n{ref_docs_string}" if ref_docs_string else ""

# For JSON media type, we need to create a proper structure
# Since we don't have access to summary here, we'll create a basic structure
referenced_docs_dict = [
{
"doc_url": v.get("docs_url"),
"doc_title": v.get("title"),
}
for v in metadata_map.values()
if "docs_url" in v and "title" in v
]

return format_stream_data(
{
"event": "end",
"data": {
"rag_chunks": [], # TODO(jboos): implement RAG chunks when summary is available
"referenced_documents": referenced_docs_dict,
"truncated": None, # TODO(jboos): implement truncated
"input_tokens": token_usage.input_tokens,
"output_tokens": token_usage.output_tokens,
},
"available_quotas": {}, # TODO(jboos): implement available quotas
}
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Use the summary parameter to populate RAG chunks.

The summary parameter is marked as unused (line 173) and there's a TODO at line 219 to "implement RAG chunks when summary is available," but summary is already available as a parameter. You can populate rag_chunks using summary.rag_chunks instead of leaving it as an empty list.

Apply this diff to utilize the summary parameter:

 def stream_end_event(
     metadata_map: dict,
-    summary: TurnSummary,  # pylint: disable=unused-argument
+    summary: TurnSummary,
     token_usage: TokenCounter,
     media_type: str = MEDIA_TYPE_JSON,
 ) -> str:
     ...
     return format_stream_data(
         {
             "event": "end",
             "data": {
-                "rag_chunks": [],  # TODO(jboos): implement RAG chunks when summary is available
+                "rag_chunks": [
+                    {
+                        "content": chunk.content,
+                        "source": chunk.source,
+                        "score": chunk.score,
+                    }
+                    for chunk in summary.rag_chunks
+                ],
                 "referenced_documents": referenced_docs_dict,
                 "truncated": None,  # TODO(jboos): implement truncated
                 "input_tokens": token_usage.input_tokens,
                 "output_tokens": token_usage.output_tokens,
             },
             "available_quotas": {},  # TODO(jboos): implement available quotas
         }
     )

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/app/endpoints/streaming_query_v3.py around lines 171 to 227, the summary
parameter is unused and rag_chunks is left as an empty list; populate the JSON
end event with RAG chunks from the provided summary by replacing the empty
"rag_chunks": [] with summary.rag_chunks (or an appropriate transformed list if
needed), ensure any serialization-safe conversion is applied (e.g., mapping
objects to dicts or primitives), and remove the now-unnecessary pylint disable
for unused-argument if summary becomes used.

Comment on lines +283 to +287
if hasattr(chunk, "error"):
yield from _handle_error_event(chunk, chunk_id, media_type)

event_type = chunk.event.payload.event_type
step_type = getattr(chunk.event.payload, "step_type", None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Add None check for chunk.event before accessing payload.

The function accesses chunk.event.payload at line 286 without first checking if chunk.event is None. However, the main handler at line 813-814 shows that chunk.event can be None. This will cause an AttributeError when processing such chunks.

Apply this diff to add the necessary check:

 def stream_build_event(
     chunk: Any,
     chunk_id: int,
     metadata_map: dict,
     media_type: str = MEDIA_TYPE_JSON,
     conversation_id: str | None = None,
 ) -> Iterator[str]:
     if hasattr(chunk, "error"):
         yield from _handle_error_event(chunk, chunk_id, media_type)
+        return
 
+    if chunk.event is None:
+        yield from _handle_heartbeat_event(chunk_id, media_type)
+        return
+
     event_type = chunk.event.payload.event_type
     step_type = getattr(chunk.event.payload, "step_type", None)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if hasattr(chunk, "error"):
yield from _handle_error_event(chunk, chunk_id, media_type)
event_type = chunk.event.payload.event_type
step_type = getattr(chunk.event.payload, "step_type", None)
def stream_build_event(
chunk: Any,
chunk_id: int,
metadata_map: dict,
media_type: str = MEDIA_TYPE_JSON,
conversation_id: str | None = None,
) -> Iterator[str]:
if hasattr(chunk, "error"):
yield from _handle_error_event(chunk, chunk_id, media_type)
return
if chunk.event is None:
yield from _handle_heartbeat_event(chunk_id, media_type)
return
event_type = chunk.event.payload.event_type
step_type = getattr(chunk.event.payload, "step_type", None)
🤖 Prompt for AI Agents
In src/app/endpoints/streaming_query_v3.py around lines 283 to 287, the code
accesses chunk.event.payload without confirming chunk.event is not None; add a
check to only access payload when chunk.event is present (e.g., if
hasattr(chunk, "event") and chunk.event is not None) and otherwise skip
payload-related handling or route to the existing error/none branch so no
AttributeError is raised.

Comment on lines +414 to +421
# Use provided conversation_id or generate one if not available
if conversation_id is None:
conversation_id = str(uuid.uuid4())

if media_type == MEDIA_TYPE_TEXT:
yield (
f"data: {json.dumps({'event': 'start', 'data': {'conversation_id': conversation_id}})}\n\n" # pylint: disable=line-too-long
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Inconsistent formatting for TEXT media type in turn start event.

For MEDIA_TYPE_TEXT at line 420, the function yields JSON-formatted data with an SSE prefix, but this contradicts the plain text format. Looking at other handlers, TEXT media type should return plain text, not SSE-formatted JSON. Either:

  • Remove this special case and always use SSE formatting (lines 423-428)
  • Return plain text like "Stream started for conversation {conversation_id}\n\n"
🤖 Prompt for AI Agents
In src/app/endpoints/streaming_query_v3.py around lines 414 to 421, the TEXT
media-type branch yields SSE-style JSON for the turn-start event which is
inconsistent with the plain-text handling used elsewhere; change the branch so
that for MEDIA_TYPE_TEXT it yields plain text (for example: "Stream started for
conversation {conversation_id}\n\n") instead of the JSON SSE line, or
alternatively remove the special-case branch so the shared SSE formatting (lines
423-428) is used for all media types — implement the plain-text replacement for
MEDIA_TYPE_TEXT to match other TEXT handlers or remove the branch to defer to
the common SSE code.

@tisnik tisnik closed this Nov 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant