Skip to content

Commit

Permalink
core[patch]: improve index/aindex api when batch_size<n_docs (#25754)
Browse files Browse the repository at this point in the history
- **Description:** prevent index function to re-index entire source
document even if nothing has changed.
- **Issue:** #22135

I worked on a solution to this issue that is a compromise between being
cheap and being fast.
In the previous code, when batch_size is greater than the number of docs
from a certain source almost the entire source is deleted (all documents
from that source except for the documents in the first batch)
My solution deletes documents from vector store and record manager only
if at least one document has changed for that source.

Hope this can help!

---------

Co-authored-by: Eugene Yurtsev <[email protected]>
  • Loading branch information
federico-pisanu and eyurtsev authored Sep 30, 2024
1 parent 7fde279 commit 2538963
Show file tree
Hide file tree
Showing 2 changed files with 342 additions and 16 deletions.
26 changes: 14 additions & 12 deletions libs/core/langchain_core/indexing/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,16 +386,17 @@ def index(

# mypy isn't good enough to determine that source ids cannot be None
# here due to a check that's happening above, so we check again.
for source_id in source_ids:
if source_id is None:
raise AssertionError("Source ids cannot be None here.")
if any(source_id is None for source_id in source_ids):
raise AssertionError("Source ids cannot be if cleanup=='incremental'.")

_source_ids = cast(Sequence[str], source_ids)
indexed_source_ids = cast(
Sequence[str], [source_id_assigner(doc) for doc in docs_to_index]
)

uids_to_delete = record_manager.list_keys(
group_ids=_source_ids, before=index_start_dt
group_ids=indexed_source_ids, before=index_start_dt
)
if uids_to_delete:
if indexed_source_ids and uids_to_delete:
# Then delete from vector store.
destination.delete(uids_to_delete)
# First delete from record store.
Expand Down Expand Up @@ -626,16 +627,17 @@ async def aindex(

# mypy isn't good enough to determine that source ids cannot be None
# here due to a check that's happening above, so we check again.
for source_id in source_ids:
if source_id is None:
raise AssertionError("Source ids cannot be None here.")
if any(source_id is None for source_id in source_ids):
raise AssertionError("Source ids cannot be if cleanup=='incremental'.")

_source_ids = cast(Sequence[str], source_ids)
indexed_source_ids = cast(
Sequence[str], [source_id_assigner(doc) for doc in docs_to_index]
)

uids_to_delete = await record_manager.alist_keys(
group_ids=_source_ids, before=index_start_dt
group_ids=indexed_source_ids, before=index_start_dt
)
if uids_to_delete:
if indexed_source_ids and uids_to_delete:
# Then delete from vector store.
await destination.adelete(uids_to_delete)
# First delete from record store.
Expand Down
Loading

0 comments on commit 2538963

Please sign in to comment.