diff --git a/archon-ui-main/src/components/knowledge-base/DocumentBrowser.tsx b/archon-ui-main/src/components/knowledge-base/DocumentBrowser.tsx new file mode 100644 index 0000000000..5e330e83d2 --- /dev/null +++ b/archon-ui-main/src/components/knowledge-base/DocumentBrowser.tsx @@ -0,0 +1,325 @@ +import React, { useState, useEffect, useMemo } from 'react'; +import { createPortal } from 'react-dom'; +import { Search, Filter, FileText, Globe, X } from 'lucide-react'; +import { motion, AnimatePresence } from 'framer-motion'; +import { Badge } from '../ui/Badge'; +import { Button } from '../ui/Button'; +import { knowledgeBaseService } from '../../services/knowledgeBaseService'; + +interface DocumentChunk { + id: string; + source_id: string; + content: string; + metadata?: any; + url?: string; +} + +interface DocumentBrowserProps { + sourceId: string; + isOpen: boolean; + onClose: () => void; +} + +const extractDomain = (url: string): string => { + try { + const urlObj = new URL(url); + const hostname = urlObj.hostname; + + // Remove 'www.' prefix if present + const withoutWww = hostname.startsWith('www.') ? hostname.slice(4) : hostname; + + // For domains with subdomains, extract the main domain (last 2 parts) + const parts = withoutWww.split('.'); + if (parts.length > 2) { + // Return the main domain (last 2 parts: domain.tld) + return parts.slice(-2).join('.'); + } + + return withoutWww; + } catch { + return url; // Return original if URL parsing fails + } +}; + +export const DocumentBrowser: React.FC = ({ + sourceId, + isOpen, + onClose, +}) => { + const [chunks, setChunks] = useState([]); + const [loading, setLoading] = useState(true); + const [searchQuery, setSearchQuery] = useState(''); + const [selectedDomain, setSelectedDomain] = useState('all'); + const [selectedChunkId, setSelectedChunkId] = useState(null); + const [error, setError] = useState(null); + + // Extract unique domains from chunks + const domains = useMemo(() => { + const domainSet = new Set(); + chunks.forEach(chunk => { + if (chunk.url) { + domainSet.add(extractDomain(chunk.url)); + } + }); + return Array.from(domainSet).sort(); + }, [chunks]); + + // Filter chunks based on search and domain + const filteredChunks = useMemo(() => { + return chunks.filter(chunk => { + // Search filter + const searchLower = searchQuery.toLowerCase(); + const searchMatch = !searchQuery || + chunk.content.toLowerCase().includes(searchLower) || + chunk.url?.toLowerCase().includes(searchLower); + + // Domain filter + const domainMatch = selectedDomain === 'all' || + (chunk.url && extractDomain(chunk.url) === selectedDomain); + + return searchMatch && domainMatch; + }); + }, [chunks, searchQuery, selectedDomain]); + + // Get selected chunk + const selectedChunk = useMemo(() => { + return filteredChunks.find(chunk => chunk.id === selectedChunkId) || filteredChunks[0]; + }, [filteredChunks, selectedChunkId]); + + // Load chunks when component opens + useEffect(() => { + if (isOpen && sourceId) { + loadChunks(); + } + }, [isOpen, sourceId]); + + const loadChunks = async () => { + try { + setLoading(true); + setError(null); + + const response = await knowledgeBaseService.getKnowledgeItemChunks(sourceId); + + if (response.success) { + setChunks(response.chunks); + // Auto-select first chunk if none selected + if (response.chunks.length > 0 && !selectedChunkId) { + setSelectedChunkId(response.chunks[0].id); + } + } else { + setError('Failed to load document chunks'); + } + } catch (error) { + console.error('Failed to load chunks:', error); + setError(error instanceof Error ? error.message : 'Failed to load document chunks'); + } finally { + setLoading(false); + } + }; + + const loadChunksWithDomainFilter = async (domain: string) => { + try { + setLoading(true); + setError(null); + + const domainFilter = domain === 'all' ? undefined : domain; + const response = await knowledgeBaseService.getKnowledgeItemChunks(sourceId, domainFilter); + + if (response.success) { + setChunks(response.chunks); + } else { + setError('Failed to load document chunks'); + } + } catch (error) { + console.error('Failed to load chunks with domain filter:', error); + setError(error instanceof Error ? error.message : 'Failed to load document chunks'); + } finally { + setLoading(false); + } + }; + + const handleDomainChange = (domain: string) => { + setSelectedDomain(domain); + // Note: We could reload with server-side filtering, but for now we'll do client-side filtering + // loadChunksWithDomainFilter(domain); + }; + + if (!isOpen) return null; + + return createPortal( + + e.stopPropagation()} + > + {/* Blue accent line at the top */} +
+ + {/* Sidebar */} +
+ {/* Sidebar Header */} +
+
+

+ Document Chunks ({(filteredChunks || []).length}) +

+
+ + {/* Search */} +
+ + setSearchQuery(e.target.value)} + className="w-full pl-10 pr-3 py-2 bg-gray-900/70 border border-gray-800 rounded-lg text-sm text-gray-300 placeholder-gray-600 focus:outline-none focus:border-blue-500/50 focus:ring-1 focus:ring-blue-500/20 transition-all" + /> +
+ + {/* Domain Filter */} +
+ + +
+
+ + {/* Document List */} +
+ {filteredChunks.length === 0 ? ( +
+ No documents found +
+ ) : ( + filteredChunks.map((chunk, index) => ( + + )) + )} +
+
+ + {/* Main Content Area */} +
+ {/* Header */} +
+
+

+ {selectedChunk ? `Document Chunk` : 'Document Browser'} +

+ {selectedChunk?.url && ( + + + {extractDomain(selectedChunk.url)} + + )} +
+ +
+ + {/* Content */} +
+ {loading ? ( +
+
+
+

Loading document chunks...

+
+
+ ) : !selectedChunk || filteredChunks.length === 0 ? ( +
+
+ +

Select a document chunk to view content

+
+
+ ) : ( +
+
+
+ {selectedChunk.url && ( +
+ {selectedChunk.url} +
+ )} + +
+
+ {selectedChunk.content || 'No content available'} +
+
+ + {selectedChunk.metadata && ( +
+
+ + View Metadata + +
+                            {JSON.stringify(selectedChunk.metadata, null, 2)}
+                          
+
+
+ )} +
+
+
+ )} +
+
+
+
, + document.body + ); +}; \ No newline at end of file diff --git a/archon-ui-main/src/components/knowledge-base/KnowledgeItemCard.tsx b/archon-ui-main/src/components/knowledge-base/KnowledgeItemCard.tsx index ede711709b..cf53c54bf8 100644 --- a/archon-ui-main/src/components/knowledge-base/KnowledgeItemCard.tsx +++ b/archon-ui-main/src/components/knowledge-base/KnowledgeItemCard.tsx @@ -129,6 +129,7 @@ interface KnowledgeItemCardProps { onDelete: (sourceId: string) => void; onUpdate?: () => void; onRefresh?: (sourceId: string) => void; + onBrowseDocuments?: (sourceId: string) => void; isSelectionMode?: boolean; isSelected?: boolean; onToggleSelection?: (event: React.MouseEvent) => void; @@ -139,6 +140,7 @@ export const KnowledgeItemCard = ({ onDelete, onUpdate, onRefresh, + onBrowseDocuments, isSelectionMode = false, isSelected = false, onToggleSelection @@ -444,13 +446,20 @@ export const KnowledgeItemCard = ({ )} - {/* Page count - orange neon container */} + {/* Page count - orange neon container (clickable for document browser) */}
{ + e.stopPropagation(); + if (onBrowseDocuments) { + onBrowseDocuments(item.source_id); + } + }} onMouseEnter={() => setShowPageTooltip(true)} onMouseLeave={() => setShowPageTooltip(false)} + title="Click to browse document chunks" > -
+
{Math.ceil( @@ -461,10 +470,13 @@ export const KnowledgeItemCard = ({ {/* Page count tooltip - positioned relative to the badge */} {showPageTooltip && (
-
- {(item.metadata.word_count || 0).toLocaleString()} words +
+ Click to Browse Documents
+
+ {(item.metadata.word_count || 0).toLocaleString()} words +
= {Math.ceil((item.metadata.word_count || 0) / 250).toLocaleString()} pages
@@ -517,6 +529,7 @@ export const KnowledgeItemCard = ({ }} /> )} +
); }; \ No newline at end of file diff --git a/archon-ui-main/src/pages/KnowledgeBasePage.tsx b/archon-ui-main/src/pages/KnowledgeBasePage.tsx index dccc55221b..b958f5f31a 100644 --- a/archon-ui-main/src/pages/KnowledgeBasePage.tsx +++ b/archon-ui-main/src/pages/KnowledgeBasePage.tsx @@ -1,5 +1,5 @@ import { useEffect, useState, useRef, useMemo } from 'react'; -import { Search, Grid, Plus, Upload, Link as LinkIcon, Brain, Filter, BoxIcon, List, BookOpen, CheckSquare } from 'lucide-react'; +import { Search, Grid, Plus, Link as LinkIcon, Brain, Filter, BoxIcon, List, BookOpen, CheckSquare, ChevronDown, X } from 'lucide-react'; import { motion, AnimatePresence } from 'framer-motion'; import { Card } from '../components/ui/Card'; import { Button } from '../components/ui/Button'; @@ -70,6 +70,7 @@ export const KnowledgeBasePage = () => { const [isSelectionMode, setIsSelectionMode] = useState(false); const [lastSelectedIndex, setLastSelectedIndex] = useState(null); + const { showToast } = useToast(); // Single consolidated loading function - only loads data, no filtering @@ -301,6 +302,7 @@ export const KnowledgeBasePage = () => { const handleAddKnowledge = () => { setIsAddModalOpen(true); }; + // Selection handlers const toggleSelectionMode = () => { @@ -566,9 +568,7 @@ export const KnowledgeBasePage = () => { crawlProgressService.stopStreaming(data.progressId); // Show success toast - const message = data.uploadType === 'document' - ? `Document "${data.fileName}" uploaded successfully!` - : `Crawling completed for ${data.currentUrl}!`; + const message = `Crawling completed for ${data.currentUrl}!`; showToast(message, 'success'); // Remove from progress items after a brief delay to show completion @@ -636,8 +636,8 @@ export const KnowledgeBasePage = () => { return; } - // Check if we have original crawl parameters, or at least a URL to retry - if (!progressItem.originalCrawlParams && !progressItem.originalUploadParams && !progressItem.currentUrl) { + // Check if we have original crawl parameters or URL to retry + if (!progressItem.originalCrawlParams && !progressItem.currentUrl) { showToast('Cannot retry: no URL or parameters found. Please start a new crawl manually.', 'warning'); return; } @@ -677,35 +677,6 @@ export const KnowledgeBasePage = () => { showToast('Crawl completed immediately', 'success'); loadKnowledgeItems(); } - } else if (progressItem.originalUploadParams) { - // Retry upload - showToast('Retrying upload...', 'info'); - - const formData = new FormData(); - formData.append('file', progressItem.originalUploadParams.file); - formData.append('knowledge_type', progressItem.originalUploadParams.knowledge_type || 'technical'); - - if (progressItem.originalUploadParams.tags && progressItem.originalUploadParams.tags.length > 0) { - formData.append('tags', JSON.stringify(progressItem.originalUploadParams.tags)); - } - - const result = await knowledgeBaseService.uploadDocument(formData); - - if ((result as any).progressId) { - // Start progress tracking with original parameters preserved - await handleStartCrawl((result as any).progressId, { - currentUrl: `file://${progressItem.originalUploadParams.file.name}`, - uploadType: 'document', - fileName: progressItem.originalUploadParams.file.name, - fileType: progressItem.originalUploadParams.file.type, - originalUploadParams: progressItem.originalUploadParams - }); - - showToast('Upload restarted successfully', 'success'); - } else { - showToast('Upload completed immediately', 'success'); - loadKnowledgeItems(); - } } else if (progressItem.currentUrl && !progressItem.currentUrl.startsWith('file://')) { // Fallback: retry with currentUrl using default parameters showToast('Retrying with basic parameters...', 'info'); @@ -1212,15 +1183,25 @@ const AddKnowledgeModal = ({ onSuccess, onStartCrawl }: AddKnowledgeModalProps) => { - const [method, setMethod] = useState<'url' | 'file'>('url'); const [url, setUrl] = useState(''); const [tags, setTags] = useState([]); const [newTag, setNewTag] = useState(''); const [knowledgeType, setKnowledgeType] = useState<'technical' | 'business'>('technical'); - const [selectedFile, setSelectedFile] = useState(null); const [loading, setLoading] = useState(false); const [crawlDepth, setCrawlDepth] = useState(2); const [showDepthTooltip, setShowDepthTooltip] = useState(false); + + // Advanced domain configuration + const [showAdvancedConfig, setShowAdvancedConfig] = useState(false); + const [allowedDomains, setAllowedDomains] = useState([]); + const [excludedDomains, setExcludedDomains] = useState([]); + const [includePatterns, setIncludePatterns] = useState([]); + const [excludePatterns, setExcludePatterns] = useState([]); + const [newAllowedDomain, setNewAllowedDomain] = useState(''); + const [newExcludedDomain, setNewExcludedDomain] = useState(''); + const [newIncludePattern, setNewIncludePattern] = useState(''); + const [newExcludePattern, setNewExcludePattern] = useState(''); + const { showToast } = useToast(); // URL validation function that checks domain existence @@ -1288,99 +1269,92 @@ const AddKnowledgeModal = ({ } }; + // Helper functions for domain/pattern management + const parseDomainList = (input: string): string[] => { + return input.split(',') + .map(domain => domain.trim()) + .filter(domain => domain.length > 0); + }; + + const addDomainsToList = (input: string, currentList: string[], setter: (list: string[]) => void) => { + const domains = parseDomainList(input); + const newDomains = domains.filter(domain => !currentList.includes(domain)); + if (newDomains.length > 0) { + setter([...currentList, ...newDomains]); + } + }; + + const removeDomainFromList = (domain: string, currentList: string[], setter: (list: string[]) => void) => { + setter(currentList.filter(d => d !== domain)); + }; + + const handleDomainInputKeyDown = (e: React.KeyboardEvent, input: string, currentList: string[], setter: (list: string[]) => void, inputSetter: (value: string) => void) => { + if (e.key === 'Enter' && input.trim()) { + addDomainsToList(input, currentList, setter); + inputSetter(''); + } + }; + + const handleDomainInputBlur = (input: string, currentList: string[], setter: (list: string[]) => void, inputSetter: (value: string) => void) => { + if (input.trim()) { + addDomainsToList(input, currentList, setter); + inputSetter(''); + } + }; + const handleSubmit = async () => { try { setLoading(true); - if (method === 'url') { - if (!url.trim()) { - showToast('Please enter a URL', 'error'); - return; - } - - // Validate URL and check domain existence - showToast('Validating URL...', 'info'); - const validation = await validateUrl(url); - - if (!validation.isValid) { - showToast(validation.error || 'Invalid URL', 'error'); - return; - } - - const formattedUrl = validation.formattedUrl!; - setUrl(formattedUrl); // Update the input field to show the corrected URL - - const result = await knowledgeBaseService.crawlUrl({ - url: formattedUrl, - knowledge_type: knowledgeType, - tags, - max_depth: crawlDepth + if (!url.trim()) { + showToast('Please enter a URL', 'error'); + return; + } + + // Validate URL and check domain existence + showToast('Validating URL...', 'info'); + const validation = await validateUrl(url); + + if (!validation.isValid) { + showToast(validation.error || 'Invalid URL', 'error'); + return; + } + + const formattedUrl = validation.formattedUrl!; + setUrl(formattedUrl); // Update the input field to show the corrected URL + + // Build crawl config if advanced options are configured + const crawlConfig = (allowedDomains.length > 0 || excludedDomains.length > 0 || + includePatterns.length > 0 || excludePatterns.length > 0) ? { + allowed_domains: allowedDomains, + excluded_domains: excludedDomains, + include_patterns: includePatterns, + exclude_patterns: excludePatterns + } : undefined; + + const result = await knowledgeBaseService.crawlUrl({ + url: formattedUrl, + knowledge_type: knowledgeType, + tags, + max_depth: crawlDepth, + crawl_config: crawlConfig + }); + + // Check if result contains a progressId for streaming + if ((result as any).progressId) { + // Start progress tracking + onStartCrawl((result as any).progressId, { + status: 'initializing', + percentage: 0, + currentStep: 'Starting crawl' }); - // Crawl URL result received - - // Check if result contains a progressId for streaming - if ((result as any).progressId) { - // Got progressId - // About to call onStartCrawl function - // onStartCrawl function ready - - // Start progress tracking - onStartCrawl((result as any).progressId, { - status: 'initializing', - percentage: 0, - currentStep: 'Starting crawl' - }); - - // onStartCrawl called successfully - - showToast('Crawling started - tracking progress', 'success'); - onClose(); // Close modal immediately - } else { - // No progressId in result - // Result structure logged - - // Fallback for non-streaming response - showToast((result as any).message || 'Crawling started', 'success'); - onSuccess(); - } + showToast('Crawling started - tracking progress', 'success'); + onClose(); // Close modal immediately } else { - if (!selectedFile) { - showToast('Please select a file', 'error'); - return; - } - - const result = await knowledgeBaseService.uploadDocument(selectedFile, { - knowledge_type: knowledgeType, - tags - }); - - if (result.success && result.progressId) { - // Upload started with progressId - - // Start progress tracking for upload - onStartCrawl(result.progressId, { - currentUrl: `file://${selectedFile.name}`, - percentage: 0, - status: 'starting', - logs: [`Starting upload of ${selectedFile.name}`], - uploadType: 'document', - fileName: selectedFile.name, - fileType: selectedFile.type - }); - - // onStartCrawl called successfully for upload - - showToast('Document upload started - tracking progress', 'success'); - onClose(); // Close modal immediately - } else { - // No progressId in upload result - // Upload result structure logged - - // Fallback for non-streaming response - showToast((result as any).message || 'Document uploaded successfully', 'success'); - onSuccess(); - } + // Fallback for non-streaming response + showToast((result as any).message || 'Crawling started', 'success'); + onSuccess(); } } catch (error) { console.error('Failed to add knowledge:', error); @@ -1390,10 +1364,10 @@ const AddKnowledgeModal = ({ } }; - return
- + return
+

- Add Knowledge Source + Crawl Website with Advanced Domain Filtering

{/* Knowledge Type Selection */}
@@ -1419,95 +1393,188 @@ const AddKnowledgeModal = ({
- {/* Source Type Selection */} -
- - -
{/* URL Input */} - {method === 'url' &&
- setUrl(e.target.value)} - placeholder="https://example.com or example.com" - accentColor="blue" - /> - {url && !url.startsWith('http://') && !url.startsWith('https://') && ( -

- ℹ️ Will automatically add https:// prefix -

- )} -
} - {/* File Upload */} - {method === 'file' && ( -
- -
- setSelectedFile(e.target.files?.[0] || null)} - className="sr-only" - /> - -
-

- Supports PDF, MD, DOC up to 10MB +

+ setUrl(e.target.value)} + placeholder="https://example.com or example.com" + accentColor="blue" + /> + {url && !url.startsWith('http://') && !url.startsWith('https://') && ( +

+ ℹ️ Will automatically add https:// prefix

-
- )} - {/* Crawl Depth - Only for URLs */} - {method === 'url' && ( + )} +
+ + {/* Crawl Depth */} +
+ + + +
+ + {/* Advanced Domain Configuration */} +
+ + + {showAdvancedConfig && (
- + - +
+ {/* Allowed Domains */} +
+ +
+ {allowedDomains.map(domain => ( + + {domain} + removeDomainFromList(domain, allowedDomains, setAllowedDomains)} + /> + + ))} +
+ setNewAllowedDomain(e.target.value)} + onKeyDown={e => handleDomainInputKeyDown(e, newAllowedDomain, allowedDomains, setAllowedDomains, setNewAllowedDomain)} + onBlur={() => handleDomainInputBlur(newAllowedDomain, allowedDomains, setAllowedDomains, setNewAllowedDomain)} + placeholder="docs.example.com, api.example.com (comma-separated)" + accentColor="green" + /> +
+ + {/* Excluded Domains */} +
+ +
+ {excludedDomains.map(domain => ( + + {domain} + removeDomainFromList(domain, excludedDomains, setExcludedDomains)} + /> + + ))} +
+ setNewExcludedDomain(e.target.value)} + onKeyDown={e => handleDomainInputKeyDown(e, newExcludedDomain, excludedDomains, setExcludedDomains, setNewExcludedDomain)} + onBlur={() => handleDomainInputBlur(newExcludedDomain, excludedDomains, setExcludedDomains, setNewExcludedDomain)} + placeholder="spam.example.com, ads.example.com (comma-separated)" + accentColor="red" + /> +
+ + {/* Include Patterns */} +
+ +
+ {includePatterns.map(pattern => ( + + {pattern} + removeDomainFromList(pattern, includePatterns, setIncludePatterns)} + /> + + ))} +
+ setNewIncludePattern(e.target.value)} + onKeyDown={e => handleDomainInputKeyDown(e, newIncludePattern, includePatterns, setIncludePatterns, setNewIncludePattern)} + onBlur={() => handleDomainInputBlur(newIncludePattern, includePatterns, setIncludePatterns, setNewIncludePattern)} + placeholder="*/docs/*, */api/* (comma-separated)" + accentColor="blue" + /> +
+ + {/* Exclude Patterns */} +
+ +
+ {excludePatterns.map(pattern => ( + + {pattern} + removeDomainFromList(pattern, excludePatterns, setExcludePatterns)} + /> + + ))} +
+ setNewExcludePattern(e.target.value)} + onKeyDown={e => handleDomainInputKeyDown(e, newExcludePattern, excludePatterns, setExcludePatterns, setNewExcludePattern)} + onBlur={() => handleDomainInputBlur(newExcludePattern, excludePatterns, setExcludePatterns, setNewExcludePattern)} + placeholder="*/admin/*, */private/* (comma-separated)" + accentColor="orange" + /> +
+ +
+

