From ef40b15e6483bcf1b294ee8bca3f5518ef8a0474 Mon Sep 17 00:00:00 2001 From: jagadeeswaran-zipstack Date: Thu, 18 Sep 2025 08:08:11 +0530 Subject: [PATCH 01/10] UN-2807 [FEAT] Add packet processing support for HITL workflows MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Added packet_id parameter to API execution flow - Integrated packet-based routing in destination connector - Enhanced queue management to support packet-based review alongside traditional HITL queues - Updated DTOs and serializers to include packet_id field This enables workflows to be grouped into packets for batch review in HITL scenarios. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- backend/api_v2/api_deployment_views.py | 2 + backend/api_v2/constants.py | 1 + backend/api_v2/deployment_helper.py | 7 ++++ .../endpoint_v2/destination.py | 42 +++++++++++++++++++ backend/workflow_manager/endpoint_v2/dto.py | 2 + 5 files changed, 54 insertions(+) diff --git a/backend/api_v2/api_deployment_views.py b/backend/api_v2/api_deployment_views.py index 9632a62475..7716afa15a 100644 --- a/backend/api_v2/api_deployment_views.py +++ b/backend/api_v2/api_deployment_views.py @@ -70,6 +70,7 @@ def post( tag_names = serializer.validated_data.get(ApiExecution.TAGS) llm_profile_id = serializer.validated_data.get(ApiExecution.LLM_PROFILE_ID) hitl_queue_name = serializer.validated_data.get(ApiExecution.HITL_QUEUE_NAME) + packet_id = serializer.validated_data.get(ApiExecution.PACKET_ID) if presigned_urls: DeploymentHelper.load_presigned_files(presigned_urls, file_objs) @@ -85,6 +86,7 @@ def post( tag_names=tag_names, llm_profile_id=llm_profile_id, hitl_queue_name=hitl_queue_name, + packet_id=packet_id, request_headers=dict(request.headers), ) if "error" in response and response["error"]: diff --git a/backend/api_v2/constants.py b/backend/api_v2/constants.py index 0495929fb9..30dd388bb7 100644 --- a/backend/api_v2/constants.py +++ b/backend/api_v2/constants.py @@ -10,4 +10,5 @@ class ApiExecution: TAGS: str = "tags" LLM_PROFILE_ID: str = "llm_profile_id" HITL_QUEUE_NAME: str = "hitl_queue_name" + PACKET_ID: str = "packet_id" PRESIGNED_URLS: str = "presigned_urls" diff --git a/backend/api_v2/deployment_helper.py b/backend/api_v2/deployment_helper.py index dfc0484d9c..a3f27616ab 100644 --- a/backend/api_v2/deployment_helper.py +++ b/backend/api_v2/deployment_helper.py @@ -155,6 +155,7 @@ def execute_workflow( tag_names: list[str] = [], llm_profile_id: str | None = None, hitl_queue_name: str | None = None, + packet_id: str | None = None, request_headers=None, ) -> ReturnDict: """Execute workflow by api. @@ -168,6 +169,7 @@ def execute_workflow( tag_names (list(str)): list of tag names llm_profile_id (str, optional): LLM profile ID for overriding tool settings hitl_queue_name (str, optional): Custom queue name for manual review + packet_id (str, optional): Packet ID for packet-based review Returns: ReturnDict: execution status/ result @@ -178,6 +180,10 @@ def execute_workflow( logger.info( f"API execution with HITL: hitl_queue_name={hitl_queue_name}, api_name={api.api_name}" ) + if packet_id: + logger.info( + f"API execution with Packet: packet_id={packet_id}, api_name={api.api_name}" + ) tags = Tag.bulk_get_or_create(tag_names=tag_names) workflow_execution = WorkflowExecutionServiceHelper.create_workflow_execution( workflow_id=workflow_id, @@ -234,6 +240,7 @@ def execute_workflow( use_file_history=use_file_history, llm_profile_id=llm_profile_id, hitl_queue_name=hitl_queue_name, + packet_id=packet_id, ) result.status_api = DeploymentHelper.construct_status_endpoint( api_endpoint=api.api_endpoint, execution_id=execution_id diff --git a/backend/workflow_manager/endpoint_v2/destination.py b/backend/workflow_manager/endpoint_v2/destination.py index 8c1ef03222..65405e1d2d 100644 --- a/backend/workflow_manager/endpoint_v2/destination.py +++ b/backend/workflow_manager/endpoint_v2/destination.py @@ -6,6 +6,7 @@ from typing import Any from connector_v2.models import ConnectorInstance +from pluggable_apps.manual_review_v2.packet_queue_utils import PacketQueueUtils from plugins.workflow_manager.workflow_v2.utils import WorkflowUtil from rest_framework.exceptions import APIException from usage_v2.helper import UsageHelper @@ -64,6 +65,7 @@ def __init__( use_file_history: bool, file_execution_id: str | None = None, hitl_queue_name: str | None = None, + packet_id: str | None = None, ) -> None: """Initialize a DestinationConnector object. @@ -82,6 +84,7 @@ def __init__( self.workflow_log = workflow_log self.use_file_history = use_file_history self.hitl_queue_name = hitl_queue_name + self.packet_id = packet_id self.workflow = workflow def _get_endpoint_for_workflow( @@ -172,6 +175,19 @@ def _should_handle_hitl( logger.info(f"Successfully pushed {file_name} to HITL queue") return True + if self.packet_id: + logger.info( + f"API packet override: pushing to packet queue for file {file_name}" + ) + self._push_data_to_queue( + file_name=file_name, + workflow=workflow, + input_file_path=input_file_path, + file_execution_id=file_execution_id, + ) + logger.info(f"Successfully pushed {file_name} to packet queue") + return True + # Skip HITL validation if we're using file_history and no execution result is available if self.is_api and self.use_file_history: return False @@ -749,6 +765,7 @@ def get_config(self) -> DestinationConfig: execution_id=self.execution_id, use_file_history=self.use_file_history, hitl_queue_name=self.hitl_queue_name, + packet_id=self.packet_id, ) @classmethod @@ -777,6 +794,7 @@ def from_config( use_file_history=config.use_file_history, file_execution_id=config.file_execution_id, hitl_queue_name=config.hitl_queue_name, + packet_id=config.packet_id, ) return destination @@ -868,6 +886,18 @@ def _push_to_queue( queue_result_json = json.dumps(queue_result) + # Check if this is a packet-based execution + if self.packet_id: + # Route to packet queue instead of regular HITL queue + success = PacketQueueUtils.enqueue_to_packet( + packet_id=self.packet_id, queue_result=queue_result + ) + if success: + logger.info(f"Pushed {file_name} to packet {self.packet_id}") + else: + logger.error(f"Failed to push {file_name} to packet {self.packet_id}") + return + conn = QueueUtils.get_queue_inst() conn.enqueue(queue_name=q_name, message=queue_result_json) logger.info(f"Pushed {file_name} to queue {q_name} with file content") @@ -918,6 +948,18 @@ def _push_to_queue( ) raise ValueError("Cannot enqueue empty JSON message") + # Check if this is a packet-based execution + if self.packet_id: + # Route to packet queue instead of regular HITL queue + success = PacketQueueUtils.enqueue_to_packet( + packet_id=self.packet_id, queue_result=queue_result_obj + ) + if success: + logger.info(f"Pushed {file_name} to packet {self.packet_id}") + else: + logger.error(f"Failed to push {file_name} to packet {self.packet_id}") + return + conn = QueueUtils.get_queue_inst() # Use the TTL metadata that was already set in the QueueResult object diff --git a/backend/workflow_manager/endpoint_v2/dto.py b/backend/workflow_manager/endpoint_v2/dto.py index cbe990a3fe..192460d397 100644 --- a/backend/workflow_manager/endpoint_v2/dto.py +++ b/backend/workflow_manager/endpoint_v2/dto.py @@ -92,6 +92,7 @@ class DestinationConfig: use_file_history: bool file_execution_id: str | None = None hitl_queue_name: str | None = None + packet_id: str | None = None def to_json(self) -> dict[str, Any]: """Serialize the DestinationConfig instance to a JSON string.""" @@ -104,6 +105,7 @@ def to_json(self) -> dict[str, Any]: "use_file_history": self.use_file_history, "file_execution_id": file_execution_id, "hitl_queue_name": self.hitl_queue_name, + "packet_id": self.packet_id, } @staticmethod From bc31ae5301d7e7dd48ed14d9f67e4135d5896b67 Mon Sep 17 00:00:00 2001 From: jagadeeswaran-zipstack Date: Fri, 19 Sep 2025 10:05:31 +0530 Subject: [PATCH 02/10] added missing references --- backend/workflow_manager/workflow_v2/workflow_helper.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/backend/workflow_manager/workflow_v2/workflow_helper.py b/backend/workflow_manager/workflow_v2/workflow_helper.py index 696a257e4e..1f53654fa5 100644 --- a/backend/workflow_manager/workflow_v2/workflow_helper.py +++ b/backend/workflow_manager/workflow_v2/workflow_helper.py @@ -64,6 +64,7 @@ EXECUTION_EXCLUDED_PARAMS = { "llm_profile_id", "hitl_queue_name", + "packet_id", } @@ -267,6 +268,7 @@ def run_workflow( use_file_history: bool = True, llm_profile_id: str | None = None, hitl_queue_name: str | None = None, + packet_id: str | None = None, ) -> ExecutionResponse: tool_instances: list[ToolInstance] = ( ToolInstanceHelper.get_tool_instances_by_workflow( @@ -295,6 +297,7 @@ def run_workflow( workflow_log=workflow_log, use_file_history=use_file_history, hitl_queue_name=hitl_queue_name, + packet_id=packet_id, ) try: # Validating endpoints @@ -436,6 +439,7 @@ def execute_workflow_async( use_file_history: bool = True, llm_profile_id: str | None = None, hitl_queue_name: str | None = None, + packet_id: str | None = None, ) -> ExecutionResponse: """Adding a workflow to the queue for execution. @@ -475,6 +479,7 @@ def execute_workflow_async( "use_file_history": use_file_history, "llm_profile_id": llm_profile_id, "hitl_queue_name": hitl_queue_name, + "packet_id": packet_id, }, queue=queue, ) @@ -692,6 +697,7 @@ def execute_workflow( use_file_history=use_file_history, llm_profile_id=kwargs.get("llm_profile_id"), hitl_queue_name=kwargs.get("hitl_queue_name"), + packet_id=kwargs.get("packet_id"), ) except Exception as error: error_message = traceback.format_exc() From 969e60c3ce42ff8bceef013dbab77c8c1feb9abb Mon Sep 17 00:00:00 2001 From: jagadeeswaran-zipstack Date: Wed, 1 Oct 2025 09:47:24 +0530 Subject: [PATCH 03/10] added hitl prefix to query params --- backend/api_v2/api_deployment_views.py | 4 ++-- backend/api_v2/constants.py | 2 +- backend/api_v2/deployment_helper.py | 10 +++----- backend/api_v2/serializers.py | 1 + .../endpoint_v2/destination.py | 24 +++++++++---------- .../workflow_v2/workflow_helper.py | 12 ++++++---- 6 files changed, 26 insertions(+), 27 deletions(-) diff --git a/backend/api_v2/api_deployment_views.py b/backend/api_v2/api_deployment_views.py index 7716afa15a..62fae9cec3 100644 --- a/backend/api_v2/api_deployment_views.py +++ b/backend/api_v2/api_deployment_views.py @@ -70,7 +70,7 @@ def post( tag_names = serializer.validated_data.get(ApiExecution.TAGS) llm_profile_id = serializer.validated_data.get(ApiExecution.LLM_PROFILE_ID) hitl_queue_name = serializer.validated_data.get(ApiExecution.HITL_QUEUE_NAME) - packet_id = serializer.validated_data.get(ApiExecution.PACKET_ID) + hitl_packet_id = serializer.validated_data.get(ApiExecution.HITL_PACKET_ID) if presigned_urls: DeploymentHelper.load_presigned_files(presigned_urls, file_objs) @@ -86,7 +86,7 @@ def post( tag_names=tag_names, llm_profile_id=llm_profile_id, hitl_queue_name=hitl_queue_name, - packet_id=packet_id, + hitl_packet_id=hitl_packet_id, request_headers=dict(request.headers), ) if "error" in response and response["error"]: diff --git a/backend/api_v2/constants.py b/backend/api_v2/constants.py index 30dd388bb7..434aaf1623 100644 --- a/backend/api_v2/constants.py +++ b/backend/api_v2/constants.py @@ -10,5 +10,5 @@ class ApiExecution: TAGS: str = "tags" LLM_PROFILE_ID: str = "llm_profile_id" HITL_QUEUE_NAME: str = "hitl_queue_name" - PACKET_ID: str = "packet_id" + HITL_PACKET_ID: str = "hitl_packet_id" PRESIGNED_URLS: str = "presigned_urls" diff --git a/backend/api_v2/deployment_helper.py b/backend/api_v2/deployment_helper.py index a3f27616ab..02153cb894 100644 --- a/backend/api_v2/deployment_helper.py +++ b/backend/api_v2/deployment_helper.py @@ -155,7 +155,7 @@ def execute_workflow( tag_names: list[str] = [], llm_profile_id: str | None = None, hitl_queue_name: str | None = None, - packet_id: str | None = None, + hitl_packet_id: str | None = None, request_headers=None, ) -> ReturnDict: """Execute workflow by api. @@ -169,7 +169,7 @@ def execute_workflow( tag_names (list(str)): list of tag names llm_profile_id (str, optional): LLM profile ID for overriding tool settings hitl_queue_name (str, optional): Custom queue name for manual review - packet_id (str, optional): Packet ID for packet-based review + hitl_packet_id (str, optional): Packet ID for packet-based review Returns: ReturnDict: execution status/ result @@ -180,10 +180,6 @@ def execute_workflow( logger.info( f"API execution with HITL: hitl_queue_name={hitl_queue_name}, api_name={api.api_name}" ) - if packet_id: - logger.info( - f"API execution with Packet: packet_id={packet_id}, api_name={api.api_name}" - ) tags = Tag.bulk_get_or_create(tag_names=tag_names) workflow_execution = WorkflowExecutionServiceHelper.create_workflow_execution( workflow_id=workflow_id, @@ -240,7 +236,7 @@ def execute_workflow( use_file_history=use_file_history, llm_profile_id=llm_profile_id, hitl_queue_name=hitl_queue_name, - packet_id=packet_id, + hitl_packet_id=hitl_packet_id, ) result.status_api = DeploymentHelper.construct_status_endpoint( api_endpoint=api.api_endpoint, execution_id=execution_id diff --git a/backend/api_v2/serializers.py b/backend/api_v2/serializers.py index 06ed566c32..b7c0ab023e 100644 --- a/backend/api_v2/serializers.py +++ b/backend/api_v2/serializers.py @@ -224,6 +224,7 @@ class ExecutionRequestSerializer(TagParamsSerializer): presigned_urls = ListField(child=URLField(), required=False) llm_profile_id = CharField(required=False, allow_null=True, allow_blank=True) hitl_queue_name = CharField(required=False, allow_null=True, allow_blank=True) + hitl_packet_id = CharField(required=False, allow_null=True, allow_blank=True) def validate_hitl_queue_name(self, value: str | None) -> str | None: """Validate queue name format using enterprise validation if available.""" diff --git a/backend/workflow_manager/endpoint_v2/destination.py b/backend/workflow_manager/endpoint_v2/destination.py index 65405e1d2d..5957e80c92 100644 --- a/backend/workflow_manager/endpoint_v2/destination.py +++ b/backend/workflow_manager/endpoint_v2/destination.py @@ -176,9 +176,6 @@ def _should_handle_hitl( return True if self.packet_id: - logger.info( - f"API packet override: pushing to packet queue for file {file_name}" - ) self._push_data_to_queue( file_name=file_name, workflow=workflow, @@ -854,7 +851,10 @@ def _push_to_queue( None """ if not result: - return + if not self.packet_id: + return + # For packet processing, use a placeholder result if none available + result = json.dumps({"status": "pending", "message": "Awaiting processing"}) connector: ConnectorInstance = self.source_endpoint.connector_instance # For API deployments, use workflow execution storage instead of connector if self.is_api: @@ -892,10 +892,10 @@ def _push_to_queue( success = PacketQueueUtils.enqueue_to_packet( packet_id=self.packet_id, queue_result=queue_result ) - if success: - logger.info(f"Pushed {file_name} to packet {self.packet_id}") - else: - logger.error(f"Failed to push {file_name} to packet {self.packet_id}") + if not success: + error_msg = f"Failed to push {file_name} to packet {self.packet_id}" + logger.error(error_msg) + raise RuntimeError(error_msg) return conn = QueueUtils.get_queue_inst() @@ -954,10 +954,10 @@ def _push_to_queue( success = PacketQueueUtils.enqueue_to_packet( packet_id=self.packet_id, queue_result=queue_result_obj ) - if success: - logger.info(f"Pushed {file_name} to packet {self.packet_id}") - else: - logger.error(f"Failed to push {file_name} to packet {self.packet_id}") + if not success: + error_msg = f"Failed to push {file_name} to packet {self.packet_id}" + logger.error(error_msg) + raise RuntimeError(error_msg) return conn = QueueUtils.get_queue_inst() diff --git a/backend/workflow_manager/workflow_v2/workflow_helper.py b/backend/workflow_manager/workflow_v2/workflow_helper.py index 1f53654fa5..dcff5f6ed7 100644 --- a/backend/workflow_manager/workflow_v2/workflow_helper.py +++ b/backend/workflow_manager/workflow_v2/workflow_helper.py @@ -64,7 +64,7 @@ EXECUTION_EXCLUDED_PARAMS = { "llm_profile_id", "hitl_queue_name", - "packet_id", + "hitl_packet_id", } @@ -439,7 +439,7 @@ def execute_workflow_async( use_file_history: bool = True, llm_profile_id: str | None = None, hitl_queue_name: str | None = None, - packet_id: str | None = None, + hitl_packet_id: str | None = None, ) -> ExecutionResponse: """Adding a workflow to the queue for execution. @@ -453,6 +453,7 @@ def execute_workflow_async( processed files. Defaults to True hitl_queue_name (str | None): Name of the HITL queue to push files to llm_profile_id (str, optional): LLM profile ID for overriding tool settings + hitl_packet_id (str | None): Packet ID for packet-based HITL workflows Returns: ExecutionResponse: Existing status of execution @@ -479,7 +480,7 @@ def execute_workflow_async( "use_file_history": use_file_history, "llm_profile_id": llm_profile_id, "hitl_queue_name": hitl_queue_name, - "packet_id": packet_id, + "hitl_packet_id": hitl_packet_id, }, queue=queue, ) @@ -683,8 +684,9 @@ def execute_workflow( execution_id=execution_id, task_id=task_id ) try: + hitl_packet_id_from_kwargs = kwargs.get("hitl_packet_id") logger.info( - f"Starting workflow execution: workflow_id={workflow_id}, execution_id={execution_id}, hitl_queue_name={kwargs.get('hitl_queue_name')}" + f"Starting workflow execution: workflow_id={workflow_id}, execution_id={execution_id}, hitl_queue_name={kwargs.get('hitl_queue_name')}, hitl_packet_id={hitl_packet_id_from_kwargs}" ) execution_response = WorkflowHelper.run_workflow( workflow=workflow, @@ -697,7 +699,7 @@ def execute_workflow( use_file_history=use_file_history, llm_profile_id=kwargs.get("llm_profile_id"), hitl_queue_name=kwargs.get("hitl_queue_name"), - packet_id=kwargs.get("packet_id"), + packet_id=hitl_packet_id_from_kwargs, ) except Exception as error: error_message = traceback.format_exc() From f10b5609c2886214aab3cec34dbfe59cef2b7cc0 Mon Sep 17 00:00:00 2001 From: jagadeeswaran-zipstack Date: Fri, 3 Oct 2025 07:49:53 +0530 Subject: [PATCH 04/10] added try-catch on enterprise import --- backend/api_v2/serializers.py | 18 +++++++++ .../endpoint_v2/destination.py | 37 +++++++++++++++---- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/backend/api_v2/serializers.py b/backend/api_v2/serializers.py index b7c0ab023e..663ce4a6bc 100644 --- a/backend/api_v2/serializers.py +++ b/backend/api_v2/serializers.py @@ -245,6 +245,24 @@ def validate_hitl_queue_name(self, value: str | None) -> str | None: ) return value + def validate_hitl_packet_id(self, value: str | None) -> str | None: + """Validate packet ID format using enterprise validation if available.""" + if not value: + return value + + # Packet-based processing requires enterprise features + try: + # If import succeeds, enterprise features are available + return value + except (ModuleNotFoundError, ImportError): + # Fallback to error if enterprise features not available + raise ValidationError( + "Packet-based HITL processing requires Unstract Enterprise. " + "This advanced workflow feature is available in our enterprise version. " + "Learn more at https://docs.unstract.com/unstract/unstract_platform/features/workflows/hqr_deployment_workflows/ or " + "contact our sales team at https://unstract.com/contact/" + ) + files = ListField( child=FileField(), required=False, diff --git a/backend/workflow_manager/endpoint_v2/destination.py b/backend/workflow_manager/endpoint_v2/destination.py index 5957e80c92..2eb5ee74be 100644 --- a/backend/workflow_manager/endpoint_v2/destination.py +++ b/backend/workflow_manager/endpoint_v2/destination.py @@ -6,7 +6,16 @@ from typing import Any from connector_v2.models import ConnectorInstance -from pluggable_apps.manual_review_v2.packet_queue_utils import PacketQueueUtils + +# Import PacketQueueUtils conditionally to support OSS (where pluggable_apps doesn't exist) +try: + from pluggable_apps.manual_review_v2.packet_queue_utils import PacketQueueUtils + + PACKET_QUEUE_AVAILABLE = True +except (ImportError, ModuleNotFoundError): + PacketQueueUtils = None + PACKET_QUEUE_AVAILABLE = False + from plugins.workflow_manager.workflow_v2.utils import WorkflowUtil from rest_framework.exceptions import APIException from usage_v2.helper import UsageHelper @@ -163,26 +172,28 @@ def _should_handle_hitl( file_execution_id: str, ) -> bool: """Determines if HITL processing should be performed, returning True if data was pushed to the queue.""" - # Check if API deployment requested HITL override - if self.hitl_queue_name: - logger.info(f"API HITL override: pushing to queue for file {file_name}") + # Check packet_id first - it takes precedence over hitl_queue_name + if self.packet_id: + logger.info( + f"API packet override: pushing to packet queue for file {file_name}" + ) self._push_data_to_queue( file_name=file_name, workflow=workflow, input_file_path=input_file_path, file_execution_id=file_execution_id, ) - logger.info(f"Successfully pushed {file_name} to HITL queue") return True - if self.packet_id: + # Check if API deployment requested HITL override + if self.hitl_queue_name: + logger.info(f"API HITL override: pushing to queue for file {file_name}") self._push_data_to_queue( file_name=file_name, workflow=workflow, input_file_path=input_file_path, file_execution_id=file_execution_id, ) - logger.info(f"Successfully pushed {file_name} to packet queue") return True # Skip HITL validation if we're using file_history and no execution result is available @@ -888,6 +899,11 @@ def _push_to_queue( # Check if this is a packet-based execution if self.packet_id: + if not PACKET_QUEUE_AVAILABLE: + raise ValueError( + "Packet-based HITL processing requires Unstract Enterprise. " + "This feature is not available in the OSS version." + ) # Route to packet queue instead of regular HITL queue success = PacketQueueUtils.enqueue_to_packet( packet_id=self.packet_id, queue_result=queue_result @@ -950,9 +966,14 @@ def _push_to_queue( # Check if this is a packet-based execution if self.packet_id: + if not PACKET_QUEUE_AVAILABLE: + raise ValueError( + "Packet-based HITL processing requires Unstract Enterprise. " + "This feature is not available in the OSS version." + ) # Route to packet queue instead of regular HITL queue success = PacketQueueUtils.enqueue_to_packet( - packet_id=self.packet_id, queue_result=queue_result_obj + packet_id=self.packet_id, queue_result=queue_result ) if not success: error_msg = f"Failed to push {file_name} to packet {self.packet_id}" From f83a9a98efc08b3248446ac50eba761a3ab52e47 Mon Sep 17 00:00:00 2001 From: jagadeeswaran-zipstack Date: Fri, 3 Oct 2025 20:01:48 +0530 Subject: [PATCH 05/10] used app registry for plugin check --- backend/api_v2/serializers.py | 11 ++++----- .../endpoint_v2/destination.py | 23 +++++++++---------- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/backend/api_v2/serializers.py b/backend/api_v2/serializers.py index 663ce4a6bc..b872af57d1 100644 --- a/backend/api_v2/serializers.py +++ b/backend/api_v2/serializers.py @@ -6,6 +6,7 @@ from django.core.validators import RegexValidator from pipeline_v2.models import Pipeline +from pluggable_apps.feature_registry import FeatureRegistry from prompt_studio.prompt_profile_manager_v2.models import ProfileManager from rest_framework.serializers import ( BooleanField, @@ -250,12 +251,8 @@ def validate_hitl_packet_id(self, value: str | None) -> str | None: if not value: return value - # Packet-based processing requires enterprise features - try: - # If import succeeds, enterprise features are available - return value - except (ModuleNotFoundError, ImportError): - # Fallback to error if enterprise features not available + # Check if HITL feature is available using FeatureRegistry + if not FeatureRegistry.is_hitl_available(): raise ValidationError( "Packet-based HITL processing requires Unstract Enterprise. " "This advanced workflow feature is available in our enterprise version. " @@ -263,6 +260,8 @@ def validate_hitl_packet_id(self, value: str | None) -> str | None: "contact our sales team at https://unstract.com/contact/" ) + return value + files = ListField( child=FileField(), required=False, diff --git a/backend/workflow_manager/endpoint_v2/destination.py b/backend/workflow_manager/endpoint_v2/destination.py index 2eb5ee74be..0fd489d72b 100644 --- a/backend/workflow_manager/endpoint_v2/destination.py +++ b/backend/workflow_manager/endpoint_v2/destination.py @@ -6,16 +6,7 @@ from typing import Any from connector_v2.models import ConnectorInstance - -# Import PacketQueueUtils conditionally to support OSS (where pluggable_apps doesn't exist) -try: - from pluggable_apps.manual_review_v2.packet_queue_utils import PacketQueueUtils - - PACKET_QUEUE_AVAILABLE = True -except (ImportError, ModuleNotFoundError): - PacketQueueUtils = None - PACKET_QUEUE_AVAILABLE = False - +from pluggable_apps.feature_registry import FeatureRegistry from plugins.workflow_manager.workflow_v2.utils import WorkflowUtil from rest_framework.exceptions import APIException from usage_v2.helper import UsageHelper @@ -899,12 +890,16 @@ def _push_to_queue( # Check if this is a packet-based execution if self.packet_id: - if not PACKET_QUEUE_AVAILABLE: + if not FeatureRegistry.is_hitl_available(): raise ValueError( "Packet-based HITL processing requires Unstract Enterprise. " "This feature is not available in the OSS version." ) # Route to packet queue instead of regular HITL queue + from pluggable_apps.manual_review_v2.packet_queue_utils import ( + PacketQueueUtils, + ) + success = PacketQueueUtils.enqueue_to_packet( packet_id=self.packet_id, queue_result=queue_result ) @@ -966,12 +961,16 @@ def _push_to_queue( # Check if this is a packet-based execution if self.packet_id: - if not PACKET_QUEUE_AVAILABLE: + if not FeatureRegistry.is_hitl_available(): raise ValueError( "Packet-based HITL processing requires Unstract Enterprise. " "This feature is not available in the OSS version." ) # Route to packet queue instead of regular HITL queue + from pluggable_apps.manual_review_v2.packet_queue_utils import ( + PacketQueueUtils, + ) + success = PacketQueueUtils.enqueue_to_packet( packet_id=self.packet_id, queue_result=queue_result ) From 028438ba4adfcef1f27dd83f992b3616296a7fc3 Mon Sep 17 00:00:00 2001 From: jagadeeswaran-zipstack Date: Sat, 4 Oct 2025 03:11:59 +0530 Subject: [PATCH 06/10] code refactor --- .../endpoint_v2/destination.py | 275 +++++++++++------- 1 file changed, 162 insertions(+), 113 deletions(-) diff --git a/backend/workflow_manager/endpoint_v2/destination.py b/backend/workflow_manager/endpoint_v2/destination.py index 70f1d49feb..d14bf7bd3d 100644 --- a/backend/workflow_manager/endpoint_v2/destination.py +++ b/backend/workflow_manager/endpoint_v2/destination.py @@ -856,145 +856,194 @@ def _push_to_queue( Returns: None """ + # Handle missing result for packet processing if not result: if not self.packet_id: return - # For packet processing, use a placeholder result if none available result = json.dumps({"status": "pending", "message": "Awaiting processing"}) - connector: ConnectorInstance = self.source_endpoint.connector_instance - # For API deployments, use workflow execution storage instead of connector + + # Delegate to appropriate handler based on deployment type if self.is_api: - logger.debug( - f"API deployment detected for {file_name}, using workflow execution file system" + self._push_to_queue_for_api_deployment( + file_name, result, input_file_path, file_execution_id, meta_data ) - # For API deployments, read file content from workflow execution storage - file_content_base64 = self._read_file_content_for_queue( - input_file_path, file_name + else: + self._push_to_queue_for_connector( + file_name, workflow, result, input_file_path, file_execution_id, meta_data ) - # Use common queue naming method - q_name = self._get_review_queue_name() - whisper_hash = meta_data.get("whisper-hash") if meta_data else None + def _enqueue_to_packet_or_regular_queue( + self, + file_name: str, + queue_result: dict[str, Any], + queue_result_json: str, + q_name: str, + ttl_seconds: int | None = None, + ) -> None: + """Route to packet queue or regular queue based on packet_id. - # Get extracted text from metadata (added by structure tool) - extracted_text = meta_data.get("extracted_text") if meta_data else None + Args: + file_name: Name of the file being queued + queue_result: Queue result dictionary + queue_result_json: JSON string of queue result + q_name: Queue name for regular queue + ttl_seconds: TTL in seconds (optional, for regular queue) + """ + if self.packet_id: + if not FeatureRegistry.is_hitl_available(): + raise ValueError( + "Packet-based HITL processing requires Unstract Enterprise. " + "This feature is not available in the OSS version." + ) + # Route to packet queue + from pluggable_apps.manual_review_v2.packet_queue_utils import ( + PacketQueueUtils, + ) - queue_result = QueueResult( - file=file_name, - status=QueueResultStatus.SUCCESS, - result=result, - workflow_id=str(self.workflow_id), - file_content=file_content_base64, - whisper_hash=whisper_hash, - file_execution_id=file_execution_id, - extracted_text=extracted_text, - ).to_dict() + success = PacketQueueUtils.enqueue_to_packet( + packet_id=self.packet_id, queue_result=queue_result + ) + if not success: + error_msg = f"Failed to push {file_name} to packet {self.packet_id}" + logger.error(error_msg) + raise RuntimeError(error_msg) + return - queue_result_json = json.dumps(queue_result) + # Route to regular queue + conn = QueueUtils.get_queue_inst() + if ttl_seconds: + conn.enqueue_with_ttl( + queue_name=q_name, message=queue_result_json, ttl_seconds=ttl_seconds + ) + else: + conn.enqueue(queue_name=q_name, message=queue_result_json) + logger.info(f"Pushed {file_name} to queue {q_name} with file content") - # Check if this is a packet-based execution - if self.packet_id: - if not FeatureRegistry.is_hitl_available(): - raise ValueError( - "Packet-based HITL processing requires Unstract Enterprise. " - "This feature is not available in the OSS version." - ) - # Route to packet queue instead of regular HITL queue - from pluggable_apps.manual_review_v2.packet_queue_utils import ( - PacketQueueUtils, - ) + def _create_queue_result( + self, + file_name: str, + result: str, + file_content_base64: str, + file_execution_id: str, + meta_data: dict[str, Any] | None = None, + ttl_seconds: int | None = None, + ) -> dict[str, Any]: + """Create QueueResult dictionary. - success = PacketQueueUtils.enqueue_to_packet( - packet_id=self.packet_id, queue_result=queue_result - ) - if not success: - error_msg = f"Failed to push {file_name} to packet {self.packet_id}" - logger.error(error_msg) - raise RuntimeError(error_msg) - return + Args: + file_name: Name of the file + result: Processing result + file_content_base64: Base64 encoded file content + file_execution_id: File execution ID + meta_data: Optional metadata + ttl_seconds: Optional TTL in seconds - conn = QueueUtils.get_queue_inst() - conn.enqueue(queue_name=q_name, message=queue_result_json) - logger.info(f"Pushed {file_name} to queue {q_name} with file content") - return - connector_settings: dict[str, Any] = connector.connector_metadata + Returns: + QueueResult as dictionary + """ + whisper_hash = meta_data.get("whisper-hash") if meta_data else None + extracted_text = meta_data.get("extracted_text") if meta_data else None + + queue_result_obj = QueueResult( + file=file_name, + status=QueueResultStatus.SUCCESS, + result=result, + workflow_id=str(self.workflow_id), + file_content=file_content_base64, + whisper_hash=whisper_hash, + file_execution_id=file_execution_id, + extracted_text=extracted_text, + ttl_seconds=ttl_seconds, + ) + return queue_result_obj.to_dict() + + def _push_to_queue_for_api_deployment( + self, + file_name: str, + result: str, + input_file_path: str, + file_execution_id: str, + meta_data: dict[str, Any] | None = None, + ) -> None: + """Handle queue push for API deployments. + + Args: + file_name: Name of the file + result: Processing result + input_file_path: Path to input file + file_execution_id: File execution ID + meta_data: Optional metadata + """ + file_content_base64 = self._read_file_content_for_queue( + input_file_path, file_name + ) + q_name = self._get_review_queue_name() + + queue_result = self._create_queue_result( + file_name=file_name, + result=result, + file_content_base64=file_content_base64, + file_execution_id=file_execution_id, + meta_data=meta_data, + ) + + queue_result_json = json.dumps(queue_result) + self._enqueue_to_packet_or_regular_queue( + file_name, queue_result, queue_result_json, q_name + ) + + def _push_to_queue_for_connector( + self, + file_name: str, + workflow: Workflow, + result: str, + input_file_path: str, + file_execution_id: str, + meta_data: dict[str, Any] | None = None, + ) -> None: + """Handle queue push for connector-based deployments. + + Args: + file_name: Name of the file + workflow: Workflow object + result: Processing result + input_file_path: Path to input file + file_execution_id: File execution ID + meta_data: Optional metadata + """ + connector = self.source_endpoint.connector_instance + connector_settings = connector.connector_metadata source_fs = self.get_fsspec( settings=connector_settings, connector_id=connector.connector_id ) + with source_fs.open(input_file_path, "rb") as remote_file: - whisper_hash = None file_content = remote_file.read() - # Convert file content to a base64 encoded string file_content_base64 = base64.b64encode(file_content).decode("utf-8") - # Use common queue naming method - q_name = self._get_review_queue_name() - if meta_data: - whisper_hash = meta_data.get("whisper-hash") - extracted_text = meta_data.get("extracted_text") - else: - whisper_hash = None - extracted_text = None - - # Get TTL from workflow settings - ttl_seconds = WorkflowUtil.get_hitl_ttl_seconds(workflow) - - # Create QueueResult with TTL metadata - queue_result_obj = QueueResult( - file=file_name, - status=QueueResultStatus.SUCCESS, - result=result, - workflow_id=str(self.workflow_id), - file_content=file_content_base64, - whisper_hash=whisper_hash, - file_execution_id=file_execution_id, - extracted_text=extracted_text, - ttl_seconds=ttl_seconds, - ) - # Add TTL metadata based on HITLSettings - queue_result_obj.ttl_seconds = WorkflowUtil.get_hitl_ttl_seconds(workflow) - - queue_result = queue_result_obj.to_dict() - queue_result_json = json.dumps(queue_result) + q_name = self._get_review_queue_name() + ttl_seconds = WorkflowUtil.get_hitl_ttl_seconds(workflow) - # Validate the JSON is not empty before enqueuing - if not queue_result_json or queue_result_json.strip() == "": - logger.error( - f"Attempted to enqueue empty JSON with TTL for file {file_name}" - ) - raise ValueError("Cannot enqueue empty JSON message") - - # Check if this is a packet-based execution - if self.packet_id: - if not FeatureRegistry.is_hitl_available(): - raise ValueError( - "Packet-based HITL processing requires Unstract Enterprise. " - "This feature is not available in the OSS version." - ) - # Route to packet queue instead of regular HITL queue - from pluggable_apps.manual_review_v2.packet_queue_utils import ( - PacketQueueUtils, - ) - - success = PacketQueueUtils.enqueue_to_packet( - packet_id=self.packet_id, queue_result=queue_result - ) - if not success: - error_msg = f"Failed to push {file_name} to packet {self.packet_id}" - logger.error(error_msg) - raise RuntimeError(error_msg) - return + queue_result = self._create_queue_result( + file_name=file_name, + result=result, + file_content_base64=file_content_base64, + file_execution_id=file_execution_id, + meta_data=meta_data, + ttl_seconds=ttl_seconds, + ) - conn = QueueUtils.get_queue_inst() + queue_result_json = json.dumps(queue_result) - # Use the TTL metadata that was already set in the QueueResult object - ttl_seconds = queue_result_obj.ttl_seconds + # Validate JSON is not empty + if not queue_result_json or queue_result_json.strip() == "": + logger.error(f"Attempted to enqueue empty JSON with TTL for file {file_name}") + raise ValueError("Cannot enqueue empty JSON message") - conn.enqueue_with_ttl( - queue_name=q_name, message=queue_result_json, ttl_seconds=ttl_seconds - ) - logger.info(f"Pushed {file_name} to queue {q_name} with file content") + self._enqueue_to_packet_or_regular_queue( + file_name, queue_result, queue_result_json, q_name, ttl_seconds + ) def _read_file_content_for_queue(self, input_file_path: str, file_name: str) -> str: """Read and encode file content for queue message. From 1c2f26e24ce7bd0953b5a24d1624ab5b714ba186 Mon Sep 17 00:00:00 2001 From: jagadeeswaran-zipstack Date: Mon, 6 Oct 2025 04:44:19 +0530 Subject: [PATCH 07/10] used registry to find installed apps --- backend/api_v2/serializers.py | 8 +++++--- backend/workflow_manager/endpoint_v2/destination.py | 11 ++++++++--- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/backend/api_v2/serializers.py b/backend/api_v2/serializers.py index 05c9b5a9ce..d708581dfe 100644 --- a/backend/api_v2/serializers.py +++ b/backend/api_v2/serializers.py @@ -4,9 +4,9 @@ from typing import Any from urllib.parse import urlparse +from django.apps import apps from django.core.validators import RegexValidator from pipeline_v2.models import Pipeline -from pluggable_apps.feature_registry import FeatureRegistry from prompt_studio.prompt_profile_manager_v2.models import ProfileManager from rest_framework.serializers import ( BooleanField, @@ -255,8 +255,8 @@ def validate_hitl_packet_id(self, value: str | None) -> str | None: if not value: return value - # Check if HITL feature is available using FeatureRegistry - if not FeatureRegistry.is_hitl_available(): + # Check if HITL feature is available + if not apps.is_installed("pluggable_apps.manual_review_v2"): raise ValidationError( "Packet-based HITL processing requires Unstract Enterprise. " "This advanced workflow feature is available in our enterprise version. " @@ -264,6 +264,8 @@ def validate_hitl_packet_id(self, value: str | None) -> str | None: "contact our sales team at https://unstract.com/contact/" ) + return value + def validate_custom_data(self, value): """Validate custom_data is a valid JSON object.""" if value is None: diff --git a/backend/workflow_manager/endpoint_v2/destination.py b/backend/workflow_manager/endpoint_v2/destination.py index d14bf7bd3d..1e1d778d61 100644 --- a/backend/workflow_manager/endpoint_v2/destination.py +++ b/backend/workflow_manager/endpoint_v2/destination.py @@ -6,7 +6,7 @@ from typing import Any from connector_v2.models import ConnectorInstance -from pluggable_apps.feature_registry import FeatureRegistry +from django.apps import apps from plugins.workflow_manager.workflow_v2.utils import WorkflowUtil from rest_framework.exceptions import APIException from usage_v2.helper import UsageHelper @@ -869,7 +869,12 @@ def _push_to_queue( ) else: self._push_to_queue_for_connector( - file_name, workflow, result, input_file_path, file_execution_id, meta_data + file_name, + workflow, + result, + input_file_path, + file_execution_id, + meta_data, ) def _enqueue_to_packet_or_regular_queue( @@ -890,7 +895,7 @@ def _enqueue_to_packet_or_regular_queue( ttl_seconds: TTL in seconds (optional, for regular queue) """ if self.packet_id: - if not FeatureRegistry.is_hitl_available(): + if not apps.is_installed("pluggable_apps.manual_review_v2"): raise ValueError( "Packet-based HITL processing requires Unstract Enterprise. " "This feature is not available in the OSS version." From 0f610b50d7b518972486d5d52115a2a286bc8e1c Mon Sep 17 00:00:00 2001 From: jagadeeswaran-zipstack Date: Mon, 6 Oct 2025 04:51:48 +0530 Subject: [PATCH 08/10] sonar issue fix --- backend/api_v2/serializers.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/backend/api_v2/serializers.py b/backend/api_v2/serializers.py index d708581dfe..1f5c102690 100644 --- a/backend/api_v2/serializers.py +++ b/backend/api_v2/serializers.py @@ -264,6 +264,20 @@ def validate_hitl_packet_id(self, value: str | None) -> str | None: "contact our sales team at https://unstract.com/contact/" ) + # Validate packet ID format (alphanumeric string, typically 8-character hex) + value = value.strip() + if not value: + raise ValidationError("Packet ID cannot be empty or whitespace only.") + + # Basic format validation: alphanumeric, reasonable length + if not re.match(r"^[a-zA-Z0-9_-]+$", value): + raise ValidationError( + "Invalid packet ID format. Packet ID must contain only letters, numbers, hyphens, or underscores." + ) + + if len(value) > 16: # Reasonable max length + raise ValidationError("Packet ID is too long (maximum 100 characters).") + return value def validate_custom_data(self, value): From 74e3260431c728b5e506eb67c14f02f5a1f3f9a1 Mon Sep 17 00:00:00 2001 From: jagadeeswaran-zipstack Date: Mon, 13 Oct 2025 15:22:52 +0530 Subject: [PATCH 09/10] changes for new worker implementation --- .../endpoint_v2/destination.py | 16 +-- .../core/src/unstract/core/data_models.py | 2 + workers/api-deployment/tasks.py | 6 +- workers/file_processing/tasks.py | 15 +- workers/general/tasks.py | 10 +- .../shared/workflow/destination_connector.py | 131 ++++++++++++++++-- workers/shared/workflow/execution/service.py | 12 +- 7 files changed, 153 insertions(+), 39 deletions(-) diff --git a/backend/workflow_manager/endpoint_v2/destination.py b/backend/workflow_manager/endpoint_v2/destination.py index 1e1d778d61..4965c946ce 100644 --- a/backend/workflow_manager/endpoint_v2/destination.py +++ b/backend/workflow_manager/endpoint_v2/destination.py @@ -6,7 +6,6 @@ from typing import Any from connector_v2.models import ConnectorInstance -from django.apps import apps from plugins.workflow_manager.workflow_v2.utils import WorkflowUtil from rest_framework.exceptions import APIException from usage_v2.helper import UsageHelper @@ -894,18 +893,12 @@ def _enqueue_to_packet_or_regular_queue( q_name: Queue name for regular queue ttl_seconds: TTL in seconds (optional, for regular queue) """ + # Get queue instance + conn = QueueUtils.get_queue_inst() + if self.packet_id: - if not apps.is_installed("pluggable_apps.manual_review_v2"): - raise ValueError( - "Packet-based HITL processing requires Unstract Enterprise. " - "This feature is not available in the OSS version." - ) # Route to packet queue - from pluggable_apps.manual_review_v2.packet_queue_utils import ( - PacketQueueUtils, - ) - - success = PacketQueueUtils.enqueue_to_packet( + success = conn.enqueue_to_packet( packet_id=self.packet_id, queue_result=queue_result ) if not success: @@ -915,7 +908,6 @@ def _enqueue_to_packet_or_regular_queue( return # Route to regular queue - conn = QueueUtils.get_queue_inst() if ttl_seconds: conn.enqueue_with_ttl( queue_name=q_name, message=queue_result_json, ttl_seconds=ttl_seconds diff --git a/unstract/core/src/unstract/core/data_models.py b/unstract/core/src/unstract/core/data_models.py index eb26ed7498..2cc0505f14 100644 --- a/unstract/core/src/unstract/core/data_models.py +++ b/unstract/core/src/unstract/core/data_models.py @@ -689,6 +689,7 @@ class FileHashData: use_file_history: bool = False # Whether to create file history entries for this file is_manualreview_required: bool = False # Whether this file requires manual review hitl_queue_name: str | None = None # HITL queue name for API deployments + hitl_packet_id: str | None = None # HITL packet ID for packet-based HITL processing def __post_init__(self): """Validate required fields.""" @@ -1388,6 +1389,7 @@ class WorkerFileData: source_config: dict[str, Any] = field(default_factory=dict) destination_config: dict[str, Any] = field(default_factory=dict) hitl_queue_name: str | None = field(default=None) + hitl_packet_id: str | None = field(default=None) manual_review_config: dict[str, Any] = field( default_factory=lambda: { "review_required": False, diff --git a/workers/api-deployment/tasks.py b/workers/api-deployment/tasks.py index 04ef2de8c4..7fb1ec5a85 100644 --- a/workers/api-deployment/tasks.py +++ b/workers/api-deployment/tasks.py @@ -22,9 +22,9 @@ from shared.processing.files import FileProcessingUtils from shared.workflow.execution import WorkerExecutionContext, WorkflowOrchestrationUtils from shared.workflow.execution.tool_validation import validate_workflow_tool_instances -from worker import app from unstract.core.data_models import ExecutionStatus, FileHashData, WorkerFileData +from worker import app logger = WorkerLogger.get_logger(__name__) @@ -838,6 +838,7 @@ def _create_file_data( f"No manual review rules configured for API deployment workflow {workflow_id}" ) hitl_queue_name = kwargs.get("hitl_queue_name") + hitl_packet_id = kwargs.get("hitl_packet_id") llm_profile_id = kwargs.get("llm_profile_id") custom_data = kwargs.get("custom_data") @@ -852,8 +853,9 @@ def _create_file_data( single_step=False, q_file_no_list=_calculate_q_file_no_list_api(manual_review_config, total_files), hitl_queue_name=hitl_queue_name, + hitl_packet_id=hitl_packet_id, manual_review_config=manual_review_config, - is_manualreview_required=bool(hitl_queue_name), + is_manualreview_required=bool(hitl_queue_name or hitl_packet_id), llm_profile_id=llm_profile_id, custom_data=custom_data, ) diff --git a/workers/file_processing/tasks.py b/workers/file_processing/tasks.py index 2a0fc15ed0..9941027ef8 100644 --- a/workers/file_processing/tasks.py +++ b/workers/file_processing/tasks.py @@ -40,9 +40,6 @@ ) from shared.processing.files.processor import FileProcessor -# Import manual review service with WorkflowUtil access -from worker import app - from unstract.core.data_models import ( ExecutionStatus, FileBatchData, @@ -53,6 +50,9 @@ ) from unstract.core.worker_models import FileProcessingResult +# Import manual review service with WorkflowUtil access +from worker import app + logger = WorkerLogger.get_logger(__name__) # Constants @@ -1056,6 +1056,15 @@ def _create_file_hash_from_dict( f"Applied HITL queue name '{file_data.hitl_queue_name}' to file {file_name}" ) + if file_data and file_data.hitl_packet_id: + file_hash.hitl_packet_id = file_data.hitl_packet_id + file_hash.is_manualreview_required = ( + True # Override manual review flag for packet processing + ) + logger.info( + f"Applied HITL packet ID '{file_data.hitl_packet_id}' to file {file_name}" + ) + return file_hash diff --git a/workers/general/tasks.py b/workers/general/tasks.py index bbc0d7a1c8..b134082fd7 100644 --- a/workers/general/tasks.py +++ b/workers/general/tasks.py @@ -45,10 +45,6 @@ ) from shared.workflow.execution.tool_validation import validate_workflow_tool_instances -# File management handled by StreamingFileDiscovery -# Import from local worker module (avoid circular import) -from worker import app, config - # Import shared data models for type safety from unstract.core.data_models import ( ExecutionStatus, @@ -60,6 +56,10 @@ # Import common workflow utilities from unstract.core.workflow_utils import WorkflowTypeDetector +# File management handled by StreamingFileDiscovery +# Import from local worker module (avoid circular import) +from worker import app, config + logger = WorkerLogger.get_logger(__name__) @@ -827,6 +827,7 @@ def _orchestrate_file_processing_general( ) hitl_queue_name = kwargs.get("hitl_queue_name") + hitl_packet_id = kwargs.get("hitl_packet_id") llm_profile_id = kwargs.get("llm_profile_id") custom_data = kwargs.get("custom_data") @@ -842,6 +843,7 @@ def _orchestrate_file_processing_general( q_file_no_list=[], manual_review_config={}, hitl_queue_name=hitl_queue_name, + hitl_packet_id=hitl_packet_id, llm_profile_id=llm_profile_id, custom_data=custom_data, ) diff --git a/workers/shared/workflow/destination_connector.py b/workers/shared/workflow/destination_connector.py index a0bcc3c1c3..9f67bc77aa 100644 --- a/workers/shared/workflow/destination_connector.py +++ b/workers/shared/workflow/destination_connector.py @@ -113,6 +113,7 @@ class DestinationConfig: connector_name: str | None = None # Manual review / HITL support hitl_queue_name: str | None = None + hitl_packet_id: str | None = None # Source connector configuration for reading files source_connector_id: str | None = None source_connector_settings: dict[str, Any] = None @@ -165,6 +166,7 @@ def from_dict(cls, data: dict[str, Any]) -> "DestinationConfig": connector_settings=connector_instance.get("connector_metadata", {}), connector_name=connector_instance.get("connector_name"), hitl_queue_name=data.get("hitl_queue_name"), + hitl_packet_id=data.get("hitl_packet_id"), source_connector_id=data.get("source_connector_id"), source_connector_settings=data.get("source_connector_settings", {}), file_execution_id=data.get("file_execution_id"), @@ -199,6 +201,7 @@ def __init__(self, config: DestinationConfig, workflow_log=None): # Manual review / HITL support self.hitl_queue_name = config.hitl_queue_name + self.hitl_packet_id = config.hitl_packet_id # Workflow and execution context (will be set when handling output) self.organization_id = None @@ -305,7 +308,10 @@ def _extract_processing_data( return ProcessingResult(tool_execution_result=tool_result, metadata=metadata) def _check_and_handle_hitl( - self, exec_ctx: ExecutionContext, file_ctx: FileContext, result: ProcessingResult + self, + exec_ctx: ExecutionContext, + file_ctx: FileContext, + result: ProcessingResult, ) -> bool: """Check HITL requirements and push to queue if needed.""" has_hitl = self._should_handle_hitl( @@ -329,7 +335,10 @@ def _check_and_handle_hitl( return has_hitl def _process_destination( - self, exec_ctx: ExecutionContext, file_ctx: FileContext, result: ProcessingResult + self, + exec_ctx: ExecutionContext, + file_ctx: FileContext, + result: ProcessingResult, ): """Route to appropriate destination handler.""" handlers = { @@ -346,7 +355,10 @@ def _process_destination( logger.warning(f"Unknown destination connection type: {self.connection_type}") def _handle_api_destination( - self, exec_ctx: ExecutionContext, file_ctx: FileContext, result: ProcessingResult + self, + exec_ctx: ExecutionContext, + file_ctx: FileContext, + result: ProcessingResult, ): """Handle API destination processing.""" log_file_info( @@ -368,7 +380,10 @@ def _handle_api_destination( ) def _handle_filesystem_destination( - self, exec_ctx: ExecutionContext, file_ctx: FileContext, result: ProcessingResult + self, + exec_ctx: ExecutionContext, + file_ctx: FileContext, + result: ProcessingResult, ): """Handle filesystem destination processing.""" if not result.has_hitl: @@ -378,7 +393,9 @@ def _handle_filesystem_destination( f"📤 File '{file_ctx.file_name}' marked for FILESYSTEM processing - copying to destination", ) self.copy_output_to_output_directory( - file_ctx.input_file_path, exec_ctx.file_execution_id, exec_ctx.api_client + file_ctx.input_file_path, + exec_ctx.file_execution_id, + exec_ctx.api_client, ) else: logger.info( @@ -386,7 +403,10 @@ def _handle_filesystem_destination( ) def _handle_database_destination( - self, exec_ctx: ExecutionContext, file_ctx: FileContext, result: ProcessingResult + self, + exec_ctx: ExecutionContext, + file_ctx: FileContext, + result: ProcessingResult, ): """Handle database destination processing.""" if not result.has_hitl: @@ -414,7 +434,10 @@ def _handle_database_destination( ) def _handle_manual_review_destination( - self, exec_ctx: ExecutionContext, file_ctx: FileContext, result: ProcessingResult + self, + exec_ctx: ExecutionContext, + file_ctx: FileContext, + result: ProcessingResult, ): """Handle manual review destination processing.""" log_file_info( @@ -1198,6 +1221,13 @@ def _should_handle_hitl( ) return False + # Check hitl_packet_id first - it takes precedence over everything else + if self.hitl_packet_id: + logger.info( + f"API packet override: pushing to packet queue for file {file_name}" + ) + return True + # Check if API deployment requested HITL override if self.hitl_queue_name: logger.info(f"{file_name}: Pushing to HITL queue") @@ -1236,6 +1266,75 @@ def _should_handle_hitl( return True return False + def _enqueue_to_packet_or_regular_queue( + self, + file_name: str, + queue_result: dict[str, Any], + queue_name: str, + workflow_util: Any, + ttl_seconds: int | None = None, + ) -> None: + """Route to packet queue or regular queue based on hitl_packet_id. + + Args: + file_name: Name of the file being queued + queue_result: Queue result dictionary + queue_name: Queue name for regular queue + workflow_util: Workflow utility instance for queue operations + ttl_seconds: TTL in seconds (optional, for regular queue) + """ + if self.hitl_packet_id: + # Route to packet queue via backend API (enterprise only) + logger.info(f"Routing {file_name} to packet queue {self.hitl_packet_id}") + + # Access the manual review client from workflow_util + # Enterprise: workflow_util.client is ManualReviewAPIClient + # OSS: workflow_util is null implementation without client attribute + manual_review_client = getattr(workflow_util, "client", None) + + # Check if client exists and has enqueue_to_packet method (enterprise only) + if not manual_review_client or not hasattr( + manual_review_client, "enqueue_to_packet" + ): + error_msg = ( + f"Packet queues are not available" + f"Cannot enqueue file '{file_name}' to packet '{self.hitl_packet_id}'. " + f"This is an Enterprise-only feature." + ) + logger.error(error_msg) + raise NotImplementedError(error_msg) + + # Call backend API to enqueue to packet queue + result = manual_review_client.enqueue_to_packet( + hitl_packet_id=self.hitl_packet_id, + queue_result=queue_result, + organization_id=self.organization_id, + ) + + if not result.get("success", False): + error_msg = ( + f"Failed to push {file_name} to packet {self.hitl_packet_id}: " + f"{result.get('message', 'Unknown error')}" + ) + logger.error(error_msg) + raise RuntimeError(error_msg) + + logger.info( + f"✅ MANUAL REVIEW: File '{file_name}' sent to packet queue '{self.hitl_packet_id}' successfully" + ) + return + + # Route to regular queue + logger.info(f"Routing {file_name} to regular queue {queue_name}") + workflow_util.enqueue_manual_review( + queue_name=queue_name, + message=queue_result, + organization_id=self.organization_id, + ) + logger.info( + f"✅ MANUAL REVIEW: File '{file_name}' sent to manual review queue '{queue_name}' successfully" + ) + def _push_data_to_queue( self, file_name: str, @@ -1320,21 +1419,23 @@ def _push_data_to_queue( if file_content_base64 is not None: queue_result.file_content = file_content_base64 - workflow_util.enqueue_manual_review( + # Route to packet queue or regular queue based on hitl_packet_id + self._enqueue_to_packet_or_regular_queue( + file_name=file_name, + queue_result=queue_result.to_dict(), queue_name=queue_name, - message=queue_result.to_dict(), - organization_id=self.organization_id, + workflow_util=workflow_util, + ttl_seconds=ttl_seconds, ) # Log successful enqueue (common for both paths) + queue_display_name = ( + self.hitl_packet_id if self.hitl_packet_id else queue_name + ) log_file_info( self.workflow_log, file_execution_id, - f"✅ File '{file_name}' sent to manual review queue '{queue_name}'", - ) - - logger.info( - f"✅ MANUAL REVIEW: File '{file_name}' sent to manual review queue '{queue_name}' successfully" + f"✅ File '{file_name}' sent to manual review queue '{queue_display_name}'", ) except Exception as e: diff --git a/workers/shared/workflow/execution/service.py b/workers/shared/workflow/execution/service.py index f2f93e89f1..71c8d695e5 100644 --- a/workers/shared/workflow/execution/service.py +++ b/workers/shared/workflow/execution/service.py @@ -1095,8 +1095,9 @@ def _handle_destination_processing( source_connection_type = workflow.source_config.connection_type - # Add HITL queue name from file_data if present (for API deployments) + # Add HITL queue name and packet ID from file_data if present (for API deployments) hitl_queue_name = file_data.hitl_queue_name + hitl_packet_id = file_data.hitl_packet_id destination_config["use_file_history"] = use_file_history destination_config["file_execution_id"] = workflow_file_execution_id if hitl_queue_name: @@ -1104,9 +1105,14 @@ def _handle_destination_processing( logger.info( f"Added HITL queue name to destination config: {hitl_queue_name}" ) - else: + if hitl_packet_id: + destination_config["hitl_packet_id"] = hitl_packet_id + logger.info( + f"Added HITL packet ID to destination config: {hitl_packet_id}" + ) + if not hitl_queue_name and not hitl_packet_id: logger.info( - "No hitl_queue_name found in file_data, proceeding with normal processing" + "No hitl_queue_name or hitl_packet_id found in file_data, proceeding with normal processing" ) # Import destination connector From 217026b1c4ff585bdce07ad9c1bbbe30f4bdd33c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 13 Oct 2025 09:54:01 +0000 Subject: [PATCH 10/10] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- workers/api-deployment/tasks.py | 2 +- workers/file_processing/tasks.py | 6 +++--- workers/general/tasks.py | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/workers/api-deployment/tasks.py b/workers/api-deployment/tasks.py index 7fb1ec5a85..31367066c0 100644 --- a/workers/api-deployment/tasks.py +++ b/workers/api-deployment/tasks.py @@ -22,9 +22,9 @@ from shared.processing.files import FileProcessingUtils from shared.workflow.execution import WorkerExecutionContext, WorkflowOrchestrationUtils from shared.workflow.execution.tool_validation import validate_workflow_tool_instances +from worker import app from unstract.core.data_models import ExecutionStatus, FileHashData, WorkerFileData -from worker import app logger = WorkerLogger.get_logger(__name__) diff --git a/workers/file_processing/tasks.py b/workers/file_processing/tasks.py index 9941027ef8..6fd097a254 100644 --- a/workers/file_processing/tasks.py +++ b/workers/file_processing/tasks.py @@ -40,6 +40,9 @@ ) from shared.processing.files.processor import FileProcessor +# Import manual review service with WorkflowUtil access +from worker import app + from unstract.core.data_models import ( ExecutionStatus, FileBatchData, @@ -50,9 +53,6 @@ ) from unstract.core.worker_models import FileProcessingResult -# Import manual review service with WorkflowUtil access -from worker import app - logger = WorkerLogger.get_logger(__name__) # Constants diff --git a/workers/general/tasks.py b/workers/general/tasks.py index b134082fd7..7990c52631 100644 --- a/workers/general/tasks.py +++ b/workers/general/tasks.py @@ -45,6 +45,10 @@ ) from shared.workflow.execution.tool_validation import validate_workflow_tool_instances +# File management handled by StreamingFileDiscovery +# Import from local worker module (avoid circular import) +from worker import app, config + # Import shared data models for type safety from unstract.core.data_models import ( ExecutionStatus, @@ -56,10 +60,6 @@ # Import common workflow utilities from unstract.core.workflow_utils import WorkflowTypeDetector -# File management handled by StreamingFileDiscovery -# Import from local worker module (avoid circular import) -from worker import app, config - logger = WorkerLogger.get_logger(__name__)