Skip to content

Commit

Permalink
Community: Adding bulk_size as a setable param for OpenSearchVectorSe…
Browse files Browse the repository at this point in the history
…arch (#28325)

Description:
When using langchain.retrievers.parent_document_retriever.py with
vectorstore is OpenSearchVectorSearch, I found that the bulk_size param
I passed into OpenSearchVectorSearch class did not work on my
ParentDocumentRetriever.add_documents() function correctly, it will be
overwrite with int 500 the function which OpenSearchVectorSearch class
had (e.g., add_texts(), add_embeddings()...).

So I made this PR requset to fix this, thanks!

---------

Co-authored-by: Erick Friis <[email protected]>
  • Loading branch information
manukychen and efriis authored Dec 12, 2024
1 parent 0af5ad8 commit ba9b95c
Showing 1 changed file with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ def __init__(
self.client = _get_opensearch_client(opensearch_url, **kwargs)
self.async_client = _get_async_opensearch_client(opensearch_url, **kwargs)
self.engine = kwargs.get("engine", "nmslib")
self.bulk_size = kwargs.get("bulk_size", 500)

@property
def embeddings(self) -> Embeddings:
Expand All @@ -413,9 +414,10 @@ def __add(
embeddings: List[List[float]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
bulk_size: int = 500,
bulk_size: Optional[int] = None,
**kwargs: Any,
) -> List[str]:
bulk_size = bulk_size if bulk_size is not None else self.bulk_size
_validate_embeddings_and_bulk_size(len(embeddings), bulk_size)
index_name = kwargs.get("index_name", self.index_name)
text_field = kwargs.get("text_field", "text")
Expand Down Expand Up @@ -454,9 +456,10 @@ async def __aadd(
embeddings: List[List[float]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
bulk_size: int = 500,
bulk_size: Optional[int] = None,
**kwargs: Any,
) -> List[str]:
bulk_size = bulk_size if bulk_size is not None else self.bulk_size
_validate_embeddings_and_bulk_size(len(embeddings), bulk_size)
index_name = kwargs.get("index_name", self.index_name)
text_field = kwargs.get("text_field", "text")
Expand Down Expand Up @@ -560,7 +563,7 @@ def add_texts(
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
bulk_size: int = 500,
bulk_size: Optional[int] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
Expand All @@ -582,6 +585,7 @@ def add_texts(
to "text".
"""
embeddings = self.embedding_function.embed_documents(list(texts))
bulk_size = bulk_size if bulk_size is not None else self.bulk_size
return self.__add(
texts,
embeddings,
Expand All @@ -596,14 +600,15 @@ async def aadd_texts(
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
bulk_size: int = 500,
bulk_size: Optional[int] = None,
**kwargs: Any,
) -> List[str]:
"""
Asynchronously run more texts through the embeddings
and add to the vectorstore.
"""
embeddings = await self.embedding_function.aembed_documents(list(texts))
bulk_size = bulk_size if bulk_size is not None else self.bulk_size
return await self.__aadd(
texts,
embeddings,
Expand All @@ -618,7 +623,7 @@ def add_embeddings(
text_embeddings: Iterable[Tuple[str, List[float]]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
bulk_size: int = 500,
bulk_size: Optional[int] = None,
**kwargs: Any,
) -> List[str]:
"""Add the given texts and embeddings to the vectorstore.
Expand All @@ -641,6 +646,7 @@ def add_embeddings(
to "text".
"""
texts, embeddings = zip(*text_embeddings)
bulk_size = bulk_size if bulk_size is not None else self.bulk_size
return self.__add(
list(texts),
list(embeddings),
Expand Down Expand Up @@ -1085,7 +1091,7 @@ def from_texts(
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
bulk_size: int = 500,
bulk_size: Optional[int] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> OpenSearchVectorSearch:
Expand Down Expand Up @@ -1134,6 +1140,7 @@ def from_texts(
"""
embeddings = embedding.embed_documents(texts)
bulk_size = bulk_size if bulk_size is not None else cls.bulk_size
return cls.from_embeddings(
embeddings,
texts,
Expand All @@ -1150,7 +1157,7 @@ async def afrom_texts(
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
bulk_size: int = 500,
bulk_size: Optional[int] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> OpenSearchVectorSearch:
Expand Down Expand Up @@ -1199,6 +1206,7 @@ async def afrom_texts(
"""
embeddings = await embedding.aembed_documents(texts)
bulk_size = bulk_size if bulk_size is not None else cls.bulk_size
return await cls.afrom_embeddings(
embeddings,
texts,
Expand All @@ -1216,7 +1224,7 @@ def from_embeddings(
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
bulk_size: int = 500,
bulk_size: Optional[int] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> OpenSearchVectorSearch:
Expand Down Expand Up @@ -1285,6 +1293,7 @@ def from_embeddings(
"max_chunk_bytes",
"is_aoss",
]
bulk_size = bulk_size if bulk_size is not None else cls.bulk_size
_validate_embeddings_and_bulk_size(len(embeddings), bulk_size)
dim = len(embeddings[0])
# Get the index name from either from kwargs or ENV Variable
Expand Down Expand Up @@ -1346,7 +1355,7 @@ async def afrom_embeddings(
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
bulk_size: int = 500,
bulk_size: Optional[int] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> OpenSearchVectorSearch:
Expand Down Expand Up @@ -1417,6 +1426,7 @@ async def afrom_embeddings(
"max_chunk_bytes",
"is_aoss",
]
bulk_size = bulk_size if bulk_size is not None else cls.bulk_size
_validate_embeddings_and_bulk_size(len(embeddings), bulk_size)
dim = len(embeddings[0])
# Get the index name from either from kwargs or ENV Variable
Expand Down

0 comments on commit ba9b95c

Please sign in to comment.