diff --git a/backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py b/backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py index 91f8f0740..3c175e681 100644 --- a/backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py +++ b/backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py @@ -74,12 +74,14 @@ from unstract.sdk1.file_storage.env_helper import EnvHelper from unstract.sdk1.prompt import PromptTool from unstract.sdk1.utils.indexing import IndexingUtils + from unstract.sdk1.utils.tool import ToolUtils else: from unstract.sdk.constants import LogLevel from unstract.sdk.exceptions import IndexingError, SdkError from unstract.sdk.file_storage.constants import StorageType from unstract.sdk.file_storage.env_helper import EnvHelper from unstract.sdk.prompt import PromptTool + from unstract.sdk.utils import ToolUtils from unstract.sdk.utils.indexing_utils import IndexingUtils logger = logging.getLogger(__name__) @@ -421,11 +423,10 @@ def index_document( document_id=document_id, run_id=run_id, enable_highlight=tool.enable_highlight, - doc_id=doc_id, ) if tool.summarize_context: summarize_file_path = PromptStudioHelper.summarize( - file_name, org_id, document_id, run_id, tool, doc_id + file_name, org_id, run_id, tool ) summarize_doc_id = IndexingUtils.generate_index_key( vector_db=str(summary_profile.vector_store.id), @@ -481,7 +482,7 @@ def index_document( return doc_id @staticmethod - def summarize(file_name, org_id, document_id, run_id, tool, doc_id) -> str: + def summarize(file_name, org_id, run_id, tool) -> str: summarizer_plugin = get_plugin("summarizer") usage_kwargs: dict[Any, Any] = dict() usage_kwargs[ToolStudioPromptKeys.RUN_ID] = run_id @@ -880,7 +881,6 @@ def _fetch_response( document_id=document_id, run_id=run_id, enable_highlight=tool.enable_highlight, - doc_id=doc_id, ) logger.info(f"Extracted text from {file_path} for {doc_id}") if is_summary: @@ -1232,17 +1232,6 @@ def _fetch_single_pass_response( file_path = os.path.join( directory, "extract", os.path.splitext(filename)[0] + ".txt" ) - doc_id = IndexingUtils.generate_index_key( - vector_db=str(default_profile.vector_store.id), - embedding=str(default_profile.embedding_model.id), - x2text=str(default_profile.x2text.id), - chunk_size=str(default_profile.chunk_size), - chunk_overlap=str(default_profile.chunk_overlap), - file_path=input_file_path, - file_hash=None, - fs=fs_instance, - tool=util, - ) PromptStudioHelper.dynamic_extractor( profile_manager=default_profile, file_path=input_file_path, @@ -1250,7 +1239,6 @@ def _fetch_single_pass_response( document_id=document_id, run_id=run_id, enable_highlight=tool.enable_highlight, - doc_id=doc_id, ) # Indexing is not needed as Single pass is always non chunked. vector_db = str(default_profile.vector_store.id) @@ -1326,8 +1314,11 @@ def dynamic_extractor( org_id: str, profile_manager: ProfileManager, document_id: str, - doc_id: str, ) -> str: + # Guard against None metadata (when adapter_metadata_b is None) + metadata = profile_manager.x2text.metadata or {} + x2text_config_hash = ToolUtils.hash_str(json.dumps(metadata, sort_keys=True)) + x2text = str(profile_manager.x2text.id) is_extracted: bool = False extract_file_path: str | None = None @@ -1342,7 +1333,7 @@ def dynamic_extractor( is_extracted = PromptStudioIndexHelper.check_extraction_status( document_id=document_id, profile_manager=profile_manager, - doc_id=doc_id, + x2text_config_hash=x2text_config_hash, enable_highlight=enable_highlight, ) if is_extracted: @@ -1381,16 +1372,36 @@ def dynamic_extractor( request_id=StateStore.get(Common.REQUEST_ID), ) extracted_text = responder.extract(payload=payload) - PromptStudioIndexHelper.mark_extraction_status( + success = PromptStudioIndexHelper.mark_extraction_status( document_id=document_id, profile_manager=profile_manager, - doc_id=doc_id, + x2text_config_hash=x2text_config_hash, enable_highlight=enable_highlight, ) + if not success: + logger.warning( + f"Failed to mark extraction success for document {document_id}. " + f"Extraction completed but status not saved." + ) except SdkError as e: msg = str(e) if e.actual_err and hasattr(e.actual_err, "response"): msg = e.actual_err.response.json().get("error", str(e)) + + success = PromptStudioIndexHelper.mark_extraction_status( + document_id=document_id, + profile_manager=profile_manager, + x2text_config_hash=x2text_config_hash, + enable_highlight=enable_highlight, + extracted=False, + error_message=msg, + ) + if not success: + logger.warning( + f"Failed to mark extraction failure for document {document_id}. " + f"Extraction failed but status not saved." + ) + raise ExtractionAPIError( f"Failed to extract '{filename}'. {msg}", status_code=int(e.status_code or 500), diff --git a/backend/prompt_studio/prompt_studio_index_manager_v2/prompt_studio_index_helper.py b/backend/prompt_studio/prompt_studio_index_manager_v2/prompt_studio_index_helper.py index edf5be2d4..b4b5e4625 100644 --- a/backend/prompt_studio/prompt_studio_index_manager_v2/prompt_studio_index_helper.py +++ b/backend/prompt_studio/prompt_studio_index_manager_v2/prompt_studio_index_helper.py @@ -71,16 +71,23 @@ def handle_index_manager( def mark_extraction_status( document_id: str, profile_manager: ProfileManager, - doc_id: str, - enable_highlight: bool = False, + x2text_config_hash: str, + enable_highlight: bool, + extracted: bool = True, + error_message: str | None = None, ) -> bool: - """Marks the extraction status for a given document with highlight metadata. + """Marks the extraction status for a given document. + + Uses x2text_config_hash (hash of X2Text config metadata) as the key. + Handles both successful and failed extractions. Args: document_id (str): ID of the document in DocumentManager. profile_manager (ProfileManager): ProfileManager instance for context. - doc_id (str): Unique identifier for the document within extraction status. - enable_highlight (bool): Whether highlight metadata was used during extraction. + x2text_config_hash (str): Hash of X2Text config metadata. + enable_highlight (bool): Whether highlight metadata was used/attempted. + extracted (bool): True for success, False for failure. Defaults to True. + error_message (str | None): Error message if extraction failed. Returns: bool: True if the status is successfully updated, False otherwise. @@ -95,62 +102,78 @@ def mark_extraction_status( "profile_manager": profile_manager, } - index_manager, created = IndexManager.objects.get_or_create(**args) - - index_manager.extraction_status = index_manager.extraction_status or {} - - index_manager.extraction_status[doc_id] = { - "extracted": True, + # Build extraction status data + status_data = { + "extracted": extracted, "enable_highlight": enable_highlight, } + + # Add error message if extraction failed + if not extracted and error_message: + status_data["error"] = error_message + + defaults = {"extraction_status": {x2text_config_hash: status_data}} + + index_manager, created = IndexManager.objects.update_or_create( + **args, + defaults=defaults, + ) + logger.info( f"Index manager {index_manager} {index_manager.index_ids_history}" ) - index_manager.save(update_fields=["extraction_status"]) - if created: - logger.info( - f"IndexManager entry created " - f"for document: {document_id} with {doc_id} " - f"(highlight={enable_highlight})" - ) + if extracted: + if created: + logger.info( + f"IndexManager entry created with SUCCESS " + f"for document: {document_id} " + f"with x2text_config_hash: {x2text_config_hash}" + ) + else: + logger.info( + f"Extraction SUCCESS for document: {document_id} " + f"with x2text_config_hash: {x2text_config_hash}" + ) else: - logger.info( - f"Updated extraction status " - f"for document: {document_id} with {doc_id} " - f"(highlight={enable_highlight})" + logger.error( + f"Extraction FAILED for document: {document_id} " + f"with x2text_config_hash: {x2text_config_hash}. " + f"Error: {error_message}" ) + return True except DocumentManager.DoesNotExist: logger.error(f"Document with ID {document_id} does not exist.") - raise IndexingAPIError( - "Error occured while extracting. Please contact admin." - ) + return False except Exception as e: - logger.error(f"Unexpected error updating extraction status: {e}") - raise IndexingAPIError(f"Error updating indexing status {str(e)}") from e + logger.exception( + f"Unexpected error marking extraction status for document {document_id}: {e}" + ) + return False @staticmethod def check_extraction_status( document_id: str, profile_manager: ProfileManager, - doc_id: str, - enable_highlight: bool = False, + x2text_config_hash: str, + enable_highlight: bool, ) -> bool: - """Checks if the extraction status is already marked as complete - for the given document and doc_id with matching highlight setting. + """Checks if the extraction status is already marked as complete. + + Uses x2text_config_hash (hash of X2Text config metadata) as the key. + Also validates that enable_highlight setting matches. Args: document_id (str): ID of the document in DocumentManager. profile_manager (ProfileManager): ProfileManager instance for context. - doc_id (str): Unique identifier for the document within extraction status. + x2text_config_hash (str): Hash of X2Text config metadata. enable_highlight (bool): Whether highlight metadata is required. Returns: - bool: True if extraction is complete with matching highlight setting, - False otherwise. + bool: True if extraction is complete with matching settings, False otherwise. """ try: index_manager = IndexManager.objects.filter( @@ -162,45 +185,44 @@ def check_extraction_status( return False extraction_status = index_manager.extraction_status or {} - status_entry = extraction_status.get(doc_id) + status_entry = extraction_status.get(x2text_config_hash) if not status_entry: logger.info( - f"Extraction is NOT yet marked as complete " - f"for document: {document_id} with {doc_id}" + f"Extraction NOT complete for document: {document_id} " + f"with x2text_config_hash: {x2text_config_hash}" ) return False - # Backward compatibility: treat boolean True as non-highlighted - if isinstance(status_entry, bool): - is_extracted = status_entry - is_highlight_handled = False - else: - # New format: {"extracted": True, "enable_highlight": } - is_extracted = status_entry.get("extracted", False) - is_highlight_handled = status_entry.get("enable_highlight", False) + # {"extracted": True/False, "enable_highlight": bool, "error": str (optional)} + is_extracted = status_entry.get("extracted", False) + stored_highlight = status_entry.get("enable_highlight", False) - # Check if extraction exists AND highlight setting matches - if is_extracted and is_highlight_handled == enable_highlight: + # Check if previous extraction failed + if not is_extracted: + error_msg = status_entry.get("error", "Unknown error") logger.info( - f"Extraction is already marked as complete " - f"for document: {document_id} with {doc_id} " + f"Previous extraction FAILED for {x2text_config_hash}. " + f"Error: {error_msg}. Will retry extraction." + ) + return False # Allow retry + + if is_extracted and stored_highlight == enable_highlight: + logger.info( + f"Extraction already complete for document: {document_id} " + f"with x2text_config_hash: {x2text_config_hash} " f"(highlight={enable_highlight})" ) return True - elif is_extracted and is_highlight_handled != enable_highlight: + elif is_extracted and stored_highlight != enable_highlight: logger.info( - f"Extraction exists but highlight mismatch " - f"for document: {document_id} with {doc_id}. " - f"Stored: {is_highlight_handled}, Requested: {enable_highlight}. " + f"Extraction exists but highlight mismatch for {x2text_config_hash}. " + f"Stored: {stored_highlight}, Requested: {enable_highlight}. " f"Re-extraction needed." ) return False else: - logger.info( - f"Extraction is NOT yet marked as complete " - f"for document: {document_id} with {doc_id}" - ) + logger.info(f"Extraction NOT complete for document: {document_id}") return False except Exception as e: