Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 31 additions & 20 deletions backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@
from unstract.sdk1.file_storage.env_helper import EnvHelper
from unstract.sdk1.prompt import PromptTool
from unstract.sdk1.utils.indexing import IndexingUtils
from unstract.sdk1.utils.tool import ToolUtils
else:
from unstract.sdk.constants import LogLevel
from unstract.sdk.exceptions import IndexingError, SdkError
from unstract.sdk.file_storage.constants import StorageType
from unstract.sdk.file_storage.env_helper import EnvHelper
from unstract.sdk.prompt import PromptTool
from unstract.sdk.utils import ToolUtils
from unstract.sdk.utils.indexing_utils import IndexingUtils

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -421,11 +423,10 @@ def index_document(
document_id=document_id,
run_id=run_id,
enable_highlight=tool.enable_highlight,
doc_id=doc_id,
)
if tool.summarize_context:
summarize_file_path = PromptStudioHelper.summarize(
file_name, org_id, document_id, run_id, tool, doc_id
file_name, org_id, run_id, tool
)
summarize_doc_id = IndexingUtils.generate_index_key(
vector_db=str(summary_profile.vector_store.id),
Expand Down Expand Up @@ -481,7 +482,7 @@ def index_document(
return doc_id

@staticmethod
def summarize(file_name, org_id, document_id, run_id, tool, doc_id) -> str:
def summarize(file_name, org_id, run_id, tool) -> str:
summarizer_plugin = get_plugin("summarizer")
usage_kwargs: dict[Any, Any] = dict()
usage_kwargs[ToolStudioPromptKeys.RUN_ID] = run_id
Expand Down Expand Up @@ -880,7 +881,6 @@ def _fetch_response(
document_id=document_id,
run_id=run_id,
enable_highlight=tool.enable_highlight,
doc_id=doc_id,
)
logger.info(f"Extracted text from {file_path} for {doc_id}")
if is_summary:
Expand Down Expand Up @@ -1232,25 +1232,13 @@ def _fetch_single_pass_response(
file_path = os.path.join(
directory, "extract", os.path.splitext(filename)[0] + ".txt"
)
doc_id = IndexingUtils.generate_index_key(
vector_db=str(default_profile.vector_store.id),
embedding=str(default_profile.embedding_model.id),
x2text=str(default_profile.x2text.id),
chunk_size=str(default_profile.chunk_size),
chunk_overlap=str(default_profile.chunk_overlap),
file_path=input_file_path,
file_hash=None,
fs=fs_instance,
tool=util,
)
PromptStudioHelper.dynamic_extractor(
profile_manager=default_profile,
file_path=input_file_path,
org_id=org_id,
document_id=document_id,
run_id=run_id,
enable_highlight=tool.enable_highlight,
doc_id=doc_id,
)
# Indexing is not needed as Single pass is always non chunked.
vector_db = str(default_profile.vector_store.id)
Expand Down Expand Up @@ -1326,8 +1314,11 @@ def dynamic_extractor(
org_id: str,
profile_manager: ProfileManager,
document_id: str,
doc_id: str,
) -> str:
# Guard against None metadata (when adapter_metadata_b is None)
metadata = profile_manager.x2text.metadata or {}
x2text_config_hash = ToolUtils.hash_str(json.dumps(metadata, sort_keys=True))

x2text = str(profile_manager.x2text.id)
is_extracted: bool = False
extract_file_path: str | None = None
Expand All @@ -1342,7 +1333,7 @@ def dynamic_extractor(
is_extracted = PromptStudioIndexHelper.check_extraction_status(
document_id=document_id,
profile_manager=profile_manager,
doc_id=doc_id,
x2text_config_hash=x2text_config_hash,
enable_highlight=enable_highlight,
)
if is_extracted:
Expand Down Expand Up @@ -1381,16 +1372,36 @@ def dynamic_extractor(
request_id=StateStore.get(Common.REQUEST_ID),
)
extracted_text = responder.extract(payload=payload)
PromptStudioIndexHelper.mark_extraction_status(
success = PromptStudioIndexHelper.mark_extraction_status(
document_id=document_id,
profile_manager=profile_manager,
doc_id=doc_id,
x2text_config_hash=x2text_config_hash,
enable_highlight=enable_highlight,
)
if not success:
logger.warning(
f"Failed to mark extraction success for document {document_id}. "
f"Extraction completed but status not saved."
)
except SdkError as e:
msg = str(e)
if e.actual_err and hasattr(e.actual_err, "response"):
msg = e.actual_err.response.json().get("error", str(e))

success = PromptStudioIndexHelper.mark_extraction_status(
document_id=document_id,
profile_manager=profile_manager,
x2text_config_hash=x2text_config_hash,
enable_highlight=enable_highlight,
extracted=False,
error_message=msg,
)
if not success:
logger.warning(
f"Failed to mark extraction failure for document {document_id}. "
f"Extraction failed but status not saved."
)

raise ExtractionAPIError(
f"Failed to extract '{filename}'. {msg}",
status_code=int(e.status_code or 500),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,23 @@ def handle_index_manager(
def mark_extraction_status(
document_id: str,
profile_manager: ProfileManager,
doc_id: str,
enable_highlight: bool = False,
x2text_config_hash: str,
enable_highlight: bool,
extracted: bool = True,
error_message: str | None = None,
) -> bool:
"""Marks the extraction status for a given document with highlight metadata.
"""Marks the extraction status for a given document.

Uses x2text_config_hash (hash of X2Text config metadata) as the key.
Handles both successful and failed extractions.

Args:
document_id (str): ID of the document in DocumentManager.
profile_manager (ProfileManager): ProfileManager instance for context.
doc_id (str): Unique identifier for the document within extraction status.
enable_highlight (bool): Whether highlight metadata was used during extraction.
x2text_config_hash (str): Hash of X2Text config metadata.
enable_highlight (bool): Whether highlight metadata was used/attempted.
extracted (bool): True for success, False for failure. Defaults to True.
error_message (str | None): Error message if extraction failed.

Returns:
bool: True if the status is successfully updated, False otherwise.
Expand All @@ -95,62 +102,78 @@ def mark_extraction_status(
"profile_manager": profile_manager,
}

index_manager, created = IndexManager.objects.get_or_create(**args)

index_manager.extraction_status = index_manager.extraction_status or {}

index_manager.extraction_status[doc_id] = {
"extracted": True,
# Build extraction status data
status_data = {
"extracted": extracted,
"enable_highlight": enable_highlight,
}

# Add error message if extraction failed
if not extracted and error_message:
status_data["error"] = error_message

defaults = {"extraction_status": {x2text_config_hash: status_data}}

index_manager, created = IndexManager.objects.update_or_create(
**args,
defaults=defaults,
)

logger.info(
f"Index manager {index_manager} {index_manager.index_ids_history}"
)
index_manager.save(update_fields=["extraction_status"])

if created:
logger.info(
f"IndexManager entry created "
f"for document: {document_id} with {doc_id} "
f"(highlight={enable_highlight})"
)
if extracted:
if created:
logger.info(
f"IndexManager entry created with SUCCESS "
f"for document: {document_id} "
f"with x2text_config_hash: {x2text_config_hash}"
)
else:
logger.info(
f"Extraction SUCCESS for document: {document_id} "
f"with x2text_config_hash: {x2text_config_hash}"
)
else:
logger.info(
f"Updated extraction status "
f"for document: {document_id} with {doc_id} "
f"(highlight={enable_highlight})"
logger.error(
f"Extraction FAILED for document: {document_id} "
f"with x2text_config_hash: {x2text_config_hash}. "
f"Error: {error_message}"
)

return True

except DocumentManager.DoesNotExist:
logger.error(f"Document with ID {document_id} does not exist.")
raise IndexingAPIError(
"Error occured while extracting. Please contact admin."
)
return False

except Exception as e:
logger.error(f"Unexpected error updating extraction status: {e}")
raise IndexingAPIError(f"Error updating indexing status {str(e)}") from e
logger.exception(
f"Unexpected error marking extraction status for document {document_id}: {e}"
)
return False

@staticmethod
def check_extraction_status(
document_id: str,
profile_manager: ProfileManager,
doc_id: str,
enable_highlight: bool = False,
x2text_config_hash: str,
enable_highlight: bool,
) -> bool:
"""Checks if the extraction status is already marked as complete
for the given document and doc_id with matching highlight setting.
"""Checks if the extraction status is already marked as complete.

Uses x2text_config_hash (hash of X2Text config metadata) as the key.
Also validates that enable_highlight setting matches.

Args:
document_id (str): ID of the document in DocumentManager.
profile_manager (ProfileManager): ProfileManager instance for context.
doc_id (str): Unique identifier for the document within extraction status.
x2text_config_hash (str): Hash of X2Text config metadata.
enable_highlight (bool): Whether highlight metadata is required.

Returns:
bool: True if extraction is complete with matching highlight setting,
False otherwise.
bool: True if extraction is complete with matching settings, False otherwise.
"""
try:
index_manager = IndexManager.objects.filter(
Expand All @@ -162,45 +185,44 @@ def check_extraction_status(
return False

extraction_status = index_manager.extraction_status or {}
status_entry = extraction_status.get(doc_id)
status_entry = extraction_status.get(x2text_config_hash)

if not status_entry:
logger.info(
f"Extraction is NOT yet marked as complete "
f"for document: {document_id} with {doc_id}"
f"Extraction NOT complete for document: {document_id} "
f"with x2text_config_hash: {x2text_config_hash}"
)
return False

# Backward compatibility: treat boolean True as non-highlighted
if isinstance(status_entry, bool):
is_extracted = status_entry
is_highlight_handled = False
else:
# New format: {"extracted": True, "enable_highlight": <bool>}
is_extracted = status_entry.get("extracted", False)
is_highlight_handled = status_entry.get("enable_highlight", False)
# {"extracted": True/False, "enable_highlight": bool, "error": str (optional)}
is_extracted = status_entry.get("extracted", False)
stored_highlight = status_entry.get("enable_highlight", False)

# Check if extraction exists AND highlight setting matches
if is_extracted and is_highlight_handled == enable_highlight:
# Check if previous extraction failed
if not is_extracted:
error_msg = status_entry.get("error", "Unknown error")
logger.info(
f"Extraction is already marked as complete "
f"for document: {document_id} with {doc_id} "
f"Previous extraction FAILED for {x2text_config_hash}. "
f"Error: {error_msg}. Will retry extraction."
)
return False # Allow retry

if is_extracted and stored_highlight == enable_highlight:
logger.info(
f"Extraction already complete for document: {document_id} "
f"with x2text_config_hash: {x2text_config_hash} "
f"(highlight={enable_highlight})"
)
return True
elif is_extracted and is_highlight_handled != enable_highlight:
elif is_extracted and stored_highlight != enable_highlight:
logger.info(
f"Extraction exists but highlight mismatch "
f"for document: {document_id} with {doc_id}. "
f"Stored: {is_highlight_handled}, Requested: {enable_highlight}. "
f"Extraction exists but highlight mismatch for {x2text_config_hash}. "
f"Stored: {stored_highlight}, Requested: {enable_highlight}. "
f"Re-extraction needed."
)
return False
else:
logger.info(
f"Extraction is NOT yet marked as complete "
f"for document: {document_id} with {doc_id}"
)
logger.info(f"Extraction NOT complete for document: {document_id}")
return False

except Exception as e:
Expand Down