Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 198 additions & 0 deletions litellm/proxy/rag_endpoints/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,184 @@
router = APIRouter()


def _build_file_metadata_entry(
response: Any,
file_data: Optional[Tuple[str, bytes, str]] = None,
file_url: Optional[str] = None,
) -> Dict[str, Any]:
"""
Build a file metadata entry for storing in vector_store_metadata.

Args:
response: The response from litellm.aingest containing file_id
file_data: Optional tuple of (filename, content, content_type)
file_url: Optional URL if file was ingested from URL

Returns:
Dictionary with file metadata (file_id, filename, file_url, ingested_at, etc.)
"""
from datetime import datetime, timezone

# Extract file_id from response
file_id = None
if hasattr(response, "get"):
file_id = response.get("file_id")
elif hasattr(response, "file_id"):
file_id = response.file_id

# Extract file information from file_data tuple
filename = None
file_size = None
content_type = None

if file_data:
filename = file_data[0]
file_size = len(file_data[1]) if len(file_data) > 1 else None
content_type = file_data[2] if len(file_data) > 2 else None

# Build file metadata entry
file_entry = {
"file_id": file_id,
"filename": filename,
"file_url": file_url,
"ingested_at": datetime.now(timezone.utc).isoformat(),
}

# Add optional fields if available
if file_size is not None:
file_entry["file_size"] = file_size
if content_type is not None:
file_entry["content_type"] = content_type

return file_entry


async def _save_vector_store_to_db_from_rag_ingest(
response: Any,
ingest_options: Dict[str, Any],
prisma_client,
user_api_key_dict: UserAPIKeyAuth,
file_data: Optional[Tuple[str, bytes, str]] = None,
file_url: Optional[str] = None,
) -> None:
"""
Helper function to save a newly created vector store from RAG ingest to the database.

This function:
- Extracts vector store ID and config from the ingest response
- Checks if the vector store already exists in the database
- Creates a new database entry if it doesn't exist
- Adds the vector store to the registry

Args:
response: The response from litellm.aingest()
ingest_options: The ingest options containing vector store config
prisma_client: The Prisma database client
user_api_key_dict: User API key authentication info
"""
from litellm.proxy.vector_store_endpoints.management_endpoints import (
create_vector_store_in_db,
)

# Handle both dict and object responses
if hasattr(response, "get"):
vector_store_id = response.get("vector_store_id")
elif hasattr(response, "vector_store_id"):
vector_store_id = response.vector_store_id
else:
verbose_proxy_logger.warning(
f"Unable to extract vector_store_id from response type: {type(response)}"
)
return

if vector_store_id is None or not isinstance(vector_store_id, str):
verbose_proxy_logger.warning(
"Vector store ID is None or not a string, skipping database save"
)
return

vector_store_config = ingest_options.get("vector_store", {})
custom_llm_provider = vector_store_config.get("custom_llm_provider")

# Extract litellm_vector_store_params for custom name and description
litellm_vector_store_params = ingest_options.get("litellm_vector_store_params", {})
custom_vector_store_name = litellm_vector_store_params.get("vector_store_name")
custom_vector_store_description = litellm_vector_store_params.get("vector_store_description")

# Build file metadata entry using helper
file_entry = _build_file_metadata_entry(
response=response,
file_data=file_data,
file_url=file_url,
)

try:
# Check if vector store already exists in database
existing_vector_store = (
await prisma_client.db.litellm_managedvectorstorestable.find_unique(
where={"vector_store_id": vector_store_id}
)
)

# Only create if it doesn't exist
if existing_vector_store is None:
verbose_proxy_logger.info(
f"Saving newly created vector store {vector_store_id} to database"
)

# Initialize metadata with first file
initial_metadata = {
"ingested_files": [file_entry]
}

# Use custom name if provided, otherwise default
vector_store_name = custom_vector_store_name or f"RAG Vector Store - {vector_store_id[:8]}"
vector_store_description = custom_vector_store_description or "Created via RAG ingest endpoint"

await create_vector_store_in_db(
vector_store_id=vector_store_id,
custom_llm_provider=custom_llm_provider or "openai",
prisma_client=prisma_client,
vector_store_name=vector_store_name,
vector_store_description=vector_store_description,
vector_store_metadata=initial_metadata,
)

verbose_proxy_logger.info(
f"Vector store {vector_store_id} saved to database successfully"
)
else:
verbose_proxy_logger.info(
f"Vector store {vector_store_id} already exists, appending file to metadata"
)

# Update existing vector store with new file
existing_metadata = existing_vector_store.vector_store_metadata or {}
if isinstance(existing_metadata, str):
import json
existing_metadata = json.loads(existing_metadata)

ingested_files = existing_metadata.get("ingested_files", [])
ingested_files.append(file_entry)
existing_metadata["ingested_files"] = ingested_files

# Update the vector store
from litellm.proxy.utils import safe_dumps
await prisma_client.db.litellm_managedvectorstorestable.update(
where={"vector_store_id": vector_store_id},
data={"vector_store_metadata": safe_dumps(existing_metadata)}
)

verbose_proxy_logger.info(
f"Added file {file_entry.get('filename') or file_entry.get('file_url', 'Unknown')} to vector store {vector_store_id} metadata"
)
except Exception as db_error:
# Log the error but don't fail the request since ingestion succeeded
verbose_proxy_logger.exception(
f"Failed to save vector store {vector_store_id} to database: {db_error}"
)


async def parse_rag_ingest_request(
request: Request,
) -> Tuple[Dict[str, Any], Optional[Tuple[str, bytes, str]], Optional[str], Optional[str]]:
Expand Down Expand Up @@ -158,6 +336,7 @@ async def rag_ingest(
add_litellm_data_to_request,
general_settings,
llm_router,
prisma_client,
proxy_config,
version,
)
Expand Down Expand Up @@ -189,6 +368,25 @@ async def rag_ingest(
**request_data,
)

# Save vector store to database if it was newly created and prisma_client is available
verbose_proxy_logger.debug(
f"RAG Ingest - Checking database save conditions: prisma_client={prisma_client is not None}, response={response is not None}, response_type={type(response)}"
)

if prisma_client is not None and response is not None:
await _save_vector_store_to_db_from_rag_ingest(
response=response,
ingest_options=ingest_options,
prisma_client=prisma_client,
user_api_key_dict=user_api_key_dict,
file_data=file_data,
file_url=file_url,
)
else:
verbose_proxy_logger.warning(
f"Skipping database save: prisma_client={prisma_client is not None}, response={response is not None}"
)

return response

except HTTPException:
Expand Down
Loading
Loading