From ba9b95cd23c96ac071090efd77be4d84a9b73223 Mon Sep 17 00:00:00 2001 From: manukychen <81044487+manukychen@users.noreply.github.com> Date: Thu, 12 Dec 2024 09:45:22 +0800 Subject: [PATCH] Community: Adding bulk_size as a setable param for OpenSearchVectorSearch (#28325) Description: When using langchain.retrievers.parent_document_retriever.py with vectorstore is OpenSearchVectorSearch, I found that the bulk_size param I passed into OpenSearchVectorSearch class did not work on my ParentDocumentRetriever.add_documents() function correctly, it will be overwrite with int 500 the function which OpenSearchVectorSearch class had (e.g., add_texts(), add_embeddings()...). So I made this PR requset to fix this, thanks! --------- Co-authored-by: Erick Friis --- .../vectorstores/opensearch_vector_search.py | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/libs/community/langchain_community/vectorstores/opensearch_vector_search.py b/libs/community/langchain_community/vectorstores/opensearch_vector_search.py index f08620eef6383..734153d989a9a 100644 --- a/libs/community/langchain_community/vectorstores/opensearch_vector_search.py +++ b/libs/community/langchain_community/vectorstores/opensearch_vector_search.py @@ -402,6 +402,7 @@ def __init__( self.client = _get_opensearch_client(opensearch_url, **kwargs) self.async_client = _get_async_opensearch_client(opensearch_url, **kwargs) self.engine = kwargs.get("engine", "nmslib") + self.bulk_size = kwargs.get("bulk_size", 500) @property def embeddings(self) -> Embeddings: @@ -413,9 +414,10 @@ def __add( embeddings: List[List[float]], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, - bulk_size: int = 500, + bulk_size: Optional[int] = None, **kwargs: Any, ) -> List[str]: + bulk_size = bulk_size if bulk_size is not None else self.bulk_size _validate_embeddings_and_bulk_size(len(embeddings), bulk_size) index_name = kwargs.get("index_name", self.index_name) text_field = kwargs.get("text_field", "text") @@ -454,9 +456,10 @@ async def __aadd( embeddings: List[List[float]], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, - bulk_size: int = 500, + bulk_size: Optional[int] = None, **kwargs: Any, ) -> List[str]: + bulk_size = bulk_size if bulk_size is not None else self.bulk_size _validate_embeddings_and_bulk_size(len(embeddings), bulk_size) index_name = kwargs.get("index_name", self.index_name) text_field = kwargs.get("text_field", "text") @@ -560,7 +563,7 @@ def add_texts( texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, - bulk_size: int = 500, + bulk_size: Optional[int] = None, **kwargs: Any, ) -> List[str]: """Run more texts through the embeddings and add to the vectorstore. @@ -582,6 +585,7 @@ def add_texts( to "text". """ embeddings = self.embedding_function.embed_documents(list(texts)) + bulk_size = bulk_size if bulk_size is not None else self.bulk_size return self.__add( texts, embeddings, @@ -596,7 +600,7 @@ async def aadd_texts( texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, - bulk_size: int = 500, + bulk_size: Optional[int] = None, **kwargs: Any, ) -> List[str]: """ @@ -604,6 +608,7 @@ async def aadd_texts( and add to the vectorstore. """ embeddings = await self.embedding_function.aembed_documents(list(texts)) + bulk_size = bulk_size if bulk_size is not None else self.bulk_size return await self.__aadd( texts, embeddings, @@ -618,7 +623,7 @@ def add_embeddings( text_embeddings: Iterable[Tuple[str, List[float]]], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, - bulk_size: int = 500, + bulk_size: Optional[int] = None, **kwargs: Any, ) -> List[str]: """Add the given texts and embeddings to the vectorstore. @@ -641,6 +646,7 @@ def add_embeddings( to "text". """ texts, embeddings = zip(*text_embeddings) + bulk_size = bulk_size if bulk_size is not None else self.bulk_size return self.__add( list(texts), list(embeddings), @@ -1085,7 +1091,7 @@ def from_texts( texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, - bulk_size: int = 500, + bulk_size: Optional[int] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> OpenSearchVectorSearch: @@ -1134,6 +1140,7 @@ def from_texts( """ embeddings = embedding.embed_documents(texts) + bulk_size = bulk_size if bulk_size is not None else cls.bulk_size return cls.from_embeddings( embeddings, texts, @@ -1150,7 +1157,7 @@ async def afrom_texts( texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, - bulk_size: int = 500, + bulk_size: Optional[int] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> OpenSearchVectorSearch: @@ -1199,6 +1206,7 @@ async def afrom_texts( """ embeddings = await embedding.aembed_documents(texts) + bulk_size = bulk_size if bulk_size is not None else cls.bulk_size return await cls.afrom_embeddings( embeddings, texts, @@ -1216,7 +1224,7 @@ def from_embeddings( texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, - bulk_size: int = 500, + bulk_size: Optional[int] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> OpenSearchVectorSearch: @@ -1285,6 +1293,7 @@ def from_embeddings( "max_chunk_bytes", "is_aoss", ] + bulk_size = bulk_size if bulk_size is not None else cls.bulk_size _validate_embeddings_and_bulk_size(len(embeddings), bulk_size) dim = len(embeddings[0]) # Get the index name from either from kwargs or ENV Variable @@ -1346,7 +1355,7 @@ async def afrom_embeddings( texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, - bulk_size: int = 500, + bulk_size: Optional[int] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> OpenSearchVectorSearch: @@ -1417,6 +1426,7 @@ async def afrom_embeddings( "max_chunk_bytes", "is_aoss", ] + bulk_size = bulk_size if bulk_size is not None else cls.bulk_size _validate_embeddings_and_bulk_size(len(embeddings), bulk_size) dim = len(embeddings[0]) # Get the index name from either from kwargs or ENV Variable