Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/src/mcp_server/features/projects/project_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import asyncio
import json
import logging
from typing import Any, Optional
from typing import Optional
from urllib.parse import urljoin

import httpx
Expand Down
2 changes: 1 addition & 1 deletion python/src/mcp_server/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
"get_polling_timeout",
"get_max_polling_attempts",
"get_polling_interval",
]
]
2 changes: 1 addition & 1 deletion python/src/mcp_server/utils/error_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,4 @@ def _get_suggestion_for_status(status_code: int) -> Optional[str]:
503: "Service temporarily unavailable. Try again later",
504: "The operation timed out. The server may be overloaded",
}
return suggestions.get(status_code)
return suggestions.get(status_code)
5 changes: 3 additions & 2 deletions python/src/mcp_server/utils/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"""

from contextlib import asynccontextmanager
from typing import AsyncIterator, Optional
from typing import Optional
from collections.abc import AsyncIterator

import httpx

Expand Down Expand Up @@ -35,4 +36,4 @@ async def get_http_client(

# Future: Could add retry logic, custom headers, etc. here
async with httpx.AsyncClient(timeout=timeout) as client:
yield client
yield client
3 changes: 1 addition & 2 deletions python/src/mcp_server/utils/timeout_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
"""

import os
from typing import Optional

import httpx

Expand Down Expand Up @@ -77,4 +76,4 @@ def get_polling_interval(attempt: int) -> float:

