@@ -189,7 +189,6 @@ async def build_graph_and_get_order() -> tuple[list[str], list[str], Graph]:
189
189
playgroundSuccess = True ,
190
190
),
191
191
)
192
- return first_layer , vertices_to_run , graph
193
192
except Exception as exc :
194
193
background_tasks .add_task (
195
194
telemetry_service .log_package_playground ,
@@ -205,6 +204,8 @@ async def build_graph_and_get_order() -> tuple[list[str], list[str], Graph]:
205
204
logger .exception ("Error checking build status" )
206
205
raise HTTPException (status_code = 500 , detail = str (exc )) from exc
207
206
207
+ return first_layer , vertices_to_run , graph
208
+
208
209
async def _build_vertex (vertex_id : str , graph : Graph , event_manager : EventManager ) -> VertexBuildResponse :
209
210
flow_id_str = str (flow_id )
210
211
@@ -302,7 +303,6 @@ async def _build_vertex(vertex_id: str, graph: Graph, event_manager: EventManage
302
303
componentErrorMessage = error_message ,
303
304
),
304
305
)
305
- return build_response
306
306
except Exception as exc :
307
307
background_tasks .add_task (
308
308
telemetry_service .log_package_component ,
@@ -317,6 +317,8 @@ async def _build_vertex(vertex_id: str, graph: Graph, event_manager: EventManage
317
317
message = parse_exception (exc )
318
318
raise HTTPException (status_code = 500 , detail = message ) from exc
319
319
320
+ return build_response
321
+
320
322
async def build_vertices (
321
323
vertex_id : str ,
322
324
graph : Graph ,
@@ -588,7 +590,6 @@ async def build_vertex(
588
590
componentErrorMessage = error_message ,
589
591
),
590
592
)
591
- return build_response
592
593
except Exception as exc :
593
594
background_tasks .add_task (
594
595
telemetry_service .log_package_component ,
@@ -603,6 +604,90 @@ async def build_vertex(
603
604
message = parse_exception (exc )
604
605
raise HTTPException (status_code = 500 , detail = message ) from exc
605
606
607
+ return build_response
608
+
609
+
610
+ async def _stream_vertex (flow_id : str , vertex_id : str , chat_service : ChatService ):
611
+ graph = None
612
+ try :
613
+ try :
614
+ cache = await chat_service .get_cache (flow_id )
615
+ except Exception as exc : # noqa: BLE001
616
+ logger .exception ("Error building Component" )
617
+ yield str (StreamData (event = "error" , data = {"error" : str (exc )}))
618
+ return
619
+
620
+ if not cache :
621
+ # If there's no cache
622
+ msg = f"No cache found for { flow_id } ."
623
+ logger .error (msg )
624
+ yield str (StreamData (event = "error" , data = {"error" : msg }))
625
+ return
626
+ else :
627
+ graph = cache .get ("result" )
628
+
629
+ try :
630
+ vertex : InterfaceVertex = graph .get_vertex (vertex_id )
631
+ except Exception as exc : # noqa: BLE001
632
+ logger .exception ("Error building Component" )
633
+ yield str (StreamData (event = "error" , data = {"error" : str (exc )}))
634
+ return
635
+
636
+ if not hasattr (vertex , "stream" ):
637
+ msg = f"Vertex { vertex_id } does not support streaming"
638
+ logger .error (msg )
639
+ yield str (StreamData (event = "error" , data = {"error" : msg }))
640
+ return
641
+
642
+ if isinstance (vertex ._built_result , str ) and vertex ._built_result :
643
+ stream_data = StreamData (
644
+ event = "message" ,
645
+ data = {"message" : f"Streaming vertex { vertex_id } " },
646
+ )
647
+ yield str (stream_data )
648
+ stream_data = StreamData (
649
+ event = "message" ,
650
+ data = {"chunk" : vertex ._built_result },
651
+ )
652
+ yield str (stream_data )
653
+
654
+ elif not vertex .frozen or not vertex ._built :
655
+ logger .debug (f"Streaming vertex { vertex_id } " )
656
+ stream_data = StreamData (
657
+ event = "message" ,
658
+ data = {"message" : f"Streaming vertex { vertex_id } " },
659
+ )
660
+ yield str (stream_data )
661
+ try :
662
+ async for chunk in vertex .stream ():
663
+ stream_data = StreamData (
664
+ event = "message" ,
665
+ data = {"chunk" : chunk },
666
+ )
667
+ yield str (stream_data )
668
+ except Exception as exc : # noqa: BLE001
669
+ logger .exception ("Error building Component" )
670
+ exc_message = parse_exception (exc )
671
+ if exc_message == "The message must be an iterator or an async iterator." :
672
+ exc_message = "This stream has already been closed."
673
+ yield str (StreamData (event = "error" , data = {"error" : exc_message }))
674
+ elif vertex .result is not None :
675
+ stream_data = StreamData (
676
+ event = "message" ,
677
+ data = {"chunk" : vertex ._built_result },
678
+ )
679
+ yield str (stream_data )
680
+ else :
681
+ msg = f"No result found for vertex { vertex_id } "
682
+ logger .error (msg )
683
+ yield str (StreamData (event = "error" , data = {"error" : msg }))
684
+ return
685
+ finally :
686
+ logger .debug ("Closing stream" )
687
+ if graph :
688
+ await chat_service .set_cache (flow_id , graph )
689
+ yield str (StreamData (event = "close" , data = {"message" : "Stream closed" }))
690
+
606
691
607
692
@router .get ("/build/{flow_id}/{vertex_id}/stream" , response_class = StreamingResponse )
608
693
async def build_vertex_stream (
@@ -638,70 +723,6 @@ async def build_vertex_stream(
638
723
HTTPException: If an error occurs while building the vertex.
639
724
"""
640
725
try :
641
- flow_id_str = str (flow_id )
642
-
643
- async def stream_vertex ():
644
- graph = None
645
- try :
646
- cache = await chat_service .get_cache (flow_id_str )
647
- if not cache :
648
- # If there's no cache
649
- msg = f"No cache found for { flow_id_str } ."
650
- raise ValueError (msg )
651
- else :
652
- graph = cache .get ("result" )
653
-
654
- vertex : InterfaceVertex = graph .get_vertex (vertex_id )
655
- if not hasattr (vertex , "stream" ):
656
- msg = f"Vertex { vertex_id } does not support streaming"
657
- raise ValueError (msg )
658
- if isinstance (vertex ._built_result , str ) and vertex ._built_result :
659
- stream_data = StreamData (
660
- event = "message" ,
661
- data = {"message" : f"Streaming vertex { vertex_id } " },
662
- )
663
- yield str (stream_data )
664
- stream_data = StreamData (
665
- event = "message" ,
666
- data = {"chunk" : vertex ._built_result },
667
- )
668
- yield str (stream_data )
669
-
670
- elif not vertex .frozen or not vertex ._built :
671
- logger .debug (f"Streaming vertex { vertex_id } " )
672
- stream_data = StreamData (
673
- event = "message" ,
674
- data = {"message" : f"Streaming vertex { vertex_id } " },
675
- )
676
- yield str (stream_data )
677
- async for chunk in vertex .stream ():
678
- stream_data = StreamData (
679
- event = "message" ,
680
- data = {"chunk" : chunk },
681
- )
682
- yield str (stream_data )
683
- elif vertex .result is not None :
684
- stream_data = StreamData (
685
- event = "message" ,
686
- data = {"chunk" : vertex ._built_result },
687
- )
688
- yield str (stream_data )
689
- else :
690
- msg = f"No result found for vertex { vertex_id } "
691
- raise ValueError (msg )
692
-
693
- except Exception as exc : # noqa: BLE001
694
- logger .exception ("Error building Component" )
695
- exc_message = parse_exception (exc )
696
- if exc_message == "The message must be an iterator or an async iterator." :
697
- exc_message = "This stream has already been closed."
698
- yield str (StreamData (event = "error" , data = {"error" : exc_message }))
699
- finally :
700
- logger .debug ("Closing stream" )
701
- if graph :
702
- await chat_service .set_cache (flow_id_str , graph )
703
- yield str (StreamData (event = "close" , data = {"message" : "Stream closed" }))
704
-
705
- return StreamingResponse (stream_vertex (), media_type = "text/event-stream" )
726
+ return StreamingResponse (_stream_vertex (str (flow_id ), vertex_id , chat_service ), media_type = "text/event-stream" )
706
727
except Exception as exc :
707
728
raise HTTPException (status_code = 500 , detail = "Error building Component" ) from exc
0 commit comments