-
Notifications
You must be signed in to change notification settings - Fork 0
Feature/auth guard improvement #38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
99bb961
6225f65
dbf63d6
bfd43cd
91d1000
577a9fd
f8b914a
ad39afe
78b20d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,91 +36,13 @@ def _search_documents(query_vector): | |
| {'query_embedding': query_vector, 'match_count': 3} | ||
| ).execute() | ||
|
|
||
| async def generate_chat_events(request: Request, query: str, history: List[HistoryMessage]): | ||
| from fastapi import APIRouter, Request, Depends, BackgroundTasks | ||
|
|
||
| async def generate_chat_events(request: Request, query: str, history: List[HistoryMessage], background_tasks: BackgroundTasks = None): | ||
| """ | ||
| Generator function that streams SSE events. | ||
| It yields 'metadata' first, then chunks of 'content'. | ||
| Generator function that streams SSE events using LangGraph. | ||
| """ | ||
| from app.services.llm import get_english_translation, get_response_stream_async | ||
| from app.services.embedding import embedding_service | ||
|
|
||
| # 1. Translate Korean query to English // Note: We don't translate history here to save costs and reduce latency | ||
| t0 = time.perf_counter() | ||
| try: | ||
| english_query = await asyncio.wait_for( | ||
| get_english_translation(query), | ||
| timeout=CHAT_TIMEOUT, | ||
| ) | ||
| t1 = time.perf_counter() | ||
| logger.info(f"Translation successful in {t1 - t0:.2f}s") | ||
| except asyncio.TimeoutError: | ||
| logger.warning(f"Translation timed out after {time.perf_counter() - t0:.2f}s") | ||
| yield {"event": "error", "data": "응답이 지연되고 있어요. 잠시 후 다시 시도해 주세요."} | ||
| return | ||
| except Exception: | ||
| logger.exception("Failed to translate query") | ||
| yield {"event": "error", "data": "오늘은 철학자도 사색의 시간이 필요하답니다. 내일 다시 지혜를 나누러 올게요."} | ||
| return | ||
|
|
||
| # 2. Generate vector representation | ||
| t2 = time.perf_counter() | ||
| try: | ||
| query_vector = await asyncio.wait_for( | ||
| embedding_service.agenerate_embedding(english_query), | ||
| timeout=CHAT_TIMEOUT, | ||
| ) | ||
| t3 = time.perf_counter() | ||
| logger.info(f"Embedding successful in {t3 - t2:.2f}s") | ||
| except asyncio.TimeoutError: | ||
| logger.warning(f"Embedding generation timed out after {time.perf_counter() - t2:.2f}s") | ||
| yield {"event": "error", "data": "응답이 지연되고 있어요. 잠시 후 다시 시도해 주세요."} | ||
| return | ||
| except Exception: | ||
| logger.exception("Failed to generate query embedding") | ||
| yield {"event": "error", "data": "오늘은 철학자도 사색의 시간이 필요하답니다. 내일 다시 지혜를 나누러 올게요."} | ||
| return | ||
|
|
||
| # 3. Perform hybrid search in Supabase | ||
| # We use the RPC match_documents function defined in schema.sql | ||
| t4 = time.perf_counter() | ||
| try: | ||
| async with _db_rpc_semaphore: | ||
| response = await asyncio.to_thread(_search_documents, query_vector) | ||
| documents = response.data or [] | ||
| t5 = time.perf_counter() | ||
| logger.info(f"Database search successful in {t5 - t4:.2f}s. Found {len(documents)} docs.") | ||
| except Exception: | ||
| logger.exception("Database search failed") | ||
| yield {"event": "error", "data": "검색 중 오류가 발생했습니다. 잠시 후 다시 시도해 주세요."} | ||
| return | ||
|
|
||
| if not documents: | ||
| logger.warning(f"No documents found for query in {time.perf_counter() - t4:.2f}s") | ||
| yield {"event": "content", "data": "관련 철학적 내용을 찾을 수 없습니다."} | ||
| return | ||
|
|
||
| # 4. Extract contexts and format metadata | ||
| contexts = [] | ||
| philosophers_meta = [] | ||
|
|
||
| for doc in documents: | ||
| contexts.append(doc['content']) | ||
| meta = doc['metadata'] | ||
| # Group metadata to send to the frontend | ||
| if meta not in philosophers_meta: | ||
| philosophers_meta.append(meta) | ||
|
|
||
| # 5. Emit Event 1: metadata (Structured JSON) | ||
| metadata_event = { | ||
| "philosophers": philosophers_meta | ||
| } | ||
| yield {"event": "metadata", "data": json.dumps(metadata_event, ensure_ascii=False)} | ||
|
|
||
| # Add a small delay for frontend to process metadata before sending content | ||
| await asyncio.sleep(0.1) | ||
|
|
||
| # 6. Emit Event 2: content (Text chunk streaming via LLM) | ||
| combined_context = "\n\n".join(contexts) | ||
| from app.services.graph import create_graph | ||
|
|
||
| MAX_HISTORY_MESSAGES = 20 | ||
| MAX_HISTORY_CHARS = 1000 | ||
|
|
@@ -141,45 +63,133 @@ async def generate_chat_events(request: Request, query: str, history: List[Histo | |
| formatted_parts.append(f"{role_name}: {content[:MAX_HISTORY_CHARS]}") | ||
|
|
||
| formatted_history = "\n\n".join(formatted_parts) | ||
|
|
||
| t6 = time.perf_counter() | ||
|
|
||
| t0 = time.perf_counter() | ||
| graph = create_graph() | ||
|
|
||
| metadata_sent = False | ||
| full_answer = "" | ||
| chunk_count = 0 | ||
| final_state = {} | ||
| client_disconnected = False | ||
|
|
||
| try: | ||
| chunk_count = 0 | ||
| disconnected = False | ||
| async for chunk in get_response_stream_async(context=combined_context, query=english_query, history=formatted_history): | ||
| if chunk_count == 0: | ||
| logger.info(f"First LLM chunk received in {time.perf_counter() - t6:.2f}s") | ||
|
|
||
| # If client disconnects, stop generating | ||
| async for event in graph.astream_events( | ||
| {"query": query, "history": formatted_history}, | ||
| version="v2" | ||
| ): | ||
| if await request.is_disconnected(): | ||
| disconnected = True | ||
| logger.info(f"Client disconnected during streaming after {chunk_count} chunks.") | ||
| logger.info("Client disconnected during streaming.") | ||
| client_disconnected = True | ||
| break | ||
|
|
||
| chunk_count += 1 | ||
| # Clean up chunk to avoid SSE formatting issues with newlines | ||
| chunk_clean = chunk.replace("\n", "\\n") | ||
| yield {"event": "content", "data": chunk_clean} | ||
| kind = event["event"] | ||
| tags = event.get("tags", []) | ||
|
|
||
| if not disconnected: | ||
| if chunk_count == 0: | ||
| logger.warning(f"LLM returned 0 chunks after {time.perf_counter() - t6:.2f}s. Sending a fallback message.") | ||
| yield {"event": "content", "data": "철학자는 난색을 표하며 서적을 뒤적거립니다. 대신 철학자가 답변을 해줄 만한 다른 질문은 없을까요?"} | ||
| # Emit metadata after the 'retrieve' node finishes | ||
| if kind == "on_chain_end" and event["name"] == "retrieve": | ||
| output = event["data"].get("output", {}) | ||
| if isinstance(output, dict) and "documents" in output and not metadata_sent: | ||
| documents = output["documents"] | ||
| philosophers_meta = [] | ||
| for doc in documents: | ||
| meta = doc.get('metadata') | ||
| if meta not in philosophers_meta: | ||
| philosophers_meta.append(meta) | ||
|
|
||
| if not documents: | ||
| # No documents found, we can still send an empty metadata | ||
| pass | ||
|
|
||
| metadata_event = { | ||
| "philosophers": philosophers_meta | ||
| } | ||
| yield {"event": "metadata", "data": json.dumps(metadata_event, ensure_ascii=False)} | ||
| metadata_sent = True | ||
| await asyncio.sleep(0.1) | ||
|
|
||
| # Watch for final generation streaming | ||
| elif kind == "on_chat_model_stream" and "final_generation" in tags: | ||
| chunk = event["data"]["chunk"].content | ||
| if isinstance(chunk, str) and chunk: | ||
| chunk_count += 1 | ||
| full_answer += chunk | ||
| chunk_clean = chunk.replace("\n", "\\n") | ||
| yield {"event": "content", "data": chunk_clean} | ||
|
|
||
| elif kind == "on_chain_end": | ||
| # Debug logging to identify the correct event name if needed | ||
| # logger.debug(f"Chain end: {event['name']}") | ||
|
|
||
| # Check if this is the final output of the graph | ||
| output = event["data"].get("output", {}) | ||
| if isinstance(output, dict) and ("documents" in output or "reformulated_query" in output): | ||
| final_state = output | ||
|
|
||
| if chunk_count == 0 and not full_answer: | ||
| # Check if graph final state already has an answer (e.g. from generate node) | ||
| # that was not streamed for some reason. | ||
| full_answer = str(final_state.get("answer") or "") | ||
| if full_answer: | ||
| yield {"event": "content", "data": full_answer.replace("\n", "\\n")} | ||
| chunk_count = 1 | ||
| else: | ||
| logger.info(f"Stream finished successfully. Total chunks: {chunk_count}, Total time: {time.perf_counter() - t0:.2f}s") | ||
| logger.warning("LLM returned 0 chunks and no final answer found.") | ||
| yield {"event": "content", "data": "철학자는 난색을 표하며 서적을 뒤적거립니다. 대신 철학자가 답변을 해줄 만한 다른 질문은 없을까요?"} | ||
|
|
||
| logger.info(f"Stream finished. Total chunks: {chunk_count}, Time: {time.perf_counter() - t0:.2f}s") | ||
|
|
||
| # evaluation background task | ||
| # Fix: Only enqueue evaluation if we have a non-empty answer to avoid log inconsistencies | ||
| if not client_disconnected and background_tasks and final_state and full_answer: | ||
| from app.services.evaluation import evaluate_and_log | ||
| contexts = [d["content"] for d in final_state.get("documents", [])] | ||
| logger.info("Scheduling background evaluation task...") | ||
| background_tasks.add_task( | ||
| evaluate_and_log, | ||
| query=query, | ||
| reformulated_query=final_state.get("reformulated_query", ""), | ||
| contexts=contexts, | ||
| answer=full_answer, | ||
| context_relevance=1.0 if final_state.get("is_relevant") else 0.0 | ||
| ) | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| else: | ||
| logger.info(f"Skipping evaluation. final_state_exists: {bool(final_state)}, has_answer: {bool(full_answer)}") | ||
|
|
||
| except Exception: | ||
| logger.exception("Failed while streaming LLM response") | ||
| logger.exception("Failed while streaming LangGraph response") | ||
| yield {"event": "error", "data": "오늘은 철학자도 사색의 시간이 필요하답니다. 내일 다시 지혜를 나누러 올게요."} | ||
| return | ||
|
|
||
| from app.core.auth import get_current_user | ||
|
|
||
| @router.get("/eval-logs") | ||
| async def get_eval_logs(user: dict = Depends(get_current_user)): | ||
| """ | ||
| Fetch the latest evaluation logs from Supabase. | ||
| """ | ||
| try: | ||
| from app.services.database import get_client | ||
| # Offload sync Supabase call to worker thread to avoid blocking event loop | ||
| res = await asyncio.to_thread( | ||
| lambda: get_client().table("eval_logs").select("*").order("created_at", desc=True).limit(50).execute() | ||
| ) | ||
| return res.data | ||
|
Comment on lines
+171
to
+177
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: # First, let's check if the file exists and see the context around lines 170-173
fd "chat.py" --path "*/api/routes/*"Repository: SanghunYun95/philo-rag Length of output: 281 🏁 Script executed: # Also look for graph.py
fd "graph.py" --path "*/services/*"Repository: SanghunYun95/philo-rag Length of output: 281 🏁 Script executed: # Search for chat.py file
fd "chat\.py"Repository: SanghunYun95/philo-rag Length of output: 135 🏁 Script executed: # Search for graph.py file
fd "graph\.py"Repository: SanghunYun95/philo-rag Length of output: 128 🏁 Script executed: # Read chat.py with context around lines 170-173
wc -l backend/app/api/routes/chat.pyRepository: SanghunYun95/philo-rag Length of output: 100 🏁 Script executed: # Read chat.py lines 160-185 for full context
cat -n backend/app/api/routes/chat.py | sed -n '160,185p'Repository: SanghunYun95/philo-rag Length of output: 1179 🏁 Script executed: # Read graph.py lines 65-95 for the comparison pattern
cat -n backend/app/services/graph.py | sed -n '65,95p'Repository: SanghunYun95/philo-rag Length of output: 1264 비동기 라우트에서 Supabase 조회를
🤖 Prompt for AI Agents |
||
| except Exception as e: | ||
| logger.exception("Failed to fetch evaluation logs from database") | ||
| from fastapi import HTTPException | ||
| raise HTTPException( | ||
| status_code=500, | ||
| detail="Failed to fetch evaluation logs" | ||
| ) from e | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
|
|
||
| @router.post("") | ||
| @limiter.limit("5/minute") | ||
| async def chat_endpoint(request: Request, chat_request: ChatRequest): | ||
| async def chat_endpoint(request: Request, chat_request: ChatRequest, background_tasks: BackgroundTasks): | ||
| """ | ||
| Endpoint for accepting chat queries and returning a text/event-stream response. | ||
| """ | ||
| return EventSourceResponse(generate_chat_events(request, chat_request.query, chat_request.history)) | ||
| return EventSourceResponse(generate_chat_events(request, chat_request.query, chat_request.history, background_tasks)) | ||
|
|
||
| @router.post("/title") | ||
| @limiter.limit("10/minute") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| from fastapi import Depends, HTTPException, status | ||
| from fastapi.security import APIKeyHeader | ||
| from app.core.config import settings | ||
|
|
||
| # This is a simple API key authentication for the dashboard logs. | ||
| # For production, consider using a full OAuth2/Supabase Auth system. | ||
| API_KEY_NAME = "x-admin-key" | ||
| api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) | ||
|
|
||
| async def get_current_user(api_key: str = Depends(api_key_header)): | ||
| """ | ||
| Validates the admin secret key from request headers. | ||
| """ | ||
| if not api_key or api_key != settings.ADMIN_SECRET_KEY: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_401_UNAUTHORIZED, | ||
| detail="Invalid or missing Admin Secret Key", | ||
| ) | ||
| return {"user": "admin"} |
Uh oh!
There was an error while loading. Please reload this page.