Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core[patch]: throw exception indexing code if deletion fails in vectorstore #28103

Merged
merged 5 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 53 additions & 4 deletions libs/core/langchain_core/indexing/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from langchain_core.document_loaders.base import BaseLoader
from langchain_core.documents import Document
from langchain_core.exceptions import LangChainException
from langchain_core.indexing.base import DocumentIndex, RecordManager
from langchain_core.vectorstores import VectorStore

Expand Down Expand Up @@ -175,6 +176,32 @@ def _deduplicate_in_order(
yield hashed_doc


class IndexingException(LangChainException):
"""Raised when an indexing operation fails."""


def _delete(
vector_store: Union[VectorStore, DocumentIndex],
ids: list[str],
) -> None:
if isinstance(vector_store, VectorStore):
delete_ok = vector_store.delete(ids)
if delete_ok is not None and delete_ok is False:
msg = "The delete operation to VectorStore failed."
raise IndexingException(msg)
elif isinstance(vector_store, DocumentIndex):
delete_response = vector_store.delete(ids)
if "num_failed" in delete_response and delete_response["num_failed"] > 0:
msg = "The delete operation to DocumentIndex failed."
raise IndexingException(msg)
else:
msg = (
f"Vectorstore should be either a VectorStore or a DocumentIndex. "
f"Got {type(vector_store)}."
)
raise TypeError(msg)


# PUBLIC API


Expand Down Expand Up @@ -441,7 +468,7 @@ def index(
)
if uids_to_delete:
# Then delete from vector store.
destination.delete(uids_to_delete)
_delete(destination, uids_to_delete)
# First delete from record store.
record_manager.delete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
Expand All @@ -454,7 +481,7 @@ def index(
group_ids=delete_group_ids, before=index_start_dt, limit=cleanup_batch_size
):
# First delete from record store.
destination.delete(uids_to_delete)
_delete(destination, uids_to_delete)
# Then delete from record manager.
record_manager.delete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
Expand All @@ -474,6 +501,28 @@ async def _to_async_iterator(iterator: Iterable[T]) -> AsyncIterator[T]:
yield item


async def _adelete(
vector_store: Union[VectorStore, DocumentIndex],
ids: list[str],
) -> None:
if isinstance(vector_store, VectorStore):
delete_ok = await vector_store.adelete(ids)
if delete_ok is not None and delete_ok is False:
msg = "The delete operation to VectorStore failed."
raise IndexingException(msg)
elif isinstance(vector_store, DocumentIndex):
delete_response = await vector_store.adelete(ids)
if "num_failed" in delete_response and delete_response["num_failed"] > 0:
msg = "The delete operation to DocumentIndex failed."
raise IndexingException(msg)
else:
msg = (
f"Vectorstore should be either a VectorStore or a DocumentIndex. "
f"Got {type(vector_store)}."
)
raise TypeError(msg)


async def aindex(
docs_source: Union[BaseLoader, Iterable[Document], AsyncIterator[Document]],
record_manager: RecordManager,
Expand Down Expand Up @@ -733,7 +782,7 @@ async def aindex(
)
if uids_to_delete:
# Then delete from vector store.
await destination.adelete(uids_to_delete)
await _adelete(destination, uids_to_delete)
# First delete from record store.
await record_manager.adelete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
Expand All @@ -746,7 +795,7 @@ async def aindex(
group_ids=delete_group_ids, before=index_start_dt, limit=cleanup_batch_size
):
# First delete from record store.
await destination.adelete(uids_to_delete)
await _adelete(destination, uids_to_delete)
# Then delete from record manager.
await record_manager.adelete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
Expand Down
160 changes: 159 additions & 1 deletion libs/core/tests/unit_tests/indexing/test_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from langchain_core.documents import Document
from langchain_core.embeddings import DeterministicFakeEmbedding
from langchain_core.indexing import InMemoryRecordManager, aindex, index
from langchain_core.indexing.api import _abatch, _HashedDocument
from langchain_core.indexing.api import IndexingException, _abatch, _HashedDocument
from langchain_core.indexing.in_memory import InMemoryDocumentIndex
from langchain_core.vectorstores import InMemoryVectorStore, VectorStore

Expand Down Expand Up @@ -287,6 +287,164 @@ async def test_aindex_simple_delete_full(
}


def test_index_delete_full_recovery_after_deletion_failure(
record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
"""Indexing some content to confirm it gets added only once."""
loader = ToyLoader(
documents=[
Document(
page_content="This is a test document.",
),
Document(
page_content="This is another document.",
),
]
)

with patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp()
):
assert index(loader, record_manager, vector_store, cleanup="full") == {
"num_added": 2,
"num_deleted": 0,
"num_skipped": 0,
"num_updated": 0,
}

loader = ToyLoader(
documents=[
Document(
page_content="mutated document 1",
),
Document(
page_content="This is another document.", # <-- Same as original
),
]
)

with (
patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp()
),
patch.object(vector_store, "delete", return_value=False),
pytest.raises(IndexingException),
):
indexing_result = index(loader, record_manager, vector_store, cleanup="full")

# At this point, there should be 3 records in both the record manager
# and the vector store
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {
"This is a test document.",
"mutated document 1",
"This is another document.",
}

with patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp()
):
indexing_result = index(loader, record_manager, vector_store, cleanup="full")
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"mutated document 1", "This is another document."}

assert indexing_result == {
"num_added": 0,
"num_deleted": 1,
"num_skipped": 2,
"num_updated": 0,
}


async def test_aindex_delete_full_recovery_after_deletion_failure(
arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
"""Indexing some content to confirm it gets added only once."""
loader = ToyLoader(
documents=[
Document(
page_content="This is a test document.",
),
Document(
page_content="This is another document.",
),
]
)

with patch.object(
arecord_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp()
):
assert await aindex(loader, arecord_manager, vector_store, cleanup="full") == {
"num_added": 2,
"num_deleted": 0,
"num_skipped": 0,
"num_updated": 0,
}

loader = ToyLoader(
documents=[
Document(
page_content="mutated document 1",
),
Document(
page_content="This is another document.", # <-- Same as original
),
]
)

with (
patch.object(
arecord_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp()
),
patch.object(vector_store, "adelete", return_value=False),
pytest.raises(IndexingException),
):
indexing_result = await aindex(
loader, arecord_manager, vector_store, cleanup="full"
)

# At this point, there should be 3 records in both the record manager
# and the vector store
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {
"This is a test document.",
"mutated document 1",
"This is another document.",
}

with patch.object(
arecord_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp()
):
indexing_result = await aindex(
loader, arecord_manager, vector_store, cleanup="full"
)
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"mutated document 1", "This is another document."}

assert indexing_result == {
"num_added": 0,
"num_deleted": 1,
"num_skipped": 2,
"num_updated": 0,
}


def test_incremental_fails_with_bad_source_ids(
record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
Expand Down
Loading