# Exponential backoff: 1s, 2s, 4s, 5s, 5s, ...
interval = min(base_interval * (2**attempt), max_interval)
return float(interval)
return float(interval)
4 changes: 2 additions & 2 deletions python/src/server/api_routes/knowledge_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ async def upload_document(
safe_logfire_info(
f"📋 UPLOAD: Starting document upload | filename={file.filename} | content_type={file.content_type} | knowledge_type={knowledge_type}"
)

safe_logfire_info(
f"Starting document upload | filename={file.filename} | content_type={file.content_type} | knowledge_type={knowledge_type}"
)
Expand Down Expand Up @@ -907,7 +907,7 @@ async def stop_crawl_task(progress_id: str):
"""Stop a running crawl task."""
try:
from ..services.crawling import get_active_orchestration, unregister_orchestration

# Emit stopping status immediately
await sio.emit(
"crawl:stopping",
Expand Down
2 changes: 1 addition & 1 deletion python/src/server/api_routes/mcp_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def _resolve_container(self):
"""Simple container resolution - just use fixed name."""
if not self.docker_client:
return None

try:
# Simple: Just look for the fixed container name
container = self.docker_client.containers.get("archon-mcp")
Expand Down
8 changes: 4 additions & 4 deletions python/src/server/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,16 @@ def validate_supabase_url(url: str) -> bool:
# Allow HTTP for local development (host.docker.internal or localhost)
if parsed.scheme not in ("http", "https"):
raise ConfigurationError("Supabase URL must use HTTP or HTTPS")

# Require HTTPS for production (non-local) URLs
if parsed.scheme == "http":
hostname = parsed.hostname or ""

# Check for exact localhost and Docker internal hosts (security: prevent subdomain bypass)
local_hosts = ["localhost", "127.0.0.1", "host.docker.internal"]
if hostname in local_hosts or hostname.endswith(".localhost"):
return True

# Check if hostname is a private IP address
try:
ip = ipaddress.ip_address(hostname)
Expand All @@ -125,7 +125,7 @@ def validate_supabase_url(url: str) -> bool:
except ValueError:
# hostname is not a valid IP address, could be a domain name
pass

# If not a local host or private IP, require HTTPS
raise ConfigurationError(f"Supabase URL must use HTTPS for non-local environments (hostname: {hostname})")

Expand Down
1 change: 0 additions & 1 deletion python/src/server/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from .api_routes.projects_api import router as projects_router

# Import Socket.IO handlers to ensure they're registered
from .api_routes import socketio_handlers # This registers all Socket.IO event handlers

# Import modular API routers
from .api_routes.settings_api import router as settings_router
Expand Down
5 changes: 3 additions & 2 deletions python/src/server/services/crawling/crawling_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

import asyncio
import uuid
from typing import Dict, Any, List, Optional, Callable, Awaitable
from typing import Dict, Any, List, Optional
from collections.abc import Callable, Awaitable
from urllib.parse import urlparse

from ...config.logfire_config import safe_logfire_info, safe_logfire_error, get_logger
Expand Down Expand Up @@ -558,7 +559,7 @@ async def _crawl_by_url_type(self, url: str, request: Dict[str, Any]) -> tuple:
max_depth = request.get("max_depth", 1)
# Let the strategy handle concurrency from settings
# This will use CRAWL_MAX_CONCURRENT from database (default: 10)

crawl_results = await self.crawl_recursive_with_progress(
[url],
max_depth=max_depth,
Expand Down
76 changes: 36 additions & 40 deletions python/src/server/services/crawling/document_storage_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@
Extracted from crawl_orchestration_service.py for better modularity.
"""
import asyncio
from typing import Dict, Any, List, Optional, Callable
from urllib.parse import urlparse
from typing import Dict, Any, List, Optional
from collections.abc import Callable

from ...config.logfire_config import safe_logfire_info, safe_logfire_error
from ..storage.storage_services import DocumentStorageService
from ..storage.document_storage_service import add_documents_to_supabase
from ..storage.code_storage_service import (
generate_code_summaries_batch,
add_code_examples_to_supabase
)
from ..source_management_service import update_source_info, extract_source_summary
from .code_extraction_service import CodeExtractionService

Expand All @@ -23,7 +19,7 @@ class DocumentStorageOperations:
"""
Handles document storage operations for crawled content.
"""

def __init__(self, supabase_client):
"""
Initialize document storage operations.
Expand All @@ -34,7 +30,7 @@ def __init__(self, supabase_client):
self.supabase_client = supabase_client
self.doc_storage_service = DocumentStorageService(supabase_client)
self.code_extraction_service = CodeExtractionService(supabase_client)

async def process_and_store_documents(
self,
crawl_results: List[Dict],
Expand All @@ -60,47 +56,47 @@ async def process_and_store_documents(
"""
# Initialize storage service for chunking
storage_service = DocumentStorageService(self.supabase_client)

# Prepare data for chunked storage
all_urls = []
all_chunk_numbers = []
all_contents = []
all_metadatas = []
source_word_counts = {}
url_to_full_document = {}

# Process and chunk each document
for doc_index, doc in enumerate(crawl_results):
# Check for cancellation during document processing
if cancellation_check:
cancellation_check()

source_url = doc.get('url', '')
markdown_content = doc.get('markdown', '')

if not markdown_content:
continue

# Store full document for code extraction context
url_to_full_document[source_url] = markdown_content

# CHUNK THE CONTENT
chunks = storage_service.smart_chunk_text(markdown_content, chunk_size=5000)

# Use the original source_id for all documents
source_id = original_source_id
safe_logfire_info(f"Using original source_id '{source_id}' for URL '{source_url}'")

# Process each chunk
for i, chunk in enumerate(chunks):
# Check for cancellation during chunk processing
if cancellation_check and i % 10 == 0: # Check every 10 chunks
cancellation_check()

all_urls.append(source_url)
all_chunk_numbers.append(i)
all_contents.append(chunk)

# Create metadata for each chunk
word_count = len(chunk.split())
metadata = {
Expand All @@ -116,29 +112,29 @@ async def process_and_store_documents(
'tags': request.get('tags', [])
}
all_metadatas.append(metadata)

# Accumulate word count
source_word_counts[source_id] = source_word_counts.get(source_id, 0) + word_count

# Yield control every 10 chunks to prevent event loop blocking
if i > 0 and i % 10 == 0:
await asyncio.sleep(0)

# Yield control after processing each document
if doc_index > 0 and doc_index % 5 == 0:
await asyncio.sleep(0)

# Create/update source record FIRST before storing documents
if all_contents and all_metadatas:
await self._create_source_records(
all_metadatas, all_contents, source_word_counts, request
)

safe_logfire_info(f"url_to_full_document keys: {list(url_to_full_document.keys())[:5]}")

# Log chunking results
safe_logfire_info(f"Document storage | documents={len(crawl_results)} | chunks={len(all_contents)} | avg_chunks_per_doc={len(all_contents)/len(crawl_results):.1f}")

Comment on lines 133 to +137
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Guard against division by zero in avg_chunks_per_doc logging

When crawl_results is empty, this will raise ZeroDivisionError.

-        safe_logfire_info(f"Document storage | documents={len(crawl_results)} | chunks={len(all_contents)} | avg_chunks_per_doc={len(all_contents)/len(crawl_results):.1f}")
+        doc_count = len(crawl_results)
+        avg = (len(all_contents) / doc_count) if doc_count else 0.0
+        safe_logfire_info(
+            f"Document storage | documents={doc_count} | chunks={len(all_contents)} | avg_chunks_per_doc={avg:.1f}"
+        )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
safe_logfire_info(f"url_to_full_document keys: {list(url_to_full_document.keys())[:5]}")
# Log chunking results
safe_logfire_info(f"Document storage | documents={len(crawl_results)} | chunks={len(all_contents)} | avg_chunks_per_doc={len(all_contents)/len(crawl_results):.1f}")
safe_logfire_info(f"url_to_full_document keys: {list(url_to_full_document.keys())[:5]}")
# Log chunking results
doc_count = len(crawl_results)
avg = (len(all_contents) / doc_count) if doc_count else 0.0
safe_logfire_info(
f"Document storage | documents={doc_count} | chunks={len(all_contents)} | avg_chunks_per_doc={avg:.1f}"
)
🤖 Prompt for AI Agents
In python/src/server/services/crawling/document_storage_operations.py around
lines 133 to 137, the logging expression computes avg_chunks_per_doc by dividing
len(all_contents) by len(crawl_results) which will raise a ZeroDivisionError
when crawl_results is empty; fix by computing a safe denominator (e.g., num_docs
= len(crawl_results)) and only perform the division if num_docs > 0, otherwise
use 0 or "N/A" for the average; update the safe_logfire_info call to include the
computed average variable (or the fallback) instead of performing the raw
division inline.

# Call add_documents_to_supabase with the correct parameters
await add_documents_to_supabase(
client=self.supabase_client,
Expand All @@ -153,17 +149,17 @@ async def process_and_store_documents(
provider=None, # Use configured provider
cancellation_check=cancellation_check # Pass cancellation check
)

# Calculate actual chunk count
chunk_count = len(all_contents)

return {
'chunk_count': chunk_count,
'total_word_count': sum(source_word_counts.values()),
'url_to_full_document': url_to_full_document,
'source_id': original_source_id
}

async def _create_source_records(
self,
all_metadatas: List[Dict],
Expand All @@ -184,23 +180,23 @@ async def _create_source_records(
unique_source_ids = set()
source_id_contents = {}
source_id_word_counts = {}

for i, metadata in enumerate(all_metadatas):
source_id = metadata['source_id']
unique_source_ids.add(source_id)

# Group content by source_id for better summaries
if source_id not in source_id_contents:
source_id_contents[source_id] = []
source_id_contents[source_id].append(all_contents[i])

# Track word counts per source_id
if source_id not in source_id_word_counts:
source_id_word_counts[source_id] = 0
source_id_word_counts[source_id] += metadata.get('word_count', 0)

safe_logfire_info(f"Found {len(unique_source_ids)} unique source_ids: {list(unique_source_ids)}")

# Create source records for ALL unique source_ids
for source_id in unique_source_ids:
# Get combined content for this specific source_id
Expand All @@ -211,15 +207,15 @@ async def _create_source_records(
combined_content += ' ' + chunk
else:
break

# Generate summary with fallback
try:
summary = extract_source_summary(source_id, combined_content)
except Exception as e:
safe_logfire_error(f"Failed to generate AI summary for '{source_id}': {str(e)}, using fallback")
# Fallback to simple summary
summary = f"Documentation from {source_id} - {len(source_contents)} pages crawled"

# Update source info in database BEFORE storing documents
safe_logfire_info(f"About to create/update source record for '{source_id}' (word count: {source_id_word_counts[source_id]})")
try:
Expand Down Expand Up @@ -257,7 +253,7 @@ async def _create_source_records(
except Exception as fallback_error:
safe_logfire_error(f"Both source creation attempts failed for '{source_id}': {str(fallback_error)}")
raise Exception(f"Unable to create source record for '{source_id}'. This will cause foreign key violations. Error: {str(fallback_error)}")

# Verify ALL source records exist before proceeding with document storage
if unique_source_ids:
for source_id in unique_source_ids:
Expand All @@ -269,9 +265,9 @@ async def _create_source_records(
except Exception as e:
safe_logfire_error(f"Source verification failed for '{source_id}': {str(e)}")
raise

safe_logfire_info(f"All {len(unique_source_ids)} source records verified - proceeding with document storage")

async def extract_and_store_code_examples(
self,
crawl_results: List[Dict],
Expand Down Expand Up @@ -300,5 +296,5 @@ async def extract_and_store_code_examples(
start_progress,
end_progress
)
return result

return result
2 changes: 1 addition & 1 deletion python/src/server/services/crawling/helpers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
__all__ = [
'URLHandler',
'SiteConfig'
]
]
Loading