diff --git a/backend/api_v2/api_deployment_views.py b/backend/api_v2/api_deployment_views.py index cd1ac0e5a2..4178b5f2ea 100644 --- a/backend/api_v2/api_deployment_views.py +++ b/backend/api_v2/api_deployment_views.py @@ -81,6 +81,7 @@ def post( tag_names = serializer.validated_data.get(ApiExecution.TAGS) llm_profile_id = serializer.validated_data.get(ApiExecution.LLM_PROFILE_ID) hitl_queue_name = serializer.validated_data.get(ApiExecution.HITL_QUEUE_NAME) + hitl_packet_id = serializer.validated_data.get(ApiExecution.HITL_PACKET_ID) custom_data = serializer.validated_data.get(ApiExecution.CUSTOM_DATA) if presigned_urls: @@ -97,6 +98,7 @@ def post( tag_names=tag_names, llm_profile_id=llm_profile_id, hitl_queue_name=hitl_queue_name, + hitl_packet_id=hitl_packet_id, custom_data=custom_data, request_headers=dict(request.headers), ) diff --git a/backend/api_v2/constants.py b/backend/api_v2/constants.py index a8f5cc559a..88c85ac617 100644 --- a/backend/api_v2/constants.py +++ b/backend/api_v2/constants.py @@ -10,5 +10,6 @@ class ApiExecution: TAGS: str = "tags" LLM_PROFILE_ID: str = "llm_profile_id" HITL_QUEUE_NAME: str = "hitl_queue_name" + HITL_PACKET_ID: str = "hitl_packet_id" PRESIGNED_URLS: str = "presigned_urls" CUSTOM_DATA: str = "custom_data" diff --git a/backend/api_v2/deployment_helper.py b/backend/api_v2/deployment_helper.py index a0e059111f..f808ffc4f3 100644 --- a/backend/api_v2/deployment_helper.py +++ b/backend/api_v2/deployment_helper.py @@ -155,6 +155,7 @@ def execute_workflow( tag_names: list[str] = [], llm_profile_id: str | None = None, hitl_queue_name: str | None = None, + hitl_packet_id: str | None = None, custom_data: dict[str, Any] | None = None, request_headers=None, ) -> ReturnDict: @@ -169,6 +170,7 @@ def execute_workflow( tag_names (list(str)): list of tag names llm_profile_id (str, optional): LLM profile ID for overriding tool settings hitl_queue_name (str, optional): Custom queue name for manual review + hitl_packet_id (str, optional): Packet ID for packet-based review custom_data (dict[str, Any], optional): JSON data for custom_data variable replacement in prompts Returns: @@ -236,6 +238,7 @@ def execute_workflow( use_file_history=use_file_history, llm_profile_id=llm_profile_id, hitl_queue_name=hitl_queue_name, + hitl_packet_id=hitl_packet_id, custom_data=custom_data, ) result.status_api = DeploymentHelper.construct_status_endpoint( diff --git a/backend/api_v2/serializers.py b/backend/api_v2/serializers.py index 4f46784b6e..1f5c102690 100644 --- a/backend/api_v2/serializers.py +++ b/backend/api_v2/serializers.py @@ -4,6 +4,7 @@ from typing import Any from urllib.parse import urlparse +from django.apps import apps from django.core.validators import RegexValidator from pipeline_v2.models import Pipeline from prompt_studio.prompt_profile_manager_v2.models import ProfileManager @@ -227,6 +228,7 @@ class ExecutionRequestSerializer(TagParamsSerializer): presigned_urls = ListField(child=URLField(), required=False) llm_profile_id = CharField(required=False, allow_null=True, allow_blank=True) hitl_queue_name = CharField(required=False, allow_null=True, allow_blank=True) + hitl_packet_id = CharField(required=False, allow_null=True, allow_blank=True) custom_data = JSONField(required=False, allow_null=True) def validate_hitl_queue_name(self, value: str | None) -> str | None: @@ -248,6 +250,36 @@ def validate_hitl_queue_name(self, value: str | None) -> str | None: ) return value + def validate_hitl_packet_id(self, value: str | None) -> str | None: + """Validate packet ID format using enterprise validation if available.""" + if not value: + return value + + # Check if HITL feature is available + if not apps.is_installed("pluggable_apps.manual_review_v2"): + raise ValidationError( + "Packet-based HITL processing requires Unstract Enterprise. " + "This advanced workflow feature is available in our enterprise version. " + "Learn more at https://docs.unstract.com/unstract/unstract_platform/features/workflows/hqr_deployment_workflows/ or " + "contact our sales team at https://unstract.com/contact/" + ) + + # Validate packet ID format (alphanumeric string, typically 8-character hex) + value = value.strip() + if not value: + raise ValidationError("Packet ID cannot be empty or whitespace only.") + + # Basic format validation: alphanumeric, reasonable length + if not re.match(r"^[a-zA-Z0-9_-]+$", value): + raise ValidationError( + "Invalid packet ID format. Packet ID must contain only letters, numbers, hyphens, or underscores." + ) + + if len(value) > 16: # Reasonable max length + raise ValidationError("Packet ID is too long (maximum 100 characters).") + + return value + def validate_custom_data(self, value): """Validate custom_data is a valid JSON object.""" if value is None: diff --git a/backend/workflow_manager/endpoint_v2/destination.py b/backend/workflow_manager/endpoint_v2/destination.py index 83917a08f9..4965c946ce 100644 --- a/backend/workflow_manager/endpoint_v2/destination.py +++ b/backend/workflow_manager/endpoint_v2/destination.py @@ -70,6 +70,7 @@ def __init__( use_file_history: bool, file_execution_id: str | None = None, hitl_queue_name: str | None = None, + packet_id: str | None = None, ) -> None: """Initialize a DestinationConnector object. @@ -88,6 +89,7 @@ def __init__( self.workflow_log = workflow_log self.use_file_history = use_file_history self.hitl_queue_name = hitl_queue_name + self.packet_id = packet_id self.workflow = workflow def _get_endpoint_for_workflow( @@ -166,6 +168,19 @@ def _should_handle_hitl( file_execution_id: str, ) -> bool: """Determines if HITL processing should be performed, returning True if data was pushed to the queue.""" + # Check packet_id first - it takes precedence over hitl_queue_name + if self.packet_id: + logger.info( + f"API packet override: pushing to packet queue for file {file_name}" + ) + self._push_data_to_queue( + file_name=file_name, + workflow=workflow, + input_file_path=input_file_path, + file_execution_id=file_execution_id, + ) + return True + # Check if API deployment requested HITL override if self.hitl_queue_name: logger.info(f"API HITL override: pushing to queue for file {file_name}") @@ -175,7 +190,6 @@ def _should_handle_hitl( input_file_path=input_file_path, file_execution_id=file_execution_id, ) - logger.info(f"Successfully pushed {file_name} to HITL queue") return True # Skip HITL validation if we're using file_history and no execution result is available @@ -753,6 +767,7 @@ def get_config(self) -> DestinationConfig: execution_id=self.execution_id, use_file_history=self.use_file_history, hitl_queue_name=self.hitl_queue_name, + packet_id=self.packet_id, ) @classmethod @@ -781,6 +796,7 @@ def from_config( use_file_history=config.use_file_history, file_execution_id=config.file_execution_id, hitl_queue_name=config.hitl_queue_name, + packet_id=config.packet_id, ) return destination @@ -839,99 +855,192 @@ def _push_to_queue( Returns: None """ + # Handle missing result for packet processing if not result: - return - connector: ConnectorInstance = self.source_endpoint.connector_instance - # For API deployments, use workflow execution storage instead of connector + if not self.packet_id: + return + result = json.dumps({"status": "pending", "message": "Awaiting processing"}) + + # Delegate to appropriate handler based on deployment type if self.is_api: - logger.debug( - f"API deployment detected for {file_name}, using workflow execution file system" + self._push_to_queue_for_api_deployment( + file_name, result, input_file_path, file_execution_id, meta_data ) - # For API deployments, read file content from workflow execution storage - file_content_base64 = self._read_file_content_for_queue( - input_file_path, file_name + else: + self._push_to_queue_for_connector( + file_name, + workflow, + result, + input_file_path, + file_execution_id, + meta_data, ) - # Use common queue naming method - q_name = self._get_review_queue_name() - whisper_hash = meta_data.get("whisper-hash") if meta_data else None + def _enqueue_to_packet_or_regular_queue( + self, + file_name: str, + queue_result: dict[str, Any], + queue_result_json: str, + q_name: str, + ttl_seconds: int | None = None, + ) -> None: + """Route to packet queue or regular queue based on packet_id. - # Get extracted text from metadata (added by structure tool) - extracted_text = meta_data.get("extracted_text") if meta_data else None + Args: + file_name: Name of the file being queued + queue_result: Queue result dictionary + queue_result_json: JSON string of queue result + q_name: Queue name for regular queue + ttl_seconds: TTL in seconds (optional, for regular queue) + """ + # Get queue instance + conn = QueueUtils.get_queue_inst() - queue_result = QueueResult( - file=file_name, - status=QueueResultStatus.SUCCESS, - result=result, - workflow_id=str(self.workflow_id), - file_content=file_content_base64, - whisper_hash=whisper_hash, - file_execution_id=file_execution_id, - extracted_text=extracted_text, - ).to_dict() + if self.packet_id: + # Route to packet queue + success = conn.enqueue_to_packet( + packet_id=self.packet_id, queue_result=queue_result + ) + if not success: + error_msg = f"Failed to push {file_name} to packet {self.packet_id}" + logger.error(error_msg) + raise RuntimeError(error_msg) + return - queue_result_json = json.dumps(queue_result) - conn = QueueUtils.get_queue_inst() + # Route to regular queue + if ttl_seconds: + conn.enqueue_with_ttl( + queue_name=q_name, message=queue_result_json, ttl_seconds=ttl_seconds + ) + else: conn.enqueue(queue_name=q_name, message=queue_result_json) - logger.info(f"Pushed {file_name} to queue {q_name} with file content") - return - connector_settings: dict[str, Any] = connector.connector_metadata + logger.info(f"Pushed {file_name} to queue {q_name} with file content") + + def _create_queue_result( + self, + file_name: str, + result: str, + file_content_base64: str, + file_execution_id: str, + meta_data: dict[str, Any] | None = None, + ttl_seconds: int | None = None, + ) -> dict[str, Any]: + """Create QueueResult dictionary. + + Args: + file_name: Name of the file + result: Processing result + file_content_base64: Base64 encoded file content + file_execution_id: File execution ID + meta_data: Optional metadata + ttl_seconds: Optional TTL in seconds + + Returns: + QueueResult as dictionary + """ + whisper_hash = meta_data.get("whisper-hash") if meta_data else None + extracted_text = meta_data.get("extracted_text") if meta_data else None + + queue_result_obj = QueueResult( + file=file_name, + status=QueueResultStatus.SUCCESS, + result=result, + workflow_id=str(self.workflow_id), + file_content=file_content_base64, + whisper_hash=whisper_hash, + file_execution_id=file_execution_id, + extracted_text=extracted_text, + ttl_seconds=ttl_seconds, + ) + return queue_result_obj.to_dict() + + def _push_to_queue_for_api_deployment( + self, + file_name: str, + result: str, + input_file_path: str, + file_execution_id: str, + meta_data: dict[str, Any] | None = None, + ) -> None: + """Handle queue push for API deployments. + + Args: + file_name: Name of the file + result: Processing result + input_file_path: Path to input file + file_execution_id: File execution ID + meta_data: Optional metadata + """ + file_content_base64 = self._read_file_content_for_queue( + input_file_path, file_name + ) + q_name = self._get_review_queue_name() + + queue_result = self._create_queue_result( + file_name=file_name, + result=result, + file_content_base64=file_content_base64, + file_execution_id=file_execution_id, + meta_data=meta_data, + ) + + queue_result_json = json.dumps(queue_result) + self._enqueue_to_packet_or_regular_queue( + file_name, queue_result, queue_result_json, q_name + ) + + def _push_to_queue_for_connector( + self, + file_name: str, + workflow: Workflow, + result: str, + input_file_path: str, + file_execution_id: str, + meta_data: dict[str, Any] | None = None, + ) -> None: + """Handle queue push for connector-based deployments. + + Args: + file_name: Name of the file + workflow: Workflow object + result: Processing result + input_file_path: Path to input file + file_execution_id: File execution ID + meta_data: Optional metadata + """ + connector = self.source_endpoint.connector_instance + connector_settings = connector.connector_metadata source_fs = self.get_fsspec( settings=connector_settings, connector_id=connector.connector_id ) + with source_fs.open(input_file_path, "rb") as remote_file: - whisper_hash = None file_content = remote_file.read() - # Convert file content to a base64 encoded string file_content_base64 = base64.b64encode(file_content).decode("utf-8") - # Use common queue naming method - q_name = self._get_review_queue_name() - if meta_data: - whisper_hash = meta_data.get("whisper-hash") - extracted_text = meta_data.get("extracted_text") - else: - whisper_hash = None - extracted_text = None - - # Get TTL from workflow settings - ttl_seconds = WorkflowUtil.get_hitl_ttl_seconds(workflow) - - # Create QueueResult with TTL metadata - queue_result_obj = QueueResult( - file=file_name, - status=QueueResultStatus.SUCCESS, - result=result, - workflow_id=str(self.workflow_id), - file_content=file_content_base64, - whisper_hash=whisper_hash, - file_execution_id=file_execution_id, - extracted_text=extracted_text, - ttl_seconds=ttl_seconds, - ) - # Add TTL metadata based on HITLSettings - queue_result_obj.ttl_seconds = WorkflowUtil.get_hitl_ttl_seconds(workflow) - - queue_result = queue_result_obj.to_dict() - queue_result_json = json.dumps(queue_result) + q_name = self._get_review_queue_name() + ttl_seconds = WorkflowUtil.get_hitl_ttl_seconds(workflow) - # Validate the JSON is not empty before enqueuing - if not queue_result_json or queue_result_json.strip() == "": - logger.error( - f"Attempted to enqueue empty JSON with TTL for file {file_name}" - ) - raise ValueError("Cannot enqueue empty JSON message") + queue_result = self._create_queue_result( + file_name=file_name, + result=result, + file_content_base64=file_content_base64, + file_execution_id=file_execution_id, + meta_data=meta_data, + ttl_seconds=ttl_seconds, + ) - conn = QueueUtils.get_queue_inst() + queue_result_json = json.dumps(queue_result) - # Use the TTL metadata that was already set in the QueueResult object - ttl_seconds = queue_result_obj.ttl_seconds + # Validate JSON is not empty + if not queue_result_json or queue_result_json.strip() == "": + logger.error(f"Attempted to enqueue empty JSON with TTL for file {file_name}") + raise ValueError("Cannot enqueue empty JSON message") - conn.enqueue_with_ttl( - queue_name=q_name, message=queue_result_json, ttl_seconds=ttl_seconds - ) - logger.info(f"Pushed {file_name} to queue {q_name} with file content") + self._enqueue_to_packet_or_regular_queue( + file_name, queue_result, queue_result_json, q_name, ttl_seconds + ) def _read_file_content_for_queue(self, input_file_path: str, file_name: str) -> str: """Read and encode file content for queue message. diff --git a/backend/workflow_manager/endpoint_v2/dto.py b/backend/workflow_manager/endpoint_v2/dto.py index cbe990a3fe..192460d397 100644 --- a/backend/workflow_manager/endpoint_v2/dto.py +++ b/backend/workflow_manager/endpoint_v2/dto.py @@ -92,6 +92,7 @@ class DestinationConfig: use_file_history: bool file_execution_id: str | None = None hitl_queue_name: str | None = None + packet_id: str | None = None def to_json(self) -> dict[str, Any]: """Serialize the DestinationConfig instance to a JSON string.""" @@ -104,6 +105,7 @@ def to_json(self) -> dict[str, Any]: "use_file_history": self.use_file_history, "file_execution_id": file_execution_id, "hitl_queue_name": self.hitl_queue_name, + "packet_id": self.packet_id, } @staticmethod diff --git a/backend/workflow_manager/workflow_v2/workflow_helper.py b/backend/workflow_manager/workflow_v2/workflow_helper.py index 88f8d71101..e106dde2d4 100644 --- a/backend/workflow_manager/workflow_v2/workflow_helper.py +++ b/backend/workflow_manager/workflow_v2/workflow_helper.py @@ -64,6 +64,7 @@ EXECUTION_EXCLUDED_PARAMS = { "llm_profile_id", "hitl_queue_name", + "hitl_packet_id", "custom_data", } @@ -268,6 +269,7 @@ def run_workflow( use_file_history: bool = True, llm_profile_id: str | None = None, hitl_queue_name: str | None = None, + packet_id: str | None = None, custom_data: dict[str, Any] | None = None, ) -> ExecutionResponse: tool_instances: list[ToolInstance] = ( @@ -297,6 +299,7 @@ def run_workflow( workflow_log=workflow_log, use_file_history=use_file_history, hitl_queue_name=hitl_queue_name, + packet_id=packet_id, ) try: # Validating endpoints @@ -439,6 +442,7 @@ def execute_workflow_async( use_file_history: bool = True, llm_profile_id: str | None = None, hitl_queue_name: str | None = None, + hitl_packet_id: str | None = None, custom_data: dict[str, Any] | None = None, ) -> ExecutionResponse: """Adding a workflow to the queue for execution. @@ -453,6 +457,7 @@ def execute_workflow_async( processed files. Defaults to True hitl_queue_name (str | None): Name of the HITL queue to push files to llm_profile_id (str, optional): LLM profile ID for overriding tool settings + hitl_packet_id (str | None): Packet ID for packet-based HITL workflows Returns: ExecutionResponse: Existing status of execution @@ -479,6 +484,7 @@ def execute_workflow_async( "use_file_history": use_file_history, "llm_profile_id": llm_profile_id, "hitl_queue_name": hitl_queue_name, + "hitl_packet_id": hitl_packet_id, "custom_data": custom_data, }, queue=queue, @@ -679,8 +685,9 @@ def execute_workflow( execution_id=execution_id, task_id=task_id ) try: + hitl_packet_id_from_kwargs = kwargs.get("hitl_packet_id") logger.info( - f"Starting workflow execution: workflow_id={workflow_id}, execution_id={execution_id}, hitl_queue_name={kwargs.get('hitl_queue_name')}" + f"Starting workflow execution: workflow_id={workflow_id}, execution_id={execution_id}, hitl_queue_name={kwargs.get('hitl_queue_name')}, hitl_packet_id={hitl_packet_id_from_kwargs}" ) execution_response = WorkflowHelper.run_workflow( workflow=workflow, @@ -693,6 +700,7 @@ def execute_workflow( use_file_history=use_file_history, llm_profile_id=kwargs.get("llm_profile_id"), hitl_queue_name=kwargs.get("hitl_queue_name"), + packet_id=hitl_packet_id_from_kwargs, custom_data=kwargs.get("custom_data"), ) except Exception as error: diff --git a/unstract/core/src/unstract/core/data_models.py b/unstract/core/src/unstract/core/data_models.py index b12671f1ca..c8d23d375d 100644 --- a/unstract/core/src/unstract/core/data_models.py +++ b/unstract/core/src/unstract/core/data_models.py @@ -685,6 +685,7 @@ class FileHashData: use_file_history: bool = False # Whether to create file history entries for this file is_manualreview_required: bool = False # Whether this file requires manual review hitl_queue_name: str | None = None # HITL queue name for API deployments + hitl_packet_id: str | None = None # HITL packet ID for packet-based HITL processing def __post_init__(self): """Validate required fields.""" @@ -1383,6 +1384,7 @@ class WorkerFileData: source_config: dict[str, Any] = field(default_factory=dict) destination_config: dict[str, Any] = field(default_factory=dict) hitl_queue_name: str | None = field(default=None) + hitl_packet_id: str | None = field(default=None) manual_review_config: dict[str, Any] = field( default_factory=lambda: { "review_required": False, diff --git a/workers/api-deployment/tasks.py b/workers/api-deployment/tasks.py index 04ef2de8c4..31367066c0 100644 --- a/workers/api-deployment/tasks.py +++ b/workers/api-deployment/tasks.py @@ -838,6 +838,7 @@ def _create_file_data( f"No manual review rules configured for API deployment workflow {workflow_id}" ) hitl_queue_name = kwargs.get("hitl_queue_name") + hitl_packet_id = kwargs.get("hitl_packet_id") llm_profile_id = kwargs.get("llm_profile_id") custom_data = kwargs.get("custom_data") @@ -852,8 +853,9 @@ def _create_file_data( single_step=False, q_file_no_list=_calculate_q_file_no_list_api(manual_review_config, total_files), hitl_queue_name=hitl_queue_name, + hitl_packet_id=hitl_packet_id, manual_review_config=manual_review_config, - is_manualreview_required=bool(hitl_queue_name), + is_manualreview_required=bool(hitl_queue_name or hitl_packet_id), llm_profile_id=llm_profile_id, custom_data=custom_data, ) diff --git a/workers/file_processing/tasks.py b/workers/file_processing/tasks.py index 5339e228d1..78dbcc3ede 100644 --- a/workers/file_processing/tasks.py +++ b/workers/file_processing/tasks.py @@ -1454,6 +1454,15 @@ def _create_file_hash_from_dict( f"Applied HITL queue name '{file_data.hitl_queue_name}' to file {file_name}" ) + if file_data and file_data.hitl_packet_id: + file_hash.hitl_packet_id = file_data.hitl_packet_id + file_hash.is_manualreview_required = ( + True # Override manual review flag for packet processing + ) + logger.info( + f"Applied HITL packet ID '{file_data.hitl_packet_id}' to file {file_name}" + ) + return file_hash diff --git a/workers/general/tasks.py b/workers/general/tasks.py index f8b1a75000..addf18b9a6 100644 --- a/workers/general/tasks.py +++ b/workers/general/tasks.py @@ -816,6 +816,7 @@ def _orchestrate_file_processing_general( ) hitl_queue_name = kwargs.get("hitl_queue_name") + hitl_packet_id = kwargs.get("hitl_packet_id") llm_profile_id = kwargs.get("llm_profile_id") custom_data = kwargs.get("custom_data") @@ -831,6 +832,7 @@ def _orchestrate_file_processing_general( q_file_no_list=[], manual_review_config={}, hitl_queue_name=hitl_queue_name, + hitl_packet_id=hitl_packet_id, llm_profile_id=llm_profile_id, custom_data=custom_data, ) diff --git a/workers/shared/workflow/destination_connector.py b/workers/shared/workflow/destination_connector.py index 60cf9f5cfe..cc125a2d00 100644 --- a/workers/shared/workflow/destination_connector.py +++ b/workers/shared/workflow/destination_connector.py @@ -121,6 +121,7 @@ class DestinationConfig: connector_name: str | None = None # Manual review / HITL support hitl_queue_name: str | None = None + hitl_packet_id: str | None = None # Source connector configuration for reading files source_connector_id: str | None = None source_connector_settings: dict[str, Any] = None @@ -173,6 +174,7 @@ def from_dict(cls, data: dict[str, Any]) -> "DestinationConfig": connector_settings=connector_instance.get("connector_metadata", {}), connector_name=connector_instance.get("connector_name"), hitl_queue_name=data.get("hitl_queue_name"), + hitl_packet_id=data.get("hitl_packet_id"), source_connector_id=data.get("source_connector_id"), source_connector_settings=data.get("source_connector_settings", {}), file_execution_id=data.get("file_execution_id"), @@ -207,6 +209,7 @@ def __init__(self, config: DestinationConfig, workflow_log=None): # Manual review / HITL support self.hitl_queue_name = config.hitl_queue_name + self.hitl_packet_id = config.hitl_packet_id # Workflow and execution context (will be set when handling output) self.organization_id = None @@ -440,7 +443,10 @@ def _check_and_acquire_destination_lock( return True # Allow processing on infrastructure failure def _check_and_handle_hitl( - self, exec_ctx: ExecutionContext, file_ctx: FileContext, result: ProcessingResult + self, + exec_ctx: ExecutionContext, + file_ctx: FileContext, + result: ProcessingResult, ) -> bool: """Check HITL requirements and push to queue if needed.""" has_hitl = self._should_handle_hitl( @@ -464,7 +470,10 @@ def _check_and_handle_hitl( return has_hitl def _process_destination( - self, exec_ctx: ExecutionContext, file_ctx: FileContext, result: ProcessingResult + self, + exec_ctx: ExecutionContext, + file_ctx: FileContext, + result: ProcessingResult, ): """Route to appropriate destination handler.""" handlers = { @@ -481,7 +490,10 @@ def _process_destination( logger.warning(f"Unknown destination connection type: {self.connection_type}") def _handle_api_destination( - self, exec_ctx: ExecutionContext, file_ctx: FileContext, result: ProcessingResult + self, + exec_ctx: ExecutionContext, + file_ctx: FileContext, + result: ProcessingResult, ): """Handle API destination processing.""" log_file_info( @@ -503,7 +515,10 @@ def _handle_api_destination( ) def _handle_filesystem_destination( - self, exec_ctx: ExecutionContext, file_ctx: FileContext, result: ProcessingResult + self, + exec_ctx: ExecutionContext, + file_ctx: FileContext, + result: ProcessingResult, ): """Handle filesystem destination processing.""" if not result.has_hitl: @@ -513,7 +528,9 @@ def _handle_filesystem_destination( f"📤 File '{file_ctx.file_name}' marked for FILESYSTEM processing - copying to destination", ) self.copy_output_to_output_directory( - file_ctx.input_file_path, exec_ctx.file_execution_id, exec_ctx.api_client + file_ctx.input_file_path, + exec_ctx.file_execution_id, + exec_ctx.api_client, ) else: logger.info( @@ -521,7 +538,10 @@ def _handle_filesystem_destination( ) def _handle_database_destination( - self, exec_ctx: ExecutionContext, file_ctx: FileContext, result: ProcessingResult + self, + exec_ctx: ExecutionContext, + file_ctx: FileContext, + result: ProcessingResult, ): """Handle database destination processing.""" if not result.has_hitl: @@ -549,7 +569,10 @@ def _handle_database_destination( ) def _handle_manual_review_destination( - self, exec_ctx: ExecutionContext, file_ctx: FileContext, result: ProcessingResult + self, + exec_ctx: ExecutionContext, + file_ctx: FileContext, + result: ProcessingResult, ): """Handle manual review destination processing.""" log_file_info( @@ -1365,6 +1388,13 @@ def _should_handle_hitl( ) return False + # Check hitl_packet_id first - it takes precedence over everything else + if self.hitl_packet_id: + logger.info( + f"API packet override: pushing to packet queue for file {file_name}" + ) + return True + # Check if API deployment requested HITL override if self.hitl_queue_name: logger.info(f"{file_name}: Pushing to HITL queue") @@ -1403,6 +1433,75 @@ def _should_handle_hitl( return True return False + def _enqueue_to_packet_or_regular_queue( + self, + file_name: str, + queue_result: dict[str, Any], + queue_name: str, + workflow_util: Any, + ttl_seconds: int | None = None, + ) -> None: + """Route to packet queue or regular queue based on hitl_packet_id. + + Args: + file_name: Name of the file being queued + queue_result: Queue result dictionary + queue_name: Queue name for regular queue + workflow_util: Workflow utility instance for queue operations + ttl_seconds: TTL in seconds (optional, for regular queue) + """ + if self.hitl_packet_id: + # Route to packet queue via backend API (enterprise only) + logger.info(f"Routing {file_name} to packet queue {self.hitl_packet_id}") + + # Access the manual review client from workflow_util + # Enterprise: workflow_util.client is ManualReviewAPIClient + # OSS: workflow_util is null implementation without client attribute + manual_review_client = getattr(workflow_util, "client", None) + + # Check if client exists and has enqueue_to_packet method (enterprise only) + if not manual_review_client or not hasattr( + manual_review_client, "enqueue_to_packet" + ): + error_msg = ( + f"Packet queues are not available" + f"Cannot enqueue file '{file_name}' to packet '{self.hitl_packet_id}'. " + f"This is an Enterprise-only feature." + ) + logger.error(error_msg) + raise NotImplementedError(error_msg) + + # Call backend API to enqueue to packet queue + result = manual_review_client.enqueue_to_packet( + hitl_packet_id=self.hitl_packet_id, + queue_result=queue_result, + organization_id=self.organization_id, + ) + + if not result.get("success", False): + error_msg = ( + f"Failed to push {file_name} to packet {self.hitl_packet_id}: " + f"{result.get('message', 'Unknown error')}" + ) + logger.error(error_msg) + raise RuntimeError(error_msg) + + logger.info( + f"✅ MANUAL REVIEW: File '{file_name}' sent to packet queue '{self.hitl_packet_id}' successfully" + ) + return + + # Route to regular queue + logger.info(f"Routing {file_name} to regular queue {queue_name}") + workflow_util.enqueue_manual_review( + queue_name=queue_name, + message=queue_result, + organization_id=self.organization_id, + ) + logger.info( + f"✅ MANUAL REVIEW: File '{file_name}' sent to manual review queue '{queue_name}' successfully" + ) + def _push_data_to_queue( self, file_name: str, @@ -1487,21 +1586,23 @@ def _push_data_to_queue( if file_content_base64 is not None: queue_result.file_content = file_content_base64 - workflow_util.enqueue_manual_review( + # Route to packet queue or regular queue based on hitl_packet_id + self._enqueue_to_packet_or_regular_queue( + file_name=file_name, + queue_result=queue_result.to_dict(), queue_name=queue_name, - message=queue_result.to_dict(), - organization_id=self.organization_id, + workflow_util=workflow_util, + ttl_seconds=ttl_seconds, ) # Log successful enqueue (common for both paths) + queue_display_name = ( + self.hitl_packet_id if self.hitl_packet_id else queue_name + ) log_file_info( self.workflow_log, file_execution_id, - f"✅ File '{file_name}' sent to manual review queue '{queue_name}'", - ) - - logger.info( - f"✅ MANUAL REVIEW: File '{file_name}' sent to manual review queue '{queue_name}' successfully" + f"✅ File '{file_name}' sent to manual review queue '{queue_display_name}'", ) except Exception as e: diff --git a/workers/shared/workflow/execution/service.py b/workers/shared/workflow/execution/service.py index 7515da9457..79412f5bbf 100644 --- a/workers/shared/workflow/execution/service.py +++ b/workers/shared/workflow/execution/service.py @@ -1327,8 +1327,9 @@ def _handle_destination_processing( source_connection_type = workflow.source_config.connection_type - # Add HITL queue name from file_data if present (for API deployments) + # Add HITL queue name and packet ID from file_data if present (for API deployments) hitl_queue_name = file_data.hitl_queue_name + hitl_packet_id = file_data.hitl_packet_id destination_config["use_file_history"] = use_file_history destination_config["file_execution_id"] = workflow_file_execution_id if hitl_queue_name: @@ -1336,9 +1337,14 @@ def _handle_destination_processing( logger.info( f"Added HITL queue name to destination config: {hitl_queue_name}" ) - else: + if hitl_packet_id: + destination_config["hitl_packet_id"] = hitl_packet_id + logger.info( + f"Added HITL packet ID to destination config: {hitl_packet_id}" + ) + if not hitl_queue_name and not hitl_packet_id: logger.info( - "No hitl_queue_name found in file_data, proceeding with normal processing" + "No hitl_queue_name or hitl_packet_id found in file_data, proceeding with normal processing" ) # Import destination connector