Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Native OpenSearch support for Mem0 #2211

Merged
merged 9 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ install:
install_all:
poetry install
poetry run pip install groq together boto3 litellm ollama chromadb sentence_transformers vertexai \
google-generativeai elasticsearch
google-generativeai elasticsearch opensearch-py

# Format code with ruff
format:
Expand Down
59 changes: 59 additions & 0 deletions docs/components/vectordbs/dbs/opensearch.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
[OpenSearch](https://opensearch.org/) is an open-source, enterprise-grade search and observability suite that brings order to unstructured data at scale. OpenSearch supports k-NN (k-Nearest Neighbors) and allows you to store and retrieve high-dimensional vector embeddings efficiently.

### Installation

OpenSearch support requires additional dependencies. Install them with:

```bash
pip install opensearch>=2.8.0
```

### Usage

```python
import os
from mem0 import Memory

os.environ["OPENAI_API_KEY"] = "sk-xx"

config = {
"vector_store": {
"provider": "opensearch",
"config": {
"collection_name": "mem0",
"host": "localhost",
"port": 9200,
"embedding_model_dims": 1536
}
}
}

m = Memory.from_config(config)
m.add("Likes to play cricket on weekends", user_id="alice", metadata={"category": "hobbies"})
```

### Config

Let's see the available parameters for the `opensearch` config:

| Parameter | Description | Default Value |
| ---------------------- | -------------------------------------------------- | ------------- |
| `collection_name` | The name of the index to store the vectors | `mem0` |
| `embedding_model_dims` | Dimensions of the embedding model | `1536` |
| `host` | The host where the OpenSearch server is running | `localhost` |
| `port` | The port where the OpenSearch server is running | `9200` |
| `api_key` | API key for authentication | `None` |
| `user` | Username for basic authentication | `None` |
| `password` | Password for basic authentication | `None` |
| `verify_certs` | Whether to verify SSL certificates | `False` |
| `auto_create_index` | Whether to automatically create the index | `True` |
| `use_ssl` | Whether to use SSL for connection | `False` |

### Features

- Fast and Efficient Vector Search
- Can be deployed on-premises, in containers, or on cloud platforms like AWS OpenSearch Service.
- Multiple Authentication and Security Methods (Basic Authentication, API Keys, LDAP, SAML, and OpenID Connect)
- Automatic index creation with optimized mappings for vector search
- Memory Optimization through Disk-Based Vector Search and Quantization
- Real-Time Analytics and Observability
1 change: 1 addition & 0 deletions docs/components/vectordbs/overview.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ See the list of supported vector databases below.
<Card title="Azure AI Search" href="/components/vectordbs/dbs/azure_ai_search"></Card>
<Card title="Redis" href="/components/vectordbs/dbs/redis"></Card>
<Card title="Elasticsearch" href="/components/vectordbs/dbs/elasticsearch"></Card>
<Card title="OpenSearch" href="/components/vectordbs/dbs/opensearch"></Card>
</CardGroup>

## Usage
Expand Down
3 changes: 2 additions & 1 deletion docs/docs.json
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@
"components/vectordbs/dbs/milvus",
"components/vectordbs/dbs/azure_ai_search",
"components/vectordbs/dbs/redis",
"components/vectordbs/dbs/elasticsearch"
"components/vectordbs/dbs/elasticsearch",
"components/vectordbs/dbs/opensearch"
]
}
]
Expand Down
42 changes: 42 additions & 0 deletions mem0/configs/vector_stores/opensearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typing import Any, Dict, Optional

from pydantic import BaseModel, Field, model_validator


class OpenSearchConfig(BaseModel):
collection_name: str = Field("mem0", description="Name of the index")
host: str = Field("localhost", description="OpenSearch host")
port: int = Field(9200, description="OpenSearch port")
user: Optional[str] = Field(None, description="Username for authentication")
password: Optional[str] = Field(None, description="Password for authentication")
api_key: Optional[str] = Field(None, description="API key for authentication (if applicable)")
embedding_model_dims: int = Field(1536, description="Dimension of the embedding vector")
verify_certs: bool = Field(False, description="Verify SSL certificates (default False for OpenSearch)")
use_ssl: bool = Field(False, description="Use SSL for connection (default False for OpenSearch)")
auto_create_index: bool = Field(True, description="Automatically create index during initialization")

