diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000000..4862a5272e --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,22 @@ +# Changelog + +All notable changes to Archon are documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] + +## [0.1.1] - 2026-01-23 + +### Added +- Changelog for release tracking + +### Changed +- Version metadata bumped to 0.1.1 across backend, UI, and agent work orders +- README now shows the current release + +--- + +[Unreleased]: https://github.com/AeyeOps/archon/compare/v0.1.1...HEAD +[0.1.1]: https://github.com/AeyeOps/archon/compare/v0.1.0...v0.1.1 diff --git a/README.md b/README.md index c3815ee2e6..9696e76a0a 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,8 @@ --- +**Current Release**: v0.1.1 + ## ๐ŸŽฏ What is Archon? > Archon is currently in beta! Expect things to not work 100%, and please feel free to share any feedback and contribute with fixes/new features! Thank you to everyone for all the excitement we have for Archon already, as well as the bug reports, PRs, and discussions. It's a lot for our small team to get through but we're committed to addressing everything and making Archon into the best tool it possibly can be! diff --git a/archon-ui-main/package.json b/archon-ui-main/package.json index 9e1b4e642d..09423ecacc 100644 --- a/archon-ui-main/package.json +++ b/archon-ui-main/package.json @@ -1,6 +1,6 @@ { "name": "archon-ui", - "version": "0.1.0", + "version": "0.1.1", "private": true, "type": "module", "scripts": { diff --git a/archon-ui-main/src/App.tsx b/archon-ui-main/src/App.tsx index acb88734bc..6ec6df8c61 100644 --- a/archon-ui-main/src/App.tsx +++ b/archon-ui-main/src/App.tsx @@ -102,7 +102,7 @@ const AppContent = () => { return ( <> - + {/* Migration Banner - shows when backend is up but DB schema needs work */} @@ -141,4 +141,4 @@ export function App() { )} ); -} \ No newline at end of file +} diff --git a/migration/cleanup_stale_credentials.py b/migration/cleanup_stale_credentials.py new file mode 100644 index 0000000000..28aaab838b --- /dev/null +++ b/migration/cleanup_stale_credentials.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python3 +""" +Cleanup stale encrypted credentials that can't be decrypted with current SUPABASE_SERVICE_KEY. + +This script runs during bootstrap to prevent silent decryption failures when the +encryption key changes (e.g., fresh Supabase init, key rotation). + +Usage: + python cleanup_stale_credentials.py + +Environment variables: + DB_HOST, DB_PORT, DB_USER, DB_PASSWORD, DB_NAME - Database connection + SUPABASE_SERVICE_KEY - Used to derive encryption key +""" + +import base64 +import os +import sys + +import psycopg2 + +# Encryption imports - same as credential_service.py +from cryptography.fernet import Fernet, InvalidToken +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + + +def get_encryption_key() -> bytes: + """Generate encryption key from SUPABASE_SERVICE_KEY - mirrors credential_service.py""" + service_key = os.getenv("SUPABASE_SERVICE_KEY", "default-key-for-development") + + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=b"static_salt_for_credentials", # Must match credential_service.py + iterations=100000, + ) + key = base64.urlsafe_b64encode(kdf.derive(service_key.encode())) + return key + + +def try_decrypt(encrypted_value: str, fernet: Fernet) -> bool: + """Attempt to decrypt a value, return True if successful.""" + if not encrypted_value: + return True # Empty values are fine + + try: + encrypted_bytes = base64.urlsafe_b64decode(encrypted_value.encode("utf-8")) + fernet.decrypt(encrypted_bytes) + return True + except (InvalidToken, ValueError, Exception): + return False + + +def main(): + # Database connection from environment + db_config = { + "host": os.getenv("DB_HOST", "localhost"), + "port": int(os.getenv("DB_PORT", "5432")), + "user": os.getenv("DB_USER", "postgres"), + "password": os.getenv("DB_PASSWORD", "postgres"), + "dbname": os.getenv("DB_NAME", "postgres"), + } + + service_key = os.getenv("SUPABASE_SERVICE_KEY") + if not service_key: + print("โš  SUPABASE_SERVICE_KEY not set, skipping credential cleanup") + return 0 + + try: + conn = psycopg2.connect(**db_config) + conn.autocommit = True + cursor = conn.cursor() + except Exception as e: + print(f"โš  Could not connect to database: {e}") + return 0 # Non-fatal - migrations may not have run yet + + # Check if table exists + cursor.execute(""" + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_name = 'archon_settings' + ) + """) + if not cursor.fetchone()[0]: + print("โœ“ archon_settings table doesn't exist yet, skipping credential cleanup") + conn.close() + return 0 + + # Get all encrypted credentials + cursor.execute(""" + SELECT key, encrypted_value + FROM archon_settings + WHERE is_encrypted = true AND encrypted_value IS NOT NULL AND encrypted_value != '' + """) + encrypted_credentials = cursor.fetchall() + + if not encrypted_credentials: + print("โœ“ No encrypted credentials to validate") + conn.close() + return 0 + + # Create Fernet cipher with current key + try: + encryption_key = get_encryption_key() + fernet = Fernet(encryption_key) + except Exception as e: + print(f"โš  Could not create encryption cipher: {e}") + conn.close() + return 0 + + # Test each credential + stale_keys = [] + valid_count = 0 + + for key, encrypted_value in encrypted_credentials: + if try_decrypt(encrypted_value, fernet): + valid_count += 1 + else: + stale_keys.append(key) + + # Delete stale credentials + if stale_keys: + print(f"โš  Found {len(stale_keys)} stale encrypted credential(s) that cannot be decrypted:") + for key in stale_keys: + print(f" - {key}") + + # Delete them + cursor.execute( + "DELETE FROM archon_settings WHERE key = ANY(%s)", + (stale_keys,) + ) + print(f"โœ“ Deleted {len(stale_keys)} stale credential(s). Re-enter them in the Settings UI.") + + if valid_count > 0: + print(f"โœ“ Validated {valid_count} encrypted credential(s)") + + conn.close() + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/migration/complete_setup.sql b/migration/complete_setup.sql index 609d38c1d7..fed9edab0d 100644 --- a/migration/complete_setup.sql +++ b/migration/complete_setup.sql @@ -48,6 +48,22 @@ BEGIN END; $$ language 'plpgsql'; +DO $$ +BEGIN + IF EXISTS ( + SELECT 1 + FROM pg_trigger + WHERE tgname = 'update_archon_settings_updated_at' + AND tgrelid = 'archon_settings'::regclass + ) THEN + RAISE NOTICE 'Trigger update_archon_settings_updated_at already exists, dropping before recreation.'; + EXECUTE 'DROP TRIGGER update_archon_settings_updated_at ON archon_settings'; + ELSE + RAISE NOTICE 'Creating trigger update_archon_settings_updated_at.'; + END IF; +END +$$; + CREATE TRIGGER update_archon_settings_updated_at BEFORE UPDATE ON archon_settings FOR EACH ROW @@ -72,7 +88,8 @@ INSERT INTO archon_settings (key, value, is_encrypted, category, description) VA ('MCP_TRANSPORT', 'dual', false, 'server_config', 'MCP server transport mode - sse (web clients), stdio (IDE clients), or dual (both)'), ('HOST', 'localhost', false, 'server_config', 'Host to bind to if using sse as the transport (leave empty if using stdio)'), ('PORT', '8051', false, 'server_config', 'Port to listen on if using sse as the transport (leave empty if using stdio)'), -('MODEL_CHOICE', 'gpt-4.1-nano', false, 'rag_strategy', 'The LLM you want to use for summaries and contextual embeddings. Generally this is a very cheap and fast LLM like gpt-4.1-nano'); +('MODEL_CHOICE', 'gpt-4.1-nano', false, 'rag_strategy', 'The LLM you want to use for summaries and contextual embeddings. Generally this is a very cheap and fast LLM like gpt-4.1-nano') +ON CONFLICT (key) DO NOTHING; -- RAG Strategy Configuration (all default to true) INSERT INTO archon_settings (key, value, is_encrypted, category, description) VALUES @@ -80,12 +97,14 @@ INSERT INTO archon_settings (key, value, is_encrypted, category, description) VA ('CONTEXTUAL_EMBEDDINGS_MAX_WORKERS', '3', false, 'rag_strategy', 'Maximum parallel workers for contextual embedding generation (1-10)'), ('USE_HYBRID_SEARCH', 'true', false, 'rag_strategy', 'Combines vector similarity search with keyword search for better results'), ('USE_AGENTIC_RAG', 'true', false, 'rag_strategy', 'Enables code example extraction, storage, and specialized code search functionality'), -('USE_RERANKING', 'true', false, 'rag_strategy', 'Applies cross-encoder reranking to improve search result relevance'); +('USE_RERANKING', 'true', false, 'rag_strategy', 'Applies cross-encoder reranking to improve search result relevance') +ON CONFLICT (key) DO NOTHING; -- Monitoring Configuration INSERT INTO archon_settings (key, value, is_encrypted, category, description) VALUES ('LOGFIRE_ENABLED', 'true', false, 'monitoring', 'Enable or disable Pydantic Logfire logging and observability platform'), -('PROJECTS_ENABLED', 'true', false, 'features', 'Enable or disable Projects and Tasks functionality'); +('PROJECTS_ENABLED', 'true', false, 'features', 'Enable or disable Projects and Tasks functionality') +ON CONFLICT (key) DO NOTHING; -- Placeholder for sensitive credentials (to be added via Settings UI) INSERT INTO archon_settings (key, encrypted_value, is_encrypted, category, description) VALUES diff --git a/python/pyproject.toml b/python/pyproject.toml index 128e433290..919c5c2fea 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "archon" -version = "0.1.0" +version = "0.1.1" description = "Archon - the command center for AI coding assistants." readme = "README.md" requires-python = ">=3.12" diff --git a/python/src/agent_work_orders/__init__.py b/python/src/agent_work_orders/__init__.py index e0b7fb7820..4e5a655407 100644 --- a/python/src/agent_work_orders/__init__.py +++ b/python/src/agent_work_orders/__init__.py @@ -4,4 +4,4 @@ Provides workflow-based agent execution in isolated sandboxes. """ -__version__ = "0.1.0" +__version__ = "0.1.1" diff --git a/python/src/agent_work_orders/server.py b/python/src/agent_work_orders/server.py index d7aee851df..e76f2ff897 100644 --- a/python/src/agent_work_orders/server.py +++ b/python/src/agent_work_orders/server.py @@ -101,7 +101,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: app = FastAPI( title="Agent Work Orders API", description="Independent agent work order service for workflow-based agent execution", - version="0.1.0", + version="0.1.1", lifespan=lifespan, ) @@ -125,7 +125,7 @@ async def health_check() -> dict[str, Any]: health_status: dict[str, Any] = { "status": "healthy", "service": "agent-work-orders", - "version": "0.1.0", + "version": "0.1.1", "enabled": config.ENABLED, "dependencies": {}, } @@ -260,7 +260,7 @@ async def root() -> dict: """Root endpoint with service information""" return { "service": "agent-work-orders", - "version": "0.1.0", + "version": "0.1.1", "description": "Independent agent work order service", "docs": "/docs", "health": "/health", diff --git a/python/src/agents/document_agent.py b/python/src/agents/document_agent.py index 9e9c5fbfc7..597bb69c77 100644 --- a/python/src/agents/document_agent.py +++ b/python/src/agents/document_agent.py @@ -76,7 +76,7 @@ def _create_agent(self, **kwargs) -> Agent: agent = Agent( model=self.model, deps_type=DocumentDependencies, - result_type=DocumentOperation, + output_type=DocumentOperation, system_prompt="""You are a Document Management Assistant that helps users create, update, and modify project documents through conversation. **Your Capabilities:** diff --git a/python/src/mcp_server/mcp_server.py b/python/src/mcp_server/mcp_server.py index f4796acbaa..f67e62a9d7 100644 --- a/python/src/mcp_server/mcp_server.py +++ b/python/src/mcp_server/mcp_server.py @@ -14,11 +14,11 @@ API service and frontend, not through MCP tools. """ +import asyncio import json import logging import os import sys -import threading import time import traceback from collections.abc import AsyncIterator @@ -64,7 +64,10 @@ from src.server.services.mcp_session_manager import get_session_manager # Global initialization lock and flag -_initialization_lock = threading.Lock() +# NOTE: asyncio.Lock is event-loop bound - safe for single Uvicorn worker (FastMCP's +# intended deployment), but not across threads/loops. If multi-worker or cross-thread +# access is needed, use threading.Lock with run_in_executor for async compatibility. +_initialization_lock = asyncio.Lock() _initialization_complete = False _shared_context = None @@ -134,58 +137,65 @@ async def perform_health_checks(context: ArchonContext): async def lifespan(server: FastMCP) -> AsyncIterator[ArchonContext]: """ Lifecycle manager - no heavy dependencies. + Uses asyncio.Lock to avoid blocking the event loop. """ global _initialization_complete, _shared_context - # Quick check without lock - if _initialization_complete and _shared_context: + # Quick check without lock (use 'is not None' for safety) + if _initialization_complete and _shared_context is not None: logger.info("โ™ป๏ธ Reusing existing context for new SSE connection") yield _shared_context return - # Acquire lock for initialization - with _initialization_lock: - # Double-check pattern - if _initialization_complete and _shared_context: - logger.info("โ™ป๏ธ Reusing existing context for new SSE connection") - yield _shared_context - return + # Capture context locally to avoid race between lock release and yield + ctx: ArchonContext | None = None - logger.info("๐Ÿš€ Starting MCP server...") - - try: - # Initialize session manager - logger.info("๐Ÿ” Initializing session manager...") - session_manager = get_session_manager() - logger.info("โœ“ Session manager initialized") - - # Initialize service client for HTTP calls - logger.info("๐ŸŒ Initializing service client...") - service_client = get_mcp_service_client() - logger.info("โœ“ Service client initialized") - - # Create context - context = ArchonContext(service_client=service_client) - - # Perform initial health check - await perform_health_checks(context) - - logger.info("โœ“ MCP server ready") - - # Store context globally - _shared_context = context - _initialization_complete = True - - yield context - - except Exception as e: - logger.error(f"๐Ÿ’ฅ Critical error in lifespan setup: {e}") - logger.error(traceback.format_exc()) - raise - finally: - # Clean up resources - logger.info("๐Ÿงน Cleaning up MCP server...") - logger.info("โœ… MCP server shutdown complete") + # Acquire async lock for initialization (doesn't block event loop) + async with _initialization_lock: + # Double-check pattern (use 'is not None' for safety) + if _initialization_complete and _shared_context is not None: + logger.info("โ™ป๏ธ Reusing existing context for new SSE connection (after lock)") + ctx = _shared_context + else: + logger.info("๐Ÿš€ Starting MCP server...") + + try: + # Initialize session manager + # NOTE: These sync calls run inside the lock. They don't cause deadlock + # (asyncio.Lock yields to other coroutines), but slow init here delays + # concurrent connection attempts. Consider run_in_executor if these + # become I/O-bound bottlenecks under load. + logger.info("๐Ÿ” Initializing session manager...") + session_manager = get_session_manager() + logger.info("โœ“ Session manager initialized") + + # Initialize service client for HTTP calls + logger.info("๐ŸŒ Initializing service client...") + service_client = get_mcp_service_client() + logger.info("โœ“ Service client initialized") + + # Create context + ctx = ArchonContext(service_client=service_client) + + # Perform initial health check + await perform_health_checks(ctx) + + logger.info("โœ“ MCP server ready") + + # Store context globally (assign last for atomicity) + _shared_context = ctx + _initialization_complete = True + + except Exception as e: + logger.error(f"๐Ÿ’ฅ Critical error in lifespan setup: {e}") + logger.error(traceback.format_exc()) + raise + + # Yield outside the lock to allow concurrent connections + if ctx is not None: + yield ctx + else: + raise RuntimeError("MCP context initialization failed") # Define MCP instructions for Claude Code and other clients diff --git a/python/src/server/api_routes/knowledge_api.py b/python/src/server/api_routes/knowledge_api.py index 052f75216e..3e9a5ada2c 100644 --- a/python/src/server/api_routes/knowledge_api.py +++ b/python/src/server/api_routes/knowledge_api.py @@ -1035,7 +1035,10 @@ def check_upload_cancellation(): # Create progress callback for tracking document processing async def document_progress_callback( - message: str, percentage: int, batch_info: dict = None + message: str, + percentage: int, + batch_info: dict | None = None, + **extra_fields, ): """Progress callback for tracking document processing""" # Map the document storage progress to overall progress range @@ -1047,7 +1050,8 @@ async def document_progress_callback( progress=mapped_percentage, log=message, currentUrl=f"file://{filename}", - **(batch_info or {}) + **(batch_info or {}), + **extra_fields, ) @@ -1256,9 +1260,9 @@ async def get_database_metrics(): async def knowledge_health(): """Knowledge API health check with migration detection.""" # Check for database migration needs - from ..main import _check_database_schema + from ..main import _check_database_schema_cached - schema_status = await _check_database_schema() + schema_status = await _check_database_schema_cached() if not schema_status["valid"]: return { "status": "migration_required", diff --git a/python/src/server/config/config.py b/python/src/server/config/config.py index df03503747..193c15a6be 100644 --- a/python/src/server/config/config.py +++ b/python/src/server/config/config.py @@ -136,9 +136,14 @@ def validate_supabase_url(url: str) -> bool: if parsed.scheme == "http": hostname = parsed.hostname or "" - # Check for exact localhost and Docker internal hosts (security: prevent subdomain bypass) + # Check for exact localhost, Docker internal hosts, and Supabase network names + # (security: prevent subdomain bypass by using exact matches or known suffixes) local_hosts = ["localhost", "127.0.0.1", "host.docker.internal"] - if hostname in local_hosts or hostname.endswith(".localhost"): + if ( + hostname in local_hosts + or hostname.endswith(".localhost") + or hostname.endswith("_supabase") # Docker Compose Supabase containers + ): return True # Check if hostname is a private IP address diff --git a/python/src/server/config/version.py b/python/src/server/config/version.py index 97b743027a..5af780d807 100644 --- a/python/src/server/config/version.py +++ b/python/src/server/config/version.py @@ -4,7 +4,7 @@ # Current version of Archon # Update this with each release -ARCHON_VERSION = "0.1.0" +ARCHON_VERSION = "0.1.1" # Repository information for GitHub API GITHUB_REPO_OWNER = "coleam00" diff --git a/python/src/server/main.py b/python/src/server/main.py index b7d272a6bd..9a81d0f2db 100644 --- a/python/src/server/main.py +++ b/python/src/server/main.py @@ -216,10 +216,16 @@ async def root(): # Health check endpoint @app.get("/health") async def health_check(response: Response): - """Health check endpoint that indicates true readiness including credential loading.""" + """ + Non-blocking health check that responds immediately without I/O. + + This endpoint checks process state without waiting on database or other I/O operations + to prevent health check timeouts when the server is under load or experiencing issues. + """ from datetime import datetime + import asyncio - # Check if initialization is complete + # Check if initialization is complete (synchronous check - fast) if not _initialization_complete: response.status_code = 503 # Service Unavailable return { @@ -230,9 +236,31 @@ async def health_check(response: Response): "ready": False, } - # Check for required database schema - schema_status = await _check_database_schema() - if not schema_status["valid"]: + # Check schema status with timeout - don't block health check + try: + schema_status = await asyncio.wait_for( + _check_database_schema_cached(), + timeout=2.0 # 2 second max - health check must respond quickly + ) + except asyncio.TimeoutError: + # Timeout means we can't verify schema, but server is responsive + api_logger.warning("Health check: schema validation timed out (server is responsive but database check slow)") + schema_status = { + "valid": True, # Assume valid to keep health check passing + "message": "Schema check timed out - assumed valid", + "timeout": True + } + except Exception as e: + # Any error - log but don't fail health check + api_logger.warning(f"Health check: schema check failed: {e}") + schema_status = { + "valid": True, # Assume valid to keep server running + "message": f"Schema check error: {type(e).__name__}", + "error": True + } + + # Only return unhealthy if schema is definitely invalid (not just timeout/error) + if schema_status.get("valid") is False and not schema_status.get("timeout") and not schema_status.get("error"): response.status_code = 503 # Service Unavailable return { "status": "migration_required", @@ -251,7 +279,8 @@ async def health_check(response: Response): "timestamp": datetime.now().isoformat(), "ready": True, "credentials_loaded": True, - "schema_valid": True, + "schema_valid": schema_status.get("valid", True), + "schema_check_timeout": schema_status.get("timeout", False), } @@ -265,20 +294,50 @@ async def api_health_check(response: Response): # Cache schema check result to avoid repeated database queries _schema_check_cache = {"valid": None, "checked_at": 0} -async def _check_database_schema(): - """Check if required database schema exists - only for existing users who need migration.""" +async def _check_database_schema_cached(): + """ + Check if required database schema exists with caching. + + Returns immediately from cache when possible. Only checks database if: + - Never checked before + - Previous check was inconclusive (error) + - Cached failure result is older than 30 seconds + + This function is designed to be fast and non-blocking for health checks. + """ import time + import asyncio - # If we've already confirmed schema is valid, don't check again + # If we've already confirmed schema is valid, return immediately (no I/O) if _schema_check_cache["valid"] is True: return {"valid": True, "message": "Schema is up to date (cached)"} - # If we recently failed, don't spam the database (wait at least 30 seconds) + # If we recently failed, return cached result immediately (no I/O) current_time = time.time() if (_schema_check_cache["valid"] is False and current_time - _schema_check_cache["checked_at"] < 30): return _schema_check_cache["result"] + # Need to check database - wrap in timeout to prevent blocking + try: + # Run database check with timeout + result = await asyncio.wait_for( + _perform_database_schema_check(current_time), + timeout=1.5 # Database check itself must complete in 1.5s + ) + return result + except asyncio.TimeoutError: + # Database check timed out - don't cache, allow retry + api_logger.warning("Database schema check timed out") + return { + "valid": True, # Assume valid to prevent health check failures + "message": "Database check timed out", + "timeout": True + } + + +async def _perform_database_schema_check(current_time: float): + """Perform the actual database schema check (can be slow).""" try: from .services.client_manager import get_supabase_client diff --git a/python/src/server/services/crawling/discovery_service.py b/python/src/server/services/crawling/discovery_service.py index 103a277296..f1c3230f8f 100644 --- a/python/src/server/services/crawling/discovery_service.py +++ b/python/src/server/services/crawling/discovery_service.py @@ -290,16 +290,24 @@ def _resolve_and_validate_hostname(self, hostname: str) -> bool: logger.warning(f"Error resolving hostname {hostname}: {e}") return False + # Expected content types for discovery files (to detect soft 404s) + EXPECTED_CONTENT_TYPES = { + '.txt': ['text/plain', 'text/markdown', 'text/x-markdown'], + '.md': ['text/plain', 'text/markdown', 'text/x-markdown'], + '.xml': ['text/xml', 'application/xml', 'application/rss+xml', 'application/atom+xml'], + } + def _check_url_exists(self, url: str) -> bool: """ Check if a URL exists and returns a successful response. Includes SSRF protection by validating hostnames and blocking private IPs. + Also validates content-type to detect soft 404s (HTML pages returned for missing files). Args: url: URL to check Returns: - True if URL returns 200, False otherwise + True if URL returns 200 with valid content-type, False otherwise """ try: # Parse URL to extract hostname @@ -358,9 +366,28 @@ def _check_url_exists(self, url: str) -> bool: return False # Check response status - success = resp.status_code == 200 - logger.debug(f"URL check: {url} -> {resp.status_code} ({'exists' if success else 'not found'})") - return success + if resp.status_code != 200: + logger.debug(f"URL check: {url} -> {resp.status_code} (not found)") + return False + + # Validate content-type to detect soft 404s + content_type = resp.headers.get('content-type', '').lower().split(';')[0].strip() + url_path = parsed.path.lower() + + # Check if URL has an extension we should validate + for ext, valid_types in self.EXPECTED_CONTENT_TYPES.items(): + if url_path.endswith(ext): + if content_type and content_type not in valid_types: + # Soft 404: server returned HTML for a text/xml file + if content_type == 'text/html': + logger.info(f"Soft 404 detected: {url} returned text/html instead of {valid_types}") + return False + # Log warning but allow other content types + logger.debug(f"Unexpected content-type for {url}: {content_type} (expected {valid_types})") + break + + logger.debug(f"URL check: {url} -> 200 (exists, content-type: {content_type})") + return True finally: if hasattr(resp, 'close'): diff --git a/python/src/server/services/credential_service.py b/python/src/server/services/credential_service.py index f4fb275be9..a1a0d01b7f 100644 --- a/python/src/server/services/credential_service.py +++ b/python/src/server/services/credential_service.py @@ -5,6 +5,7 @@ Credentials include API keys, service credentials, and application configuration. """ +import asyncio import base64 import os import re @@ -17,6 +18,7 @@ from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC +from postgrest.exceptions import APIError from supabase import Client, create_client from ..config.logfire_config import get_logger @@ -125,37 +127,50 @@ def _decrypt_value(self, encrypted_value: str) -> str: async def load_all_credentials(self) -> dict[str, Any]: """Load all credentials from database and cache them.""" - try: - supabase = self._get_supabase_client() - - # Fetch all credentials - result = supabase.table("archon_settings").select("*").execute() - - credentials = {} - for item in result.data: - key = item["key"] - if item["is_encrypted"] and item["encrypted_value"]: - # For encrypted values, we store the encrypted version - # Decryption happens when the value is actually needed - credentials[key] = { - "encrypted_value": item["encrypted_value"], - "is_encrypted": True, - "category": item["category"], - "description": item["description"], - } - else: - # Plain text values - credentials[key] = item["value"] + supabase = self._get_supabase_client() + retries = 5 + delay = 2 - self._cache = credentials - self._cache_initialized = True - logger.info(f"Loaded {len(credentials)} credentials from database") - - return credentials + for attempt in range(1, retries + 1): + try: + result = supabase.table("archon_settings").select("*").execute() + + credentials = {} + for item in result.data: + key = item["key"] + if item["is_encrypted"] and item["encrypted_value"]: + credentials[key] = { + "encrypted_value": item["encrypted_value"], + "is_encrypted": True, + "category": item["category"], + "description": item["description"], + } + else: + credentials[key] = item["value"] + + self._cache = credentials + self._cache_initialized = True + logger.info(f"Loaded {len(credentials)} credentials from database") + return credentials + + except APIError as api_error: + if api_error.code == "PGRST205" and attempt < retries: + logger.warning( + "Supabase schema cache not ready (attempt %s/%s); retrying in %ss", + attempt, + retries, + delay, + ) + await asyncio.sleep(delay) + delay = min(delay * 2, 10) + continue + logger.error(f"Error loading credentials: {api_error}") + raise + except Exception as e: + logger.error(f"Error loading credentials: {e}") + raise - except Exception as e: - logger.error(f"Error loading credentials: {e}") - raise + raise RuntimeError("Failed to load credentials after retries") async def get_credential(self, key: str, default: Any = None, decrypt: bool = True) -> Any: """Get a credential value by key.""" diff --git a/python/src/server/services/storage/code_storage_service.py b/python/src/server/services/storage/code_storage_service.py index c38918e7f7..de708feea1 100644 --- a/python/src/server/services/storage/code_storage_service.py +++ b/python/src/server/services/storage/code_storage_service.py @@ -573,7 +573,7 @@ def _get_setting_fallback(key: str, default: str) -> str: return grouped_blocks -def generate_code_example_summary( +async def generate_code_example_summary( code: str, context_before: str, context_after: str, language: str = "", provider: str = None ) -> dict[str, str]: """ @@ -589,10 +589,8 @@ def generate_code_example_summary( Returns: A dictionary with 'summary' and 'example_name' """ - import asyncio - - # Run the async version in the current thread - return asyncio.run(_generate_code_example_summary_async(code, context_before, context_after, language, provider)) + # Call the async version directly (no event loop creation needed) + return await _generate_code_example_summary_async(code, context_before, context_after, language, provider) async def _generate_code_example_summary_async( diff --git a/python/src/server/services/storage/document_storage_service.py b/python/src/server/services/storage/document_storage_service.py index 898417581b..f9f5c67fd9 100644 --- a/python/src/server/services/storage/document_storage_service.py +++ b/python/src/server/services/storage/document_storage_service.py @@ -11,6 +11,12 @@ from ...config.logfire_config import safe_span, search_logger from ..embeddings.contextual_embedding_service import generate_contextual_embeddings_batch from ..embeddings.embedding_service import create_embeddings_batch +from .embedding_schema_support import ( + determine_embedding_column, + legacy_column_in_use, + note_multi_dim_success, + should_retry_with_legacy_column, +) async def add_documents_to_supabase( @@ -385,20 +391,14 @@ async def embedding_progress_wrapper(message: str, percentage: float): # Determine the correct embedding column based on dimension embedding_dim = len(embedding) if isinstance(embedding, list) else len(embedding.tolist()) - embedding_column = None - - if embedding_dim == 768: - embedding_column = "embedding_768" - elif embedding_dim == 1024: - embedding_column = "embedding_1024" - elif embedding_dim == 1536: - embedding_column = "embedding_1536" - elif embedding_dim == 3072: - embedding_column = "embedding_3072" - else: - # Default to closest supported dimension - search_logger.warning(f"Unsupported embedding dimension {embedding_dim}, using embedding_1536") - embedding_column = "embedding_1536" + embedding_column = determine_embedding_column(embedding_dim) + if ( + not legacy_column_in_use() + and embedding_column != f"embedding_{embedding_dim}" + ): + search_logger.warning( + f"Unsupported embedding dimension {embedding_dim}, using {embedding_column}" + ) # Get page_id for this URL if available page_id = url_to_page_id.get(batch_urls[j]) if url_to_page_id else None @@ -439,7 +439,12 @@ async def embedding_progress_wrapper(message: str, percentage: float): raise try: - client.table("archon_crawled_pages").insert(batch_data).execute() + client.table("archon_crawled_pages").upsert( + batch_data, + on_conflict="url,chunk_number" + ).execute() + if not legacy_column_in_use(): + note_multi_dim_success() total_chunks_stored += len(batch_data) # Increment completed batches and report simple progress @@ -468,6 +473,9 @@ async def embedding_progress_wrapper(message: str, percentage: float): break except Exception as e: + if should_retry_with_legacy_column(e, batch_data): + retry_delay = 1.0 + continue if retry < max_retries - 1: search_logger.warning( f"Error inserting batch (attempt {retry + 1}/{max_retries}): {e}" @@ -497,7 +505,12 @@ async def embedding_progress_wrapper(message: str, percentage: float): raise try: - client.table("archon_crawled_pages").insert(record).execute() + client.table("archon_crawled_pages").upsert( + record, + on_conflict="url,chunk_number" + ).execute() + if not legacy_column_in_use(): + note_multi_dim_success() successful_inserts += 1 total_chunks_stored += 1 except Exception as individual_error: diff --git a/python/src/server/services/storage/embedding_schema_support.py b/python/src/server/services/storage/embedding_schema_support.py new file mode 100644 index 0000000000..71ee518b97 --- /dev/null +++ b/python/src/server/services/storage/embedding_schema_support.py @@ -0,0 +1,104 @@ +"""Embedding schema compatibility helpers. + +Some deployments still rely on the original single-column `embedding` layout while +newer migrations expose dedicated columns per vector dimension. These helpers +allow storage services to detect missing dimension columns at runtime and fall +back to the legacy column without crashing uploads. +""" + +from __future__ import annotations + +from collections.abc import Iterable, MutableMapping + +from ...config.logfire_config import search_logger + +# Column names used in the upgraded schema +DIMENSION_COLUMNS: tuple[str, ...] = ( + "embedding_384", + "embedding_768", + "embedding_1024", + "embedding_1536", + "embedding_3072", +) +LEGACY_COLUMN = "embedding" + +# Cache whether the database supports the upgraded schema. "None" means we +# haven't determined support yet, ``True`` means the per-dimension columns exist, +# and ``False`` means we must fall back to the legacy column. +_multi_dim_schema_supported: bool | None = None + +# Mapping of common embedding vector sizes to their dedicated columns. +_DIMENSION_TO_COLUMN = { + 384: "embedding_384", + 768: "embedding_768", + 1024: "embedding_1024", + 1536: "embedding_1536", + 3072: "embedding_3072", +} + + +def determine_embedding_column(dimension: int) -> str: + """Return the appropriate column name for a given embedding dimension.""" + + if _multi_dim_schema_supported is False: + return LEGACY_COLUMN + + # Default to the OpenAI dimension column when the exact size is unknown. + return _DIMENSION_TO_COLUMN.get(dimension, _DIMENSION_TO_COLUMN.get(1536, LEGACY_COLUMN)) + + +def note_multi_dim_success() -> None: + """Record that the upgraded schema appears to be available.""" + + global _multi_dim_schema_supported + if _multi_dim_schema_supported is None: + _multi_dim_schema_supported = True + + +def should_retry_with_legacy_column(error: Exception, records: Iterable[MutableMapping[str, object]]) -> bool: + """Inspect an error and decide whether to retry using the legacy column. + + When the PostgREST API returns a message like + "Could not find the 'embedding_1536' column", we switch the in-memory + records to use the legacy ``embedding`` column and ask the caller to retry. + """ + + message = str(error) + if not any(column in message for column in DIMENSION_COLUMNS): + return False + + _mark_schema_as_legacy() + converted = _convert_records_to_legacy(records) + + if converted: + search_logger.warning( + "Multi-dimensional embedding columns unavailable. Falling back to legacy 'embedding' column." + ) + return True + + return False + + +def _mark_schema_as_legacy() -> None: + global _multi_dim_schema_supported + _multi_dim_schema_supported = False + + +def _convert_records_to_legacy(records: Iterable[MutableMapping[str, object]]) -> bool: + """Move any per-dimension embeddings to the legacy column.""" + + converted = False + for record in records: + for column in DIMENSION_COLUMNS: + if column in record: + # Do not clobber an existing legacy value if the caller already set it. + record.setdefault(LEGACY_COLUMN, record.pop(column)) + converted = True + return converted + + +def legacy_column_in_use() -> bool: + """Expose whether we are currently using the legacy embedding column.""" + + return _multi_dim_schema_supported is False +