Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
ef40b15
UN-2807 [FEAT] Add packet processing support for HITL workflows
jagadeeswaran-zipstack Sep 18, 2025
bc31ae5
added missing references
jagadeeswaran-zipstack Sep 19, 2025
969e60c
added hitl prefix to query params
jagadeeswaran-zipstack Oct 1, 2025
996e1a2
Merge branch 'main' into UN-2807-packet-processing-support-in-hitl
jagadeeswaran-zipstack Oct 1, 2025
f10b560
added try-catch on enterprise import
jagadeeswaran-zipstack Oct 3, 2025
f83a9a9
used app registry for plugin check
jagadeeswaran-zipstack Oct 3, 2025
c962b57
Merge branch 'main' into UN-2807-packet-processing-support-in-hitl
jagadeeswaran-zipstack Oct 3, 2025
c2c8ec8
Merge branch 'UN-2807-packet-processing-support-in-hitl' of github.co…
jagadeeswaran-zipstack Oct 3, 2025
028438b
code refactor
jagadeeswaran-zipstack Oct 3, 2025
1c2f26e
used registry to find installed apps
jagadeeswaran-zipstack Oct 5, 2025
0f610b5
sonar issue fix
jagadeeswaran-zipstack Oct 5, 2025
ee1a227
Merge branch 'main' into UN-2807-packet-processing-support-in-hitl
jagadeeswaran-zipstack Oct 6, 2025
3d9013f
Merge branch 'main' of github.com:Zipstack/unstract into UN-2807-pack…
jagadeeswaran-zipstack Oct 8, 2025
0b22c96
Merge branch 'main' of github.com:Zipstack/unstract into UN-2807-pack…
jagadeeswaran-zipstack Oct 8, 2025
74e3260
changes for new worker implementation
jagadeeswaran-zipstack Oct 13, 2025
217026b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 13, 2025
015cd06
Merge branch 'main' into UN-2807-packet-processing-support-in-hitl
athul-rs Oct 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/api_v2/api_deployment_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def post(
tag_names = serializer.validated_data.get(ApiExecution.TAGS)
llm_profile_id = serializer.validated_data.get(ApiExecution.LLM_PROFILE_ID)
hitl_queue_name = serializer.validated_data.get(ApiExecution.HITL_QUEUE_NAME)
packet_id = serializer.validated_data.get(ApiExecution.PACKET_ID)

if presigned_urls:
DeploymentHelper.load_presigned_files(presigned_urls, file_objs)
Expand All @@ -85,6 +86,7 @@ def post(
tag_names=tag_names,
llm_profile_id=llm_profile_id,
hitl_queue_name=hitl_queue_name,
packet_id=packet_id,
request_headers=dict(request.headers),
)
if "error" in response and response["error"]:
Expand Down
1 change: 1 addition & 0 deletions backend/api_v2/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ class ApiExecution:
TAGS: str = "tags"
LLM_PROFILE_ID: str = "llm_profile_id"
HITL_QUEUE_NAME: str = "hitl_queue_name"
PACKET_ID: str = "packet_id"
PRESIGNED_URLS: str = "presigned_urls"
7 changes: 7 additions & 0 deletions backend/api_v2/deployment_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def execute_workflow(
tag_names: list[str] = [],
llm_profile_id: str | None = None,
hitl_queue_name: str | None = None,
packet_id: str | None = None,
request_headers=None,
) -> ReturnDict:
"""Execute workflow by api.
Expand All @@ -168,6 +169,7 @@ def execute_workflow(
tag_names (list(str)): list of tag names
llm_profile_id (str, optional): LLM profile ID for overriding tool settings
hitl_queue_name (str, optional): Custom queue name for manual review
packet_id (str, optional): Packet ID for packet-based review

Returns:
ReturnDict: execution status/ result
Expand All @@ -178,6 +180,10 @@ def execute_workflow(
logger.info(
f"API execution with HITL: hitl_queue_name={hitl_queue_name}, api_name={api.api_name}"
)
if packet_id:
logger.info(
f"API execution with Packet: packet_id={packet_id}, api_name={api.api_name}"
)
tags = Tag.bulk_get_or_create(tag_names=tag_names)
workflow_execution = WorkflowExecutionServiceHelper.create_workflow_execution(
workflow_id=workflow_id,
Expand Down Expand Up @@ -234,6 +240,7 @@ def execute_workflow(
use_file_history=use_file_history,
llm_profile_id=llm_profile_id,
hitl_queue_name=hitl_queue_name,
packet_id=packet_id,
)
result.status_api = DeploymentHelper.construct_status_endpoint(
api_endpoint=api.api_endpoint, execution_id=execution_id
Expand Down
42 changes: 42 additions & 0 deletions backend/workflow_manager/endpoint_v2/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any

from connector_v2.models import ConnectorInstance
from pluggable_apps.manual_review_v2.packet_queue_utils import PacketQueueUtils
from plugins.workflow_manager.workflow_v2.utils import WorkflowUtil
from rest_framework.exceptions import APIException
from usage_v2.helper import UsageHelper
Expand Down Expand Up @@ -64,6 +65,7 @@ def __init__(
use_file_history: bool,
file_execution_id: str | None = None,
hitl_queue_name: str | None = None,
packet_id: str | None = None,
) -> None:
"""Initialize a DestinationConnector object.

Expand All @@ -82,6 +84,7 @@ def __init__(
self.workflow_log = workflow_log
self.use_file_history = use_file_history
self.hitl_queue_name = hitl_queue_name
self.packet_id = packet_id
self.workflow = workflow

def _get_endpoint_for_workflow(
Expand Down Expand Up @@ -172,6 +175,19 @@ def _should_handle_hitl(
logger.info(f"Successfully pushed {file_name} to HITL queue")
return True

if self.packet_id:
logger.info(
f"API packet override: pushing to packet queue for file {file_name}"
)
self._push_data_to_queue(
file_name=file_name,
workflow=workflow,
input_file_path=input_file_path,
file_execution_id=file_execution_id,
)
logger.info(f"Successfully pushed {file_name} to packet queue")
return True

# Skip HITL validation if we're using file_history and no execution result is available
if self.is_api and self.use_file_history:
return False
Expand Down Expand Up @@ -749,6 +765,7 @@ def get_config(self) -> DestinationConfig:
execution_id=self.execution_id,
use_file_history=self.use_file_history,
hitl_queue_name=self.hitl_queue_name,
packet_id=self.packet_id,
)

@classmethod
Expand Down Expand Up @@ -777,6 +794,7 @@ def from_config(
use_file_history=config.use_file_history,
file_execution_id=config.file_execution_id,
hitl_queue_name=config.hitl_queue_name,
packet_id=config.packet_id,
)

return destination
Expand Down Expand Up @@ -868,6 +886,18 @@ def _push_to_queue(

queue_result_json = json.dumps(queue_result)

# Check if this is a packet-based execution
if self.packet_id:
# Route to packet queue instead of regular HITL queue
success = PacketQueueUtils.enqueue_to_packet(
packet_id=self.packet_id, queue_result=queue_result
)
if success:
logger.info(f"Pushed {file_name} to packet {self.packet_id}")
else:
logger.error(f"Failed to push {file_name} to packet {self.packet_id}")
return

conn = QueueUtils.get_queue_inst()
conn.enqueue(queue_name=q_name, message=queue_result_json)
logger.info(f"Pushed {file_name} to queue {q_name} with file content")
Expand Down Expand Up @@ -918,6 +948,18 @@ def _push_to_queue(
)
raise ValueError("Cannot enqueue empty JSON message")

# Check if this is a packet-based execution
if self.packet_id:
# Route to packet queue instead of regular HITL queue
success = PacketQueueUtils.enqueue_to_packet(
packet_id=self.packet_id, queue_result=queue_result_obj
)
if success:
logger.info(f"Pushed {file_name} to packet {self.packet_id}")
else:
logger.error(f"Failed to push {file_name} to packet {self.packet_id}")
return

conn = QueueUtils.get_queue_inst()

# Use the TTL metadata that was already set in the QueueResult object
Expand Down
2 changes: 2 additions & 0 deletions backend/workflow_manager/endpoint_v2/dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class DestinationConfig:
use_file_history: bool
file_execution_id: str | None = None
hitl_queue_name: str | None = None
packet_id: str | None = None

def to_json(self) -> dict[str, Any]:
"""Serialize the DestinationConfig instance to a JSON string."""
Expand All @@ -104,6 +105,7 @@ def to_json(self) -> dict[str, Any]:
"use_file_history": self.use_file_history,
"file_execution_id": file_execution_id,
"hitl_queue_name": self.hitl_queue_name,
"packet_id": self.packet_id,
}

@staticmethod
Expand Down