Skip to content

Commit

Permalink
core[patch]: throw exception indexing code if deletion fails in vecto…
Browse files Browse the repository at this point in the history
…rstore (#28103)

The delete methods in the VectorStore and DocumentIndex interfaces
return a status indicating the result. Therefore, we can assume that
their implementations don't throw exceptions but instead return a result
indicating whether the delete operations have failed. The current
implementation doesn't check the returned value, so I modified it to
throw an exception when the operation fails.

---------

Co-authored-by: Eugene Yurtsev <[email protected]>
  • Loading branch information
KeiichiHirobe and eyurtsev authored Dec 13, 2024
1 parent 258b3be commit 67fd554
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 5 deletions.
57 changes: 53 additions & 4 deletions libs/core/langchain_core/indexing/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from langchain_core.document_loaders.base import BaseLoader
from langchain_core.documents import Document
from langchain_core.exceptions import LangChainException
from langchain_core.indexing.base import DocumentIndex, RecordManager
from langchain_core.vectorstores import VectorStore

Expand Down Expand Up @@ -175,6 +176,32 @@ def _deduplicate_in_order(
yield hashed_doc


class IndexingException(LangChainException):
"""Raised when an indexing operation fails."""


def _delete(
vector_store: Union[VectorStore, DocumentIndex],
ids: list[str],
) -> None:
if isinstance(vector_store, VectorStore):
delete_ok = vector_store.delete(ids)
if delete_ok is not None and delete_ok is False:
msg = "The delete operation to VectorStore failed."
raise IndexingException(msg)
elif isinstance(vector_store, DocumentIndex):
delete_response = vector_store.delete(ids)
if "num_failed" in delete_response and delete_response["num_failed"] > 0:
msg = "The delete operation to DocumentIndex failed."
raise IndexingException(msg)
else:
msg = (
f"Vectorstore should be either a VectorStore or a DocumentIndex. "
f"Got {type(vector_store)}."
)
raise TypeError(msg)


# PUBLIC API


Expand Down Expand Up @@ -441,7 +468,7 @@ def index(
)
if uids_to_delete:
# Then delete from vector store.
destination.delete(uids_to_delete)
_delete(destination, uids_to_delete)
# First delete from record store.
record_manager.delete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
Expand All @@ -454,7 +481,7 @@ def index(
group_ids=delete_group_ids, before=index_start_dt, limit=cleanup_batch_size
):
# First delete from record store.
destination.delete(uids_to_delete)
_delete(destination, uids_to_delete)
# Then delete from record manager.
record_manager.delete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
Expand All @@ -474,6 +501,28 @@ async def _to_async_iterator(iterator: Iterable[T]) -> AsyncIterator[T]:
yield item


async def _adelete(
vector_store: Union[VectorStore, DocumentIndex],
ids: list[str],
) -> None:
if isinstance(vector_store, VectorStore):
delete_ok = await vector_store.adelete(ids)
if delete_ok is not None and delete_ok is False:
msg = "The delete operation to VectorStore failed."
raise IndexingException(msg)
elif isinstance(vector_store, DocumentIndex):
delete_response = await vector_store.adelete(ids)
if "num_failed" in delete_response and delete_response["num_failed"] > 0:
msg = "The delete operation to DocumentIndex failed."
raise IndexingException(msg)
else:
msg = (
f"Vectorstore should be either a VectorStore or a DocumentIndex. "
f"Got {type(vector_store)}."
)
raise TypeError(msg)


async def aindex(
docs_source: Union[BaseLoader, Iterable[Document], AsyncIterator[Document]],
record_manager: RecordManager,
Expand Down Expand Up @@ -733,7 +782,7 @@ async def aindex(
)
if uids_to_delete:
# Then delete from vector store.
await destination.adelete(uids_to_delete)
await _adelete(destination, uids_to_delete)
# First delete from record store.
await record_manager.adelete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
Expand All @@ -746,7 +795,7 @@ async def aindex(
group_ids=delete_group_ids, before=index_start_dt, limit=cleanup_batch_size
):
# First delete from record store.
await destination.adelete(uids_to_delete)
await _adelete(destination, uids_to_delete)
# Then delete from record manager.
await record_manager.adelete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
Expand Down
160 changes: 159 additions & 1 deletion libs/core/tests/unit_tests/indexing/test_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from langchain_core.documents import Document
from langchain_core.embeddings import DeterministicFakeEmbedding
from langchain_core.indexing import InMemoryRecordManager, aindex, index
from langchain_core.indexing.api import _abatch, _HashedDocument
from langchain_core.indexing.api import IndexingException, _abatch, _HashedDocument
from langchain_core.indexing.in_memory import InMemoryDocumentIndex
from langchain_core.vectorstores import InMemoryVectorStore, VectorStore

Expand Down Expand Up @@ -287,6 +287,164 @@ async def test_aindex_simple_delete_full(
}


def test_index_delete_full_recovery_after_deletion_failure(
record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
"""Indexing some content to confirm it gets added only once."""
loader = ToyLoader(
documents=[
Document(
page_content="This is a test document.",
),
Document(
page_content="This is another document.",
),
]
)

with patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp()
):
assert index(loader, record_manager, vector_store, cleanup="full") == {
"num_added": 2,
"num_deleted": 0,
"num_skipped": 0,
"num_updated": 0,
}

loader = ToyLoader(
documents=[
Document(
page_content="mutated document 1",
),
Document(
page_content="This is another document.", # <-- Same as original
),
]
)

with (
patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp()
),
patch.object(vector_store, "delete", return_value=False),
pytest.raises(IndexingException),
):
indexing_result = index(loader, record_manager, vector_store, cleanup="full")

# At this point, there should be 3 records in both the record manager
# and the vector store
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {
"This is a test document.",
"mutated document 1",
"This is another document.",
}

with patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp()
):
indexing_result = index(loader, record_manager, vector_store, cleanup="full")
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"mutated document 1", "This is another document."}

assert indexing_result == {
"num_added": 0,
"num_deleted": 1,
"num_skipped": 2,
"num_updated": 0,
}


async def test_aindex_delete_full_recovery_after_deletion_failure(
arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
"""Indexing some content to confirm it gets added only once."""
loader = ToyLoader(
documents=[
Document(
page_content="This is a test document.",
),
Document(
page_content="This is another document.",
),
]
)

with patch.object(
arecord_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp()
):
assert await aindex(loader, arecord_manager, vector_store, cleanup="full") == {
"num_added": 2,
"num_deleted": 0,
"num_skipped": 0,
"num_updated": 0,
}

loader = ToyLoader(
documents=[
Document(
page_content="mutated document 1",
),
Document(
page_content="This is another document.", # <-- Same as original
),
]
)

with (
patch.object(
arecord_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp()
),
patch.object(vector_store, "adelete", return_value=False),
pytest.raises(IndexingException),
):
indexing_result = await aindex(
loader, arecord_manager, vector_store, cleanup="full"
)

# At this point, there should be 3 records in both the record manager
# and the vector store
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {
"This is a test document.",
"mutated document 1",
"This is another document.",
}

with patch.object(
arecord_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp()
):
indexing_result = await aindex(
loader, arecord_manager, vector_store, cleanup="full"
)
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"mutated document 1", "This is another document."}

assert indexing_result == {
"num_added": 0,
"num_deleted": 1,
"num_skipped": 2,
"num_updated": 0,
}


def test_incremental_fails_with_bad_source_ids(
record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
Expand Down

0 comments on commit 67fd554

Please sign in to comment.