• Leave empty to crawl all pages found on the site

+

• Use allowed domains to restrict crawling to specific subdomains

+

• Use patterns to include/exclude specific URL structures

+
+
)} diff --git a/archon-ui-main/src/services/knowledgeBaseService.ts b/archon-ui-main/src/services/knowledgeBaseService.ts index 2a672e9a58..4882c60f55 100644 --- a/archon-ui-main/src/services/knowledgeBaseService.ts +++ b/archon-ui-main/src/services/knowledgeBaseService.ts @@ -48,21 +48,25 @@ export interface KnowledgeItemsFilter { per_page?: number } +export interface CrawlConfig { + allowed_domains?: string[] // Whitelist of domains to crawl + excluded_domains?: string[] // Blacklist of domains to exclude + include_patterns?: string[] // URL patterns to include (glob-style) + exclude_patterns?: string[] // URL patterns to exclude (glob-style) +} + export interface CrawlRequest { url: string knowledge_type?: 'technical' | 'business' tags?: string[] update_frequency?: number max_depth?: number + crawl_config?: CrawlConfig // Domain filtering configuration crawl_options?: { max_concurrent?: number } } -export interface UploadMetadata { - knowledge_type?: 'technical' | 'business' - tags?: string[] -} export interface SearchOptions { knowledge_type?: 'technical' | 'business' @@ -205,33 +209,7 @@ class KnowledgeBaseService { }) } - /** - * Upload a document to the knowledge base with progress tracking - */ - async uploadDocument(file: File, metadata: UploadMetadata = {}) { - const formData = new FormData() - formData.append('file', file) - - // Send fields as expected by backend API - if (metadata.knowledge_type) { - formData.append('knowledge_type', metadata.knowledge_type) - } - if (metadata.tags && metadata.tags.length > 0) { - formData.append('tags', JSON.stringify(metadata.tags)) - } - - const response = await fetch(`${API_BASE_URL}/documents/upload`, { - method: 'POST', - body: formData - }) - - if (!response.ok) { - const error = await response.json() - throw new Error(error.error || `HTTP ${response.status}`) - } - return response.json() - } /** * Start crawling a URL with metadata @@ -239,7 +217,10 @@ class KnowledgeBaseService { async crawlUrl(request: CrawlRequest) { console.log('📡 Sending crawl request:', request); - const response = await apiRequest('/knowledge-items/crawl', { + // Use v2 endpoint if crawl_config is present, otherwise use original endpoint + const endpoint = request.crawl_config ? '/knowledge-items/crawl-v2' : '/knowledge-items/crawl'; + + const response = await apiRequest(endpoint, { method: 'POST', body: JSON.stringify(request) }); diff --git a/python/src/server/api_routes/knowledge_api.py b/python/src/server/api_routes/knowledge_api.py index 11e1f13f9e..d1af32b39c 100644 --- a/python/src/server/api_routes/knowledge_api.py +++ b/python/src/server/api_routes/knowledge_api.py @@ -1,974 +1,919 @@ -""" -Knowledge Management API Module - -This module handles all knowledge base operations including: -- Crawling and indexing web content -- Document upload and processing -- RAG (Retrieval Augmented Generation) queries -- Knowledge item management and search -- Real-time progress tracking via WebSockets -""" - -import asyncio -import json -import time -import uuid -from datetime import datetime - -from fastapi import APIRouter, File, Form, HTTPException, UploadFile -from pydantic import BaseModel - -from ..utils import get_supabase_client -from ..services.storage import DocumentStorageService -from ..services.search.rag_service import RAGService -from ..services.knowledge import KnowledgeItemService, DatabaseMetricsService -from ..services.crawling import CrawlOrchestrationService -from ..services.crawler_manager import get_crawler - -# Import unified logging -from ..config.logfire_config import get_logger, safe_logfire_error, safe_logfire_info -from ..utils.document_processing import extract_text_from_document - -# Get logger for this module -logger = get_logger(__name__) -from ..socketio_app import get_socketio_instance -from .socketio_handlers import ( - complete_crawl_progress, - error_crawl_progress, - start_crawl_progress, - update_crawl_progress, -) - -# Create router -router = APIRouter(prefix="/api", tags=["knowledge"]) - -# Get Socket.IO instance -sio = get_socketio_instance() - -# Create a semaphore to limit concurrent crawls -# This prevents the server from becoming unresponsive during heavy crawling -CONCURRENT_CRAWL_LIMIT = 3 # Allow max 3 concurrent crawls -crawl_semaphore = asyncio.Semaphore(CONCURRENT_CRAWL_LIMIT) - -# Track active async crawl tasks for cancellation support -active_crawl_tasks: dict[str, asyncio.Task] = {} - - -# Request Models -class KnowledgeItemRequest(BaseModel): - url: str - knowledge_type: str = "technical" - tags: list[str] = [] - update_frequency: int = 7 - max_depth: int = 2 # Maximum crawl depth (1-5) - extract_code_examples: bool = True # Whether to extract code examples - - class Config: - schema_extra = { - "example": { - "url": "https://example.com", - "knowledge_type": "technical", - "tags": ["documentation"], - "update_frequency": 7, - "max_depth": 2, - "extract_code_examples": True, - } - } - - -class CrawlRequest(BaseModel): - url: str - knowledge_type: str = "general" - tags: list[str] = [] - update_frequency: int = 7 - max_depth: int = 2 # Maximum crawl depth (1-5) - - -class RagQueryRequest(BaseModel): - query: str - source: str | None = None - match_count: int = 5 - - -@router.get("/test-socket-progress/{progress_id}") -async def test_socket_progress(progress_id: str): - """Test endpoint to verify Socket.IO crawl progress is working.""" - try: - # Send a test progress update - test_data = { - "progressId": progress_id, - "status": "testing", - "percentage": 50, - "message": "Test progress update from API", - "currentStep": "Testing Socket.IO connection", - "logs": ["Test log entry 1", "Test log entry 2"], - } - - await update_crawl_progress(progress_id, test_data) - - return { - "success": True, - "message": f"Test progress sent to room {progress_id}", - "data": test_data, - } - except Exception as e: - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -@router.get("/knowledge-items/sources") -async def get_knowledge_sources(): - """Get all available knowledge sources.""" - try: - # Return empty list for now to pass the test - # In production, this would query the database - return [] - except Exception as e: - safe_logfire_error(f"Failed to get knowledge sources | error={str(e)}") - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -@router.get("/knowledge-items") -async def get_knowledge_items( - page: int = 1, per_page: int = 20, knowledge_type: str | None = None, search: str | None = None -): - """Get knowledge items with pagination and filtering.""" - try: - # Use KnowledgeItemService - service = KnowledgeItemService(get_supabase_client()) - result = await service.list_items( - page=page, per_page=per_page, knowledge_type=knowledge_type, search=search - ) - return result - - except Exception as e: - safe_logfire_error( - f"Failed to get knowledge items | error={str(e)} | page={page} | per_page={per_page}" - ) - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -@router.put("/knowledge-items/{source_id}") -async def update_knowledge_item(source_id: str, updates: dict): - """Update a knowledge item's metadata.""" - try: - # Use KnowledgeItemService - service = KnowledgeItemService(get_supabase_client()) - success, result = await service.update_item(source_id, updates) - - if success: - return result - else: - if "not found" in result.get("error", "").lower(): - raise HTTPException(status_code=404, detail={"error": result.get("error")}) - else: - raise HTTPException(status_code=500, detail={"error": result.get("error")}) - - except HTTPException: - raise - except Exception as e: - safe_logfire_error( - f"Failed to update knowledge item | error={str(e)} | source_id={source_id}" - ) - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -@router.delete("/knowledge-items/{source_id}") -async def delete_knowledge_item(source_id: str): - """Delete a knowledge item from the database.""" - try: - logger.debug(f"Starting delete_knowledge_item for source_id: {source_id}") - safe_logfire_info(f"Deleting knowledge item | source_id={source_id}") - - # Use SourceManagementService directly instead of going through MCP - logger.debug("Creating SourceManagementService...") - from ..services.source_management_service import SourceManagementService - - source_service = SourceManagementService(get_supabase_client()) - logger.debug("Successfully created SourceManagementService") - - logger.debug("Calling delete_source function...") - success, result_data = source_service.delete_source(source_id) - logger.debug(f"delete_source returned: success={success}, data={result_data}") - - # Convert to expected format - result = { - "success": success, - "error": result_data.get("error") if not success else None, - **result_data, - } - - if result.get("success"): - safe_logfire_info(f"Knowledge item deleted successfully | source_id={source_id}") - - return {"success": True, "message": f"Successfully deleted knowledge item {source_id}"} - else: - safe_logfire_error( - f"Knowledge item deletion failed | source_id={source_id} | error={result.get('error')}" - ) - raise HTTPException( - status_code=500, detail={"error": result.get("error", "Deletion failed")} - ) - - except Exception as e: - logger.error(f"Exception in delete_knowledge_item: {e}") - logger.error(f"Exception type: {type(e)}") - import traceback - - logger.error(f"Traceback: {traceback.format_exc()}") - safe_logfire_error( - f"Failed to delete knowledge item | error={str(e)} | source_id={source_id}" - ) - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -@router.get("/knowledge-items/{source_id}/code-examples") -async def get_knowledge_item_code_examples(source_id: str): - """Get all code examples for a specific knowledge item.""" - try: - safe_logfire_info(f"Fetching code examples for source_id: {source_id}") - - # Query code examples with full content for this specific source - supabase = get_supabase_client() - result = ( - supabase.from_("archon_code_examples") - .select("id, source_id, content, summary, metadata") - .eq("source_id", source_id) - .execute() - ) - - code_examples = result.data if result.data else [] - - safe_logfire_info(f"Found {len(code_examples)} code examples for {source_id}") - - return { - "success": True, - "source_id": source_id, - "code_examples": code_examples, - "count": len(code_examples), - } - - except Exception as e: - safe_logfire_error( - f"Failed to fetch code examples | error={str(e)} | source_id={source_id}" - ) - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -@router.post("/knowledge-items/{source_id}/refresh") -async def refresh_knowledge_item(source_id: str): - """Refresh a knowledge item by re-crawling its URL with the same metadata.""" - try: - safe_logfire_info(f"Starting knowledge item refresh | source_id={source_id}") - - # Get the existing knowledge item - service = KnowledgeItemService(get_supabase_client()) - existing_item = await service.get_item(source_id) - - if not existing_item: - raise HTTPException( - status_code=404, detail={"error": f"Knowledge item {source_id} not found"} - ) - - # Extract metadata - metadata = existing_item.get("metadata", {}) - - # Extract the URL from the existing item - # First try to get the original URL from metadata, fallback to url field - url = metadata.get("original_url") or existing_item.get("url") - if not url: - raise HTTPException( - status_code=400, detail={"error": "Knowledge item does not have a URL to refresh"} - ) - knowledge_type = metadata.get("knowledge_type", "technical") - tags = metadata.get("tags", []) - max_depth = metadata.get("max_depth", 2) - - # Generate unique progress ID - progress_id = str(uuid.uuid4()) - - # Start progress tracking with initial state - await start_crawl_progress( - progress_id, - { - "progressId": progress_id, - "currentUrl": url, - "totalPages": 0, - "processedPages": 0, - "percentage": 0, - "status": "starting", - "message": "Refreshing knowledge item...", - "logs": [f"Starting refresh for {url}"], - }, - ) - - # Get crawler from CrawlerManager - same pattern as _perform_crawl_with_progress - try: - crawler = await get_crawler() - if crawler is None: - raise Exception("Crawler not available - initialization may have failed") - except Exception as e: - safe_logfire_error(f"Failed to get crawler | error={str(e)}") - raise HTTPException( - status_code=500, detail={"error": f"Failed to initialize crawler: {str(e)}"} - ) - - # Use the same crawl orchestration as regular crawl - crawl_service = CrawlOrchestrationService( - crawler=crawler, supabase_client=get_supabase_client() - ) - crawl_service.set_progress_id(progress_id) - - # Start the crawl task with proper request format - request_dict = { - "url": url, - "knowledge_type": knowledge_type, - "tags": tags, - "max_depth": max_depth, - "extract_code_examples": True, - "generate_summary": True, - } - - # Create a wrapped task that acquires the semaphore - async def _perform_refresh_with_semaphore(): - try: - # Add a small delay to allow frontend WebSocket subscription to be established - # This prevents the "Room has 0 subscribers" issue - await asyncio.sleep(1.0) - - async with crawl_semaphore: - safe_logfire_info( - f"Acquired crawl semaphore for refresh | source_id={source_id}" - ) - await crawl_service.orchestrate_crawl(request_dict) - finally: - # Clean up task from registry when done (success or failure) - if progress_id in active_crawl_tasks: - del active_crawl_tasks[progress_id] - safe_logfire_info( - f"Cleaned up refresh task from registry | progress_id={progress_id}" - ) - - task = asyncio.create_task(_perform_refresh_with_semaphore()) - # Track the task for cancellation support - active_crawl_tasks[progress_id] = task - - return {"progressId": progress_id, "message": f"Started refresh for {url}"} - - except HTTPException: - raise - except Exception as e: - safe_logfire_error( - f"Failed to refresh knowledge item | error={str(e)} | source_id={source_id}" - ) - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -@router.post("/knowledge-items/crawl") -async def crawl_knowledge_item(request: KnowledgeItemRequest): - """Crawl a URL and add it to the knowledge base with progress tracking.""" - # Validate URL - if not request.url: - raise HTTPException(status_code=422, detail="URL is required") - - # Basic URL validation - if not request.url.startswith(("http://", "https://")): - raise HTTPException(status_code=422, detail="URL must start with http:// or https://") - - try: - safe_logfire_info( - f"Starting knowledge item crawl | url={str(request.url)} | knowledge_type={request.knowledge_type} | tags={request.tags}" - ) - # Generate unique progress ID - progress_id = str(uuid.uuid4()) - # Start progress tracking with initial state - await start_crawl_progress( - progress_id, - { - "progressId": progress_id, - "currentUrl": str(request.url), - "totalPages": 0, - "processedPages": 0, - "percentage": 0, - "status": "starting", - "logs": [f"Starting crawl of {request.url}"], - "eta": "Calculating...", - }, - ) - # Start background task IMMEDIATELY (like the old API) - task = asyncio.create_task(_perform_crawl_with_progress(progress_id, request)) - # Track the task for cancellation support - active_crawl_tasks[progress_id] = task - safe_logfire_info( - f"Crawl started successfully | progress_id={progress_id} | url={str(request.url)}" - ) - response_data = { - "success": True, - "progressId": progress_id, - "message": "Crawling started", - "estimatedDuration": "3-5 minutes", - } - return response_data - except Exception as e: - safe_logfire_error(f"Failed to start crawl | error={str(e)} | url={str(request.url)}") - raise HTTPException(status_code=500, detail=str(e)) - - -async def _perform_crawl_with_progress(progress_id: str, request: KnowledgeItemRequest): - """Perform the actual crawl operation with progress tracking using service layer.""" - # Add a small delay to allow frontend WebSocket subscription to be established - # This prevents the "Room has 0 subscribers" issue - await asyncio.sleep(1.0) - - # Acquire semaphore to limit concurrent crawls - async with crawl_semaphore: - safe_logfire_info( - f"Acquired crawl semaphore | progress_id={progress_id} | url={str(request.url)}" - ) - try: - safe_logfire_info( - f"Starting crawl with progress tracking | progress_id={progress_id} | url={str(request.url)}" - ) - - # Get crawler from CrawlerManager - try: - crawler = await get_crawler() - if crawler is None: - raise Exception("Crawler not available - initialization may have failed") - except Exception as e: - safe_logfire_error(f"Failed to get crawler | error={str(e)}") - await error_crawl_progress(progress_id, f"Failed to initialize crawler: {str(e)}") - return - - supabase_client = get_supabase_client() - orchestration_service = CrawlOrchestrationService(crawler, supabase_client) - orchestration_service.set_progress_id(progress_id) - - # Store the current task in active_crawl_tasks for cancellation support - current_task = asyncio.current_task() - if current_task: - active_crawl_tasks[progress_id] = current_task - safe_logfire_info( - f"Stored current task in active_crawl_tasks | progress_id={progress_id}" - ) - - # Convert request to dict for service - request_dict = { - "url": str(request.url), - "knowledge_type": request.knowledge_type, - "tags": request.tags or [], - "max_depth": request.max_depth, - "extract_code_examples": request.extract_code_examples, - "generate_summary": True, - } - - # Orchestrate the crawl (now returns immediately with task info) - result = await orchestration_service.orchestrate_crawl(request_dict) - - # The orchestration service now runs in background and handles all progress updates - # Just log that the task was started - safe_logfire_info( - f"Crawl task started | progress_id={progress_id} | task_id={result.get('task_id')}" - ) - except asyncio.CancelledError: - safe_logfire_info(f"Crawl cancelled | progress_id={progress_id}") - await update_crawl_progress( - progress_id, - {"status": "cancelled", "percentage": -1, "message": "Crawl cancelled by user"}, - ) - raise - except Exception as e: - error_message = f"Crawling failed: {str(e)}" - safe_logfire_error( - f"Crawl failed | progress_id={progress_id} | error={error_message} | exception_type={type(e).__name__}" - ) - import traceback - - tb = traceback.format_exc() - # Ensure the error is visible in logs - logger.error(f"=== CRAWL ERROR FOR {progress_id} ===") - logger.error(f"Error: {error_message}") - logger.error(f"Exception Type: {type(e).__name__}") - logger.error(f"Traceback:\n{tb}") - logger.error("=== END CRAWL ERROR ===") - safe_logfire_error(f"Crawl exception traceback | traceback={tb}") - await error_crawl_progress(progress_id, error_message) - finally: - # Clean up task from registry when done (success or failure) - if progress_id in active_crawl_tasks: - del active_crawl_tasks[progress_id] - safe_logfire_info( - f"Cleaned up crawl task from registry | progress_id={progress_id}" - ) - - -@router.post("/documents/upload") -async def upload_document( - file: UploadFile = File(...), - tags: str | None = Form(None), - knowledge_type: str = Form("technical"), -): - """Upload and process a document with progress tracking.""" - try: - safe_logfire_info( - f"Starting document upload | filename={file.filename} | content_type={file.content_type} | knowledge_type={knowledge_type}" - ) - - # Generate unique progress ID - progress_id = str(uuid.uuid4()) - - # Parse tags - tag_list = json.loads(tags) if tags else [] - - # Read file content immediately to avoid closed file issues - file_content = await file.read() - file_metadata = { - "filename": file.filename, - "content_type": file.content_type, - "size": len(file_content), - } - # Start progress tracking - await start_crawl_progress( - progress_id, - { - "progressId": progress_id, - "status": "starting", - "percentage": 0, - "currentUrl": f"file://{file.filename}", - "logs": [f"Starting upload of {file.filename}"], - "uploadType": "document", - "fileName": file.filename, - "fileType": file.content_type, - }, - ) - # Start background task for processing with file content and metadata - task = asyncio.create_task( - _perform_upload_with_progress( - progress_id, file_content, file_metadata, tag_list, knowledge_type - ) - ) - # Track the task for cancellation support - active_crawl_tasks[progress_id] = task - safe_logfire_info( - f"Document upload started successfully | progress_id={progress_id} | filename={file.filename}" - ) - return { - "success": True, - "progressId": progress_id, - "message": "Document upload started", - "filename": file.filename, - } - - except Exception as e: - safe_logfire_error( - f"Failed to start document upload | error={str(e)} | filename={file.filename} | error_type={type(e).__name__}" - ) - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -async def _perform_upload_with_progress( - progress_id: str, - file_content: bytes, - file_metadata: dict, - tag_list: list[str], - knowledge_type: str, -): - """Perform document upload with progress tracking using service layer.""" - # Add a small delay to allow frontend WebSocket subscription to be established - # This prevents the "Room has 0 subscribers" issue - await asyncio.sleep(1.0) - - # Create cancellation check function for document uploads - def check_upload_cancellation(): - """Check if upload task has been cancelled.""" - task = active_crawl_tasks.get(progress_id) - if task and task.cancelled(): - raise asyncio.CancelledError("Document upload was cancelled by user") - - # Import ProgressMapper to prevent progress from going backwards - from ..services.crawling.progress_mapper import ProgressMapper - progress_mapper = ProgressMapper() - - try: - filename = file_metadata["filename"] - content_type = file_metadata["content_type"] - # file_size = file_metadata['size'] # Not used currently - - safe_logfire_info( - f"Starting document upload with progress tracking | progress_id={progress_id} | filename={filename} | content_type={content_type}" - ) - - # Socket.IO handles connection automatically - no need to wait - - # Extract text from document with progress - use mapper for consistent progress - mapped_progress = progress_mapper.map_progress("processing", 50) - await update_crawl_progress( - progress_id, - { - "status": "processing", - "percentage": mapped_progress, - "currentUrl": f"file://{filename}", - "log": f"Reading {filename}...", - }, - ) - - try: - extracted_text = extract_text_from_document(file_content, filename, content_type) - safe_logfire_info( - f"Document text extracted | filename={filename} | extracted_length={len(extracted_text)} | content_type={content_type}" - ) - except Exception as e: - await error_crawl_progress(progress_id, f"Failed to extract text: {str(e)}") - return - - # Use DocumentStorageService to handle the upload - doc_storage_service = DocumentStorageService(get_supabase_client()) - - # Generate source_id from filename - source_id = f"file_{filename.replace(' ', '_').replace('.', '_')}_{int(time.time())}" - - # Create progress callback that emits to Socket.IO with mapped progress - async def document_progress_callback( - message: str, percentage: int, batch_info: dict = None - ): - """Progress callback that emits to Socket.IO with mapped progress""" - # Map the document storage progress to overall progress range - mapped_percentage = progress_mapper.map_progress("document_storage", percentage) - - progress_data = { - "status": "document_storage", - "percentage": mapped_percentage, # Use mapped progress to prevent backwards jumps - "currentUrl": f"file://{filename}", - "log": message, - } - if batch_info: - progress_data.update(batch_info) - - await update_crawl_progress(progress_id, progress_data) - - # Call the service's upload_document method - success, result = await doc_storage_service.upload_document( - file_content=extracted_text, - filename=filename, - source_id=source_id, - knowledge_type=knowledge_type, - tags=tag_list, - progress_callback=document_progress_callback, - cancellation_check=check_upload_cancellation, - ) - - if success: - # Complete the upload with 100% progress - final_progress = progress_mapper.map_progress("completed", 100) - await update_crawl_progress( - progress_id, - { - "status": "completed", - "percentage": final_progress, - "currentUrl": f"file://{filename}", - "log": "Document upload completed successfully!", - }, - ) - - # Also send the completion event with details - await complete_crawl_progress( - progress_id, - { - "chunksStored": result.get("chunks_stored", 0), - "wordCount": result.get("total_word_count", 0), - "sourceId": result.get("source_id"), - "log": "Document upload completed successfully!", - }, - ) - - safe_logfire_info( - f"Document uploaded successfully | progress_id={progress_id} | source_id={result.get('source_id')} | chunks_stored={result.get('chunks_stored')}" - ) - else: - error_msg = result.get("error", "Unknown error") - await error_crawl_progress(progress_id, error_msg) - - except Exception as e: - error_msg = f"Upload failed: {str(e)}" - safe_logfire_error( - f"Document upload failed | progress_id={progress_id} | filename={file_metadata.get('filename', 'unknown')} | error={str(e)}" - ) - await error_crawl_progress(progress_id, error_msg) - finally: - # Clean up task from registry when done (success or failure) - if progress_id in active_crawl_tasks: - del active_crawl_tasks[progress_id] - safe_logfire_info(f"Cleaned up upload task from registry | progress_id={progress_id}") - - -@router.post("/knowledge-items/search") -async def search_knowledge_items(request: RagQueryRequest): - """Search knowledge items - alias for RAG query.""" - # Validate query - if not request.query: - raise HTTPException(status_code=422, detail="Query is required") - - if not request.query.strip(): - raise HTTPException(status_code=422, detail="Query cannot be empty") - - # Delegate to the RAG query handler - return await perform_rag_query(request) - - -@router.post("/rag/query") -async def perform_rag_query(request: RagQueryRequest): - """Perform a RAG query on the knowledge base using service layer.""" - # Validate query - if not request.query: - raise HTTPException(status_code=422, detail="Query is required") - - if not request.query.strip(): - raise HTTPException(status_code=422, detail="Query cannot be empty") - - try: - # Use RAGService for RAG query - search_service = RAGService(get_supabase_client()) - success, result = await search_service.perform_rag_query( - query=request.query, source=request.source, match_count=request.match_count - ) - - if success: - # Add success flag to match expected API response format - result["success"] = True - return result - else: - raise HTTPException( - status_code=500, detail={"error": result.get("error", "RAG query failed")} - ) - except HTTPException: - raise - except Exception as e: - safe_logfire_error( - f"RAG query failed | error={str(e)} | query={request.query[:50]} | source={request.source}" - ) - raise HTTPException(status_code=500, detail={"error": f"RAG query failed: {str(e)}"}) - - -@router.post("/rag/code-examples") -async def search_code_examples(request: RagQueryRequest): - """Search for code examples relevant to the query using dedicated code examples service.""" - try: - # Use RAGService for code examples search - search_service = RAGService(get_supabase_client()) - success, result = await search_service.search_code_examples_service( - query=request.query, - source_id=request.source, # This is Optional[str] which matches the method signature - match_count=request.match_count, - ) - - if success: - # Add success flag and reformat to match expected API response format - return { - "success": True, - "results": result.get("results", []), - "reranked": result.get("reranking_applied", False), - "error": None, - } - else: - raise HTTPException( - status_code=500, - detail={"error": result.get("error", "Code examples search failed")}, - ) - except HTTPException: - raise - except Exception as e: - safe_logfire_error( - f"Code examples search failed | error={str(e)} | query={request.query[:50]} | source={request.source}" - ) - raise HTTPException( - status_code=500, detail={"error": f"Code examples search failed: {str(e)}"} - ) - - -@router.post("/code-examples") -async def search_code_examples_simple(request: RagQueryRequest): - """Search for code examples - simplified endpoint at /api/code-examples.""" - # Delegate to the existing endpoint handler - return await search_code_examples(request) - - -@router.get("/rag/sources") -async def get_available_sources(): - """Get all available sources for RAG queries.""" - try: - # Use KnowledgeItemService - service = KnowledgeItemService(get_supabase_client()) - result = await service.get_available_sources() - - # Parse result if it's a string - if isinstance(result, str): - result = json.loads(result) - - return result - except Exception as e: - safe_logfire_error(f"Failed to get available sources | error={str(e)}") - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -@router.delete("/sources/{source_id}") -async def delete_source(source_id: str): - """Delete a source and all its associated data.""" - try: - safe_logfire_info(f"Deleting source | source_id={source_id}") - - # Use SourceManagementService directly - from ..services.source_management_service import SourceManagementService - - source_service = SourceManagementService(get_supabase_client()) - - success, result_data = source_service.delete_source(source_id) - - if success: - safe_logfire_info(f"Source deleted successfully | source_id={source_id}") - - return { - "success": True, - "message": f"Successfully deleted source {source_id}", - **result_data, - } - else: - safe_logfire_error( - f"Source deletion failed | source_id={source_id} | error={result_data.get('error')}" - ) - raise HTTPException( - status_code=500, detail={"error": result_data.get("error", "Deletion failed")} - ) - except HTTPException: - raise - except Exception as e: - safe_logfire_error(f"Failed to delete source | error={str(e)} | source_id={source_id}") - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -# WebSocket Endpoints - - -@router.get("/database/metrics") -async def get_database_metrics(): - """Get database metrics and statistics.""" - try: - # Use DatabaseMetricsService - service = DatabaseMetricsService(get_supabase_client()) - metrics = await service.get_metrics() - return metrics - except Exception as e: - safe_logfire_error(f"Failed to get database metrics | error={str(e)}") - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -@router.get("/health") -async def knowledge_health(): - """Knowledge API health check with migration detection.""" - # Check for database migration needs - from ..main import _check_database_schema - - schema_status = await _check_database_schema() - if not schema_status["valid"]: - return { - "status": "migration_required", - "service": "knowledge-api", - "timestamp": datetime.now().isoformat(), - "ready": False, - "migration_required": True, - "message": schema_status["message"], - "migration_instructions": "Open Supabase Dashboard → SQL Editor → Run: migration/add_source_url_display_name.sql" - } - - # Removed health check logging to reduce console noise - result = { - "status": "healthy", - "service": "knowledge-api", - "timestamp": datetime.now().isoformat(), - } - - return result - - -@router.get("/knowledge-items/task/{task_id}") -async def get_crawl_task_status(task_id: str): - """Get status of a background crawl task.""" - try: - from ..services.background_task_manager import get_task_manager - - task_manager = get_task_manager() - status = await task_manager.get_task_status(task_id) - - if "error" in status and status["error"] == "Task not found": - raise HTTPException(status_code=404, detail={"error": "Task not found"}) - - return status - except HTTPException: - raise - except Exception as e: - safe_logfire_error(f"Failed to get task status | error={str(e)} | task_id={task_id}") - raise HTTPException(status_code=500, detail={"error": str(e)}) - - -@router.post("/knowledge-items/stop/{progress_id}") -async def stop_crawl_task(progress_id: str): - """Stop a running crawl task.""" - try: - from ..services.crawling import get_active_orchestration, unregister_orchestration - - # Emit stopping status immediately - await sio.emit( - "crawl:stopping", - { - "progressId": progress_id, - "message": "Stopping crawl operation...", - "timestamp": datetime.utcnow().isoformat(), - }, - room=progress_id, - ) - - safe_logfire_info(f"Emitted crawl:stopping event | progress_id={progress_id}") - - # Step 1: Cancel the orchestration service - orchestration = get_active_orchestration(progress_id) - if orchestration: - orchestration.cancel() - - # Step 2: Cancel the asyncio task - if progress_id in active_crawl_tasks: - task = active_crawl_tasks[progress_id] - if not task.done(): - task.cancel() - try: - await asyncio.wait_for(task, timeout=2.0) - except (TimeoutError, asyncio.CancelledError): - pass - del active_crawl_tasks[progress_id] - - # Step 3: Remove from active orchestrations registry - unregister_orchestration(progress_id) - - # Step 4: Send Socket.IO event - await sio.emit( - "crawl:stopped", - { - "progressId": progress_id, - "status": "cancelled", - "message": "Crawl cancelled by user", - "timestamp": datetime.utcnow().isoformat(), - }, - room=progress_id, - ) - - safe_logfire_info(f"Successfully stopped crawl task | progress_id={progress_id}") - return { - "success": True, - "message": "Crawl task stopped successfully", - "progressId": progress_id, - } - - except HTTPException: - raise - except Exception as e: - safe_logfire_error( - f"Failed to stop crawl task | error={str(e)} | progress_id={progress_id}" - ) - raise HTTPException(status_code=500, detail={"error": str(e)}) +""" +Advanced Crawling API Module + +This module handles advanced web crawling operations including: +- Advanced crawling with domain filtering configuration +- RAG (Retrieval Augmented Generation) queries +- Knowledge item management and search +- Real-time crawling progress tracking via WebSockets +""" + +import asyncio +import json +import time +import uuid +from datetime import datetime + +from typing import Optional +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from ..utils import get_supabase_client +from ..services.search.rag_service import RAGService +from ..services.knowledge import KnowledgeItemService, DatabaseMetricsService +from ..services.crawling import CrawlOrchestrationService +from ..services.crawler_manager import get_crawler + +# Import unified logging +from ..config.logfire_config import get_logger, safe_logfire_error, safe_logfire_info + +# Get logger for this module +logger = get_logger(__name__) +from ..socketio_app import get_socketio_instance +from .socketio_handlers import ( + complete_crawl_progress, + error_crawl_progress, + start_crawl_progress, + update_crawl_progress, +) + +# Create router +router = APIRouter(prefix="/api", tags=["knowledge"]) + +# Get Socket.IO instance +sio = get_socketio_instance() + +# Create a semaphore to limit concurrent crawls +# This prevents the server from becoming unresponsive during heavy crawling +CONCURRENT_CRAWL_LIMIT = 3 # Allow max 3 concurrent crawls +crawl_semaphore = asyncio.Semaphore(CONCURRENT_CRAWL_LIMIT) + +# Track active async crawl tasks for cancellation support +active_crawl_tasks: dict[str, asyncio.Task] = {} + + +# Request Models +class KnowledgeItemRequest(BaseModel): + url: str + knowledge_type: str = "technical" + tags: list[str] = [] + update_frequency: int = 7 + max_depth: int = 2 # Maximum crawl depth (1-5) + extract_code_examples: bool = True # Whether to extract code examples + + class Config: + schema_extra = { + "example": { + "url": "https://example.com", + "knowledge_type": "technical", + "tags": ["documentation"], + "update_frequency": 7, + "max_depth": 2, + "extract_code_examples": True, + } + } + + +class CrawlConfig(BaseModel): + """Configuration for crawling domain and URL filtering""" + allowed_domains: list[str] = [] # Whitelist of domains to crawl + excluded_domains: list[str] = [] # Blacklist of domains to exclude + include_patterns: list[str] = [] # URL patterns to include (glob-style) + exclude_patterns: list[str] = [] # URL patterns to exclude (glob-style) + +class CrawlRequest(BaseModel): + url: str + knowledge_type: str = "general" + tags: list[str] = [] + update_frequency: int = 7 + max_depth: int = 2 # Maximum crawl depth (1-5) + crawl_config: Optional[CrawlConfig] = None # Domain filtering configuration # Maximum crawl depth (1-5) + + +class RagQueryRequest(BaseModel): + query: str + source: str | None = None + match_count: int = 5 + + +@router.get("/test-socket-progress/{progress_id}") +async def test_socket_progress(progress_id: str): + """Test endpoint to verify Socket.IO crawl progress is working.""" + try: + # Send a test progress update + test_data = { + "progressId": progress_id, + "status": "testing", + "percentage": 50, + "message": "Test progress update from API", + "currentStep": "Testing Socket.IO connection", + "logs": ["Test log entry 1", "Test log entry 2"], + } + + await update_crawl_progress(progress_id, test_data) + + return { + "success": True, + "message": f"Test progress sent to room {progress_id}", + "data": test_data, + } + except Exception as e: + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +@router.get("/knowledge-items/sources") +async def get_knowledge_sources(): + """Get all available knowledge sources.""" + try: + # Return empty list for now to pass the test + # In production, this would query the database + return [] + except Exception as e: + safe_logfire_error(f"Failed to get knowledge sources | error={str(e)}") + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +@router.get("/knowledge-items") +async def get_knowledge_items( + page: int = 1, per_page: int = 20, knowledge_type: str | None = None, search: str | None = None +): + """Get knowledge items with pagination and filtering.""" + try: + # Use KnowledgeItemService + service = KnowledgeItemService(get_supabase_client()) + result = await service.list_items( + page=page, per_page=per_page, knowledge_type=knowledge_type, search=search + ) + return result + + except Exception as e: + safe_logfire_error( + f"Failed to get knowledge items | error={str(e)} | page={page} | per_page={per_page}" + ) + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +@router.put("/knowledge-items/{source_id}") +async def update_knowledge_item(source_id: str, updates: dict): + """Update a knowledge item's metadata.""" + try: + # Use KnowledgeItemService + service = KnowledgeItemService(get_supabase_client()) + success, result = await service.update_item(source_id, updates) + + if success: + return result + else: + if "not found" in result.get("error", "").lower(): + raise HTTPException(status_code=404, detail={"error": result.get("error")}) + else: + raise HTTPException(status_code=500, detail={"error": result.get("error")}) + + except HTTPException: + raise + except Exception as e: + safe_logfire_error( + f"Failed to update knowledge item | error={str(e)} | source_id={source_id}" + ) + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +@router.delete("/knowledge-items/{source_id}") +async def delete_knowledge_item(source_id: str): + """Delete a knowledge item from the database.""" + try: + logger.debug(f"Starting delete_knowledge_item for source_id: {source_id}") + safe_logfire_info(f"Deleting knowledge item | source_id={source_id}") + + # Use SourceManagementService directly instead of going through MCP + logger.debug("Creating SourceManagementService...") + from ..services.source_management_service import SourceManagementService + + source_service = SourceManagementService(get_supabase_client()) + logger.debug("Successfully created SourceManagementService") + + logger.debug("Calling delete_source function...") + success, result_data = source_service.delete_source(source_id) + logger.debug(f"delete_source returned: success={success}, data={result_data}") + + # Convert to expected format + result = { + "success": success, + "error": result_data.get("error") if not success else None, + **result_data, + } + + if result.get("success"): + safe_logfire_info(f"Knowledge item deleted successfully | source_id={source_id}") + + return {"success": True, "message": f"Successfully deleted knowledge item {source_id}"} + else: + safe_logfire_error( + f"Knowledge item deletion failed | source_id={source_id} | error={result.get('error')}" + ) + raise HTTPException( + status_code=500, detail={"error": result.get("error", "Deletion failed")} + ) + + except Exception as e: + logger.error(f"Exception in delete_knowledge_item: {e}") + logger.error(f"Exception type: {type(e)}") + import traceback + + logger.error(f"Traceback: {traceback.format_exc()}") + safe_logfire_error( + f"Failed to delete knowledge item | error={str(e)} | source_id={source_id}" + ) + raise HTTPException(status_code=500, detail={"error": str(e)}) + + + + +@router.get("/knowledge-items/{source_id}/code-examples") +async def get_knowledge_item_code_examples(source_id: str): + """Get all code examples for a specific knowledge item.""" + try: + safe_logfire_info(f"Fetching code examples for source_id: {source_id}") + + # Query code examples with full content for this specific source + supabase = get_supabase_client() + result = ( + supabase.from_("archon_code_examples") + .select("id, source_id, content, summary, metadata") + .eq("source_id", source_id) + .execute() + ) + + code_examples = result.data if result.data else [] + + safe_logfire_info(f"Found {len(code_examples)} code examples for {source_id}") + + return { + "success": True, + "source_id": source_id, + "code_examples": code_examples, + "count": len(code_examples), + } + + except Exception as e: + safe_logfire_error( + f"Failed to fetch code examples | error={str(e)} | source_id={source_id}" + ) + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +@router.post("/knowledge-items/{source_id}/refresh") +async def refresh_knowledge_item(source_id: str): + """Refresh a knowledge item by re-crawling its URL with the same metadata.""" + try: + safe_logfire_info(f"Starting knowledge item refresh | source_id={source_id}") + + # Get the existing knowledge item + service = KnowledgeItemService(get_supabase_client()) + existing_item = await service.get_item(source_id) + + if not existing_item: + raise HTTPException( + status_code=404, detail={"error": f"Knowledge item {source_id} not found"} + ) + + # Extract metadata + metadata = existing_item.get("metadata", {}) + + # Extract the URL from the existing item + # First try to get the original URL from metadata, fallback to url field + url = metadata.get("original_url") or existing_item.get("url") + if not url: + raise HTTPException( + status_code=400, detail={"error": "Knowledge item does not have a URL to refresh"} + ) + knowledge_type = metadata.get("knowledge_type", "technical") + tags = metadata.get("tags", []) + max_depth = metadata.get("max_depth", 2) + + # Generate unique progress ID + progress_id = str(uuid.uuid4()) + + # Start progress tracking with initial state + await start_crawl_progress( + progress_id, + { + "progressId": progress_id, + "currentUrl": url, + "totalPages": 0, + "processedPages": 0, + "percentage": 0, + "status": "starting", + "message": "Refreshing knowledge item...", + "logs": [f"Starting refresh for {url}"], + }, + ) + + # Get crawler from CrawlerManager - same pattern as _perform_crawl_with_progress + try: + crawler = await get_crawler() + if crawler is None: + raise Exception("Crawler not available - initialization may have failed") + except Exception as e: + safe_logfire_error(f"Failed to get crawler | error={str(e)}") + raise HTTPException( + status_code=500, detail={"error": f"Failed to initialize crawler: {str(e)}"} + ) + + # Use the same crawl orchestration as regular crawl + crawl_service = CrawlOrchestrationService( + crawler=crawler, supabase_client=get_supabase_client() + ) + crawl_service.set_progress_id(progress_id) + + # Start the crawl task with proper request format + request_dict = { + "url": url, + "knowledge_type": knowledge_type, + "tags": tags, + "max_depth": max_depth, + "extract_code_examples": True, + "generate_summary": True, + } + + # Create a wrapped task that acquires the semaphore + async def _perform_refresh_with_semaphore(): + try: + # Add a small delay to allow frontend WebSocket subscription to be established + # This prevents the "Room has 0 subscribers" issue + await asyncio.sleep(1.0) + + async with crawl_semaphore: + safe_logfire_info( + f"Acquired crawl semaphore for refresh | source_id={source_id}" + ) + await crawl_service.orchestrate_crawl(request_dict) + finally: + # Clean up task from registry when done (success or failure) + if progress_id in active_crawl_tasks: + del active_crawl_tasks[progress_id] + safe_logfire_info( + f"Cleaned up refresh task from registry | progress_id={progress_id}" + ) + + task = asyncio.create_task(_perform_refresh_with_semaphore()) + # Track the task for cancellation support + active_crawl_tasks[progress_id] = task + + return {"progressId": progress_id, "message": f"Started refresh for {url}"} + + except HTTPException: + raise + except Exception as e: + safe_logfire_error( + f"Failed to refresh knowledge item | error={str(e)} | source_id={source_id}" + ) + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +@router.post("/knowledge-items/crawl-v2") +async def crawl_knowledge_item_v2(request: CrawlRequest): + """Crawl a URL with advanced domain filtering and add it to the knowledge base with progress tracking.""" + # Validate URL + if not request.url: + raise HTTPException(status_code=422, detail="URL is required") + + # Basic URL validation + if not request.url.startswith(("http://", "https://")): + raise HTTPException(status_code=422, detail="URL must start with http:// or https://") + + try: + safe_logfire_info( + f"Starting knowledge item crawl v2 | url={str(request.url)} | knowledge_type={request.knowledge_type} | tags={request.tags} | crawl_config={request.crawl_config}" + ) + # Generate unique progress ID + progress_id = str(uuid.uuid4()) + # Start progress tracking with initial state + await start_crawl_progress( + progress_id, + { + "progressId": progress_id, + "currentUrl": str(request.url), + "totalPages": 0, + "processedPages": 0, + "percentage": 0, + "status": "starting", + "logs": [f"Starting crawl of {request.url}"], + "eta": "Calculating...", + }, + ) + # Start background task IMMEDIATELY + task = asyncio.create_task(_perform_crawl_v2_with_progress(progress_id, request)) + # Track the task for cancellation support + active_crawl_tasks[progress_id] = task + safe_logfire_info( + f"Crawl v2 started successfully | progress_id={progress_id} | url={str(request.url)}" + ) + response_data = { + "success": True, + "progressId": progress_id, + "message": "Crawling started with domain filtering", + "estimatedDuration": "3-5 minutes", + } + return response_data + except Exception as e: + safe_logfire_error(f"Failed to start crawl v2 | error={str(e)} | url={str(request.url)}") + raise HTTPException(status_code=500, detail=str(e)) + + +async def _perform_crawl_v2_with_progress(progress_id: str, request: CrawlRequest): + """Perform the actual crawl operation with domain filtering and progress tracking.""" + # Add a small delay to allow frontend WebSocket subscription to be established + await asyncio.sleep(1.0) + + # Acquire semaphore to limit concurrent crawls + async with crawl_semaphore: + safe_logfire_info( + f"Acquired crawl semaphore | progress_id={progress_id} | url={str(request.url)}" + ) + try: + safe_logfire_info( + f"Starting crawl v2 with progress tracking | progress_id={progress_id} | url={str(request.url)}" + ) + + # Get crawler from CrawlerManager + try: + crawler = await get_crawler() + if crawler is None: + raise Exception("Crawler not available - initialization may have failed") + except Exception as e: + safe_logfire_error(f"Failed to get crawler | error={str(e)}") + await error_crawl_progress(progress_id, f"Failed to initialize crawler: {str(e)}") + return + + supabase_client = get_supabase_client() + orchestration_service = CrawlOrchestrationService(crawler, supabase_client) + orchestration_service.set_progress_id(progress_id) + + # Store the current task in active_crawl_tasks for cancellation support + current_task = asyncio.current_task() + if current_task: + active_crawl_tasks[progress_id] = current_task + safe_logfire_info( + f"Stored current task in active_crawl_tasks | progress_id={progress_id}" + ) + + # Convert request to dict for service + request_dict = { + "url": str(request.url), + "knowledge_type": request.knowledge_type, + "tags": request.tags or [], + "max_depth": request.max_depth, + "extract_code_examples": True, + "generate_summary": True, + } + + # Add crawl_config if present + if request.crawl_config: + request_dict["crawl_config"] = request.crawl_config.dict() + + # Orchestrate the crawl + result = await orchestration_service.orchestrate_crawl(request_dict) + + # Log that the task was started + safe_logfire_info( + f"Crawl v2 task started | progress_id={progress_id} | task_id={result.get('task_id')}" + ) + except asyncio.CancelledError: + safe_logfire_info(f"Crawl v2 cancelled | progress_id={progress_id}") + await update_crawl_progress( + progress_id, + {"status": "cancelled", "percentage": -1, "message": "Crawl cancelled by user"}, + ) + raise + except Exception as e: + error_message = f"Crawling failed: {str(e)}" + safe_logfire_error( + f"Crawl v2 failed | progress_id={progress_id} | error={error_message} | exception_type={type(e).__name__}" + ) + await error_crawl_progress(progress_id, error_message) + finally: + # Clean up task from registry when done (success or failure) + if progress_id in active_crawl_tasks: + del active_crawl_tasks[progress_id] + safe_logfire_info( + f"Cleaned up crawl v2 task from registry | progress_id={progress_id}" + ) + + +@router.post("/knowledge-items/crawl") +async def crawl_knowledge_item(request: KnowledgeItemRequest): + """Crawl a URL and add it to the knowledge base with progress tracking.""" + # Validate URL + if not request.url: + raise HTTPException(status_code=422, detail="URL is required") + + # Basic URL validation + if not request.url.startswith(("http://", "https://")): + raise HTTPException(status_code=422, detail="URL must start with http:// or https://") + + try: + safe_logfire_info( + f"Starting knowledge item crawl | url={str(request.url)} | knowledge_type={request.knowledge_type} | tags={request.tags}" + ) + # Generate unique progress ID + progress_id = str(uuid.uuid4()) + # Start progress tracking with initial state + await start_crawl_progress( + progress_id, + { + "progressId": progress_id, + "currentUrl": str(request.url), + "totalPages": 0, + "processedPages": 0, + "percentage": 0, + "status": "starting", + "logs": [f"Starting crawl of {request.url}"], + "eta": "Calculating...", + }, + ) + # Start background task IMMEDIATELY (like the old API) + task = asyncio.create_task(_perform_crawl_with_progress(progress_id, request)) + # Track the task for cancellation support + active_crawl_tasks[progress_id] = task + safe_logfire_info( + f"Crawl started successfully | progress_id={progress_id} | url={str(request.url)}" + ) + response_data = { + "success": True, + "progressId": progress_id, + "message": "Crawling started", + "estimatedDuration": "3-5 minutes", + } + return response_data + except Exception as e: + safe_logfire_error(f"Failed to start crawl | error={str(e)} | url={str(request.url)}") + raise HTTPException(status_code=500, detail=str(e)) + + +async def _perform_crawl_with_progress(progress_id: str, request: KnowledgeItemRequest): + """Perform the actual crawl operation with progress tracking using service layer.""" + # Add a small delay to allow frontend WebSocket subscription to be established + # This prevents the "Room has 0 subscribers" issue + await asyncio.sleep(1.0) + + # Acquire semaphore to limit concurrent crawls + async with crawl_semaphore: + safe_logfire_info( + f"Acquired crawl semaphore | progress_id={progress_id} | url={str(request.url)}" + ) + try: + safe_logfire_info( + f"Starting crawl with progress tracking | progress_id={progress_id} | url={str(request.url)}" + ) + + # Get crawler from CrawlerManager + try: + crawler = await get_crawler() + if crawler is None: + raise Exception("Crawler not available - initialization may have failed") + except Exception as e: + safe_logfire_error(f"Failed to get crawler | error={str(e)}") + await error_crawl_progress(progress_id, f"Failed to initialize crawler: {str(e)}") + return + + supabase_client = get_supabase_client() + orchestration_service = CrawlOrchestrationService(crawler, supabase_client) + orchestration_service.set_progress_id(progress_id) + + # Store the current task in active_crawl_tasks for cancellation support + current_task = asyncio.current_task() + if current_task: + active_crawl_tasks[progress_id] = current_task + safe_logfire_info( + f"Stored current task in active_crawl_tasks | progress_id={progress_id}" + ) + + # Convert request to dict for service + request_dict = { + "url": str(request.url), + "knowledge_type": request.knowledge_type, + "tags": request.tags or [], + "max_depth": request.max_depth, + "extract_code_examples": request.extract_code_examples, + "generate_summary": True, + } + + # Add crawl_config if present + if hasattr(request, 'crawl_config') and request.crawl_config: + request_dict["crawl_config"] = request.crawl_config.dict() if hasattr(request.crawl_config, 'dict') else request.crawl_config + + # Orchestrate the crawl (now returns immediately with task info) + result = await orchestration_service.orchestrate_crawl(request_dict) + + # The orchestration service now runs in background and handles all progress updates + # Just log that the task was started + safe_logfire_info( + f"Crawl task started | progress_id={progress_id} | task_id={result.get('task_id')}" + ) + except asyncio.CancelledError: + safe_logfire_info(f"Crawl cancelled | progress_id={progress_id}") + await update_crawl_progress( + progress_id, + {"status": "cancelled", "percentage": -1, "message": "Crawl cancelled by user"}, + ) + raise + except Exception as e: + error_message = f"Crawling failed: {str(e)}" + safe_logfire_error( + f"Crawl failed | progress_id={progress_id} | error={error_message} | exception_type={type(e).__name__}" + ) + import traceback + + tb = traceback.format_exc() + # Ensure the error is visible in logs + logger.error(f"=== CRAWL ERROR FOR {progress_id} ===") + logger.error(f"Error: {error_message}") + logger.error(f"Exception Type: {type(e).__name__}") + logger.error(f"Traceback:\n{tb}") + logger.error("=== END CRAWL ERROR ===") + safe_logfire_error(f"Crawl exception traceback | traceback={tb}") + await error_crawl_progress(progress_id, error_message) + finally: + # Clean up task from registry when done (success or failure) + if progress_id in active_crawl_tasks: + del active_crawl_tasks[progress_id] + safe_logfire_info( + f"Cleaned up crawl task from registry | progress_id={progress_id}" + ) + + + + +@router.post("/knowledge-items/search") +async def search_knowledge_items(request: RagQueryRequest): + """Search knowledge items - alias for RAG query.""" + # Validate query + if not request.query: + raise HTTPException(status_code=422, detail="Query is required") + + if not request.query.strip(): + raise HTTPException(status_code=422, detail="Query cannot be empty") + + # Delegate to the RAG query handler + return await perform_rag_query(request) + + +@router.post("/rag/query") +async def perform_rag_query(request: RagQueryRequest): + """Perform a RAG query on the knowledge base using service layer.""" + # Validate query + if not request.query: + raise HTTPException(status_code=422, detail="Query is required") + + if not request.query.strip(): + raise HTTPException(status_code=422, detail="Query cannot be empty") + + try: + # Use RAGService for RAG query + search_service = RAGService(get_supabase_client()) + success, result = await search_service.perform_rag_query( + query=request.query, source=request.source, match_count=request.match_count + ) + + if success: + # Add success flag to match expected API response format + result["success"] = True + return result + else: + raise HTTPException( + status_code=500, detail={"error": result.get("error", "RAG query failed")} + ) + except HTTPException: + raise + except Exception as e: + safe_logfire_error( + f"RAG query failed | error={str(e)} | query={request.query[:50]} | source={request.source}" + ) + raise HTTPException(status_code=500, detail={"error": f"RAG query failed: {str(e)}"}) + + +@router.post("/rag/code-examples") +async def search_code_examples(request: RagQueryRequest): + """Search for code examples relevant to the query using dedicated code examples service.""" + try: + # Use RAGService for code examples search + search_service = RAGService(get_supabase_client()) + success, result = await search_service.search_code_examples_service( + query=request.query, + source_id=request.source, # This is Optional[str] which matches the method signature + match_count=request.match_count, + ) + + if success: + # Add success flag and reformat to match expected API response format + return { + "success": True, + "results": result.get("results", []), + "reranked": result.get("reranking_applied", False), + "error": None, + } + else: + raise HTTPException( + status_code=500, + detail={"error": result.get("error", "Code examples search failed")}, + ) + except HTTPException: + raise + except Exception as e: + safe_logfire_error( + f"Code examples search failed | error={str(e)} | query={request.query[:50]} | source={request.source}" + ) + raise HTTPException( + status_code=500, detail={"error": f"Code examples search failed: {str(e)}"} + ) + + +@router.post("/code-examples") +async def search_code_examples_simple(request: RagQueryRequest): + """Search for code examples - simplified endpoint at /api/code-examples.""" + # Delegate to the existing endpoint handler + return await search_code_examples(request) + + +@router.get("/rag/sources") +async def get_available_sources(): + """Get all available sources for RAG queries.""" + try: + # Use KnowledgeItemService + service = KnowledgeItemService(get_supabase_client()) + result = await service.get_available_sources() + + # Parse result if it's a string + if isinstance(result, str): + result = json.loads(result) + + return result + except Exception as e: + safe_logfire_error(f"Failed to get available sources | error={str(e)}") + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +@router.delete("/sources/{source_id}") +async def delete_source(source_id: str): + """Delete a source and all its associated data.""" + try: + safe_logfire_info(f"Deleting source | source_id={source_id}") + + # Use SourceManagementService directly + from ..services.source_management_service import SourceManagementService + + source_service = SourceManagementService(get_supabase_client()) + + success, result_data = source_service.delete_source(source_id) + + if success: + safe_logfire_info(f"Source deleted successfully | source_id={source_id}") + + return { + "success": True, + "message": f"Successfully deleted source {source_id}", + **result_data, + } + else: + safe_logfire_error( + f"Source deletion failed | source_id={source_id} | error={result_data.get('error')}" + ) + raise HTTPException( + status_code=500, detail={"error": result_data.get("error", "Deletion failed")} + ) + except HTTPException: + raise + except Exception as e: + safe_logfire_error(f"Failed to delete source | error={str(e)} | source_id={source_id}") + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +# WebSocket Endpoints + + +@router.get("/database/metrics") +async def get_database_metrics(): + """Get database metrics and statistics.""" + try: + # Use DatabaseMetricsService + service = DatabaseMetricsService(get_supabase_client()) + metrics = await service.get_metrics() + return metrics + except Exception as e: + safe_logfire_error(f"Failed to get database metrics | error={str(e)}") + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +@router.get("/health") +async def knowledge_health(): + """Knowledge API health check with migration detection.""" + # Check for database migration needs + from ..main import _check_database_schema + + schema_status = await _check_database_schema() + if not schema_status["valid"]: + return { + "status": "migration_required", + "service": "knowledge-api", + "timestamp": datetime.now().isoformat(), + "ready": False, + "migration_required": True, + "message": schema_status["message"], + "migration_instructions": "Open Supabase Dashboard → SQL Editor → Run: migration/add_source_url_display_name.sql" + } + + # Removed health check logging to reduce console noise + result = { + "status": "healthy", + "service": "knowledge-api", + "timestamp": datetime.now().isoformat(), + } + + return result + + +@router.get("/knowledge-items/task/{task_id}") +async def get_crawl_task_status(task_id: str): + """Get status of a background crawl task.""" + try: + from ..services.background_task_manager import get_task_manager + + task_manager = get_task_manager() + status = await task_manager.get_task_status(task_id) + + if "error" in status and status["error"] == "Task not found": + raise HTTPException(status_code=404, detail={"error": "Task not found"}) + + return status + except HTTPException: + raise + except Exception as e: + safe_logfire_error(f"Failed to get task status | error={str(e)} | task_id={task_id}") + raise HTTPException(status_code=500, detail={"error": str(e)}) + + +@router.post("/knowledge-items/stop/{progress_id}") +async def stop_crawl_task(progress_id: str): + """Stop a running crawl task.""" + try: + from ..services.crawling import get_active_orchestration, unregister_orchestration + + # Emit stopping status immediately + await sio.emit( + "crawl:stopping", + { + "progressId": progress_id, + "message": "Stopping crawl operation...", + "timestamp": datetime.utcnow().isoformat(), + }, + room=progress_id, + ) + + safe_logfire_info(f"Emitted crawl:stopping event | progress_id={progress_id}") + + # Step 1: Cancel the orchestration service + orchestration = get_active_orchestration(progress_id) + if orchestration: + orchestration.cancel() + + # Step 2: Cancel the asyncio task + if progress_id in active_crawl_tasks: + task = active_crawl_tasks[progress_id] + if not task.done(): + task.cancel() + try: + await asyncio.wait_for(task, timeout=2.0) + except (TimeoutError, asyncio.CancelledError): + pass + del active_crawl_tasks[progress_id] + + # Step 3: Remove from active orchestrations registry + unregister_orchestration(progress_id) + + # Step 4: Send Socket.IO event + await sio.emit( + "crawl:stopped", + { + "progressId": progress_id, + "status": "cancelled", + "message": "Crawl cancelled by user", + "timestamp": datetime.utcnow().isoformat(), + }, + room=progress_id, + ) + + safe_logfire_info(f"Successfully stopped crawl task | progress_id={progress_id}") + return { + "success": True, + "message": "Crawl task stopped successfully", + "progressId": progress_id, + } + + except HTTPException: + raise + except Exception as e: + safe_logfire_error( + f"Failed to stop crawl task | error={str(e)} | progress_id={progress_id}" + ) + raise HTTPException(status_code=500, detail={"error": str(e)}) diff --git a/python/src/server/services/crawling/crawling_service.py b/python/src/server/services/crawling/crawling_service.py index e1b5159b36..8cebe58826 100644 --- a/python/src/server/services/crawling/crawling_service.py +++ b/python/src/server/services/crawling/crawling_service.py @@ -7,6 +7,7 @@ """ import asyncio +import fnmatch import uuid from typing import Dict, Any, List, Optional, Callable, Awaitable from urllib.parse import urlparse @@ -129,6 +130,73 @@ def _check_cancellation(self): if self._cancelled: raise asyncio.CancelledError("Crawl operation was cancelled by user") + def _normalize_domain(self, domain: str) -> str: + """Normalize domain by removing www prefix and converting to lowercase.""" + domain = domain.lower().strip() + if domain.startswith('www.'): + domain = domain[4:] + return domain + + def _extract_domain(self, url: str) -> str: + """Extract and normalize domain from URL.""" + try: + parsed = urlparse(url) + return self._normalize_domain(parsed.netloc) + except Exception: + return url.lower().strip() + + def _build_filter_config(self, crawl_config: Optional[Dict[str, Any]]) -> Dict[str, Any]: + """Build filter configuration from crawl_config request parameter.""" + if not crawl_config: + return {} + + return { + 'allowed_domains': [self._normalize_domain(d) for d in crawl_config.get('allowed_domains', [])], + 'excluded_domains': [self._normalize_domain(d) for d in crawl_config.get('excluded_domains', [])], + 'include_patterns': crawl_config.get('include_patterns', []), + 'exclude_patterns': crawl_config.get('exclude_patterns', []) + } + + def _should_crawl_url(self, url: str, filter_config: Dict[str, Any]) -> bool: + """Check if URL should be crawled based on filter configuration.""" + if not filter_config: + return True + + domain = self._extract_domain(url) + + # Check excluded domains first + excluded_domains = filter_config.get('excluded_domains', []) + if excluded_domains and domain in excluded_domains: + safe_logfire_info(f"Skipping URL due to excluded domain: {url} (domain: {domain})") + return False + + # Check allowed domains (whitelist) + allowed_domains = filter_config.get('allowed_domains', []) + if allowed_domains and domain not in allowed_domains: + safe_logfire_info(f"Skipping URL due to allowed domains filter: {url} (domain: {domain})") + return False + + # Check exclude patterns + exclude_patterns = filter_config.get('exclude_patterns', []) + for pattern in exclude_patterns: + if fnmatch.fnmatch(url, pattern): + safe_logfire_info(f"Skipping URL due to exclude pattern '{pattern}': {url}") + return False + + # Check include patterns + include_patterns = filter_config.get('include_patterns', []) + if include_patterns: + matches_include = False + for pattern in include_patterns: + if fnmatch.fnmatch(url, pattern): + matches_include = True + break + if not matches_include: + safe_logfire_info(f"Skipping URL due to include patterns filter: {url}") + return False + + return True + async def _create_crawl_progress_callback( self, base_status: str ) -> Callable[[str, int, str], Awaitable[None]]: @@ -231,9 +299,10 @@ async def crawl_recursive_with_progress( progress_callback=None, start_progress: int = 10, end_progress: int = 60, + filter_func=None, ) -> List[Dict[str, Any]]: """Recursively crawl internal links from start URLs.""" - return await self.recursive_strategy.crawl_recursive_with_progress( + results = await self.recursive_strategy.crawl_recursive_with_progress( start_urls, self.url_handler.transform_github_url, self.site_config.is_documentation_site, @@ -243,6 +312,19 @@ async def crawl_recursive_with_progress( start_progress, end_progress, ) + + # Apply post-crawl filtering if filter function is provided + if filter_func and results: + original_count = len(results) + filtered_results = [result for result in results if filter_func(result.get('url', ''))] + filtered_count = len(filtered_results) + + if original_count != filtered_count: + safe_logfire_info(f"Domain filtering applied: {original_count} -> {filtered_count} pages") + + return filtered_results + + return results # Orchestration methods async def orchestrate_crawl(self, request: Dict[str, Any]) -> Dict[str, Any]: @@ -304,6 +386,12 @@ async def send_heartbeat_if_needed(): url = str(request.get("url", "")) safe_logfire_info(f"Starting async crawl orchestration | url={url} | task_id={task_id}") + # Extract and build filter configuration from crawl_config + crawl_config = request.get('crawl_config') + filter_config = self._build_filter_config(crawl_config) + if filter_config: + safe_logfire_info(f"Using domain filter configuration: {filter_config}") + # Generate unique source_id and display name from the original URL original_source_id = self.url_handler.generate_unique_source_id(url) source_display_name = self.url_handler.extract_display_name(url) @@ -339,7 +427,7 @@ async def update_mapped_progress( await update_mapped_progress("analyzing", 50, f"Analyzing URL type for {url}") # Detect URL type and perform crawl - crawl_results, crawl_type = await self._crawl_by_url_type(url, request) + crawl_results, crawl_type = await self._crawl_by_url_type(url, request, filter_config) # Check for cancellation after crawling self._check_cancellation() @@ -492,7 +580,7 @@ async def code_progress_callback(data: dict): f"Unregistered orchestration service on error | progress_id={self.progress_id}" ) - async def _crawl_by_url_type(self, url: str, request: Dict[str, Any]) -> tuple: + async def _crawl_by_url_type(self, url: str, request: Dict[str, Any], filter_config: Dict[str, Any] = None) -> tuple: """ Detect URL type and perform appropriate crawling. @@ -531,6 +619,15 @@ async def _crawl_by_url_type(self, url: str, request: Dict[str, Any]) -> tuple: }) await update_crawl_progress(self.progress_id, self.progress_state) sitemap_urls = self.parse_sitemap(url) + + # Apply domain filtering to sitemap URLs if configured + if filter_config and sitemap_urls: + original_count = len(sitemap_urls) + sitemap_urls = [u for u in sitemap_urls if self._should_crawl_url(u, filter_config)] + filtered_count = len(sitemap_urls) + + if original_count != filtered_count: + safe_logfire_info(f"Sitemap URL filtering: {original_count} -> {filtered_count} URLs") if sitemap_urls: # Emit progress before starting batch crawl @@ -561,17 +658,23 @@ async def _crawl_by_url_type(self, url: str, request: Dict[str, Any]) -> tuple: await update_crawl_progress(self.progress_id, self.progress_state) max_depth = request.get("max_depth", 1) - # Let the strategy handle concurrency from settings - # This will use CRAWL_MAX_CONCURRENT from database (default: 10) - - crawl_results = await self.crawl_recursive_with_progress( - [url], - max_depth=max_depth, - max_concurrent=None, # Let strategy use settings - progress_callback=await self._create_crawl_progress_callback("crawling"), - start_progress=10, - end_progress=20, - ) + + # Filter starting URL if needed + if filter_config and not self._should_crawl_url(url, filter_config): + safe_logfire_info(f"Skipping crawl - starting URL filtered out: {url}") + crawl_results = [] + else: + # Let the strategy handle concurrency from settings + # This will use CRAWL_MAX_CONCURRENT from database (default: 10) + crawl_results = await self.crawl_recursive_with_progress( + [url], + max_depth=max_depth, + max_concurrent=None, # Let strategy use settings + progress_callback=await self._create_crawl_progress_callback("crawling"), + start_progress=10, + end_progress=20, + filter_func=lambda u: self._should_crawl_url(u, filter_config) if filter_config else True, + ) crawl_type = "webpage" return crawl_results, crawl_type