diff --git a/libs/core/langchain_core/indexing/api.py b/libs/core/langchain_core/indexing/api.py index 87baed948c78d..814356b17c3d7 100644 --- a/libs/core/langchain_core/indexing/api.py +++ b/libs/core/langchain_core/indexing/api.py @@ -386,16 +386,17 @@ def index( # mypy isn't good enough to determine that source ids cannot be None # here due to a check that's happening above, so we check again. - for source_id in source_ids: - if source_id is None: - raise AssertionError("Source ids cannot be None here.") + if any(source_id is None for source_id in source_ids): + raise AssertionError("Source ids cannot be if cleanup=='incremental'.") - _source_ids = cast(Sequence[str], source_ids) + indexed_source_ids = cast( + Sequence[str], [source_id_assigner(doc) for doc in docs_to_index] + ) uids_to_delete = record_manager.list_keys( - group_ids=_source_ids, before=index_start_dt + group_ids=indexed_source_ids, before=index_start_dt ) - if uids_to_delete: + if indexed_source_ids and uids_to_delete: # Then delete from vector store. destination.delete(uids_to_delete) # First delete from record store. @@ -626,16 +627,17 @@ async def aindex( # mypy isn't good enough to determine that source ids cannot be None # here due to a check that's happening above, so we check again. - for source_id in source_ids: - if source_id is None: - raise AssertionError("Source ids cannot be None here.") + if any(source_id is None for source_id in source_ids): + raise AssertionError("Source ids cannot be if cleanup=='incremental'.") - _source_ids = cast(Sequence[str], source_ids) + indexed_source_ids = cast( + Sequence[str], [source_id_assigner(doc) for doc in docs_to_index] + ) uids_to_delete = await record_manager.alist_keys( - group_ids=_source_ids, before=index_start_dt + group_ids=indexed_source_ids, before=index_start_dt ) - if uids_to_delete: + if indexed_source_ids and uids_to_delete: # Then delete from vector store. await destination.adelete(uids_to_delete) # First delete from record store. diff --git a/libs/core/tests/unit_tests/indexing/test_indexing.py b/libs/core/tests/unit_tests/indexing/test_indexing.py index 0909176140602..96d3584dad88c 100644 --- a/libs/core/tests/unit_tests/indexing/test_indexing.py +++ b/libs/core/tests/unit_tests/indexing/test_indexing.py @@ -655,7 +655,7 @@ def test_incremental_indexing_with_batch_size( ) with patch.object( - record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + record_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp() ): assert index( loader, @@ -671,6 +671,16 @@ def test_incremental_indexing_with_batch_size( "num_updated": 0, } + doc_texts = { + # Ignoring type since doc should be in the store and not a None + vector_store.get_by_ids([uid])[0].page_content # type: ignore + for uid in vector_store.store + } + assert doc_texts == {"1", "2", "3", "4"} + + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): assert index( loader, record_manager, @@ -693,6 +703,149 @@ def test_incremental_indexing_with_batch_size( assert doc_texts == {"1", "2", "3", "4"} +def test_incremental_indexing_with_batch_size_with_optimization( + record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Test case when batch_size < num of docs. + + Here, we'll verify that an indexing optimization works as expected. + """ + documents = [ + Document( + page_content="1", + metadata={"source": "1"}, + ), + Document( + page_content="2", + metadata={"source": "1"}, + ), + Document( + page_content="3", + metadata={"source": "1"}, + ), + Document( + page_content="4", + metadata={"source": "1"}, + ), + ] + + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp() + ): + assert index( + documents, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + batch_size=2, + ) == { + "num_added": 4, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + doc_texts = { + # Ignoring type since doc should be in the store and not a None + vector_store.get_by_ids([uid])[0].page_content # type: ignore + for uid in vector_store.store + } + assert doc_texts == {"1", "2", "3", "4"} + + # Mutate content in first batch + doc_first_batch_mutation = [ + Document( + page_content="updated 1", + metadata={"source": "1"}, + ), + Document( + page_content="2", + metadata={"source": "1"}, + ), + Document( + page_content="3", + metadata={"source": "1"}, + ), + Document( + page_content="4", + metadata={"source": "1"}, + ), + ] + + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert index( + doc_first_batch_mutation, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + batch_size=2, + ) == { + # Cannot optimize here since the first batch was changed + # So only skpping indexing the document with content "2" + "num_added": 3, + "num_deleted": 3, + "num_skipped": 1, + "num_updated": 0, + } + + doc_texts = { + # Ignoring type since doc should be in the store and not a None + vector_store.get_by_ids([uid])[0].page_content # type: ignore + for uid in vector_store.store + } + assert doc_texts == {"updated 1", "2", "3", "4"} + + # Mutate content in second batch + doc_second_batch_mutation = [ + Document( + page_content="updated 1", # <-- This was already previously updated + metadata={"source": "1"}, + ), + Document( + page_content="2", + metadata={"source": "1"}, + ), + Document( + page_content="3", + metadata={"source": "1"}, + ), + Document( + page_content="updated 4", + metadata={"source": "1"}, + ), + ] + + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp() + ): + assert index( + doc_second_batch_mutation, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + batch_size=2, + ) == { + # Skips updating content from the first batch, only updates + # the `updated 4` content + "num_added": 1, + "num_deleted": 1, + "num_skipped": 3, + "num_updated": 0, + } + + doc_texts = { + # Ignoring type since doc should be in the store and not a None + vector_store.get_by_ids([uid])[0].page_content # type: ignore + for uid in vector_store.store + } + assert doc_texts == {"updated 1", "2", "3", "updated 4"} + + def test_incremental_delete_with_batch_size( record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore ) -> None: @@ -719,7 +872,7 @@ def test_incremental_delete_with_batch_size( ) with patch.object( - record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + record_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp() ): assert index( loader, @@ -760,6 +913,13 @@ def test_incremental_delete_with_batch_size( "num_updated": 0, } + doc_texts = { + # Ignoring type since doc should be in the store and not a None + vector_store.get_by_ids([uid])[0].page_content # type: ignore + for uid in vector_store.store + } + assert doc_texts == {"1", "2", "3", "4"} + # Attempt to index again verify that nothing changes with patch.object( record_manager, "get_time", return_value=datetime(2022, 1, 3).timestamp() @@ -789,9 +949,16 @@ def test_incremental_delete_with_batch_size( "num_updated": 0, } + doc_texts = { + # Ignoring type since doc should be in the store and not a None + vector_store.get_by_ids([uid])[0].page_content # type: ignore + for uid in vector_store.store + } + assert doc_texts == {"1", "2", "3", "4"} + # Attempt to index again verify that nothing changes with patch.object( - record_manager, "get_time", return_value=datetime(2023, 1, 3).timestamp() + record_manager, "get_time", return_value=datetime(2023, 1, 4).timestamp() ): # Docs with same content docs = [ @@ -818,9 +985,16 @@ def test_incremental_delete_with_batch_size( "num_updated": 0, } + doc_texts = { + # Ignoring type since doc should be in the store and not a None + vector_store.get_by_ids([uid])[0].page_content # type: ignore + for uid in vector_store.store + } + assert doc_texts == {"1", "2", "3", "4"} + # Try to index with changed docs now with patch.object( - record_manager, "get_time", return_value=datetime(2024, 1, 3).timestamp() + record_manager, "get_time", return_value=datetime(2024, 1, 5).timestamp() ): # Docs with same content docs = [ @@ -846,6 +1020,13 @@ def test_incremental_delete_with_batch_size( "num_updated": 0, } + doc_texts = { + # Ignoring type since doc should be in the store and not a None + vector_store.get_by_ids([uid])[0].page_content # type: ignore + for uid in vector_store.store + } + assert doc_texts == {"changed 1", "changed 2", "3", "4"} + async def test_aincremental_delete( arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore @@ -1404,3 +1585,146 @@ async def test_aindex_into_document_index( "num_skipped": 0, "num_updated": 0, } + + +async def test_incremental_aindexing_with_batch_size_with_optimization( + arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Test case when batch_size < num of docs. + + Here, we'll verify that an indexing optimization works as expected. + """ + documents = [ + Document( + page_content="1", + metadata={"source": "1"}, + ), + Document( + page_content="2", + metadata={"source": "1"}, + ), + Document( + page_content="3", + metadata={"source": "1"}, + ), + Document( + page_content="4", + metadata={"source": "1"}, + ), + ] + + with patch.object( + arecord_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp() + ): + assert await aindex( + documents, + arecord_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + batch_size=2, + ) == { + "num_added": 4, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + doc_texts = { + # Ignoring type since doc should be in the store and not a None + vector_store.get_by_ids([uid])[0].page_content # type: ignore + for uid in vector_store.store + } + assert doc_texts == {"1", "2", "3", "4"} + + # Mutate content in first batch + doc_first_batch_mutation = [ + Document( + page_content="updated 1", + metadata={"source": "1"}, + ), + Document( + page_content="2", + metadata={"source": "1"}, + ), + Document( + page_content="3", + metadata={"source": "1"}, + ), + Document( + page_content="4", + metadata={"source": "1"}, + ), + ] + + with patch.object( + arecord_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert await aindex( + doc_first_batch_mutation, + arecord_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + batch_size=2, + ) == { + # Cannot optimize here since the first batch was changed + # So only skpping indexing the document with content "2" + "num_added": 3, + "num_deleted": 3, + "num_skipped": 1, + "num_updated": 0, + } + + doc_texts = { + # Ignoring type since doc should be in the store and not a None + vector_store.get_by_ids([uid])[0].page_content # type: ignore + for uid in vector_store.store + } + assert doc_texts == {"updated 1", "2", "3", "4"} + + # Mutate content in second batch + doc_second_batch_mutation = [ + Document( + page_content="updated 1", # <-- This was already previously updated + metadata={"source": "1"}, + ), + Document( + page_content="2", + metadata={"source": "1"}, + ), + Document( + page_content="3", + metadata={"source": "1"}, + ), + Document( + page_content="updated 4", + metadata={"source": "1"}, + ), + ] + + with patch.object( + arecord_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp() + ): + assert await aindex( + doc_second_batch_mutation, + arecord_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + batch_size=2, + ) == { + # Skips updating content from the first batch, only updates + # the `updated 4` content + "num_added": 1, + "num_deleted": 1, + "num_skipped": 3, + "num_updated": 0, + } + + doc_texts = { + # Ignoring type since doc should be in the store and not a None + vector_store.get_by_ids([uid])[0].page_content # type: ignore + for uid in vector_store.store + } + assert doc_texts == {"updated 1", "2", "3", "updated 4"}