From 2399e67eeae671bf0031ba58bde292a6221d88b1 Mon Sep 17 00:00:00 2001 From: Keiichi Hirobe Date: Wed, 4 Dec 2024 17:21:26 +0900 Subject: [PATCH 1/6] add scoped_full_cleanup arg to index --- libs/core/langchain_core/indexing/api.py | 33 +++++++++++++++++------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/libs/core/langchain_core/indexing/api.py b/libs/core/langchain_core/indexing/api.py index c310deb241516..179d6c7b1971b 100644 --- a/libs/core/langchain_core/indexing/api.py +++ b/libs/core/langchain_core/indexing/api.py @@ -202,6 +202,7 @@ def index( cleanup_batch_size: int = 1_000, force_update: bool = False, upsert_kwargs: Optional[dict[str, Any]] = None, + scoped_full_cleanup: bool = False, ) -> IndexingResult: """Index data from the loader into the vector store. @@ -215,10 +216,6 @@ def index( are not able to specify the uid of the document. IMPORTANT: - * if auto_cleanup is set to True, the loader should be returning - the entire dataset, and not just a subset of the dataset. - Otherwise, the auto_cleanup will remove documents that it is not - supposed to. * In incremental mode, if documents associated with a particular source id appear across different batches, the indexing API will do some redundant work. This will still result in the @@ -259,6 +256,11 @@ def index( specify a custom vector_field: upsert_kwargs={"vector_field": "embedding"} .. versionadded:: 0.3.10 + scoped_full_cleanup: This argument will be valid only when `claneup` is Full. + If True, Full cleanup deletes all documents that haven't + been updated AND that are associated with source ids that + were seen during indexing. + Default is False. Returns: Indexing result which contains information about how many documents @@ -278,8 +280,15 @@ def index( ) raise ValueError(msg) - if cleanup == "incremental" and source_id_key is None: - msg = "Source id key is required when cleanup mode is incremental." + if scoped_full_cleanup and cleanup != "full": + msg = "scoped_full_cleanup is valid only when cleanup mode is 'full'." + raise ValueError(msg) + + if (cleanup == "incremental" or scoped_full_cleanup) and source_id_key is None: + msg = ( + "Source id key is required when cleanup mode is incremental" + "or scoped_full_clenup is True." + ) raise ValueError(msg) destination = vector_store # Renaming internally for clarity @@ -326,6 +335,7 @@ def index( num_skipped = 0 num_updated = 0 num_deleted = 0 + scoped_full_cleanup_source_ids: set[str] = set() for doc_batch in _batch(batch_size, doc_iterator): hashed_docs = list( @@ -338,8 +348,8 @@ def index( source_id_assigner(doc) for doc in hashed_docs ] - if cleanup == "incremental": - # If the cleanup mode is incremental, source ids are required. + if cleanup == "incremental" or scoped_full_cleanup: + # source ids are required. for source_id, hashed_doc in zip(source_ids, hashed_docs): if source_id is None: msg = ( @@ -349,6 +359,8 @@ def index( f"as source id." ) raise ValueError(msg) + if cleanup == "full": + scoped_full_cleanup_source_ids.add(source_id) # source ids cannot be None after for loop above. source_ids = cast(Sequence[str], source_ids) # type: ignore[assignment] @@ -427,8 +439,11 @@ def index( num_deleted += len(uids_to_delete) if cleanup == "full": + delete_group_ids: Optional[Sequence[str]] = None + if scoped_full_cleanup: + delete_group_ids = list(scoped_full_cleanup_source_ids) while uids_to_delete := record_manager.list_keys( - before=index_start_dt, limit=cleanup_batch_size + group_ids=delete_group_ids, before=index_start_dt, limit=cleanup_batch_size ): # First delete from record store. destination.delete(uids_to_delete) From a474efbe01ffc6ed457bfb4109ef580ffe60fa85 Mon Sep 17 00:00:00 2001 From: Keiichi Hirobe Date: Fri, 13 Dec 2024 18:01:02 +0900 Subject: [PATCH 2/6] introduce new clenn up option scoped_full --- libs/core/langchain_core/indexing/api.py | 95 ++++-- .../unit_tests/indexing/test_indexing.py | 300 ++++++++++++++++++ 2 files changed, 361 insertions(+), 34 deletions(-) diff --git a/libs/core/langchain_core/indexing/api.py b/libs/core/langchain_core/indexing/api.py index 179d6c7b1971b..dcb2baff28d75 100644 --- a/libs/core/langchain_core/indexing/api.py +++ b/libs/core/langchain_core/indexing/api.py @@ -197,12 +197,11 @@ def index( vector_store: Union[VectorStore, DocumentIndex], *, batch_size: int = 100, - cleanup: Literal["incremental", "full", None] = None, + cleanup: Literal["incremental", "full", "scoped_full", None] = None, source_id_key: Union[str, Callable[[Document], str], None] = None, cleanup_batch_size: int = 1_000, force_update: bool = False, upsert_kwargs: Optional[dict[str, Any]] = None, - scoped_full_cleanup: bool = False, ) -> IndexingResult: """Index data from the loader into the vector store. @@ -216,6 +215,10 @@ def index( are not able to specify the uid of the document. IMPORTANT: + * In full mode, the loader should be returning + the entire dataset, and not just a subset of the dataset. + Otherwise, the auto_cleanup will remove documents that it is not + supposed to. * In incremental mode, if documents associated with a particular source id appear across different batches, the indexing API will do some redundant work. This will still result in the @@ -224,6 +227,11 @@ def index( chunks, and we index them using a batch size of 5, we'll have 3 batches all with the same source id. In general, to avoid doing too much redundant work select as big a batch size as possible. + * scoped_full mode may be a good option if you have difficulty decidng + on the appropriate batch size and your loader is not able to + return the entire dataset. This solution keeps track of the source ids + in memory. It would probably be fine for most use cases in terms of + memory consumption, but would require parallelizing for 10M+ docs anyway Args: docs_source: Data loader or iterable of documents to index. @@ -242,6 +250,9 @@ def index( during this run of indexing. Clean up runs after all documents have been indexed. This means that users may see duplicated content during indexing. + - Scoped_Full: Similar to Full, but only deletes all documents + that haven't been updated AND that are associated with + source ids that were seen during indexing. - None: Do not delete any documents. source_id_key: Optional key that helps identify the original source of the document. Default is None. @@ -256,11 +267,6 @@ def index( specify a custom vector_field: upsert_kwargs={"vector_field": "embedding"} .. versionadded:: 0.3.10 - scoped_full_cleanup: This argument will be valid only when `claneup` is Full. - If True, Full cleanup deletes all documents that haven't - been updated AND that are associated with source ids that - were seen during indexing. - Default is False. Returns: Indexing result which contains information about how many documents @@ -273,21 +279,16 @@ def index( "delete" and "add_documents" required methods. ValueError: If source_id_key is not None, but is not a string or callable. """ - if cleanup not in {"incremental", "full", None}: + if cleanup not in {"incremental", "full", "scoped_full", None}: msg = ( - f"cleanup should be one of 'incremental', 'full' or None. " + f"cleanup should be one of 'incremental', 'full', 'scoped_full' or None. " f"Got {cleanup}." ) raise ValueError(msg) - if scoped_full_cleanup and cleanup != "full": - msg = "scoped_full_cleanup is valid only when cleanup mode is 'full'." - raise ValueError(msg) - - if (cleanup == "incremental" or scoped_full_cleanup) and source_id_key is None: + if (cleanup == "incremental" or cleanup == "scoped_full") and source_id_key is None: msg = ( - "Source id key is required when cleanup mode is incremental" - "or scoped_full_clenup is True." + "Source id key is required when cleanup mode is incremental or scoped_full." ) raise ValueError(msg) @@ -348,18 +349,19 @@ def index( source_id_assigner(doc) for doc in hashed_docs ] - if cleanup == "incremental" or scoped_full_cleanup: + if cleanup == "incremental" or cleanup == "scoped_full": # source ids are required. for source_id, hashed_doc in zip(source_ids, hashed_docs): if source_id is None: msg = ( - "Source ids are required when cleanup mode is incremental. " + f"Source ids are required when cleanup mode is " + f"incremental or scoped_full. " f"Document that starts with " f"content: {hashed_doc.page_content[:100]} was not assigned " f"as source id." ) raise ValueError(msg) - if cleanup == "full": + if cleanup == "scoped_full": scoped_full_cleanup_source_ids.add(source_id) # source ids cannot be None after for loop above. source_ids = cast(Sequence[str], source_ids) # type: ignore[assignment] @@ -438,9 +440,9 @@ def index( record_manager.delete_keys(uids_to_delete) num_deleted += len(uids_to_delete) - if cleanup == "full": + if cleanup == "full" or cleanup == "scoped_full": delete_group_ids: Optional[Sequence[str]] = None - if scoped_full_cleanup: + if cleanup == "scoped_full": delete_group_ids = list(scoped_full_cleanup_source_ids) while uids_to_delete := record_manager.list_keys( group_ids=delete_group_ids, before=index_start_dt, limit=cleanup_batch_size @@ -472,7 +474,7 @@ async def aindex( vector_store: Union[VectorStore, DocumentIndex], *, batch_size: int = 100, - cleanup: Literal["incremental", "full", None] = None, + cleanup: Literal["incremental", "full", "scoped_full", None] = None, source_id_key: Union[str, Callable[[Document], str], None] = None, cleanup_batch_size: int = 1_000, force_update: bool = False, @@ -490,10 +492,23 @@ async def aindex( are not able to specify the uid of the document. IMPORTANT: - if auto_cleanup is set to True, the loader should be returning - the entire dataset, and not just a subset of the dataset. - Otherwise, the auto_cleanup will remove documents that it is not - supposed to. + * In full mode, the loader should be returning + the entire dataset, and not just a subset of the dataset. + Otherwise, the auto_cleanup will remove documents that it is not + supposed to. + * In incremental mode, if documents associated with a particular + source id appear across different batches, the indexing API + will do some redundant work. This will still result in the + correct end state of the index, but will unfortunately not be + 100% efficient. For example, if a given document is split into 15 + chunks, and we index them using a batch size of 5, we'll have 3 batches + all with the same source id. In general, to avoid doing too much + redundant work select as big a batch size as possible. + * scoped_full mode may be a good option if you have difficulty decidng + on the appropriate batch size and your loader is not able to + return the entire dataset. This solution keeps track of the source ids + in memory. It would probably be fine for most use cases in terms of + memory consumption, but would require parallelizing for 10M+ docs anyway Args: docs_source: Data loader or iterable of documents to index. @@ -511,6 +526,9 @@ async def aindex( - Full: Delete all documents that haven to been returned by the loader. Clean up runs after all documents have been indexed. This means that users may see duplicated content during indexing. + - Scoped_Full: Similar to Full, but only deletes all documents + that haven't been updated AND that are associated with + source ids that were seen during indexing. - None: Do not delete any documents. source_id_key: Optional key that helps identify the original source of the document. Default is None. @@ -538,15 +556,17 @@ async def aindex( ValueError: If source_id_key is not None, but is not a string or callable. """ - if cleanup not in {"incremental", "full", None}: + if cleanup not in {"incremental", "full", "scoped_full", None}: msg = ( - f"cleanup should be one of 'incremental', 'full' or None. " + f"cleanup should be one of 'incremental', 'full', 'scoped_full' or None. " f"Got {cleanup}." ) raise ValueError(msg) - if cleanup == "incremental" and source_id_key is None: - msg = "Source id key is required when cleanup mode is incremental." + if (cleanup == "incremental" or cleanup == "scoped_full") and source_id_key is None: + msg = ( + "Source id key is required when cleanup mode is incremental or scoped_full." + ) raise ValueError(msg) destination = vector_store # Renaming internally for clarity @@ -602,6 +622,7 @@ async def aindex( num_skipped = 0 num_updated = 0 num_deleted = 0 + scoped_full_cleanup_source_ids: set[str] = set() async for doc_batch in _abatch(batch_size, async_doc_iterator): hashed_docs = list( @@ -614,17 +635,20 @@ async def aindex( source_id_assigner(doc) for doc in hashed_docs ] - if cleanup == "incremental": + if cleanup == "incremental" or cleanup == "scoped_full": # If the cleanup mode is incremental, source ids are required. for source_id, hashed_doc in zip(source_ids, hashed_docs): if source_id is None: msg = ( - "Source ids are required when cleanup mode is incremental. " + f"Source ids are required when cleanup mode is " + f"incremental or scoped_full. " f"Document that starts with " f"content: {hashed_doc.page_content[:100]} was not assigned " f"as source id." ) raise ValueError(msg) + if cleanup == "scoped_full": + scoped_full_cleanup_source_ids.add(source_id) # source ids cannot be None after for loop above. source_ids = cast(Sequence[str], source_ids) @@ -702,9 +726,12 @@ async def aindex( await record_manager.adelete_keys(uids_to_delete) num_deleted += len(uids_to_delete) - if cleanup == "full": + if cleanup == "full" or cleanup == "scoped_full": + delete_group_ids: Optional[Sequence[str]] = None + if cleanup == "scoped_full": + delete_group_ids = list(scoped_full_cleanup_source_ids) while uids_to_delete := await record_manager.alist_keys( - before=index_start_dt, limit=cleanup_batch_size + group_ids=delete_group_ids, before=index_start_dt, limit=cleanup_batch_size ): # First delete from record store. await destination.adelete(uids_to_delete) diff --git a/libs/core/tests/unit_tests/indexing/test_indexing.py b/libs/core/tests/unit_tests/indexing/test_indexing.py index 287b6b49f66a4..d2df0f6f80385 100644 --- a/libs/core/tests/unit_tests/indexing/test_indexing.py +++ b/libs/core/tests/unit_tests/indexing/test_indexing.py @@ -364,6 +364,306 @@ async def test_aincremental_fails_with_bad_source_ids( ) +def test_index_simple_delete_scoped_full( + record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Test Indexing with scoped_full strategy.""" + loader = ToyLoader( + documents=[ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is yet another document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is a test document from another source.", + metadata={"source": "2"}, + ), + ] + ) + + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp() + ): + assert index( + loader, + record_manager, + vector_store, + cleanup="scoped_full", + source_id_key="source", + ) == { + "num_added": 4, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert index( + loader, + record_manager, + vector_store, + cleanup="scoped_full", + source_id_key="source", + ) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 4, + "num_updated": 0, + } + + loader = ToyLoader( + documents=[ + Document( + page_content="mutated document 1", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", # <-- Same as original + metadata={"source": "1"}, + ), + ] + ) + + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp() + ): + assert index( + loader, + record_manager, + vector_store, + cleanup="scoped_full", + source_id_key="source", + ) == { + "num_added": 1, + "num_deleted": 2, + "num_skipped": 1, + "num_updated": 0, + } + doc_texts = { + # Ignoring type since doc should be in the store and not a None + vector_store.get_by_ids([uid])[0].page_content # type: ignore + for uid in vector_store.store + } + assert doc_texts == { + "mutated document 1", + "This is another document.", + "This is a test document from another source.", + } + + # Attempt to index again verify that nothing changes + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 4).timestamp() + ): + assert index( + loader, + record_manager, + vector_store, + cleanup="scoped_full", + source_id_key="source", + ) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 2, + "num_updated": 0, + } + + +async def test_aindex_simple_delete_scoped_full( + arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Test Indexing with scoped_full strategy.""" + loader = ToyLoader( + documents=[ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is yet another document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is a test document from another source.", + metadata={"source": "2"}, + ), + ] + ) + + with patch.object( + arecord_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp() + ): + assert await aindex( + loader, + arecord_manager, + vector_store, + cleanup="scoped_full", + source_id_key="source", + ) == { + "num_added": 4, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + with patch.object( + arecord_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert await aindex( + loader, + arecord_manager, + vector_store, + cleanup="scoped_full", + source_id_key="source", + ) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 4, + "num_updated": 0, + } + + loader = ToyLoader( + documents=[ + Document( + page_content="mutated document 1", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", # <-- Same as original + metadata={"source": "1"}, + ), + ] + ) + + with patch.object( + arecord_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp() + ): + assert await aindex( + loader, + arecord_manager, + vector_store, + cleanup="scoped_full", + source_id_key="source", + ) == { + "num_added": 1, + "num_deleted": 2, + "num_skipped": 1, + "num_updated": 0, + } + doc_texts = { + # Ignoring type since doc should be in the store and not a None + vector_store.get_by_ids([uid])[0].page_content # type: ignore + for uid in vector_store.store + } + assert doc_texts == { + "mutated document 1", + "This is another document.", + "This is a test document from another source.", + } + + # Attempt to index again verify that nothing changes + with patch.object( + arecord_manager, "get_time", return_value=datetime(2021, 1, 4).timestamp() + ): + assert await aindex( + loader, + arecord_manager, + vector_store, + cleanup="scoped_full", + source_id_key="source", + ) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 2, + "num_updated": 0, + } + + +def test_scoped_full_fails_with_bad_source_ids( + record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Test Indexing with scoped_full strategy.""" + loader = ToyLoader( + documents=[ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", + metadata={"source": "2"}, + ), + Document( + page_content="This is yet another document.", + metadata={"source": None}, + ), + ] + ) + + with pytest.raises(ValueError): + # Should raise an error because no source id function was specified + index(loader, record_manager, vector_store, cleanup="scoped_full") + + with pytest.raises(ValueError): + # Should raise an error because no source id function was specified + index( + loader, + record_manager, + vector_store, + cleanup="scoped_full", + source_id_key="source", + ) + + +async def test_ascoped_full_fails_with_bad_source_ids( + arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Test Indexing with scoped_full strategy.""" + loader = ToyLoader( + documents=[ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", + metadata={"source": "2"}, + ), + Document( + page_content="This is yet another document.", + metadata={"source": None}, + ), + ] + ) + + with pytest.raises(ValueError): + # Should raise an error because no source id function was specified + await aindex(loader, arecord_manager, vector_store, cleanup="scoped_full") + + with pytest.raises(ValueError): + # Should raise an error because no source id function was specified + await aindex( + loader, + arecord_manager, + vector_store, + cleanup="scoped_full", + source_id_key="source", + ) + + def test_no_delete( record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore ) -> None: From 7d68a6685d7458cd90aec258fd36c568b9e077f5 Mon Sep 17 00:00:00 2001 From: Eugene Yurtsev Date: Fri, 13 Dec 2024 11:16:40 -0500 Subject: [PATCH 3/6] Update libs/core/langchain_core/indexing/api.py --- libs/core/langchain_core/indexing/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/core/langchain_core/indexing/api.py b/libs/core/langchain_core/indexing/api.py index dcb2baff28d75..dc353f7bd5c7b 100644 --- a/libs/core/langchain_core/indexing/api.py +++ b/libs/core/langchain_core/indexing/api.py @@ -250,7 +250,7 @@ def index( during this run of indexing. Clean up runs after all documents have been indexed. This means that users may see duplicated content during indexing. - - Scoped_Full: Similar to Full, but only deletes all documents + - scoped_full: Similar to Full, but only deletes all documents that haven't been updated AND that are associated with source ids that were seen during indexing. - None: Do not delete any documents. From decf48e1e52a20093a9cdda110699fe1d1921f4f Mon Sep 17 00:00:00 2001 From: Eugene Yurtsev Date: Fri, 13 Dec 2024 11:17:17 -0500 Subject: [PATCH 4/6] Update libs/core/langchain_core/indexing/api.py --- libs/core/langchain_core/indexing/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/core/langchain_core/indexing/api.py b/libs/core/langchain_core/indexing/api.py index dc353f7bd5c7b..d7b354e8f98db 100644 --- a/libs/core/langchain_core/indexing/api.py +++ b/libs/core/langchain_core/indexing/api.py @@ -504,7 +504,7 @@ async def aindex( chunks, and we index them using a batch size of 5, we'll have 3 batches all with the same source id. In general, to avoid doing too much redundant work select as big a batch size as possible. - * scoped_full mode may be a good option if you have difficulty decidng + * scoped_full mode may be a good option if you have difficulty deciding on the appropriate batch size and your loader is not able to return the entire dataset. This solution keeps track of the source ids in memory. It would probably be fine for most use cases in terms of From 32c3f03a89fe2fb5de780ccf78d92f5d11982647 Mon Sep 17 00:00:00 2001 From: Keiichi Hirobe Date: Sat, 14 Dec 2024 01:23:27 +0900 Subject: [PATCH 5/6] update docs --- docs/docs/how_to/indexing.ipynb | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/docs/docs/how_to/indexing.ipynb b/docs/docs/how_to/indexing.ipynb index e3e6ec8aef6d7..4e6fed96abdcd 100644 --- a/docs/docs/how_to/indexing.ipynb +++ b/docs/docs/how_to/indexing.ipynb @@ -39,19 +39,20 @@ "| None | ✅ | ✅ | ❌ | ❌ | - |\n", "| Incremental | ✅ | ✅ | ❌ | ✅ | Continuously |\n", "| Full | ✅ | ❌ | ✅ | ✅ | At end of indexing |\n", + "| Scoped_Full | ✅ | ✅ | ❌ | ✅ | At end of indexing |\n", "\n", "\n", "`None` does not do any automatic clean up, allowing the user to manually do clean up of old content. \n", "\n", - "`incremental` and `full` offer the following automated clean up:\n", + "`incremental`, `full` and `scoped_full` offer the following automated clean up:\n", "\n", - "* If the content of the source document or derived documents has **changed**, both `incremental` or `full` modes will clean up (delete) previous versions of the content.\n", - "* If the source document has been **deleted** (meaning it is not included in the documents currently being indexed), the `full` cleanup mode will delete it from the vector store correctly, but the `incremental` mode will not.\n", + "* If the content of the source document or derived documents has **changed**, all 3 modes will clean up (delete) previous versions of the content.\n", + "* If the source document has been **deleted** (meaning it is not included in the documents currently being indexed), the `full` cleanup mode will delete it from the vector store correctly, but the `incremental` and `scoped_full` mode will not.\n", "\n", "When content is mutated (e.g., the source PDF file was revised) there will be a period of time during indexing when both the new and old versions may be returned to the user. This happens after the new content was written, but before the old version was deleted.\n", "\n", "* `incremental` indexing minimizes this period of time as it is able to do clean up continuously, as it writes.\n", - "* `full` mode does the clean up after all batches have been written.\n", + "* `full` and `scoped_full` mode does the clean up after all batches have been written.\n", "\n", "## Requirements\n", "\n", @@ -64,7 +65,7 @@ " \n", "## Caution\n", "\n", - "The record manager relies on a time-based mechanism to determine what content can be cleaned up (when using `full` or `incremental` cleanup modes).\n", + "The record manager relies on a time-based mechanism to determine what content can be cleaned up (when using `full` or `incremental` or `scoped_full` cleanup modes).\n", "\n", "If two tasks run back-to-back, and the first task finishes before the clock time changes, then the second task may not be able to clean up content.\n", "\n", From e45fa30ad5b0cb8ea130d53595d8e8e441e71148 Mon Sep 17 00:00:00 2001 From: Eugene Yurtsev Date: Fri, 13 Dec 2024 15:23:08 -0500 Subject: [PATCH 6/6] x --- libs/core/langchain_core/indexing/api.py | 38 ++++++++++++++---------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/libs/core/langchain_core/indexing/api.py b/libs/core/langchain_core/indexing/api.py index ef73acd26a658..e35ab9c68b42c 100644 --- a/libs/core/langchain_core/indexing/api.py +++ b/libs/core/langchain_core/indexing/api.py @@ -227,11 +227,11 @@ def index( chunks, and we index them using a batch size of 5, we'll have 3 batches all with the same source id. In general, to avoid doing too much redundant work select as big a batch size as possible. - * scoped_full mode may be a good option if you have difficulty decidng - on the appropriate batch size and your loader is not able to - return the entire dataset. This solution keeps track of the source ids - in memory. It would probably be fine for most use cases in terms of - memory consumption, but would require parallelizing for 10M+ docs anyway + * The `scoped_full` mode is suitable if determining an appropriate batch size + is challenging or if your data loader cannot return the entire dataset at + once. This mode keeps track of source IDs in memory, which should be fine + for most use cases. If your dataset is large (10M+ docs), you will likely + need to parallelize the indexing process regardless. Args: docs_source: Data loader or iterable of documents to index. @@ -240,13 +240,13 @@ def index( vector_store: VectorStore or DocumentIndex to index the documents into. batch_size: Batch size to use when indexing. Default is 100. cleanup: How to handle clean up of documents. Default is None. - - Incremental: Cleans up all documents that haven't been updated AND + - incremental: Cleans up all documents that haven't been updated AND that are associated with source ids that were seen during indexing. Clean up is done continuously during indexing helping to minimize the probability of users seeing duplicated content. - - Full: Delete all documents that have not been returned by the loader + - full: Delete all documents that have not been returned by the loader during this run of indexing. Clean up runs after all documents have been indexed. This means that users may see duplicated content during indexing. @@ -278,6 +278,10 @@ def index( ValueError: If vectorstore does not have "delete" and "add_documents" required methods. ValueError: If source_id_key is not None, but is not a string or callable. + + .. version_modified:: 0.3.25 + + * Added `scoped_full` cleanup mode. """ if cleanup not in {"incremental", "full", "scoped_full", None}: msg = ( @@ -503,11 +507,11 @@ async def aindex( chunks, and we index them using a batch size of 5, we'll have 3 batches all with the same source id. In general, to avoid doing too much redundant work select as big a batch size as possible. - * scoped_full mode may be a good option if you have difficulty deciding - on the appropriate batch size and your loader is not able to - return the entire dataset. This solution keeps track of the source ids - in memory. It would probably be fine for most use cases in terms of - memory consumption, but would require parallelizing for 10M+ docs anyway + * The `scoped_full` mode is suitable if determining an appropriate batch size + is challenging or if your data loader cannot return the entire dataset at + once. This mode keeps track of source IDs in memory, which should be fine + for most use cases. If your dataset is large (10M+ docs), you will likely + need to parallelize the indexing process regardless. Args: docs_source: Data loader or iterable of documents to index. @@ -516,16 +520,16 @@ async def aindex( vector_store: VectorStore or DocumentIndex to index the documents into. batch_size: Batch size to use when indexing. Default is 100. cleanup: How to handle clean up of documents. Default is None. - - Incremental: Cleans up all documents that haven't been updated AND + - incremental: Cleans up all documents that haven't been updated AND that are associated with source ids that were seen during indexing. Clean up is done continuously during indexing helping to minimize the probability of users seeing duplicated content. - - Full: Delete all documents that haven to been returned by the loader. + - full: Delete all documents that haven to been returned by the loader. Clean up runs after all documents have been indexed. This means that users may see duplicated content during indexing. - - Scoped_Full: Similar to Full, but only deletes all documents + - scoped_full: Similar to Full, but only deletes all documents that haven't been updated AND that are associated with source ids that were seen during indexing. - None: Do not delete any documents. @@ -553,6 +557,10 @@ async def aindex( ValueError: If vectorstore does not have "adelete" and "aadd_documents" required methods. ValueError: If source_id_key is not None, but is not a string or callable. + + .. version_modified:: 0.3.25 + + * Added `scoped_full` cleanup mode. """ if cleanup not in {"incremental", "full", "scoped_full", None}: