Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix Astra DB collection indexing params #4255

Merged
merged 2 commits into from
Oct 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 28 additions & 27 deletions src/backend/base/langflow/components/vectorstores/astradb.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os

import orjson
from astrapy.admin import parse_api_endpoint
from loguru import logger

Expand Down Expand Up @@ -116,6 +117,7 @@ class AstraVectorStoreComponent(LCVectorStoreComponent):
display_name="Metric",
info="Optional distance metric for vector comparisons in the vector store.",
options=["cosine", "dot_product", "euclidean"],
value="cosine",
advanced=True,
),
IntInput(
Expand Down Expand Up @@ -145,8 +147,8 @@ class AstraVectorStoreComponent(LCVectorStoreComponent):
DropdownInput(
name="setup_mode",
display_name="Setup Mode",
info="Configuration mode for setting up the vector store, with options like 'Sync', 'Async', or 'Off'.",
options=["Sync", "Async", "Off"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, why is Async going away?

Copy link
Collaborator Author

@cbornet cbornet Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of the ASYNC mode is to not block when you instantiate the AstraDBVectorStore from an asyncio event loop. Here the component only uses it in a SYNC context.
Using ASYNC with sync operations, there is a high chance of erroring as if the read/write op happens while the collection creation hasn't finished, it raises an exception (we could probably do something to loop until the creation has finished but it's not the case atm). And in a flow, these ops generally happen in a short time.

info="Configuration mode for setting up the vector store, with options like 'Sync' or 'Off'.",
options=["Sync", "Off"],
advanced=True,
value="Sync",
),
Expand All @@ -160,18 +162,21 @@ class AstraVectorStoreComponent(LCVectorStoreComponent):
name="metadata_indexing_include",
display_name="Metadata Indexing Include",
info="Optional list of metadata fields to include in the indexing.",
is_list=True,
advanced=True,
),
StrInput(
name="metadata_indexing_exclude",
display_name="Metadata Indexing Exclude",
info="Optional list of metadata fields to exclude from the indexing.",
is_list=True,
advanced=True,
),
StrInput(
name="collection_indexing_policy",
display_name="Collection Indexing Policy",
info="Optional dictionary defining the indexing policy for the collection.",
info='Optional JSON string for the "indexing" field of the collection. '
"See https://docs.datastax.com/en/astra-db-serverless/api-reference/collections.html#the-indexing-option",
advanced=True,
),
IntInput(
Expand Down Expand Up @@ -401,31 +406,27 @@ def build_vector_store(self, vectorize_options=None):
),
}

vector_store_kwargs = {
**embedding_dict,
"collection_name": self.collection_name,
"token": self.token,
"api_endpoint": self.api_endpoint,
"namespace": self.namespace or None,
"environment": parse_api_endpoint(self.api_endpoint).environment,
"metric": self.metric or None,
"batch_size": self.batch_size or None,
"bulk_insert_batch_concurrency": self.bulk_insert_batch_concurrency or None,
"bulk_insert_overwrite_concurrency": self.bulk_insert_overwrite_concurrency or None,
"bulk_delete_concurrency": self.bulk_delete_concurrency or None,
"setup_mode": setup_mode_value,
"pre_delete_collection": self.pre_delete_collection or False,
}

if self.metadata_indexing_include:
vector_store_kwargs["metadata_indexing_include"] = self.metadata_indexing_include
elif self.metadata_indexing_exclude:
vector_store_kwargs["metadata_indexing_exclude"] = self.metadata_indexing_exclude
elif self.collection_indexing_policy:
vector_store_kwargs["collection_indexing_policy"] = self.collection_indexing_policy

try:
vector_store = AstraDBVectorStore(**vector_store_kwargs)
vector_store = AstraDBVectorStore(
collection_name=self.collection_name,
token=self.token,
api_endpoint=self.api_endpoint,
namespace=self.namespace or None,
environment=parse_api_endpoint(self.api_endpoint).environment,
metric=self.metric,
batch_size=self.batch_size or None,
bulk_insert_batch_concurrency=self.bulk_insert_batch_concurrency or None,
bulk_insert_overwrite_concurrency=self.bulk_insert_overwrite_concurrency or None,
bulk_delete_concurrency=self.bulk_delete_concurrency or None,
setup_mode=setup_mode_value,
pre_delete_collection=self.pre_delete_collection,
metadata_indexing_include=[s for s in self.metadata_indexing_include if s],
metadata_indexing_exclude=[s for s in self.metadata_indexing_exclude if s],
collection_indexing_policy=orjson.dumps(self.collection_indexing_policy)
if self.collection_indexing_policy
else None,
**embedding_dict,
)
except Exception as e:
msg = f"Error initializing AstraDBVectorStore: {e}"
raise ValueError(msg) from e
Expand Down
Loading
Loading