1
1
import logging
2
+ import os
2
3
import subprocess
3
4
import time
4
5
from collections .abc import Generator
6
+ from datetime import datetime
5
7
from fnmatch import fnmatch as fnmatch_path
6
8
from logging import Filter
7
9
from pathlib import Path
@@ -176,7 +178,22 @@ def add_documents(self, documents: list[Document], batch_size: int = 10) -> None
176
178
documents: List of documents to add
177
179
batch_size: Number of documents to process in each batch
178
180
"""
179
- list (self .add_documents_progress (documents , batch_size = batch_size ))
181
+ # Process documents in batches
182
+ for i in range (0 , len (documents ), batch_size ):
183
+ batch = documents [i : i + batch_size ]
184
+ self ._add_documents (batch )
185
+
186
+ # Update stored timestamps after successful indexing
187
+ for doc in batch :
188
+ if "source" in doc .metadata :
189
+ abs_path = str (Path (doc .metadata ["source" ]).resolve ())
190
+ current_mtime = int (os .path .getmtime (abs_path ))
191
+ doc .metadata ["last_modified" ] = current_mtime
192
+ # Update the document in the collection
193
+ self .collection .update (
194
+ ids = [doc .doc_id ],
195
+ metadatas = [doc .metadata ],
196
+ )
180
197
181
198
def add_documents_progress (
182
199
self , documents : list [Document ], batch_size : int = 10
@@ -201,6 +218,14 @@ def _add_documents(self, documents: list[Document]) -> None:
201
218
doc = self ._generate_doc_id (doc )
202
219
assert doc .doc_id is not None
203
220
221
+ # Update timestamp in metadata to current time
222
+ if "source" in doc .metadata :
223
+ abs_path = str (Path (doc .metadata ["source" ]).resolve ())
224
+ current_mtime = os .path .getmtime (abs_path )
225
+ doc .metadata ["last_modified" ] = self ._normalize_timestamp (
226
+ current_mtime
227
+ )
228
+
204
229
contents .append (doc .content )
205
230
metadatas .append (doc .metadata )
206
231
ids .append (doc .doc_id )
@@ -869,14 +894,54 @@ def _get_valid_files(
869
894
870
895
return valid_files
871
896
897
+ def _normalize_timestamp (self , timestamp : str | float | int | None ) -> str :
898
+ """Normalize timestamp to ISO format string."""
899
+ if timestamp is None :
900
+ return datetime .fromtimestamp (0 ).isoformat ()
901
+ try :
902
+ if isinstance (timestamp , int | float ):
903
+ return datetime .fromtimestamp (float (timestamp )).isoformat ()
904
+ # If it's already an ISO string, validate and return
905
+ if isinstance (timestamp , str ):
906
+ datetime .fromisoformat (timestamp ) # Validate format
907
+ return timestamp
908
+ raise ValueError (f"Unsupported timestamp type: { type (timestamp )} " )
909
+ except (ValueError , TypeError ) as e :
910
+ logger .warning ("Invalid timestamp format: %s (%s)" , timestamp , e )
911
+ return datetime .fromtimestamp (0 ).isoformat ()
912
+
913
+ def _compare_timestamps (self , stored : str , current : float ) -> bool :
914
+ """Compare stored ISO timestamp with current Unix timestamp.
915
+
916
+ Returns True if current is newer than stored."""
917
+ try :
918
+ stored_ts = datetime .fromisoformat (stored ).timestamp ()
919
+ # Round to seconds for comparison
920
+ return int (current ) > int (stored_ts )
921
+ except (ValueError , TypeError ) as e :
922
+ logger .warning ("Error comparing timestamps: %s" , e )
923
+ return True # If we can't compare, assume modified
924
+
925
+ def _get_stored_timestamps (self ) -> dict [str , str ]:
926
+ """Get stored timestamps for all indexed files."""
927
+ stored = {}
928
+ for doc in self .get_all_documents ():
929
+ if "source" in doc .metadata :
930
+ abs_path = str (Path (doc .metadata ["source" ]).resolve ())
931
+ timestamp = self ._normalize_timestamp (doc .metadata .get ("last_modified" ))
932
+ stored [abs_path ] = timestamp
933
+ logger .debug ("Stored timestamp for %s: %s" , abs_path , timestamp )
934
+ return stored
935
+
872
936
def collect_documents (
873
- self , path : Path , glob_pattern : str = "**/*.*"
937
+ self , path : Path , glob_pattern : str = "**/*.*" , check_modified : bool = True
874
938
) -> list [Document ]:
875
939
"""Collect documents from a file or directory without processing them.
876
940
877
941
Args:
878
942
path: Path to collect documents from
879
943
glob_pattern: Pattern to match files (only used for directories)
944
+ check_modified: Whether to check for modifications (skip unchanged files)
880
945
881
946
Returns:
882
947
List of documents ready for processing
@@ -888,8 +953,33 @@ def collect_documents(
888
953
logger .debug (f"No valid files found in { path } " )
889
954
return documents
890
955
956
+ if check_modified :
957
+ stored_timestamps = self ._get_stored_timestamps ()
958
+
891
959
# Process files in order (least deep first)
892
960
for file_path in sorted (valid_files , key = lambda x : len (x .parts )):
961
+ abs_path = str (file_path .resolve ())
962
+
963
+ if check_modified :
964
+ current_mtime = os .path .getmtime (file_path )
965
+ stored_timestamp = stored_timestamps .get (abs_path )
966
+
967
+ if stored_timestamp and not self ._compare_timestamps (
968
+ stored_timestamp , current_mtime
969
+ ):
970
+ logger .debug ("Skipping unchanged file: %s" , abs_path )
971
+ continue
972
+
973
+ if not stored_timestamp :
974
+ logger .debug ("New file: %s" , abs_path )
975
+ else :
976
+ logger .debug (
977
+ "Modified file: %s (current: %s, stored: %s)" ,
978
+ abs_path ,
979
+ self ._normalize_timestamp (current_mtime ),
980
+ stored_timestamp ,
981
+ )
982
+
893
983
logger .debug (f"Processing { file_path } " )
894
984
documents .extend (Document .from_file (file_path , processor = self .processor ))
895
985
@@ -918,6 +1008,4 @@ def get_all_documents(self) -> list[Document]:
918
1008
"""
919
1009
logger .debug ("Getting all documents from index" )
920
1010
docs = self .list_documents (group_by_source = False )
921
- for doc in docs :
922
- logger .debug ("Retrieved document with metadata: %s" , doc .metadata )
923
1011
return docs
0 commit comments