@model_validator(mode="before")
@classmethod
def validate_auth(cls, values: Dict[str, Any]) -> Dict[str, Any]:
# Check if host is provided
if not values.get("host"):
raise ValueError("Host must be provided for OpenSearch")

# Authentication: Either API key or user/password must be provided
if not any([values.get("api_key"), (values.get("user") and values.get("password"))]):
raise ValueError("Either api_key or user/password must be provided for OpenSearch authentication")

return values

@model_validator(mode="before")
@classmethod
def validate_extra_fields(cls, values: Dict[str, Any]) -> Dict[str, Any]:
allowed_fields = set(cls.model_fields.keys())
input_fields = set(values.keys())
extra_fields = input_fields - allowed_fields
if extra_fields:
raise ValueError(
f"Extra fields not allowed: {', '.join(extra_fields)}. "
f"Allowed fields: {', '.join(allowed_fields)}"
)
return values
1 change: 1 addition & 0 deletions mem0/utils/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class VectorStoreFactory:
"azure_ai_search": "mem0.vector_stores.azure_ai_search.AzureAISearch",
"redis": "mem0.vector_stores.redis.RedisDB",
"elasticsearch": "mem0.vector_stores.elasticsearch.ElasticsearchDB",
"opensearch": "mem0.vector_stores.opensearch.OpenSearchDB"
}

@classmethod
Expand Down
1 change: 1 addition & 0 deletions mem0/vector_stores/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class VectorStoreConfig(BaseModel):
"azure_ai_search": "AzureAISearchConfig",
"redis": "RedisDBConfig",
"elasticsearch": "ElasticsearchConfig",
"opensearch": "OpenSearchConfig",
}

@model_validator(mode="after")
Expand Down
189 changes: 189 additions & 0 deletions mem0/vector_stores/opensearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import logging
from typing import Any, Dict, List, Optional

try:
from opensearchpy import OpenSearch
from opensearchpy.helpers import bulk
except ImportError:
raise ImportError("OpenSearch requires extra dependencies. Install with `pip install opensearch-py`") from None

from pydantic import BaseModel

from mem0.configs.vector_stores.opensearch import OpenSearchConfig
from mem0.vector_stores.base import VectorStoreBase

logger = logging.getLogger(__name__)


class OutputData(BaseModel):
id: str
score: float
payload: Dict


class OpenSearchDB(VectorStoreBase):
def __init__(self, **kwargs):
config = OpenSearchConfig(**kwargs)

# Initialize OpenSearch client
self.client = OpenSearch(
hosts=[{"host": config.host, "port": config.port or 9200}],
http_auth=(config.user, config.password) if (config.user and config.password) else None,
use_ssl=config.use_ssl,
verify_certs=config.verify_certs,
)

self.collection_name = config.collection_name
self.vector_dim = config.embedding_model_dims

# Create index only if auto_create_index is True
if config.auto_create_index:
self.create_index()

def create_index(self) -> None:
"""Create OpenSearch index with proper mappings if it doesn't exist."""
index_settings = {
# ToDo change replicas to 1
"settings": {
"index": {"number_of_replicas": 1, "number_of_shards": 5, "refresh_interval": "1s", "knn": True}
},
"mappings": {
"properties": {
"text": {"type": "text"},
"vector": {
"type": "knn_vector",
"dimension": self.vector_dim
},
"metadata": {"type": "object", "properties": {"user_id": {"type": "keyword"}}},
}
},
}

if not self.client.indices.exists(index=self.collection_name):
self.client.indices.create(index=self.collection_name, body=index_settings)
logger.info(f"Created index {self.collection_name}")
else:
logger.info(f"Index {self.collection_name} already exists")

def create_col(self, name: str, vector_size: int) -> None:
"""Create a new collection (index in OpenSearch)."""
index_settings = {
"mappings": {
"properties": {
"vector": {
"type": "knn_vector",
"dimension": vector_size,
"method": { "engine": "lucene", "name": "hnsw", "space_type": "cosinesimil"},
},
"payload": {"type": "object"},
"id": {"type": "keyword"},
}
}
}

if not self.client.indices.exists(index=name):
self.client.indices.create(index=name, body=index_settings)
logger.info(f"Created index {name}")

def insert(
self, vectors: List[List[float]], payloads: Optional[List[Dict]] = None, ids: Optional[List[str]] = None
) -> List[OutputData]:
"""Insert vectors into the index."""
if not ids:
ids = [str(i) for i in range(len(vectors))]

if payloads is None:
payloads = [{} for _ in range(len(vectors))]

actions = []
for i, (vec, id_) in enumerate(zip(vectors, ids)):
action = {
"_index": self.collection_name,
"_id": id_,
"_source": {
"vector": vec,
"metadata": payloads[i], # Store metadata in the metadata field
},
}
actions.append(action)

bulk(self.client, actions)

results = []
for i, id_ in enumerate(ids):
results.append(OutputData(id=id_, score=1.0, payload=payloads[i]))
return results

def search(self, query: List[float], limit: int = 5, filters: Optional[Dict] = None) -> List[OutputData]:
"""Search for similar vectors using OpenSearch k-NN search with pre-filtering."""
search_query = {
"size": limit,
"query": {
"knn": {
"vector": {
"vector": query,
"k": limit,
}
}
}
}

if filters:
filter_conditions = [{"term": {f"metadata.{key}": value}} for key, value in filters.items()]
search_query["query"]["knn"]["vector"]["filter"] = { "bool": {"filter": filter_conditions} }

response = self.client.search(index=self.collection_name, body=search_query)

results = [
OutputData(id=hit["_id"], score=hit["_score"], payload=hit["_source"].get("metadata", {}))
for hit in response["hits"]["hits"]
]
return results

def delete(self, vector_id: str) -> None:
"""Delete a vector by ID."""
self.client.delete(index=self.collection_name, id=vector_id)

def update(self, vector_id: str, vector: Optional[List[float]] = None, payload: Optional[Dict] = None) -> None:
"""Update a vector and its payload."""
doc = {}
if vector is not None:
doc["vector"] = vector
if payload is not None:
doc["metadata"] = payload

self.client.update(index=self.collection_name, id=vector_id, body={"doc": doc})

def get(self, vector_id: str) -> Optional[OutputData]:
"""Retrieve a vector by ID."""
try:
response = self.client.get(index=self.collection_name, id=vector_id)
return OutputData(id=response["_id"], score=1.0, payload=response["_source"].get("metadata", {}))
except Exception as e:
logger.error(f"Error retrieving vector {vector_id}: {e}")
return None

def list_cols(self) -> List[str]:
"""List all collections (indices)."""
return list(self.client.indices.get_alias().keys())

def delete_col(self) -> None:
"""Delete a collection (index)."""
self.client.indices.delete(index=self.collection_name)

def col_info(self, name: str) -> Any:
"""Get information about a collection (index)."""
return self.client.indices.get(index=name)

def list(self, filters: Optional[Dict] = None, limit: Optional[int] = None) -> List[List[OutputData]]:
"""List all memories."""
query = {"query": {"match_all": {}}}

if filters:
query["query"] = {"bool": {"must": [{"term": {f"metadata.{key}": value}} for key, value in filters.items()]}}

if limit:
query["size"] = limit

response = self.client.search(index=self.collection_name, body=query)
return [[OutputData(id=hit["_id"], score=1.0, payload=hit["_source"].get("metadata", {})) for hit in response["hits"]["hits"]]]
Loading
Loading