From 51843d4e22c68d798b8f4a1c8bc89fc60e76fe53 Mon Sep 17 00:00:00 2001 From: manu2 Date: Sun, 24 Nov 2024 14:46:51 +0800 Subject: [PATCH 1/4] let OpenSearchVectorSearch class function can accept bulk_size param correctly --- .../vectorstores/opensearch_vector_search.py | 28 +++++++------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/libs/community/langchain_community/vectorstores/opensearch_vector_search.py b/libs/community/langchain_community/vectorstores/opensearch_vector_search.py index f08620eef6383..60c26c2ef050a 100644 --- a/libs/community/langchain_community/vectorstores/opensearch_vector_search.py +++ b/libs/community/langchain_community/vectorstores/opensearch_vector_search.py @@ -402,6 +402,7 @@ def __init__( self.client = _get_opensearch_client(opensearch_url, **kwargs) self.async_client = _get_async_opensearch_client(opensearch_url, **kwargs) self.engine = kwargs.get("engine", "nmslib") + self.bulk_size = kwargs.get("bulk_size", 500) @property def embeddings(self) -> Embeddings: @@ -413,10 +414,9 @@ def __add( embeddings: List[List[float]], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, - bulk_size: int = 500, **kwargs: Any, ) -> List[str]: - _validate_embeddings_and_bulk_size(len(embeddings), bulk_size) + _validate_embeddings_and_bulk_size(len(embeddings), self.bulk_size) index_name = kwargs.get("index_name", self.index_name) text_field = kwargs.get("text_field", "text") dim = len(embeddings[0]) @@ -454,10 +454,9 @@ async def __aadd( embeddings: List[List[float]], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, - bulk_size: int = 500, **kwargs: Any, ) -> List[str]: - _validate_embeddings_and_bulk_size(len(embeddings), bulk_size) + _validate_embeddings_and_bulk_size(len(embeddings), self.bulk_size) index_name = kwargs.get("index_name", self.index_name) text_field = kwargs.get("text_field", "text") dim = len(embeddings[0]) @@ -560,7 +559,6 @@ def add_texts( texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, - bulk_size: int = 500, **kwargs: Any, ) -> List[str]: """Run more texts through the embeddings and add to the vectorstore. @@ -587,7 +585,7 @@ def add_texts( embeddings, metadatas=metadatas, ids=ids, - bulk_size=bulk_size, + bulk_size=self.bulk_size, **kwargs, ) @@ -596,7 +594,6 @@ async def aadd_texts( texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, - bulk_size: int = 500, **kwargs: Any, ) -> List[str]: """ @@ -609,7 +606,7 @@ async def aadd_texts( embeddings, metadatas=metadatas, ids=ids, - bulk_size=bulk_size, + bulk_size=self.bulk_size, **kwargs, ) @@ -618,7 +615,6 @@ def add_embeddings( text_embeddings: Iterable[Tuple[str, List[float]]], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, - bulk_size: int = 500, **kwargs: Any, ) -> List[str]: """Add the given texts and embeddings to the vectorstore. @@ -646,7 +642,7 @@ def add_embeddings( list(embeddings), metadatas=metadatas, ids=ids, - bulk_size=bulk_size, + bulk_size=self.bulk_size, **kwargs, ) @@ -1085,7 +1081,6 @@ def from_texts( texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, - bulk_size: int = 500, ids: Optional[List[str]] = None, **kwargs: Any, ) -> OpenSearchVectorSearch: @@ -1139,7 +1134,7 @@ def from_texts( texts, embedding, metadatas=metadatas, - bulk_size=bulk_size, + bulk_size=self.bulk_size, ids=ids, **kwargs, ) @@ -1150,7 +1145,6 @@ async def afrom_texts( texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, - bulk_size: int = 500, ids: Optional[List[str]] = None, **kwargs: Any, ) -> OpenSearchVectorSearch: @@ -1204,7 +1198,7 @@ async def afrom_texts( texts, embedding, metadatas=metadatas, - bulk_size=bulk_size, + bulk_size=self.bulk_size, ids=ids, **kwargs, ) @@ -1216,7 +1210,6 @@ def from_embeddings( texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, - bulk_size: int = 500, ids: Optional[List[str]] = None, **kwargs: Any, ) -> OpenSearchVectorSearch: @@ -1285,7 +1278,7 @@ def from_embeddings( "max_chunk_bytes", "is_aoss", ] - _validate_embeddings_and_bulk_size(len(embeddings), bulk_size) + _validate_embeddings_and_bulk_size(len(embeddings), self.bulk_size) dim = len(embeddings[0]) # Get the index name from either from kwargs or ENV Variable # before falling back to random generation @@ -1346,7 +1339,6 @@ async def afrom_embeddings( texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, - bulk_size: int = 500, ids: Optional[List[str]] = None, **kwargs: Any, ) -> OpenSearchVectorSearch: @@ -1417,7 +1409,7 @@ async def afrom_embeddings( "max_chunk_bytes", "is_aoss", ] - _validate_embeddings_and_bulk_size(len(embeddings), bulk_size) + _validate_embeddings_and_bulk_size(len(embeddings), self.bulk_size) dim = len(embeddings[0]) # Get the index name from either from kwargs or ENV Variable # before falling back to random generation From 9fbccecf021b5c2ddbf403c013529478ea0109e0 Mon Sep 17 00:00:00 2001 From: manu2 Date: Sun, 24 Nov 2024 15:39:10 +0800 Subject: [PATCH 2/4] fix classmethod self.bulk_size bug --- .../vectorstores/opensearch_vector_search.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/libs/community/langchain_community/vectorstores/opensearch_vector_search.py b/libs/community/langchain_community/vectorstores/opensearch_vector_search.py index 60c26c2ef050a..22208696db3c6 100644 --- a/libs/community/langchain_community/vectorstores/opensearch_vector_search.py +++ b/libs/community/langchain_community/vectorstores/opensearch_vector_search.py @@ -1134,7 +1134,7 @@ def from_texts( texts, embedding, metadatas=metadatas, - bulk_size=self.bulk_size, + bulk_size=cls.bulk_size, ids=ids, **kwargs, ) @@ -1198,7 +1198,7 @@ async def afrom_texts( texts, embedding, metadatas=metadatas, - bulk_size=self.bulk_size, + bulk_size=cls.bulk_size, ids=ids, **kwargs, ) @@ -1278,7 +1278,7 @@ def from_embeddings( "max_chunk_bytes", "is_aoss", ] - _validate_embeddings_and_bulk_size(len(embeddings), self.bulk_size) + _validate_embeddings_and_bulk_size(len(embeddings), cls.bulk_size) dim = len(embeddings[0]) # Get the index name from either from kwargs or ENV Variable # before falling back to random generation @@ -1409,7 +1409,7 @@ async def afrom_embeddings( "max_chunk_bytes", "is_aoss", ] - _validate_embeddings_and_bulk_size(len(embeddings), self.bulk_size) + _validate_embeddings_and_bulk_size(len(embeddings), cls.bulk_size) dim = len(embeddings[0]) # Get the index name from either from kwargs or ENV Variable # before falling back to random generation From 11383f47c783bd9312d2447fd92e1d824edd5c14 Mon Sep 17 00:00:00 2001 From: manu2 Date: Tue, 10 Dec 2024 22:52:54 +0800 Subject: [PATCH 3/4] let usage of bulk size can be passed-in as an override --- .../vectorstores/opensearch_vector_search.py | 36 ++++++++++++++----- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/libs/community/langchain_community/vectorstores/opensearch_vector_search.py b/libs/community/langchain_community/vectorstores/opensearch_vector_search.py index 22208696db3c6..19cd64cfcab34 100644 --- a/libs/community/langchain_community/vectorstores/opensearch_vector_search.py +++ b/libs/community/langchain_community/vectorstores/opensearch_vector_search.py @@ -414,9 +414,11 @@ def __add( embeddings: List[List[float]], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, + bulk_size: Optional[int] = None, **kwargs: Any, ) -> List[str]: - _validate_embeddings_and_bulk_size(len(embeddings), self.bulk_size) + bulk_size = bulk_size if bulk_size is not None else self.bulk_size + _validate_embeddings_and_bulk_size(len(embeddings), bulk_size) index_name = kwargs.get("index_name", self.index_name) text_field = kwargs.get("text_field", "text") dim = len(embeddings[0]) @@ -454,9 +456,11 @@ async def __aadd( embeddings: List[List[float]], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, + bulk_size: Optional[int] = None, **kwargs: Any, ) -> List[str]: - _validate_embeddings_and_bulk_size(len(embeddings), self.bulk_size) + bulk_size = bulk_size if bulk_size is not None else self.bulk_size + _validate_embeddings_and_bulk_size(len(embeddings), bulk_size) index_name = kwargs.get("index_name", self.index_name) text_field = kwargs.get("text_field", "text") dim = len(embeddings[0]) @@ -558,6 +562,7 @@ def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, + bulk_size: Optional[int] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: @@ -580,12 +585,13 @@ def add_texts( to "text". """ embeddings = self.embedding_function.embed_documents(list(texts)) + bulk_size = bulk_size if bulk_size is not None else self.bulk_size return self.__add( texts, embeddings, metadatas=metadatas, ids=ids, - bulk_size=self.bulk_size, + bulk_size=bulk_size, **kwargs, ) @@ -594,6 +600,7 @@ async def aadd_texts( texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, + bulk_size: Optional[int] = None, **kwargs: Any, ) -> List[str]: """ @@ -601,12 +608,13 @@ async def aadd_texts( and add to the vectorstore. """ embeddings = await self.embedding_function.aembed_documents(list(texts)) + bulk_size = bulk_size if bulk_size is not None else self.bulk_size return await self.__aadd( texts, embeddings, metadatas=metadatas, ids=ids, - bulk_size=self.bulk_size, + bulk_size=bulk_size, **kwargs, ) @@ -615,6 +623,7 @@ def add_embeddings( text_embeddings: Iterable[Tuple[str, List[float]]], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, + bulk_size: Optional[int] = None, **kwargs: Any, ) -> List[str]: """Add the given texts and embeddings to the vectorstore. @@ -637,12 +646,13 @@ def add_embeddings( to "text". """ texts, embeddings = zip(*text_embeddings) + bulk_size = bulk_size if bulk_size is not None else self.bulk_size return self.__add( list(texts), list(embeddings), metadatas=metadatas, ids=ids, - bulk_size=self.bulk_size, + bulk_size=bulk_size, **kwargs, ) @@ -1082,6 +1092,7 @@ def from_texts( embedding: Embeddings, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, + bulk_size: Optional[int] = None, **kwargs: Any, ) -> OpenSearchVectorSearch: """Construct OpenSearchVectorSearch wrapper from raw texts. @@ -1129,12 +1140,13 @@ def from_texts( """ embeddings = embedding.embed_documents(texts) + bulk_size = bulk_size if bulk_size is not None else cls.bulk_size return cls.from_embeddings( embeddings, texts, embedding, metadatas=metadatas, - bulk_size=cls.bulk_size, + bulk_size=bulk_size, ids=ids, **kwargs, ) @@ -1146,6 +1158,7 @@ async def afrom_texts( embedding: Embeddings, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, + bulk_size: Optional[int] = None, **kwargs: Any, ) -> OpenSearchVectorSearch: """Asynchronously construct OpenSearchVectorSearch wrapper from raw texts. @@ -1193,12 +1206,13 @@ async def afrom_texts( """ embeddings = await embedding.aembed_documents(texts) + bulk_size = bulk_size if bulk_size is not None else cls.bulk_size return await cls.afrom_embeddings( embeddings, texts, embedding, metadatas=metadatas, - bulk_size=cls.bulk_size, + bulk_size=bulk_size, ids=ids, **kwargs, ) @@ -1210,6 +1224,7 @@ def from_embeddings( texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, + bulk_size: Optional[int] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> OpenSearchVectorSearch: @@ -1278,7 +1293,8 @@ def from_embeddings( "max_chunk_bytes", "is_aoss", ] - _validate_embeddings_and_bulk_size(len(embeddings), cls.bulk_size) + bulk_size = bulk_size if bulk_size is not None else cls.bulk_size + _validate_embeddings_and_bulk_size(len(embeddings), bulk_size) dim = len(embeddings[0]) # Get the index name from either from kwargs or ENV Variable # before falling back to random generation @@ -1339,6 +1355,7 @@ async def afrom_embeddings( texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, + bulk_size: Optional[int] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> OpenSearchVectorSearch: @@ -1409,7 +1426,8 @@ async def afrom_embeddings( "max_chunk_bytes", "is_aoss", ] - _validate_embeddings_and_bulk_size(len(embeddings), cls.bulk_size) + bulk_size = bulk_size if bulk_size is not None else cls.bulk_size + _validate_embeddings_and_bulk_size(len(embeddings), bulk_size) dim = len(embeddings[0]) # Get the index name from either from kwargs or ENV Variable # before falling back to random generation From 0ee5af5e9ddd09b6d81472108ed852432adb3e39 Mon Sep 17 00:00:00 2001 From: Erick Friis Date: Wed, 11 Dec 2024 17:43:02 -0800 Subject: [PATCH 4/4] x --- .../vectorstores/opensearch_vector_search.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/libs/community/langchain_community/vectorstores/opensearch_vector_search.py b/libs/community/langchain_community/vectorstores/opensearch_vector_search.py index 19cd64cfcab34..734153d989a9a 100644 --- a/libs/community/langchain_community/vectorstores/opensearch_vector_search.py +++ b/libs/community/langchain_community/vectorstores/opensearch_vector_search.py @@ -562,8 +562,8 @@ def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, - bulk_size: Optional[int] = None, ids: Optional[List[str]] = None, + bulk_size: Optional[int] = None, **kwargs: Any, ) -> List[str]: """Run more texts through the embeddings and add to the vectorstore. @@ -1091,8 +1091,8 @@ def from_texts( texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, - ids: Optional[List[str]] = None, bulk_size: Optional[int] = None, + ids: Optional[List[str]] = None, **kwargs: Any, ) -> OpenSearchVectorSearch: """Construct OpenSearchVectorSearch wrapper from raw texts. @@ -1157,8 +1157,8 @@ async def afrom_texts( texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, - ids: Optional[List[str]] = None, bulk_size: Optional[int] = None, + ids: Optional[List[str]] = None, **kwargs: Any, ) -> OpenSearchVectorSearch: """Asynchronously construct OpenSearchVectorSearch wrapper from raw texts.